译者的前言:
在使用python asyncio时再次踩到耙子,我去互联网上找到比干文档更令人愉快的东西。 我碰到了Yeray Diaz撰写的文章“ Asyncio Coroutine Patterns:Beyond await” ,其中作者非常有趣地考虑了使用asyncio并分享了一些技巧。 由于我没有找到俄文的完整内容,因此我决定将其翻译。
Asyncio是python程序员的竞争梦想:您编写与同步代码接壤的代码,然后让Python完成其余工作。 这是反重力库的另一个导入: 导入反重力
实际上,事实并非如此,并发编程是一项艰巨的工作,尽管协程使我们避免了回调地狱,这可能会使您走得足够远,但您仍然需要考虑创建任务,获取结果以及优雅地捕获异常。 真伤心
好消息是,所有这些都可以在asyncio中实现。 坏消息是,问题出在哪里以及如何解决它并不总是立即显而易见。 以下是我在使用asyncio时发现的一些模式。
开始之前:
我使用了我最喜欢的aiohttp库来执行异步HTTP请求和Hacker News API,因为它是一个遵循熟悉的使用场景的简单而知名的网站。 在回应上一篇文章之后 ,我还使用了Python 3.5中引入的async / await语法。 我以为读者熟悉这里描述的想法。 最终,所有示例都可以在本文的GitHub存储库中找到 。
好的,让我们开始吧!
递归协程
在异步中,创建和运行任务很简单。 对于此类任务,API在AbstractEventLoop类中包括几种方法,并在库中包括函数。 但是通常您希望合并这些任务的结果并以某种方式对其进行处理,并且递归是该方案的一个很好的示例,并且与其他竞争工具相比,协程还比较简单。
使用asyncio的常见情况是创建某种Web搜寻器。 想象一下,我们太忙了以至于无法查看HackerNews,或者您也许只是一个好人,因此您想要实现一个系统来检索特定HN帖子的评论数量,并且如果该数量超出阈值,则会通知您。 您在Google上搜索了一下,找到了所需的HN API文档,但是您在文档中注意到了以下内容:
想知道文章评论的总数吗? 绕树走,数数。
已接电话!
""" , , . , Hacker News """ import asyncio import argparse import logging from urllib.parse import urlparse, parse_qs from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the comments of a Hacker News post.') parser.add_argument('--id', type=int, default=8863, help='ID of the post in HN, defaults to 8863') parser.add_argument('--url', type=str, help='URL of a post in HN') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) fetch_counter = 0 async def fetch(session, url): """ URL aiohttp, JSON . aiohttp . """ global fetch_counter with async_timeout.timeout(FETCH_TIMEOUT): fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, post_id): """ . """ url = URL_TEMPLATE.format(post_id) now = datetime.now() response = await fetch(session, url) log.debug('{:^6} > Fetching of {} took {} seconds'.format( post_id, url, (datetime.now() - now).total_seconds())) if 'kids' not in response:
随意尝试使用带有“ -verbose”标志的脚本来获得更详细的输出。
[14:47:32] > Calculating comments took 2.23 seconds and 73 fetches [14:47:32] -- Post 8863 has 72 comments
让我们跳过样板代码,直接进入递归协程。 请注意,该代码几乎与同步代码一样完全被读取。
async def post_number_of_comments(loop, session, post_id): """ . """ url = URL_TEMPLATE.format(post_id) now = datetime.now() response = await fetch(session, url) log.debug('{:^6} > Fetching of {} took {} seconds'.format( post_id, url, (datetime.now() - now).total_seconds())) if 'kids' not in response:
- 首先,我们使用发布数据获取JSON。
- 递归地绕过每个继承人。
- 最后,我们将达到基本情况并返回零,
当帖子没有反馈时。 - 从基本案例返回时,将答案添加到当前帖子中
继承人的数目并返回
这是布雷特·斯拉特金(Brett Slatkin)所描述的扇入和扇出的一个很好的例子,我们扇出以接收继承人的数据,扇入汇总数据以计算评论数
asyncio API中有几种方法可以执行这些扇出操作。 在这里,我使用了collect函数,该函数有效地期望所有协程完成并返回其结果列表。
让我们注意协程的使用也与递归之间存在任意数量的协程的良好匹配如何,在调用collect函数期间等待对它们的请求的答案,并在I / O操作完成后恢复执行。 这使我们能够在一种优雅且(易于)读取的corutin中表达相当复杂的行为。
“很简单,”您说? 好吧,让我们上一个档次。
射门忘了
想象一下,您想给自己发送一封电子邮件,其中所包含的帖子中的注释多于某个特定值,并且您希望采用与处理帖子树相同的方式进行操作。 我们可以简单地在递归函数的末尾添加if语句来实现此目的:
async def post_number_of_comments(loop, session, post_id): """ . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response:
是的,我使用了asyncio.sleep。 这是最后一次。 我保证
[09:41:02] Post logged [09:41:04] Post logged [09:41:06] Post logged [09:41:06] > Calculating comments took 6.35 seconds and 73 fetches [09:41:06] -- Post 8863 has 72 comments
这比以前慢得多!
原因是,正如我们前面所讨论的,await暂停协程执行,直到将来完成为止,但是由于我们不需要日志记录的结果,因此没有真正的理由这样做。
我们需要使用协程“射击和忘记”,并且由于我们无法等待使用await完成它,因此我们需要另一种无需等待就可以启动协程的方法。 快速浏览asyncio API会发现suresure_future函数,该函数将安排协程运行,将其包装在Task对象中,然后将其返回。 请记住,在计划协程之前,一个事件周期将控制我们在未来某个时候的协程结果,而此时另一个协程将处于预期状态。
很好,让我们如下替换await log_post:
async def post_number_of_comments(loop, session, post_id): """ . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response:
[09:42:57] > Calculating comments took 1.69 seconds and 73 fetches [09:42:57] -- Post 8863 has 72 comments [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1109197f8>()]>> [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919948>()]>> [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919978>()]>>
哎呀,令人敬畏的任务被摧毁了,但是它正在等待中! 困扰着世界各地的asyncio用户。 好消息是我们回到了早些时候收到的时间(1.69页)。坏消息是asyncio不喜欢超出“即发即弃”的范围。
问题在于,在获得post_number_of_comments 协程的结果之后,我们将强制关闭事件循环,而没有完成任务log_post的时间。
我们有两个选择:
我们要么使用run_forever让事件循环无休止地工作并手动终止脚本,要么使用Task类的all_tasks方法查找所有工作任务并等待注释数量的计算完成。
让我们尝试通过在调用post_number_of_comments之后快速进行更改来摆脱这种情况:
if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) pending_tasks = [ task for task in asyncio.Task.all_tasks() if not task.done()] loop.run_until_complete(asyncio.gather(*pending_tasks)) loop.close()
[09:47:29] > Calculating comments took 1.72 seconds and 73 fetches [09:47:29] — Post 8863 has 72 comments [09:47:30] Post logged [09:47:31] Post logged [09:47:32] Post logged
现在我们确定日志记录任务已完成。
当在事件循环中正确执行任务时,假设all_tasks方法可以很好地工作是一个好主意,但是在更复杂的情况下,可以执行任意数量的任务,其源可以位于代码外部。
另一种方法是在我们独立注册我们计划启动并允许执行的所有协程之后,恢复订单,该协程已被推迟,
评论计数结束后。 如您所知, sure_future函数返回一个Task对象,我们可以使用它注册低优先级的任务。 让我们只定义一个task_registry列表并将期货存储在其中:
async def post_number_of_comments(loop, session, post_id): """Retrieve data for current post and recursively for all comments. """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response:
[09:53:46] > Calculating comments took 1.68 seconds and 73 fetches [09:53:46] — Post 8863 has 72 comments [09:53:46] Post logged [09:53:48] Post logged [09:53:49] Post logged
我们学习了以下课程- 异步不应被视为像Celery这样的分布式任务队列。 所有任务都在一个线程中启动,并且必须相应地管理事件的周期,以便您分配时间来完成任务。
这导致了另一种普遍接受的模式:
定期触发协程
继续我们关于HN的示例(我们之前做得很好),我们决定
这对于在HN出版物上获得评论并在它们出现在5个最近条目的列表中时,计算它们的评论数量至关重要。
快速浏览HN API会显示一个端点,该端点返回最近的500条记录。 太好了,因此我们可以简单地轮询此端点以接收新出版物并计算对它们的评论数,例如每五秒钟发表一次。
好了,由于我们现在正在进行定期轮询,因此我们可以简单地使用无限的while循环,等待轮询任务完成(调用await ),然后在所需的时间内入睡 (调用sleep )。 我做了一些小的更改以获取热门条目,而不是使用直接发布URL。
""" An example of periodically scheduling coroutines using an infinite loop of awaiting and sleeping. """ import asyncio import argparse import logging from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.') parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll') parser.add_argument('--limit', type=int, default=5,help='Number of new stories to calculate comments for') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) fetch_counter = 0 async def fetch(session, url): """ URL aiohttp, JSON . aiohttp . """ global fetch_counter with async_timeout.timeout(FETCH_TIMEOUT): fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, post_id): """ . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response:
[10:14:03] Calculating comments for top 5 stories. (1) [10:14:06] Post 13848196 has 31 comments (1) [10:14:06] Post 13849430 has 37 comments (1) [10:14:06] Post 13849037 has 15 comments (1) [10:14:06] Post 13845337 has 128 comments (1) [10:14:06] Post 13847465 has 27 comments (1) [10:14:06] > Calculating comments took 2.96 seconds and 244 fetches [10:14:06] Waiting for 5 seconds… [10:14:11] Calculating comments for top 5 stories. (2) [10:14:14] Post 13848196 has 31 comments (2) [10:14:14] Post 13849430 has 37 comments (2) [10:14:14] Post 13849037 has 15 comments (2) [10:14:14] Post 13845337 has 128 comments (2) [10:14:14] Post 13847465 has 27 comments (2) [10:14:14] > Calculating comments took 3.04 seconds and 244 fetches [10:14:14] Waiting for 5 seconds…
很好,但是有一个小问题:如果您注意时间戳记,
那么任务就不会严格每5秒启动一次,它会在get_comments_of_top_stories完成 后 5秒开始启动。 再次使用等待和阻塞的结果,直到我们得到结果。
当任务耗时超过五秒钟时,这些功能不会造成问题。 同样,当协程设计为无限时,使用_run_until complete似乎是错误的。
好消息是,我们现在是sure_future的专家,我们可以将其推入代码中,而不必使用await ...
async def poll_top_stories_for_comments(loop, session, period, limit): """ . """ global fetch_counter iteration = 1 while True: now = datetime.now() log.info("Calculating comments for top {} stories. ({})".format( limit, iteration)) asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("Waiting for {} seconds...".format(period)) iteration += 1 fetch_counter = 0 await asyncio.sleep(period)
[10:55:40] Calculating comments for top 5 stories. (1) [10:55:40] > Calculating comments took 0.00 seconds and 0 fetches [10:55:40] Waiting for 5 seconds… [10:55:43] Post 13848196 has 32 comments (1) [10:55:43] Post 13849430 has 48 comments (1) [10:55:43] Post 13849037 has 16 comments (1) [10:55:43] Post 13845337 has 129 comments (1) [10:55:43] Post 13847465 has 29 comments (1) [10:55:45] Calculating comments for top 5 stories. (2) [10:55:45] > Calculating comments took 0.00 seconds and 260 fetches [10:55:45] Waiting for 5 seconds… [10:55:48] Post 13848196 has 32 comments (2) [10:55:48] Post 13849430 has 48 comments (2) [10:55:48] Post 13849037 has 16 comments (2) [10:55:48] Post 13845337 has 129 comments (2) [10:55:48] Post 13847465 has 29 comments (2)
好吧,好消息是,时间戳记恰好位于5秒钟后,但是0.00秒内没有什么采样,然后又有什么采样,然后下一次迭代需要零秒和260个采样?
这是避免等待的结果之一,现在我们不再阻塞协程,而直接转到下一行,该行显示零秒,并且首次检索到零消息。 这些是非常小的任务,因为我们可以生活在没有消息的情况下,但是如果我们需要任务的结果怎么办?
然后,我的朋友,我们需要求助于...回调(将((((
我知道,协程的全部目的是避免回调,但这是因为文章的戏剧性副标题是“ Out of await”。 我们不再处于等待状态 ,我们通过手动启动任务来冒险,这导致了我们的用例。 这给你什么? 扰流板
正如我们前面讨论的, sure_future返回一个Future对象,我们可以使用_add_done callback向其添加一个回调 。
在执行此操作之前,为了获取正确的内容,我们需要将提取协程封装到URLFetcher类中。 在这种情况下,我们为每个任务创建一个实例,以便我们拥有正确的样本数。 无论如何,我们还删除了引入该错误的全局变量:
""" ensure_future sleep. future, ensure_future, , URLFetcher . """ import asyncio import argparse import logging from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.') parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll') parser.add_argument('--limit', type=int, default=5, help='Number of new stories to calculate comments for') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) class URLFetcher(): """ URL """ def __init__(self): self.fetch_counter = 0 async def fetch(self, session, url): """ URL aiohttp, JSON . aiohttp . """ with async_timeout.timeout(FETCH_TIMEOUT): self.fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, fetcher, post_id): """ . """ url = URL_TEMPLATE.format(post_id) response = await fetcher.fetch(session, url)
[12:23:40] Calculating comments for top 5 stories. (1) [12:23:40] Waiting for 5 seconds... [12:23:43] Post 13848196 has 38 comments (1) [12:23:43] Post 13849430 has 72 comments (1) [12:23:43] Post 13849037 has 19 comments (1) [12:23:43] Post 13848283 has 64 comments (1) [12:23:43] Post 13847465 has 34 comments (1) [12:23:43] > Calculating comments took 3.17 seconds and 233 fetches [12:23:45] Calculating comments for top 5 stories. (2) [12:23:45] Waiting for 5 seconds... [12:23:47] Post 13848196 has 38 comments (2) [12:23:47] Post 13849430 has 72 comments (2) [12:23:47] Post 13849037 has 19 comments (2) [12:23:47] Post 13848283 has 64 comments (2) [12:23:47] Post 13847465 has 34 comments (2) [12:23:47] > Calculating comments took 2.47 seconds and 233 fetches [12:23:50] Calculating comments for top 5 stories. (3) [12:23:50] Waiting for 5 seconds...
, , callback:
async def poll_top_stories_for_comments(loop, session, period, limit): """ . """ iteration = 1 while True: log.info("Calculating comments for top {} stories. ({})".format( limit, iteration)) future = asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 await asyncio.sleep(period)
, callback , future. (fetch) URLFetcher _get_comments_of_top stories future.
看吗 , , await .
callback-, API asyncio AbstractBaseLoop _call later _call at ,
- . , , poll_top_stories_for_comments :
def poll_top_stories_for_comments(loop, session, period, limit, iteration=0): """ get_comments_of_top_stories. """ log.info("Calculating comments for top {} stories ({})".format( limit, iteration)) future = asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 loop.call_later( period, partial(
. :
- , ensure_future .
( : ) - _poll_top_stories_for comments , , _run forever , .
, , , — ? ? URL:
MAXIMUM_FETCHES = 5 class URLFetcher(): """ URL """ def __init__(self): self.fetch_counter = 0 async def fetch(self, session, url): """ URL aiohttp, JSON . aiohttp . """ with async_timeout.timeout(FETCH_TIMEOUT): self.fetch_counter += 1 if self.fetch_counter > MAXIMUM_FETCHES: raise Exception('BOOM!') async with session.get(url) as response: return await response.json()
[12:51:00] Calculating comments for top 5 stories. (1) [12:51:00] Waiting for 5 seconds… [12:51:01] Exception in callback poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion('BOOM!',)>) at 05_periodic_coroutines.py:121 handle: <Handle poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion('BOOM!',)>) at 05_periodic_coroutines.py:121> Traceback (most recent call last): File “/Users/yeray/.pyenv/versions/3.6.0/lib/python3.6/asyncio/events.py”, line 126, in _run self._callback(*self._args) File “05_periodic_coroutines.py”, line 122, in callback fetch_count = fut.result() File “05_periodic_coroutines.py”, line 100, in get_comments_of_top_stories results = await asyncio.gather(*tasks) File “05_periodic_coroutines.py”, line 69, in post_number_of_comments response = await fetcher.fetch(session, url) File “05_periodic_coroutines.py”, line 58, in fetch raise Exception('BOOM!') Exception: BOOM! [12:51:05] Calculating comments for top 5 stories. (2) [12:51:05] Waiting for 5 seconds…
, ?
怎么办 , , : asyncio :