在本文中,我邀请读者与我一起进行一段激动人心的旅程,以了解asyncio的原理 ,以了解其如何实现异步代码执行。 我们对回调进行处理,并通过几个关键抽象直接进入协程,从而完成一系列事件。 如果您的python地图还没有这些吸引力,欢迎加入cat。
对于初学者-关于我们之前分布的区域的简要参考
根据pep3153 , asyncio是一个异步I / O库,其创建目的是为创建异步框架提供标准化框架。 pep3156还归因于她需要提供非常简单的集成到已经存在的异步框架(Twisted,Tornado,Gevent)中。 正如我们现在所看到的,这些目标已成功实现-出现了一个新的基于asyncio的框架: aiohttp (在Tornado中) AsyncioMainLoop是版本5.0中的默认事件循环,在Twisted asyncioreactor中从版本16.5.0起可用,并且有一个用于Gevent的第三方aiogevent库。
asyncio是一个混合库,它同时使用两种方法来实现异步代码执行:在回调上是经典的,在协程上是相对较新的(至少对于python)。 它基于三个主要的抽象,它们类似于第三方框架中存在的抽象:
- 可插拔事件循环
可插拔事件循环。 可插拔意味着可以用两行代码替换为另一行实现同一接口的代码。 现在在libuv( uvloop )之上有cython实现 并且在Rust( asyncio-tokio )中 。 - 未来的
操作结果,将在将来提供。 有必要在协程中接收回调执行的结果。 - 工作任务
Future的特殊子类,可在事件循环上运行协程。
走吧
事件周期是库的主要组成部分,沿着贯穿它的道路,数据被传递到其任何组成部分。 它既大又复杂,因此首先考虑其精简版本。
骑我们的小回调,我们通过call_soon上路 ,进入队列,并在短暂等待后显示。
错误回拨集
值得一提的是,回调是危险的事情-如果它们使您陷入困境,则python解释器将无法帮助您了解发生此情况的位置。 如果您不相信我 ,请以同样的方式进行maybe_print 回调 ,该回调大约完成一半的时间。
下面是上一个示例的完整回溯。 由于may_print函数是由事件循环启动的,而不是直接从starting_point启动的 ,因此在run_until_complete方法中,回溯结束于此 。 使用这种回溯无法确定starting_point在代码中的位置,如果starting_point位于代码库中的多个位置,这将使调试变得非常复杂。
$: python3 base_loop.py >>
连续调用堆栈不仅需要显示完整的追溯,而且还需要实现其他语言功能。 例如,异常处理基于此。 下面的示例不起作用,因为到starting_point开始时,该主函数已经被执行:
以下示例也不起作用。 main函数中的上下文管理器将在开始处理文件之前打开和关闭文件。
缺少连续调用堆栈限制了熟悉的语言功能的使用。 为了部分地避免asyncio中的此缺点,我不得不添加许多与它要解决的任务没有直接关系的其他代码。 大部分示例中都缺少此代码-如果没有这些代码,它们将非常复杂。
从事件的周期到外部世界再到
事件循环通过操作系统通过事件与外界进行通信。 知道如何使用它的代码由称为选择器的标准库模块提供。 它使您可以告诉操作系统我们正在等待某种事件,然后询问是否发生了。 在下面的示例中,预期的事件将是读取套接字可用性。
来自外界的信使将其消息或包裹留在选择器中,然后选择器将其传递给收件人。 现在可以使用事件循环从套接字读取。 如果运行此代码并使用netcat进行连接,它将如实显示将发送给它的所有内容。
$: nc localhost 8086 $: python3 event_loop.py "Hi there!" b'"Hi there!"\n' "Hello!" b'"Hello!"\n' "Answer me, please!" b'"Answer me, please!"\n'
在本文的开头,有人说asyncio是一个混合库,其中协程在回调之上工作。 为了实现此功能,使用了两个剩余的主要抽象: Task和Future 。 接下来,将显示这些抽象的代码,然后使用它们的事件周期执行协程。
未来的
下面是Future类的代码。 它是必需的,以便在corutin中可以等到回调完成并获得其结果。
工作任务
这是Future的特殊子类。 需要在回调事件循环上运行协程。
可以与Future一起使用的一系列事件
继续前进
现在让我们看看协程main将如何执行:
履行 __________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): # task._step self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() # callback(*args) # task._step() ___________________________________________________________________ clsss Task: def _step(self, exc=None): try: if exc is None: # None result = self._coro.send(None) else: ___________________________________________________________________ async def main(loop): # # sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class EventLoop: def sock_accept(self, sock, fut=None): # Future fut = fut if fut else Future(loop=self) try: # conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): # # # Future self.selector.register( sock, EVENT_READ, (self.sock_accept, sock, fut) ) except Exception as exc: -------------------------------------------- self.selector.unregister(sock) # Future return fut ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # await __await__ Future conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class Future: def __await__(self): # Future if self._state == 'PENDING': yield self return self.result() ___________________________________________________________________ class Task(Future): def _step(self, exc=None): try: if exc is None: # None result = self._coro.send(None) # result = fut -------------------------------- else: # Future # wakeup if isinstance(result, Future): result.add_done_callback(self._wakeup) elif result is None: self._loop.call_soon(self._step) # - Task Future # # ___________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) for key, events in self.selector.select(timeout=0): # callback, *args = key.data self.call_soon(callback, *args) # loop.sock_accept(sock, fut) ___________________________________________________________________ class EventLoop: def sock_accept(self, sock, fut=None): fut = fut if fut else Future(loop=self) try: # conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): -------------------------------- else: # Future fut.set_result((conn, address)) self.selector.unregister(sock) return fut ___________________________________________________________________ class Future: def set_result(self, result): # self._result = result # self._state = 'FINISHED' # self._schedule_callbacks() def _schedule_callbacks(self): for callback in self._callbacks: # task.wakeup self._loop.call_soon(callback, self) # (task.wakeup, fut) self._callbacks[:] = [] ___________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() # # task.wakeup callback(*args) # task.wakeup(fut) ___________________________________________________________________ class Task(Future): def _wakeup(self, future): try: future.result() except Exception as exc: self._step(exc) else: # Future task._step self._step() def _step(self, exc=None): try: if exc is None: # None result = self._coro.send(None) else: ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # await __awai__ conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class Future: def __await__(self): if self._state == 'PENDING': yield self # Future return self.result() ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # Future conn addr conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result)
以这种简单的方式,asyncio执行协程。
总结
创建异步的目标已成功实现。 它不仅解决了兼容性问题,而且引起了社区对竞争性编程的极大兴趣。 新的文章和图书馆开始出现,就像雨后的蘑菇一样。 另外,异步也影响了语言本身:本机协程和新的异步 / 等待关键字已添加到其中。 上一次在2003年添加新关键字时,它是yield关键字。
创建asyncio的目标之一是提供与现有异步框架(Twisted,Tornado,Gevent)的极其简单的集成。 从这个目标出发,从逻辑上选择工具:如果没有兼容性要求,协程可能会扮演主要角色。 由于在进行回调编程时不可能保持连续的调用堆栈,因此必须在它们与协程之间的边界上创建一个附加系统,以支持基于该调用的语言功能。
现在是主要问题。 为什么简单的库用户应该了解所有这些知识,这些知识遵循文档中的建议并且仅使用协程和高级API?
这是StreamWriter类文档的一部分

它的实例由asyncio.open_connection函数返回,并且是回调API之上的async / await API。 而且这些回调仍然存在。 write和writelines函数是同步的,它们尝试向套接字写入,如果失败,则将数据转储到基础缓冲区中,并将回调添加到记录中。 需要Corutin 消耗以提供机会,直到缓冲区中的数据量下降到指定值为止。
如果您忘记在两次 写调用之间调用流失 ,则内部缓冲区可能会增大到不合适的大小。 但是,如果您牢记这一点,会留下一些不愉快的时刻。 首先:如果记录上的回调“中断”,则使用此API的协程将不会以任何方式知道它,因此将无法对其进行处理。 第二:如果协程“中断”,则记录的回调将不会以任何方式知道该记录,并将继续从缓冲区写入数据。
因此,即使仅使用协程,也要为回调会提醒自己的事实做好准备。
您可以在公司博客Antida软件的 这篇文章中阅读有关如何通过异步代码使用数据库的信息。
PS感谢您为eirnym , kurb , rasswet用户提供有关错别字和错误信息的信息