使用asyncio在MicroPython v.1.12上创建异步设备驱动程序

在研究MicroPython达到其目的的可能性时,我遇到了asyncio库的一种实现,并且在与库的作者Piter Hinch进行了简短的通信之后,我意识到我需要更深入地了解使用异步编程方法的原理,基本概念和典型错误。 此外,面向初学者的部分仅适合我。

本指南面向具有不同asyncio经验水平的用户,其中包括面向初学者的特殊部分。

目录内容
0.简介
0.1 .___在空设备(硬件)上安装uasyncio
1.规划联合计划执行
1.1 .___模块
2. uasyncio
2.1 .___程序结构:事件处理周期
2.2 .___协程
2.2.1 .______对协程进行排队以参与计划
2.2.2 .______ 启动函数回调callback
2.2.3 .______ 注意:协程作为相关方法。 返回值。
2.3 .___延迟
3.同步及其类
3.1 .___锁定锁定
3.1.1 .______ 锁定和超时
3.2 .___ 事件
3.2.1 .______事件
3.3 .___屏障屏障
3.4 .___ 信号量
3.4.1 .______ 受限信号量
3.5 .___队列队列
3.6 .___其他同步类
4. asyncio的类开发
4.1 .___类使用await
4.1.1 .______ 在上下文管理器中使用
4.1.2 .______ 等待协程
4.2 .___异步迭代器
4.3 .___异步上下文管理器
5.超时和任务取消导致的例外
5.1 .___例外
5.2 .___由于超时和取消任务而导致的异常
5.2.1 .______ 取消任务
5.2.2 带有超时的协程
6.与硬件设备的交互
6.1 .___同步问题
6.2 .___带有协程的轮询设备
6.3 .___使用流引擎
6.3.1 .______ UART驱动程序示例
6.4 .___流设备的驱动程序开发
6.5 .___完整示例: aremote.py用于IR遥控接收器的驱动程序。
6.6 .___温湿度传感器HTU21D的驱动程序。
7.技巧和窍门
7.1 .___程序冻结
7.2 .___ uasyncio保存状态
7.3 .___垃圾收集
7.4 .___测试
7.5 .___常见错误。 可能很难找到。
7.6 .___使用套接字编程( 套接字
7.6.1 .______ WiFi问题
7.7 .___事件循环构造函数的参数
8.初学者注意事项
8.1 .___问题1:事件循环
8.2 .___问题2:锁定方法
8.3 .___ uasyncio方法
8.4 .___在uasyncio中进行规划
8.5 .___为什么要协作而不是基于线程的调度( _thread )?
8.6 .___互动
8.7 .___ 轮询

0.简介

本文档的大部分内容假定您对异步编程有所了解。 对于初学者,可以在第7节中找到介绍。

MicroPython的uasyncio库包含asyncio Python库的一个子集,旨在用于微控制器。 这样,它占用少量RAM,并且配置为使用零RAM分配快速切换上下文。

本文档介绍了uasyncio的使用,重点是为硬件设备创建驱动程序。

目的是设计驱动程序,以使应用程序在驱动程序等待设备响应时继续运行。 同时,应用程序对其他事件和用户交互保持敏感。

异步应用的另一个重要领域是网络编程:在Internet上您可以找到有关此主题的足够信息。

请注意, MicroPython基于Python 3.4,带有最少的Python 3.5附加组件。 除以下详述外,不支持早于3.4的asyncio版本的功能。 本文档定义了此子集中支持的功能。

本指南的目的是介绍与CPython V3.5及更高版本兼容的编程风格。

0.1在空设备上安装uasyncio (硬件)

建议使用固件MicroPython V1.11或更高版本。 在许多平台上,不需要安装,因为uasyncio®已在程序集中进行编译。 要检查,只需输入REPL

import uasyncio 

以下说明介绍了未预装模块的情况。 队列同步模块是可选的,但运行此处给出的示例是必需的。

互联网连接设备

在连接到Internet并运行固件V1.11或更高版本的设备上,可以使用内置的upip版本进行安装。 确保设备已连接到网络:

 import upip upip.install ( 'micropython-uasyncio' ) upip.install ( 'micropython-uasyncio.synchro' ) upip.install ( 'micropython-uasyncio.queues' ) 

来自upip的错误消息不是很有用。 如果出现无法理解的错误,请再次检查Internet连接。

没有互联网连接的硬件( micropip

如果您的设备没有 Internet连接(例如Pyboard V1.x ),最简单的方法是开始在计算机上安装micropip.py到您选择的目录,然后将生成的目录结构复制到目标设备。 micropip.py实用程序在Python 3.2或更高版本上运行,并在Linux,Windows和OSX上运行。 在此处可以找到更多信息。

典型电话:

 $ micropip.py install -p ~/rats micropython-uasyncio $ micropip.py install -p ~/rats micropython-uasyncio.synchro $ micropip.py install -p ~/rats micropython-uasyncio.queues 

没有Internet连接的设备(复制源)

如果不使用micropip.py ,则必须从源复制文件。 以下说明描述了如何将最少数量的文件复制到目标设备,以及需要将uasyncio压缩为字节码形式的已编译程序集以减少占用空间的情况。 对于与官方固件兼容的最新版本,必须从官方micropython-lib网站复制文件。

使用以下命令将库克隆到计算机

 $ git clone https://github.com/micropython/micropython-lib.git 

在目标设备上,创建uasyncio目录(lib目录中的可选目录),并将以下文件复制到其中:

•uasyncio / uasyncio / __ init__.py
•uasyncio.core / uasyncio / core.py
•uasyncio.synchro / uasyncio / synchron.py
•uasyncio.queues / uasyncio / queues.py


通过将uasyncio目录及其内容放置在modules目录的端口中并重新编译内容,可以将这些uasyncio模块压缩为字节码。

1.联合规划

联合执行多个任务的技术已在嵌入式系统中广泛使用,与线程调度( _thread )调度相比,它提供了更少的开销,避免了与真正的异步线程相关的许多陷阱。

1.1模块

以下是可以在目标设备上运行的模块的列表。

图书馆

1. asyn.py提供锁,事件,屏障,信号量,BoundedSemaphore,条件,收集同步原语。 通过NamedTaskCancellable类提供对取消任务的支持。

2. aswitch.py表示配对开关和按钮的类,以及可能重复延迟的程序对象。 按钮是对开关的概括,它们提供逻辑状态而不是物理状态,以及双击和长按触发的事件。

演示程序

前两个最有用,因为它们在访问Pyboard硬件时会给出可见的结果。

  1. aledflash.py异步闪烁四个Pyboard指示器10秒钟。 uasyncio的最简单演示。 导入运行。
  2. apoll.py Pyboard加速度计的设备驱动程序。 演示使用协程查询设备。 工作20秒。 导入运行。 需要PyboardV1.x。
  3. astests.py aswitch模块的测试/演示程序。
  4. asyn_demos.py取消任务的简单演示。
  5. roundrobin.py循环计划的演示。 也是绩效计划的基准。
  6. awaitable.py带有等待的类的演示。 一种实现轮询接口的设备驱动程序的方法。
  7. chain.pyPython文档复制。 协程链演示。
  8. aqtest.py演示uasyncio库的Queue类。
  9. aremote.py NEC IR协议的示例设备驱动程序。
  10. auart.py通过Pyboard UART进行流输入输出的演示。
  11. auart_hd.py使用Pyboard UART使用半双工协议与设备进行通信。 适用于设备,例如,使用AT调制解调器命令集。
  12. iorw.py演示使用流I / O的读取器/写入器设备。

测试程序

  1. asyntest.py测试asyn.py中的同步类。
  2. cantest.py职位取消测试。

效用

1. check_async_code.py该实用程序是用Python3编写的,用于检测可能很难找到的特定编码错误。 请参阅第7.5节。

控制权

基准目录包含用于检查和表征uasyncio调度程序的脚本。


2. uasyncio

异步概念基于联合执行多个任务的计划组织,在本文中称为协程

2.1程序结构:事件循环

考虑以下示例:

 import uasyncio as asyncio async def bar (): count = 0, while True : count + = 1 print ( count ) await asyncio.sleep ( 1 ) #  1 loop = asyncio.get_event_loop () loop.create_task ( bar ()) #     loop.run_forever () 

程序将继续执行,直到调用loop.run_forever为止。 此时,执行由调度程序控制。 loop.run_forever之后的行将永远不会执行。 调度程序执行条形码,因为它已在loop.create_task调度程序中排队。 在这个简单的例子中,只有一个协程 。 如果还有其他内容,调度程序将在暂停期间执行它们。

大多数嵌入式应用程序都有一个连续的事件循环。 也可以使用run_until_complete事件循环方法以允许完成的方式启动事件循环。 它主要用于测试。 可以在astests.py模块中找到示例。

事件循环实例是由对asyncio.get_event_loop()的第一次调用创建的单个对象,带有两个可选的整数参数,指示两个队列中协程的数量-开始和等待。 通常,两个参数将具有相同的值,至少等于应用程序中同时执行的协程数。 通常,默认值16就足够了,如果使用非默认值,请参见事件循环构造函数的参数(第7.7节)。

如果协程需要调用事件循环方法(通常为create_task ),则调用asyncio.get_event_loop() (不带参数)将有效地返回它。

2.2协程

协程的创建如下:

 async def foo ( delay_secs ): await asyncio.sleep ( delay_secs ) print ( 'Hello' ) 

协程可以使用await语句启动其他协程。 协程必须至少包含一个wait语句。 这将导致协程在完成之前执行,然后执行下一条语句。 考虑一个例子:

 await asyncio.sleep ( delay_secs ) await asyncio.sleep ( 0 ) 

第一行使代码暂停一个延迟时间,而其他协程将这段时间用于执行它们。 延迟0会导致所有暂挂的协程以循环顺序执行,直到执行下一行为止。 请参见roundrobin.py的示例。

2.2.1。 计划协程的队列

  • EventLoop.create_task参数:协程运行。 调度程序将协程式排队以便尽快启动。 create_task调用立即返回。 使用带有必要参数的函数调用的语法指定参数中的协程。
  • EventLoop.run_until_complete参数:协程运行。 调度程序将协程式排队以便尽快启动。 使用带有必要参数的函数调用的语法指定参数中的协程。 协程完成后,将返回un_until_complete调用:此方法提供了一种退出调度程序的方法。
  • await参数:要运行的协程,使用函数调用语法指定。 尽快启动协程。 待处理的协程被阻塞,直到预期的协程之一完成。

以上与CPython兼容。 注释(第2.2.3节) 讨论了其他uasyncio方法。

2.2.2启动回调函数

回调应该是旨在在短时间内执行的Python函数。 这是由于协程在整个功能执行期间将无法工作的事实。

以下EventLoop方法使用回调:

  1. call_soon-尽快致电。 Args:要运行的回调回调, * args任何位置参数后都可以带逗号。
  2. call_later-延迟几秒钟后调用。 Args: 延迟,回调,* args
  3. call_later_ms-延迟毫秒后调用。 Args: 延迟,回调,* args

 loop = asyncio.get_event_loop () loop.call_soon ( foo , 5 ) #    'foo'      5. loop.call_later ( 2 , foo , 5 ) #   2 . loop.call_later_ms ( 50 , foo , 5 ) #   50 . loop.run_forever () 

2.2.3注意事项

协程可以包含带有任意返回值的return语句。 要获得此值:

 result = await my_coro () 

协程可以受方法限制,并且必须至少包含一个await语句。

2.3延误

组织协程的延迟有两种选择。 对于较长的延迟以及在持续时间不需要精确的情况下,可以使用:

 async def foo( delay_secs , delay_ms ): await asyncio.sleep ( delay_secs ) print ( 'Hello' ) await asyncio.sleep_ms ( delay_ms ) 

在此类延迟期间,调度程序将执行其他协程。 这可能会引入时间不确定性,因为调用协程仅在执行当前正在运行的协程时才会启动。 延迟量取决于应用程序开发人员,但可能约为数十或数百毫秒。 与硬件设备的交互(第6节)中将进一步讨论。

使用utime函数-sleep_mssleep_us可以执行非常精确的延迟。 它们最适合短延迟,因为在延迟进行期间,调度程序将无法执行其他协程。

3.同步

通常,需要确保协程之间的同步。 一个常见的示例是在几个协程同时要求访问同一资源时避免所谓的“竞争条件”。 在astests.py中提供了一个示例,并在文档中进行了讨论。 当每个协程等待对方完成时,另一个危险是“死亡拥抱”。

在简单的应用程序中,可以使用全局标志或相关变量来实现同步。 一种更优雅的方法是使用同步类。 asyn.py模块提供Event,Barrier,SemaphoreConditios类的“微型”实现,仅用于asyncio 。 除非另有说明,否则它们不是面向线程的,不应与_thread模块或中断处理程序一起使用。 Lock类也已实现,它是正式实现的替代方法。

协程生产者和协程消耗者出现另一个同步问题。 协程生产者生成协程消费者使用的数据。 为此, asyncio提供了Queue类。 协程生产者将数据放入队列中,而协程消费者正在等待其完成(按计划安排其他操作)。 Queue类可保证按接收顺序删除项目。 另外,如果生产者协程必须等到消费者协程准备访问数据,则可以使用Barrier类。

下面是这些类的简要概述。 完整文档中有更多详细信息。

3.1

锁定可确保对共享资源的唯一访问。 下面的代码示例创建Lock的实例,该实例将传递给想要访问共享资源的所有客户端。 每个协程试图捕获锁,并暂停执行直到成功:

 import uasyncio as asyncio from uasyncio.synchro import Lock async def task(i, lock): while 1: await lock.acquire() print("Acquired lock in task", i) await asyncio.sleep(0.5) lock.release() async def killer(): await asyncio.sleep(10) loop = asyncio.get_event_loop() lock = Lock() # The global Lock instance loop.create_task(task(1, lock)) loop.create_task(task(2, lock)) loop.create_task(task(3, lock)) loop.run_until_complete(killer()) #  10s 

3.1.1锁定和超时

在撰写本文时(2018年1月5日), uasycio Lock类的开发尚未正式完成。 如果协程有超时(第5.2.2节) ,则在触发触发时等待锁定时,超时将无效。 直到收到锁,它才会收到TimeoutError 。 取消任务也是如此。

asyn.py模块提供Lock类,在这些情况下可以使用。 该类的实现不如正式类有效,但根据CPython版本支持其他接口,包括使用上下文管理器。

3.2事件

事件为一个或几个协程暂停提供了机会,而另一个则发出了延续的信号。 Event实例可用于使用它的所有协程:

 import asyn event = asyn.Event () 

协程通过声明await事件来等待事件 ,此后执行暂停,直到其他协程声明event.set()为止。 完整的信息

如果在循环构造中发出event.set(),则可能会出现问题。 该代码必须等待,直到所有待处理的对象都可以访问该事件,然后才能再次进行设置。 如果某个Coro希望发生事件,可以通过接收清除事件的Coro事件来实现:

 async def eventwait ( event ): await event event.clear() 

触发事件的协程检查它是否已被服务:

 async def foo ( event ): while True : #   - while event.is_set (): await asyncio.sleep ( 1 ) # ,  coro   event.set () 

如果多个角色正在等待一个事件的同步,则可以使用确认事件解决问题。 每个Coro需要一个单独的事件。

 async def eventwait (  , ack_event ): await event ack_event.set () 

asyntest.pyevent_test函数中给出了一个示例。 这很麻烦在大多数情况下-即使有一个等待的Coro-如下所示的Barrier类也提供了一种更简单的方法。
事件还可以提供中断处理程序和coro之间的通信方式。 处理程序维护硬件并设置事件,该事件已由正常模式下的coro检查。

3.2.1事件值

event.set()方法可以采用任何类型的可选数据值。 等待事件的Coro可以使用event.value()来获取它。 请注意, event.clear()将设置为None 。 在事件设置中,此方法的典型用法是发出event.set(utime.ticks_ms()) 。 任何等待事件的事件可以确定发生的延迟,例如,以对此进行补偿。

3.3 障碍

Barrier类有两种用途。

首先,它可以暂停协程直到一个或几个其他协程完成。

其次,它允许几个协程在某个点相遇。 例如,生产者和消费者可以在生产者拥有数据并且消费者准备使用它的时刻进行同步。 在执行时, 屏障可能会在移除屏障之前发出附加的回调,并且所有未决事件可能会继续。

回调可以是函数或协程。 在大多数应用程序中,最有可能会使用该功能:可以确保在完成之前,移除障碍之前执行该功能。

一个示例是asyntest.py中的barrier_test函数。 在该程序的代码段中:

 import asyn def callback(text): print(text) barrier = asyn.Barrier(3, callback, ('Synch',)) async def report(): for i in range(5): print('{} '.format(i), end='') await barrier 

报告协程的几个实例将打印其结果并暂停,直到其他实例也完成并等待障碍继续。 此时,正在进行回调。 完成后,原始协程将恢复。

3.4信号量

信号量限制了可以访问资源的协程的数量。 它可以用来限制可以同时运行的特定协程的实例数量。 这是使用访问计数器完成的,访问计数器由构造函数初始化,并在协程程序每次收到信号量时减少。

在上下文管理器中使用它的最简单方法是:

 import asyn sema = asyn.Semaphore(3) async def foo(sema): async with sema: #    

一个示例是asyntest.py中semaphore_test函数。

3.4.1( Limited )信号量

它与Semaphore类的工作方式相似,不同之处在于,如果释放方法导致访问计数器超过其初始值, 则会设置ValueError

3.5排队

Queue类由官方uasycio维护,而aqtest.py示例程序演示了其用法。 队列创建如下:

 from uasyncio.queues import Queue q = Queue () 

典型的制造商协程可以按以下方式工作:

 async def producer(q): while True: result = await slow_process() #       await q.put(result) #  ,        

消费者协程可以按以下方式工作:

 async def consumer(q): while True: result = await(q.get()) # ,  q  print('Result was {}'.format(result)) 

当可以限制队列的大小并且可以轮询状态时, Queue类提供了重要的附加功能。可以控制队列为空的行为(如果大小受限制)和队列已满的行为。有关此文档,请参见代码。

3.6其他同步类 asyn.py

提供了CPython其他一些功能的微型实现Condition允许协程通知等待锁定资源的其他协程。收到通知后,他们将可以访问资源并依次解锁。通知协程可能会限制要通知的协程的数量。班级聚会



允许您运行协程列表。后者完成后,将返回结果列表。此“微”实现使用不同的语法。超时可以应用于任何协程。

4为异步 开发类

在开发设备驱动程序的上下文中,目标是确保它们不阻塞地工作。协程驱动程序必须确保在驱动程序等待设备执行硬件操作时执行其他协程。例如,等待数据到达UART的任务或用户按下按钮的操作应允许安排其他事件,直到事件发生为止。

4.1使用await 等待的类协程

可以在等待对象时暂停执行值得期待的。在CPython下,通过实现生成器返回的特殊__await__方法创建自定义的等待等待用法如下:

 import uasyncio as asyncio class Foo(): def __await__(self): for n in range(5): print('__await__ called') yield from asyncio.sleep(1) #     return 42 __iter__ = __await__ # .   async def bar(): foo = Foo() # Foo - awaitable  print('waiting for foo') res = await foo #   print('done', res) loop = asyncio.get_event_loop() loop.run_until_complete(bar()) 

目前MicroPython不支持__await__问题#2678)和解决方案中使用__iter__。字符串__iter__ = __await__提供了CPythonMicroPython之间的可移植性。代码示例,请参阅类事件,道闸,撤销,条件asyn.py

4.1.1在上下文管理器中使用

期望的对象可以在同步或异步上下文管理器中使用,提供必要的特殊方法。语法:

 with await awaitable as a: #  'as'   #    async with awaitable as a: #    (.) #  - 

为此,__await__生成器必须返回self。这将传递给as子句中的任何变量,并且还允许特殊方法起作用。请参阅asyn.Conditionasyntest.condition_test,其中Condition使用await,并且可以在同步上下文管理器中使用。

4.1.2等待协同程序

语言Python的要求__await__是发电机的功能。在MicroPython中,生成器和协程是相同的,因此解决方案是使用coro(args)的yield

本指南的目的是提供可移植到CPython 3.5或更高版本的代码。在CPython中,生成器和协程的含义不同。在CPython中,协程具有生成器检索__await__特殊方法。这是便携式的:

 up = False #   MicroPython? try: import uasyncio as asyncio up = True #    sys.implementation.name except ImportError: import asyncio async def times_two(n): # Coro   await asyncio.sleep(1) return 2 * n class Foo(): def __await__(self): res = 1 for n in range(5): print('__await__ called') if up: # MicroPython res = yield from times_two(res) else: # CPython res = yield from times_two(res).__await__() return res __iter__ = __await__ async def bar(): foo = Foo() # foo is awaitable print('waiting for foo') res = await foo #   print('done', res) loop = asyncio.get_event_loop() loop.run_until_complete(bar()) 

请注意,CPython允许__await__,从asyncio.sleep(1)产生我仍然不知道如何实现这一目标。4.2异步迭代器异步迭代器提供了一种返回有限或无限值序列的方法,并且可以用作检索来自只读设备的顺序数据元素的方法。异步迭代器在其下一个方法中调用异步代码该课程必须满足以下要求:





  • 它有一个异步def中定义__aiter__方法并返回一个异步迭代器。
  • 它有一个__anext__方法,它本身就是一个协程-通过异步def定义并包含至少一个await语句要停止迭代,它必须引发StopAsyncIteration异常。

序列值使用async检索,如下所示:

 class AsyncIterable: def __init__(self): self.data = (1, 2, 3, 4, 5) self.index = 0 async def __aiter__(self): return self async def __anext__(self): data = await self.fetch_data() if data: return data else: raise StopAsyncIteration async def fetch_data(self): await asyncio.sleep(0.1) #     if self.index >= len(self.data): return None x = self.data[self.index] self.index += 1 return x async def run(): ai = AsyncIterable() async for x in ai: print(x) 

4.3异步上下文管理器

可以将类设计为支持异步上下文管理器,这些异步上下文管理器具有作为共同程序的进入和退出过程。一个例子是上述的。它具有__aenter__协程,这在异步操作上在逻辑上是必需的。为了支持上下文管理器的异步协议,其__aexit__方法也必须是协程,这可以通过包含await asyncio.sleep(0)来实现。可从协程内部使用以下语法访问此类:

 async def bar ( lock ): async with lock: print ( « bar » ) 

与常规上下文管理器一样,保证在上下文管理器照常完成工作并通过异常时调用exit方法。为了实现此目标,必须使用特殊方法__aenter____aexit__,它们必须定义为等待另一个协程或等待对象的协程。这个例子来自Lock

  async def __aenter__(self): await self.acquire() # a coro    async def return self async def __aexit__(self, *args): self.release() #   await asyncio.sleep_ms(0) 

如果async with包含一个as变量子句,则该变量获取__aenter__返回的值

为确保正确的行为,固件必须为V1.9.10或更高版本。

5.超时和由于任务取消引起的异常

这些主题相关:uasyncio包括取消任务并对任务应用超时,以特殊方式为任务引发异常。

5.1例外情况

如果协程中发生过外显子,则必须在此协程中或在协程中对其进行处理以等待其完成。这样可以确保该异常不适用于调度程序。如果发生异常,则调度程序将通过将异常传递给调度程序启动的代码来停止工作。因此,为避免调度程序停止,使用loop.create_task()启动的协程必须捕获内部的任何异常。在协程中

使用throwclose 抛出异常是不合理的。这破坏了uasyncio,导致协程启动,并可能在协程仍在执行队列中时退出

上面的示例说明了这种情况。如果允许工作到最后,它将按预期工作。

 import uasyncio as asyncio async def foo(): await asyncio.sleep(3) print('About to throw exception.') 1/0 async def bar(): try: await foo() except ZeroDivisionError: print('foo  -   0') # ! raise #     . except KeyboardInterrupt: print('foo was interrupted by ctrl-c') #   ! raise async def shutdown(): print('Shutdown is running.') #     await asyncio.sleep(1) print('done') loop = asyncio.get_event_loop() try: loop.run_until_complete(bar()) except ZeroDivisionError: loop.run_until_complete(shutdown()) except KeyboardInterrupt: print('Keyboard interrupt at loop level.') loop.run_until_complete(shutdown()) 

但是,发出键盘中断会使异常进入事件循环。这是因为uasyncio.sleep的执行被传递到事件循环。因此,需要清晰代码来响应键盘中断的应用程序必须在事件循环级别捕获异常。

5.2取消和超时

如上所述,这些函数通过使用特殊的MicroPython方法coroutine pend_throw以特殊的方式为任务抛出异常来工作。它的工作方式取决于版本。在正式的uasyncio v.2.0中,直到下一个计划任务才处理异常。如果任务预期进入睡眠状态,则会造成延迟输入输出 超时可能会超出其标称期限。其他任务的撤消任务无法确定撤消完成的时间。

当前有一个解决方法和两个解决方案。

  • 解决方法asyn提供了一种等待任务或任务组被取消的方法。请参阅取消作业(第5.2.1节)。
  • Paul Sokolovsky库提供uasyncio v2.4,但这需要其Pycopy固件
  • Fast_io uasyncio解决了这个问题了Python(优雅地少)和运行官方固件。

此处使用的异常的层次结构是Exception-CanceledError-TimeoutError

5.2.1取消作业

uasyncio提供取消(Coro)功能。这通过抛出异常来使用pend_throw协程来工作。它也适用于嵌套的协程。用法如下:

 async def foo(): while True: #  -  10 secs await asyncio.sleep(10) async def bar(loop): foo_instance = foo() #   coro loop.create_task(foo_instance) # code omitted asyncio.cancel(foo_instance) 

如果此示例在uasyncio v2.0下运行,则在取消bar问题时,直到下一个计划的foo都不会生效而取消foo可能会延迟10秒。如果foo正在等待I / O,则将导致另一个延迟源。无论延迟发生在何处,bar将无法确定foo是否已取消。在某些用例中,这很重要。使用Paul Sokolovskyfast_io库时,使用sleep(0)足够了:



 async def foo(): while True: #  -  10 secs await asyncio.sleep(10) async def bar(loop): foo_instance = foo() #   coro loop.create_task(foo_instance) #    asyncio.cancel(foo_instance) await asyncio.sleep(0) #    

如果foo(以及任何待处理的协程foo)从未返回睡眠并且不等待I / O,这也将在uasyncio v2.0中工作当由create_task运行并处于待机模式的协程被取消时,可能会发生使粗心大意的行为考虑以下代码段:



 async def foo(): while True: #  -  10 secs await asyncio.sleep(10) async def foo_runner(foo_instance): await foo_instance print('   ') async def bar(loop): foo_instance = foo() loop.create_task(foo_runner(foo_instance)) #    asyncio.cancel(foo_instance) 

FOO被取消,它是从调度器队列中删除; 因为它没有返回语句,所以调用过程foo_runner从不恢复。建议始终在要撤消的函数的最外部范围中捕获异常:

 async def foo(): try: while True: await asyncio.sleep(10) await my_coro except asyncio.CancelledError: return 

在这种情况下,my_coro不需要捕获异常,因为它将被传播到调用通道并在此捕获。

注意事项如果在调度程序之外使用协程,则禁止使用协程的关闭抛出方法。这会破坏调度程序,迫使协程即使未调度代码也要执行代码。这可能会产生不良后果。

5.2.2具有超时

协程超时是使用uasyncio方法.wait_for().wait_for_ms()实现的。它们分别以协程和等待时间(以秒或毫秒为单位)作为参数。如果超时到期,TimeoutError将使用pend_throw扔进协程用户或调用者必须捕获此异常。由于上述原因,这是必要的:如果超时到期,则取消超时。除非捕获并返回了错误,否则调用者可以继续的唯一方法是捕获异常本身。

在协程程序捕获到异常的地方,如果异常未在外部范围中捕获,则我无法清除失败,如下所示:

 import uasyncio as asyncio async def forever(): try: print('Starting') while True: await asyncio.sleep_ms(300) print('Got here') except asyncio.TimeoutError: print('Got timeout') # And return async def foo(): await asyncio.wait_for(forever(), 5) await asyncio.sleep(2) loop = asyncio.get_event_loop() loop.run_until_complete(foo()) 

另外,您可以拦截调用函数:

 import uasyncio as asyncio async def forever(): print('Starting') while True: await asyncio.sleep_ms(300) print('Got here') async def foo(): try: await asyncio.wait_for(forever(), 5) except asyncio.TimeoutError: pass print('Timeout elapsed.') await asyncio.sleep(2) loop = asyncio.get_event_loop() loop.run_until_complete(foo()) 

Uasyncio v2.0的注意事项

这不适用于Paul Sokolovskyfast_io库

如果协程启动等待asyncio.sleep(t)并延迟了很长的时间t,协程将不会重新启动,直到t到期为止。如果在睡眠结束之前已经超时在重新加载协程时发生TimeoutError-即当t到期时。从调用者的角度实时而言,他的TimeoutError响应将被延迟。

如果这对于应用程序很重要,请在等待循环中的短暂延迟时创建较长的延迟。协程asyn.sleep支持这一点。

6与设备

的交互uasyncio与外部异步事件之间交互的基础是轮询。需要快速响应的硬件可能会使用中断。但是,中断例程(ISR)与用户协程之间的交互将基于轮询。例如,ISR可以调用事件或设置全局标志,而每次调度请求时,等待结果的协程会轮询对象。

可以两种方式进行询问,即显式或隐式。后者是使用流I / O完成的一种机制,是一种用于UART套接字等流设备的系统。在最简单的显式轮询中,以下代码可能包含:

 async def poll_my_device(): global my_flag #   ISR while True: if my_flag: my_flag = False # service the device await asyncio.sleep(0) 

可以使用Event类的实例变量或使用await的类的实例代替全局标志。显式调查将在下面讨论。

隐式轮询包括开发一个驱动程序,该驱动程序将用作流I / O设备,例如UART流I / O 套接字 ,该驱动程序使用select.poll Python系统对设备进行轮询:由于轮询是在C语言中执行的,因此它比C语言更快,更高效明确的民意调查。在第6.3节中讨论了流I / O的使用。

由于其有效性,隐式轮询为最快的I / O设备驱动程序提供了一个优势:可以为通常不视为流设备的许多设备创建流驱动程序。这将在6.4节中详细讨论。

6.1同步问题

当前的显式和隐式轮询均基于周期性计划。假设I / O与N个自定义协程同时工作,每个协程以零延迟运行。提供I / O后,将在安排所有用户操作后立即对其进行轮询。设计时应考虑估计的延迟。 I / O通道可能需要使用ISR服务设备从缓冲区和协程进行实时缓冲,以较慢的时间填充或释放缓冲区。

还必须考虑超越的可能性:这种情况是当协程实际询问的某件事在协程实际计划之前多次发生的情况。

另一个时序问题是延迟准确性。如果协程出现问题

 await asyncio.sleep_ms ( t ) #   

调度程序保证执行将至少暂停t ms。实际延迟可能大于t,这取决于当前系统负载。如果此时其他协程正在等待非零延迟的完成,则将立即安排执行下一行。但是,如果其他协程也等待执行(因为它们发出了零延迟,或者因为它们的时间也已到期),则可以安排它们更早地执行。这将同步不确定性引入了sleep()sleep_ms()函数。可以通过将所有此类协程的运行时值相加来确定此溢出的最坏情况值,以确定到调度程序的最坏情况传输时间。

在这种情况下uasyncio 的fast_io版本提供了一种确保在调度程序的每次迭代时都将轮询流I / O的方法。希望正式的uasyncio会在适当的时候接受相关的修订。

6.2使用协程查询设备

这是一种简单的方法,最适用于可以以较低速度查询的设备。这主要是由于以下事实:以短(或零)轮询间隔进行轮询可能导致以下事实:协程消耗的处理器时间多于进入该间隔所需的时间。apoll.py

示例通过查询Pyboard加速度计演示了这种方法间隔为100毫秒。它执行简单的过滤以忽略噪声,如果没有移动,则每两秒钟打印一条消息。aswitch.py

示例提供了用于开关和按钮设备的驱动程序。下面显示了能够读取和写入的设备的示例驱动程序。为了便于测试,Pyboard UART 4模拟了条件设备。驱动程序实现RecordOrientedUart

,其中数据以字节实例组成的可变长度记录形式提供。该对象在发送之前添加定界符,并缓冲传入的数据,直到收到添加的定界符。与流输入/输出相比,这只是使用UART的演示,并且效率低下。

为了演示异步传输,我们假设仿真设备具有一种验证传输完成以及应用程序需要我们等待的方法。在此示例中,所有假设都不成立,但是代码通过asyncio.sleep(0.1)来伪造它

首先,请不要忘记连接Pyboard X1和X2 的输出(UART Txd和Rxd)

 import uasyncio as asyncio from pyb import UART class RecordOrientedUart(): DELIMITER = b'\0' def __init__(self): self.uart = UART(4, 9600) self.data = b'' def __iter__(self): # Not __await__ issue #2678 data = b'' while not data.endswith(self.DELIMITER): yield from asyncio.sleep(0) # ,  : while not self.uart.any(): yield from asyncio.sleep(0) # timing may mean this is never called data = b''.join((data, self.uart.read(self.uart.any()))) self.data = data async def send_record(self, data): data = b''.join((data, self.DELIMITER)) self.uart.write(data) await self._send_complete() #          #        await asyncio.sleep(0) async def _send_complete(self): await asyncio.sleep(0.1) def read_record(self): # Synchronous: await the object before calling return self.data[0:-1] # Discard delimiter async def run(): foo = RecordOrientedUart() rx_data = b'' await foo.send_record(b'A line of text.') for _ in range(20): await foo #  coros       foo rx_data = foo.read_record() print('Got: {}'.format(rx_data)) await foo.send_record(rx_data) rx_data = b'' loop = asyncio.get_event_loop() loop.run_until_complete(run()) 

6.3使用流机构(

本实施例表明在单个UART的微处理器输入和输出同时Pyboard

首先,连接Pyboard X1和X2 的输出(UART Txd和Rxd)

 import uasyncio as asyncio from pyb import UART uart = UART(4, 9600) async def sender(): swriter = asyncio.StreamWriter(uart, {}) while True: await swriter.awrite('Hello uart\n') await asyncio.sleep(2) async def receiver(): sreader = asyncio.StreamReader(uart) while True: res = await sreader.readline() print('Received', res) loop = asyncio.get_event_loop() loop.create_task(sender()) loop.create_task(receiver()) loop.run_forever() 

支持代码可以uasyncio库的__init__.py找到该机制之所以有效,是因为设备驱动程序(用C编写)实现了以下方法:ioctl,read,readlinewrite第6.4节:编写流设备驱动程序,详细介绍了如何使用Python编写此类驱动程序

UART可以随时接收数据。每当调度程序获得控制权时,流I / O机制都会检查未决的传入字符。当协程运行时,中断例程将缓冲传入的字符;当协程让位给调度程序时,它们将被删除。因此,必须设计UART应用程序,以使协程最小化两次传输到调度程序之间的时间,以避免缓冲区溢出和数据丢失。这可以通过使用较大的UART读取缓冲区或较低的数据速率来改善。另外,如果数据源支持,硬件流控制将提供解决方案。

6.3.1 UART驱动的实施例

方案auart_hd.py图1示出了与半双工设备(诸如响应AT调制解调器命令集的设备)的通信方法。半双工意味着设备永远不会发送未经请求的数据:其传输总是响应于从主机接收到的命令而执行。

通过在具有两个有线连接Pyboard上运行测试来仿真设备

(非常简化的)仿真设备通过发送四行数据,每行之间都有一个暂停来模拟慢速处理,从而对任何命令做出响应。

该向导将发送命令,但事先不知道将返回多少行数据。它启动一个重新启动计时器,该计时器在每次接收到一行时重新启动。当计时器到期时,假定设备已完成传输并返回接收到的线路列表。

还演示了设备故障情况,这是通过在等待响应之前跳过传输来实现的。超时后,将返回一个空列表。有关更多详细信息,请参见代码注释。

6.4发展流的(驱动器)单元

流输入/输出机构(流I / O),用于控制流传输的操作的I / O设备,诸如UART和插座(插座)该机制可以由任何定期轮询的设备的驱动程序使用,方法是委派给使用select的调度程序,该调度程序轮询队列中任何设备的就绪状态。这比执行几个协程操作更有效,每个协程操作都会轮询设备,部分原因是select是用C编写的,并且还因为执行轮询的协程被延迟到被轮询对象返回就绪状态为止。

能够服务于流输入/输出机制的设备驱动程序应优选支持StreamReader,StreamWriter方法。可读设备必须提供以下方法中的至少一种。请注意,这些是同步方法。ioctl方法(请参见下文)确保仅在有数据时才调用它们。应该使用尽可能多的数据尽快返回方法。

readline()返回尽可能多的字符,最多返回任何换行符。如果使用StreamReader.readline(), 则为必需;

读取(n)返回尽可能多的字符,但不超过n。如果使用StreamReader.read()StreamReader.readexactly(),则为必需

创建的驱动程序应提供以下立即返回的同步方法:使用参数buf,off,sz进行

写入其中:buf是写缓冲区。off-偏移到要写入的第一个字符的缓冲区。sz-请求的要写入的字符数。返回值是实际写入的字符数(如果设备运行缓慢,则可能为1)。ioctl方法可确保仅在设备准备好接收数据时才调用它。









所有设备都必须提供一种ioctl方法该方法可轮询设备以确定其可用性状态。读/写驱动程序的典型示例:

 import io MP_STREAM_POLL_RD = const(1) MP_STREAM_POLL_WR = const(4) MP_STREAM_POLL = const(3) MP_STREAM_ERROR = const(-1) class MyIO(io.IOBase): #    def ioctl(self, req, arg): # see ports/stm32/uart.c ret = MP_STREAM_ERROR if req == MP_STREAM_POLL: ret = 0 if arg & MP_STREAM_POLL_RD: if hardware_has_at_least_one_char_to_read: ret |= MP_STREAM_POLL_RD if arg & MP_STREAM_POLL_WR: if hardware_can_accept_at_least_one_write_character: ret |= MP_STREAM_POLL_WR return ret 

下面是对MillisecTimer等待延迟的描述

 import uasyncio as asyncio import utime import io MP_STREAM_POLL_RD = const(1) MP_STREAM_POLL = const(3) MP_STREAM_ERROR = const(-1) class MillisecTimer(io.IOBase): def __init__(self): self.end = 0 self.sreader = asyncio.StreamReader(self) def __iter__(self): await self.sreader.readline() def __call__(self, ms): self.end = utime.ticks_add(utime.ticks_ms(), ms) return self def readline(self): return b'\n' def ioctl(self, req, arg): ret = MP_STREAM_ERROR if req == MP_STREAM_POLL: ret = 0 if arg & MP_STREAM_POLL_RD: if utime.ticks_diff(utime.ticks_ms(), self.end) >= 0: ret |= MP_STREAM_POLL_RD return ret 

可以如下使用:

 async def timer_test ( n ): timer = ms_timer.MillisecTimer () await timer ( 30 ) #  30  

与正式的uasyncio相比,与awaitio.sleep_ms()相比这种实现没有任何优势当协程程序期望零延迟时,使用fast_io可以在正常使用模式中提供更准确的延迟。

您可以使用I / O调度将事件与回调关联。这比轮询周期更有效,因为直到ioctl返回就绪状态时才计划轮询。接下来,当回调改变状态时执行一个回调。

 import uasyncio as asyncio import io MP_STREAM_POLL_RD = const(1) MP_STREAM_POLL = const(3) MP_STREAM_ERROR = const(-1) class PinCall(io.IOBase): def __init__(self, pin, *, cb_rise=None, cbr_args=(), cb_fall=None, cbf_args=()): self.pin = pin self.cb_rise = cb_rise self.cbr_args = cbr_args self.cb_fall = cb_fall self.cbf_args = cbf_args self.pinval = pin.value() self.sreader = asyncio.StreamReader(self) loop = asyncio.get_event_loop() loop.create_task(self.run()) async def run(self): while True: await self.sreader.read(1) def read(self, _): v = self.pinval if v and self.cb_rise is not None: self.cb_rise(*self.cbr_args) return b'\n' if not v and self.cb_fall is not None: self.cb_fall(*self.cbf_args) return b'\n' def ioctl(self, req, arg): ret = MP_STREAM_ERROR if req == MP_STREAM_POLL: ret = 0 if arg & MP_STREAM_POLL_RD: v = self.pin.value() if v != self.pinval: self.pinval = v ret = MP_STREAM_POLL_RD return ret 

再说一次-在官方uasyncio上,延迟可能会很高。根据应用程序设计,fast_io版本可能更有效。

演示程序iorw.py展示了一个完整的例子。请注意,在用正式的uasyncio撰写本文时,由于存在错误,因此无法正常工作。有两种解决方案。解决方法是编写两个单独的驱动程序,一个用于只读,另一个用于只读。第二种是使用fast_io,它可以解决此问题。

在正式的uasyncio中,很少计划输入/输出

6.5完整示例:aremote.py

该驱动程序旨在接收/解码来自红外遥控器的信号。 aremote.py驱动程序本身。以下注意事项与 asyncio的使用有关

联系人上的中断会记录状态更改的时间(以微秒为单位)并设置事件,从而跳过第一次状态更改发生的时间。协程会发生事件,报告数据包的持续时间,然后在调用用户指定的回调之前解码存储的数据。

将时间传递给事件实例使协程可以在设置延迟时段时补偿任何异步延迟。

6.6 HTU21D环境传感器

驱动 HTU21D芯片提供的温度和湿度的精确测量。

该芯片需要大约120 ms的时间来接收两个数据元素。驱动程序异步工作,在读取数据之前启动 await asyncio.sleep(t)的接收和使用,更新温度和湿度变量,该变量可以随时访问,从而允许在芯片驱动程序运行时启动其他协程。

7.提示和技巧

7.1程序冻结冻结

通常是由于任务被无条件阻止而发生的:这将导致整个系统冻结。进行显影时,使用协程器定期打开内置LED很有用。这样可以确认调度程序仍在运行。

7.2 uasyncio保存状态 在REPL中

使用uasyncio启动程序时,请在启动之间执行软复位(ctrl-D)。由于uasyncio会在两次启动之间保持状态,因此在下次启动时可能会发生不可预测的行为。

7.3垃圾收集

您可以通过首先指定import gc来执行协程

 gc.collect () gc.treshold ( gc.mem_free () // 4 + gc.mem_alloc ()) 

这次讨论的目的,在这里,在堆(堆)的部分。

7.4测试

建议确保设备驱动程序在必要时保持控制,可以通过运行一个或多个虚拟协程副本来启动消息打印周期,并检查其是否在驱动程序处于待机模式时运行,以进行控制:

 async def rr(n): while True: print('Roundrobin ', n) await asyncio.sleep(0) 

作为可能出现的危险类型的一个示例,在上面的示例中,RecordOrientedUart __await__方法最初写为:

 def __await__(self): data = b'' while not data.endswith(self.DELIMITER): while not self.uart.any(): yield from asyncio.sleep(0) data = b''.join((data, self.uart.read(self.uart.any()))) self.data = data 

结果,执行被拉伸到接收到整个记录为止,以及uart.any()始终返回接收到的字符数不为零的事实。在通话时,可能已经收到所有字符。可以使用外部循环解决这种情况:

 def __await__(self): data = b'' while not data.endswith(self.DELIMITER): yield from asyncio.sleep(0) # ,  : while not self.uart.any(): yield from asyncio.sleep(0) #        data = b''.join((data, self.uart.read(self.uart.any()))) self.data = data 

可能值得注意的是,如果数据以较低的速度而不是使用反馈测试发送到UART,则该错误不会很明显。欢迎来到实时编程的乐趣。

7.5常见错误

如果函数或方法是由async def定义的,并且随后被调用为常规(同步)调用,则MicroPython不会显示错误消息。这是设计使然。通常,这导致程序无提示地无法正常工作:

 async def foo(): # code loop.create_task(foo) #  1 1: foo     foo() #  2: . 

我有一个建议,建议使用fast_io解决方案1中的问题check_async_code.py

模块尝试检测可疑程序的使用情况。它是用Python3编写的,旨在在PC上工作。在根据本指南概述的指南编写的脚本中使用,并使用async def声明了协程。该模块采用一个参数,即源MicroPython文件(或--help)的路径

请注意,它有点粗鲁,打算在语法正确的文件中使用,该文件默认情况下不会启动。使用诸如pylint之类的工具进行常规语法检查(pylint当前没有此错误)。

该脚本会产生误报。根据计划,协程是第一级的对象;可以将它们传递给函数并存储在数据结构中。根据程序的逻辑,可以保存函数或执行结果。脚本无法确定意图。它旨在忽略在确定其他要考虑的案例时似乎正确的案例。假设将协程声明为异步def的foo

 loop.run_until_complete(foo()) #   bar(foo) #     ,      bar(foo()) z = (foo,) z = (foo(),) foo() #  :   . 

我觉得它很有用,但总是欢迎改进。

7.6编程与插座(插座

有到编程插座两种基本方法uasyncio。默认情况下,套接字被阻止,直到指定的读取或写入操作完成。Uasyncio支持阻止使用套接字select.poll以防止它们阻塞调度。在大多数情况下,此机制最容易使用。客户端和服务器代码的示例可以在client_server目录中找到Userver 通过显式轮询服务器套接字使用select.poll应用程序

客户端套接字在uasyncio流引擎直接使用它的意义上隐式使用它。

请注意,socket.getaddrinfo当前被阻止。示例代码中的时间将最少,但是如果需要DNS查找,则阻止时间可能很长。

套接字编程的第二种方法是使用非阻塞套接字。这增加了复杂性,但是在某些应用程序中是必需的,尤其是在通过WiFi连接的情况下(请参见下文)。

在撰写本文时(2019年3月),正在开发对无阻塞套接字的TLS支持。它的确切状态(对我来说)是未知的。

使用无阻塞插座需要注意一些细节。如果由于服务器延迟而发生非阻塞读取,则不能保证将返回所有(或任何一个)请求的数据。同样,条目可能不会完成。

因此,异步读取和写入方法必须迭代执行非阻塞操作,直到读取或写入所需的数据为止。实际上,可能需要超时来处理服务器中断。
另一个问题是,ESP32端口存在一些问题,需要进行非常讨厌的入侵才能实现无错误操作。我还没有测试是否仍然如此。Sock_nonblock.py
模块说明了所需的方法。这不是一个可行的演示,决策可能取决于应用程序。

7.6.1 WiFi问题 在检测WiFi中断时

uasyncio流机制不是最佳选择。我发现有必要使用非阻塞套接字来提供故障安全操作,并在发生故障时重新连接客户端。

文件描述了我在保持开放的插座长时间WiFi应用所遇到的问题,并概述了决定。

厘米提供强大的异步MQTT客户端,可在WiFi故障期间提供消息完整性。描述了在无线客户端和有线服务器之间的一个简单的异步全双工串行链接,它保证了消息的传递。

7.7事件循环构造函数的参数

如果您需要创建一个值与默认值不同的事件循环,则可能会发生一个小错误。在使用asyncio运行任何其他代码之前,必须声明这样的循环,因为此代码中可能需要这些值。否则,代码将使用默认值初始化:

 import uasyncio as asyncio import some_module bar = some_module.Bar() #   get_event_loop() #     loop = asyncio.get_event_loop(runq_len=40, waitq_len=40) 

鉴于导入模块可以执行代码,最安全的方法是在导入uasyncio之后立即实例化事件循环

 import uasyncio as asyncio loop = asyncio.get_event_loop(runq_len=40, waitq_len=40) import some_module bar = some_module.Bar() # get_event_loop()    

编写供其他程序使用的模块时,我宁愿避免在导入时运行uasyncio代码编写函数和方法以等待事件循环作为参数。然后确保仅顶级应用程序调用get_event_loop

 import uasyncio as asyncio import my_module #      loop = asyncio.get_event_loop(runq_len=40, waitq_len=40) bar = my_module.Bar(loop) 

这里讨论这个问题

8面向初学者的

注释这些注释面向异步代码中的初学者,首先描述了计划人员试图解决的问题,并概述了uasyncio的解决方案。

8.5节讨论了uasyncio和_ 线程模块的相对优点,以及为什么您可能更喜欢在主动调度(_thread)中使用uasyncio协程

8.1问题1:事件循环

典型的固件应用程序连续工作,同时应响应外部事件,这些事件可能包括ADC电压的变化,硬件中断的出现,UART中接收到的符号或套接字上可用的数据。这些事件是异步发生的,并且代码应该能够响应,而不管它们发生的顺序如何。另外,可能需要与时间有关的任务,例如LED闪烁。

显而易见的方法是使用uasycio事件循环。此示例不是实际的代码,但用于说明事件循环的一般形式。

 def event_loop(): led_1_time = 0 led_1_period = 20 led_2_time = 0 led_2_period = 30 switch_state = switch.state() #    while True: time_now = utime.time() if time_now >= led_1_time: #  LED #1 led1.toggle() led_1_time = time_now + led_1_period if time_now >= led_2_time: #  LED #2 led2.toggle() led_2_time = time_now + led_2_period #    LEDs if switch.value() != switch_state: switch_state = switch.value() #  - if uart.any(): #    UART 

这样的循环适用于简单的示例,但是随着事件数量的增加,代码很快变得很麻烦。通过将大多数程序逻辑放在一个位置,而不是将代码链接到受控对象,它们还违反了面向对象编程的原理。我们要为可插入模块并导入的闪烁LED开发一个类。 OOP的LED闪烁方法可能如下所示:

 import pyb class LED_flashable(): def __init__(self, led_no): self.led = pyb.LED(led_no) def flash(self, period): while True: self.led.toggle() # -     period, #          

uasyncio中的调度程序允许您创建此类。

8.2问题2:阻塞方法

假设您需要从套接字读取一定数量的字节。如果默认情况下使用阻塞套接字调用socket.read(n),它将“阻塞”(即它将无法终止),直到收到n个字节为止。在此期间,应用程序将不会响应其他事件。

使用非阻塞uasyncio套接字您可以编写异步读取方法。需要数据的任务将被(有必要)阻止,直到接收到它为止,但是在此期间将执行其他任务,这将使应用程序保持响应状态。

8.3。 Uasyncio进近

下一课有一个可以打开和关闭的LED,您也可以任何速度闪烁。LED_async实例使用run方法,该方法可用于连续操作。可以使用on(),off()flash(secs)方法控制LED的行为

 import pyb import uasyncio as asyncio class LED_async(): def __init__(self, led_no): self.led = pyb.LED(led_no) self.rate = 0 loop = asyncio.get_event_loop() loop.create_task(self.run()) async def run(self): while True: if self.rate <= 0: await asyncio.sleep_ms(200) else: self.led.toggle() await asyncio.sleep_ms(int(500 / self.rate)) def flash(self, rate): self.rate = rate def on(self): self.led.on() self.rate = 0 def off(self): self.led.off() self.rate = 0 

应当注意,on(),off()flash()是普通的同步方法。它们改变了LED的行为,但立即返回。闪烁发生在“后台”。下一节将对此进行详细说明。

该类符合OOP原则,该原则在于将与设备关联的逻辑存储在该类中。同时,使用uasyncio可确保应用程序可以在LED闪烁时响应其他事件。下面的程序以四个不同频率的Pyboard LED闪烁,并且还响应USR按钮,从而完成了该程序。

 import pyb import uasyncio as asyncio from led_async import LED_async # ,   async def killer(): # ,      sw = pyb.Switch() while not sw.value(): await asyncio.sleep_ms(100) leds = [LED_async(n) for n in range(1, 4)] for n, led in enumerate(leds): led.flash(0.7 + n/4) loop = asyncio.get_event_loop() loop.run_until_complete(killer()) 

与事件循环的第一个示例不同,与开关关联的逻辑的功能与LED的功能不同。注意用于启动调度程序的代码:

 loop = asyncio.get_event_loop() loop.run_until_complete(killer()) #    #       killer (), #   . 

8.4在uasyncio中进行规划

Python 3.5MicroPython支持异步函数的概念,也称为协程或任务。协程必须至少包含一个等待声明

 async def hello(): for _ in range(10): print('Hello world.') await asyncio.sleep(1) 

此功能以一秒钟的间隔打印十次消息。当功能因预期延迟而暂停时,异步调度程序将执行其他任务,从而产生了同时运行它们的错觉。

当协程发出await asyncio.sleep_ms()asyncio.sleep()时,当前任务将暂停并放置在按时间排序的队列中,执行将继续到队列顶部的任务。队列的设计方式是,即使指定的睡眠模式为零,也将执行其他相关任务,直到电流恢复为止。这是“诚实的通知”计划。一种常见的做法是运行asyncio.sleep(0)循环。这样任务就不会延迟执行。以下是一个忙等待循环,等待另一个任务来设置全局标志变量las,它垄断了处理器,阻止了其他协程的启动:

 async def bad_code(): global flag while not flag: pass #  flag = False #     

这里的问题是,直到flagis False循环控制权传递给调度程序,否则其他任务将不会启动。正确的方法:

 async def good_code(): global flag while not flag: await asyncio.sleep(0) #  flag = False #     

出于同样的原因,设置延迟(例如utime.sleep(1))是一种不好的做法,因为它会阻塞其他任务1 s;因此,设置延迟会导致延迟。使用await asyncio.sleep(1)更正确
请注意,uasyncio sleepsleep_ms方法产生的延迟实际上可能超过了指定的时间。这是由于在延迟期间将执行其他任务。延迟时间过后,执行任务将等待或终止之前,执行将不会继续。行为规范的协程将始终声明等待定期地。在需要精确延迟的地方,尤其是当延迟小于几毫秒时,可能有必要使用utime.sleep_us(us)

8.5为什么要协作而不是基于线程的调度(_thread)?

对于协程协程的想法,初学者的最初反应通常令人失望。当然,流媒体计划更好吗?如果Python虚拟机可以为我做到这一点,为什么还要明确让位控制呢?

对于嵌入式系统,协作模型具有两个优点。
首先是重量轻。可能会有大量的协程,因为与预定线程不同,挂起的协程占用更少的空间。
其次,这避免了与流调度相关的一些细微问题。

实际上,协作多任务处理被广泛使用,尤其是在用户界面应用程序中。

为了捍卫流媒体计划模型,我将展示一个优势:如果有人写

 for x in range ( 1000000 ): #  -  

它不会阻止其他任务。协作模型假定循环应显式地为每个任务的控制提供一定数量的迭代,例如,将代码放入协程中并定期发出await asyncio.sleep(0)

las,与缺点相比,这种优点显得苍白。其中一些描述在编写中断处理程序的文档中。在流调度模型中,每个线程可以中断任何其他线程,从而更改可以在其他线程中使用的数据。通常,查找和修复由于没有给出结果的错误而导致的锁定要比检测在流规划中的模型框架中编写的代码中有时可能发生的细微且很少遇到的错误要容易得多。

简而言之,如果您编写MicroPython协程,则可以确保变量不会被另一个协程突然更改:您的协程具有完全控制权,直到它返回等待asyncio.sleep(0)为止

请记住,中断处理程序是抢占式的。这适用于可能在代码中任何地方发生的硬件和软件中断。

有关流式传输计划问题的雄辩讨论可以在这里找到

8.6交互

在非平凡的应用程序中,协程必须进行交互。可以使用常规的Python方法。这些方法包括使用全局变量或将协程声明为对象方法:它们可以共享实例变量。或者,可以将可变对象作为参数传递给协程。

流计划模型要求专家确保类提供安全的连接;在协作模型中,很少需要这样做。

8.7。轮询(轮询

一些硬件设备,诸如加速度计Pyboard,不支持中断,因此应被轮询(即定期检查)。轮询也可以与中断处理程序一起使用:中断处理程序维护设备并设置标志。协程会轮询该标志-如果已设置该标志,则将处理数据并重置该标志。最好的方法是使用Event

Source: https://habr.com/ru/post/zh-CN484472/


All Articles