أنماط Asyncio coroutine: خارج انتظار

مقدمة المترجم:
مرة أخرى أخطو على أشعل النار أثناء العمل مع python asyncio ، ذهبت إلى الإنترنت للعثور على شيء أكثر متعة من الوثائق الجافة. لقد قرأت مقالًا بقلم Yeray Diaz بعنوان "أنماط Asyncio Coroutine: Beyond انتظار" ، حيث ينظر المؤلف بشكل مثير للاهتمام في استخدام Asyncio ويشارك بعض الحيل. نظرًا لأنني لم أجد أي شيء مكتمل باللغة الروسية ، فقد قررت ترجمته.


Asyncio هو حلم تنافسي لمبرمج بايثون: تكتب رمزًا متزامنًا على نحو متزامن وتترك Python تقوم بالباقي. هذا استيراد آخر لمكتبة مكافحة الجاذبية: استيراد مكافحة الجاذبية


في الواقع ، هذا ليس صحيحًا على الإطلاق ، فالبرمجة المتزامنة هي عمل شاق ، وبينما تسمح لنا coroutines بتجنب رد الاتصال الجحيم ، والذي يمكن أن يأخذك بعيدًا بما فيه الكفاية ، ما زلت بحاجة إلى التفكير في إنشاء المهام ، والحصول على النتائج ، والتقاط الاستثناءات بأناقة. إنه أمر محزن.


الخبر السار هو أن كل هذا ممكن في asyncio. الأخبار السيئة هي أنه ليس من الواضح دائمًا ما هو الخطأ وكيفية إصلاحه. فيما يلي بعض الأنماط التي اكتشفتها أثناء العمل مع Asyncio.



قبل أن نبدأ:


لقد استخدمت مكتبة aiohttp المفضلة لدي لتنفيذ طلبات HTTP غير المتزامنة وواجهة برمجة تطبيقات Hacker News لأنها موقع بسيط ومعروف يتبع سيناريو الاستخدام المألوف. بعد الرد على مقالتي السابقة ، استخدمت أيضًا بناء جملة async / ينتظر في Python 3.5. افترضت أن القارئ كان على دراية بالأفكار الموضحة هنا. وفي النهاية ، تتوفر جميع الأمثلة في مستودع GitHub من هذه المقالة .


حسنًا ، فلنبدأ!


Coroutines العودية


يعد إنشاء المهام وتشغيلها أمرًا تافهًا في حالة عدم التزامن. لمثل هذه المهام ، يتضمن API عدة طرق في فئة AbstractEventLoop ، بالإضافة إلى وظائف في المكتبة. ولكن عادة ما ترغب في الجمع بين نتائج هذه المهام ومعالجتها بطريقة ما ، ويعد التكرار مثالًا رائعًا على هذا المخطط ، ويوضح أيضًا بساطة coroutine بالمقارنة مع الأدوات التنافسية الأخرى.


من الحالات الشائعة لاستخدام asyncio إنشاء نوع من زاحف الويب. تخيل أننا مشغولون للغاية بحيث لا يمكننا التحقق من HackerNews ، أو ربما تريد فقط holivar جيد ، لذلك تريد تنفيذ نظام يسترد عدد التعليقات لمنشور HN معين ، وإذا كان أعلى من الحد الأدنى ، يخطرك. لقد غوغل قليلاً ووجدت وثائق واجهة برمجة تطبيقات HN ، ما تحتاجه تمامًا ، لكنك لاحظت ما يلي في الوثائق:


هل تريد معرفة العدد الإجمالي لتعليقات المقالات؟ إذهب حول الشجرة وعدهم.

تم قبول المكالمة!


"""          ,       ,      . ,         Hacker News      """ import asyncio import argparse import logging from urllib.parse import urlparse, parse_qs from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the comments of a Hacker News post.') parser.add_argument('--id', type=int, default=8863, help='ID of the post in HN, defaults to 8863') parser.add_argument('--url', type=str, help='URL of a post in HN') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) fetch_counter = 0 async def fetch(session, url): """  URL   aiohttp,    JSON .     aiohttp    . """ global fetch_counter with async_timeout.timeout(FETCH_TIMEOUT): fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, post_id): """          . """ url = URL_TEMPLATE.format(post_id) now = datetime.now() response = await fetch(session, url) log.debug('{:^6} > Fetching of {} took {} seconds'.format( post_id, url, (datetime.now() - now).total_seconds())) if 'kids' not in response: #  .   return 0 #     ,    (  ) number_of_comments = len(response['kids']) #       log.debug('{:^6} > Fetching {} child posts'.format( post_id, number_of_comments)) tasks = [post_number_of_comments(loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments def id_from_HN_url(url): """   `id` URL       None. """ parse_result = urlparse(url) try: return parse_qs(parse_result.query)['id'][0] except (KeyError, IndexError): return None if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) loop.close() 

لا تتردد في محاولة تشغيل البرنامج النصي بعلامة "-verbose" للحصول على إخراج أكثر تفصيلاً.


  [14:47:32] > Calculating comments took 2.23 seconds and 73 fetches [14:47:32] -- Post 8863 has 72 comments 

دعنا نتخطى كود الصفيحة ونذهب مباشرة إلى الروتين العودي. لاحظ أن هذا الرمز يتم قراءته بالكامل تقريبًا كما هو الحال مع التعليمات البرمجية المتزامنة.


 async def post_number_of_comments(loop, session, post_id): """          . """ url = URL_TEMPLATE.format(post_id) now = datetime.now() response = await fetch(session, url) log.debug('{:^6} > Fetching of {} took {} seconds'.format( post_id, url, (datetime.now() - now).total_seconds())) if 'kids' not in response: # base case, there are no comments return 0 #     ,    (  ) number_of_comments = len(response['kids']) #       log.debug('{:^6} > Fetching {} child posts'.format( post_id, number_of_comments)) tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #          number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments 

  1. أولاً نحصل على JSON مع بيانات النشر.
  2. كرر كل واحد من الورثة.
  3. في النهاية ، سنصل إلى الحالة الأساسية ونعود صفر ،
    عندما لا تحتوي المشاركة على تعليقات.
  4. عند العودة من الحالة الأساسية ، أضف الإجابات على المنشور الحالي
    بعدد الورثة والعودة

هذا مثال رائع على ما يصفه بريت سلاتكين بأنه " معجب في الداخل وخارجه " ، نحن متفرغون لاستقبال البيانات من الورثة ، ويلخص المعجبون البيانات لحساب عدد التعليقات


هناك طريقتان في واجهة برمجة تطبيقات asyncio لإجراء عمليات التوزيع هذه. هنا أستخدم وظيفة التجميع ، والتي تتوقع بشكل فعال أن تكمل جميع الكائنات الحية قائمة نتائجها وتعيدها.


دعونا ننتبه إلى كيفية توافق استخدام coroutine أيضًا بشكل جيد مع العودية مع أي نقطة واحدة يوجد فيها أي عدد من coroutine ، في انتظار الإجابات على طلباتهم أثناء استدعاء وظيفة التجميع واستئناف التنفيذ بعد اكتمال عملية الإدخال / الإخراج. هذا يسمح لنا بالتعبير عن سلوك معقد إلى حد ما في كوروتين واحد أنيق و (سهل) للقراءة.


تقول "بسيط جدا"؟ حسنًا ، لنذهب لأعلى درجة.


قتل ونسي


تخيل أنك تريد أن ترسل لنفسك رسالة بريد إلكتروني تحتوي على مشاركات تحتوي على تعليقات أكثر من قيمة معينة ، وتريد القيام بذلك بنفس الطريقة التي قمنا بها في شجرة المنشورات. يمكننا ببساطة إضافة تعليمة if في نهاية الدالة العودية لتحقيق ذلك:


 async def post_number_of_comments(loop, session, post_id): """           . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: #  .   return 0 #       . number_of_comments = len(response['kids']) #       tasks = [post_number_of_comments(loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) #        if number_of_comments > MIN_COMMENTS: await log_post(response) return number_of_comments async def log_post(post): """   . """ await asyncio.sleep(random() * 3) log.info("Post logged") 

نعم ، لقد استخدمت asyncio.sleep. هذه هي المرة الأخيرة. أعدك.


 [09:41:02] Post logged [09:41:04] Post logged [09:41:06] Post logged [09:41:06] > Calculating comments took 6.35 seconds and 73 fetches [09:41:06] -- Post 8863 has 72 comments 

هذا أبطأ بكثير من ذي قبل!
والسبب هو ، كما ناقشنا سابقًا ، في انتظار إيقاف تنفيذ coroutine مؤقتًا حتى يكتمل المستقبل ، ولكن نظرًا لأننا لا نحتاج إلى نتيجة التسجيل ، فلا يوجد سبب حقيقي للقيام بذلك.


نحن بحاجة إلى "إطلاق النار والنسيان" مع coroutine لدينا ، وبما أننا لا نستطيع الانتظار حتى ينتهي باستخدام الانتظار ، فنحن بحاجة إلى طريقة أخرى لبدء coroutine دون انتظار. نظرة سريعة على واجهة برمجة تطبيقات asyncio ستعثر على وظيفة include_future التي ستقوم بجدولة Coroutine للتشغيل ، ولفها في كائن Task ، وإعادتها. تذكر أنه قبل التخطيط لـ coroutine ، ستتحكم دورة من الأحداث في نتيجة coroutine في مرحلة ما في المستقبل ، عندما يكون coroutine آخر في حالة توقع.


رائع ، دعنا نستبدل log_post على النحو التالي:


 async def post_number_of_comments(loop, session, post_id): """           . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: # base case, there are no comments return 0 #       . number_of_comments = len(response['kids']) #       tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) #        if number_of_comments > MIN_COMMENTS: asyncio.ensure_future(log_post(response)) return number_of_comments 

 [09:42:57] > Calculating comments took 1.69 seconds and 73 fetches [09:42:57] -- Post 8863 has 72 comments [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1109197f8>()]>> [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919948>()]>> [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919978>()]>> 

مهم ، تم تدمير المهمة الرائعة ولكنها معلقة! يطارد المستخدمين المتزامنين حول العالم. الخبر السار هو أننا عدنا إلى الوقت الذي تلقيناه في وقت سابق (1.69 ص) ، والخبر السيئ هو أن asyncio لا يحب تجاوز نطاق اللقطة والنسيان.


تكمن المشكلة في أننا أغلقنا حلقة الحدث بقوة بعد أن نحصل على نتيجة اللائحة post_number_of_comments ، دون ترك وقت log_post الخاص بنا لإكماله .


لدينا خياران:
إما أن ندع حلقة الحدث تعمل إلى ما لا نهاية باستخدام run_forever وننهي النص البرمجي يدويًا ، أو نستخدم طريقة all_tasks لفئة المهام للعثور على جميع مهام العمل والانتظار حتى ينتهي حساب عدد التعليقات.


دعنا نحاول الخروج من هذا الموقف عن طريق إجراء تغييرات بسرعة بعد استدعائنا إلى post_number_of_comments :


 if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) pending_tasks = [ task for task in asyncio.Task.all_tasks() if not task.done()] loop.run_until_complete(asyncio.gather(*pending_tasks)) loop.close() 

 [09:47:29] > Calculating comments took 1.72 seconds and 73 fetches [09:47:29]Post 8863 has 72 comments [09:47:30] Post logged [09:47:31] Post logged [09:47:32] Post logged 

نحن الآن على يقين من أن مهام التسجيل قد اكتملت.
يُعد الافتراض بأن طريقة all_tasks تعمل بشكل جيد في الحالات التي نتعامل معها فكرة رائعة عندما يتم تنفيذ المهام بشكل مناسب في حلقة الأحداث لدينا ، ولكن في الحالات الأكثر تعقيدًا يمكن أن يكون هناك أي عدد من المهام التي يمكن تنفيذها ، والتي يمكن أن يكون مصدرها خارج التعليمات البرمجية الخاصة بنا .


نهج آخر هو استعادة النظام بعد أن نسجل بشكل مستقل جميع الكائنات الحية التي خططنا لإطلاقها والسماح بتنفيذها ، والتي تم تأجيلها في وقت سابق ،
بمجرد الانتهاء من عد التعليقات. كما تعلمون ، تُرجع الدالة include_future كائن مهمة . يمكننا استخدام هذا لتسجيل مهامنا بأولوية منخفضة. دعنا فقط نحدد قائمة المهام_السجلية ونخزن العقود الآجلة فيها:


 async def post_number_of_comments(loop, session, post_id): """Retrieve data for current post and recursively for all comments. """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: # base case, there are no comments return 0 # calculate this post's comments as number of comments number_of_comments = len(response['kids']) # create recursive tasks for all comments tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] # schedule the tasks and retrieve results results = await asyncio.gather(*tasks) # reduce the descendents comments and add it to this post's number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) # Log if number of comments is over a threshold if number_of_comments > MIN_COMMENTS: # Add the future to the registry task_registry.append(asyncio.ensure_future(log_post(response))) return number_of_comments # (... ommitted code ...) # if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() task_registry = [] # define our task registry with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) pending_tasks = [task for task in task_registry if not task.done()] loop.run_until_complete(asyncio.gather(*pending_tasks)) loop.close() 

 [09:53:46] > Calculating comments took 1.68 seconds and 73 fetches [09:53:46]Post 8863 has 72 comments [09:53:46] Post logged [09:53:48] Post logged [09:53:49] Post logged 

نتعلم الدرس التالي - لا ينبغي اعتبار Asyncio كقائمة انتظار مهمة موزعة مثل الكرفس . يتم تشغيل جميع المهام في سلسلة واحدة ويجب إدارة دورة الأحداث وفقًا لذلك ، مما يتيح لك تخصيص الوقت لإكمال المهام.


مما يؤدي إلى نمط آخر مقبول بشكل عام:


أثار corوتينات بشكل دوري


استمرارًا بمثالنا حول HN (وقمنا بعمل رائع في وقت سابق) ، قررنا
وهو أمر حاسم لحساب عدد التعليقات على منشور HN بمجرد أن تصبح متاحة وأثناء وجودهم في قائمة 5 إدخالات حديثة.


نظرة سريعة على HN API تُظهر نقطة نهاية تُرجع آخر 500 سجل. رائع ، لذلك يمكننا ببساطة استطلاع نقطة النهاية هذه لتلقي منشورات جديدة وحساب عدد التعليقات عليها ، كل خمس ثوانٍ على سبيل المثال.


حسنًا ، نظرًا لأننا ننتقل الآن إلى الاقتراع الدوري ، يمكننا ببساطة استخدام اللامتناهي أثناء التكرار ، والانتظار حتى تكتمل مهمة الاستطلاع ( ننتظر المكالمة) ، ونغفو ( نوم المكالمة) للمدة المطلوبة من الوقت. لقد أجريت بعض التغييرات الطفيفة للحصول على أفضل الإدخالات بدلاً من استخدام عنوان URL المباشر للمشاركة.


 """ An example of periodically scheduling coroutines using an infinite loop of awaiting and sleeping. """ import asyncio import argparse import logging from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.') parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll') parser.add_argument('--limit', type=int, default=5,help='Number of new stories to calculate comments for') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) fetch_counter = 0 async def fetch(session, url): """  URL   aiohttp,    JSON .     aiohttp    . """ global fetch_counter with async_timeout.timeout(FETCH_TIMEOUT): fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, post_id): """          . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: # base case, there are no comments return 0 #     ,    (  ) number_of_comments = len(response['kids']) #       tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments async def get_comments_of_top_stories(loop, session, limit, iteration): """     HN. """ response = await fetch(session, TOP_STORIES_URL) tasks = [post_number_of_comments(loop, session, post_id) for post_id in response[:limit]] results = await asyncio.gather(*tasks) for post_id, num_comments in zip(response[:limit], results): log.info("Post {} has {} comments ({})".format(post_id, num_comments, iteration)) async def poll_top_stories_for_comments(loop, session, period, limit): """         . """ global fetch_counter iteration = 1 while True: now = datetime.now() log.info("Calculating comments for top {} stories. ({})".format(limit, iteration)) await get_comments_of_top_stories(loop, session, limit, iteration) log.info('> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("Waiting for {} seconds...".format(period)) iteration += 1 fetch_counter = 0 await asyncio.sleep(period) if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: loop.run_until_complete( poll_top_stories_for_comments( loop, session, args.period, args.limit)) loop.close() 

 [10:14:03] Calculating comments for top 5 stories. (1) [10:14:06] Post 13848196 has 31 comments (1) [10:14:06] Post 13849430 has 37 comments (1) [10:14:06] Post 13849037 has 15 comments (1) [10:14:06] Post 13845337 has 128 comments (1) [10:14:06] Post 13847465 has 27 comments (1) [10:14:06] > Calculating comments took 2.96 seconds and 244 fetches [10:14:06] Waiting for 5 seconds… [10:14:11] Calculating comments for top 5 stories. (2) [10:14:14] Post 13848196 has 31 comments (2) [10:14:14] Post 13849430 has 37 comments (2) [10:14:14] Post 13849037 has 15 comments (2) [10:14:14] Post 13845337 has 128 comments (2) [10:14:14] Post 13847465 has 27 comments (2) [10:14:14] > Calculating comments took 3.04 seconds and 244 fetches [10:14:14] Waiting for 5 seconds… 

رائع ، ولكن هناك مشكلة بسيطة: إذا انتبهت إلى الطابع الزمني ،
ثم لا تبدأ المهمة بشكل صارم كل 5 ثوانٍ ، وتبدأ بعد 5 ثوانٍ من إكمال get_comments_of_top_stories . مرة أخرى عواقب استخدام الانتظار والحظر حتى نحصل على نتائجنا مرة أخرى.
لا تطرح هذه الميزات مشكلة عندما تستغرق المهمة أكثر من خمس ثوانٍ. أيضا ، يبدو من الخطأ استخدام _run_until كاملة عندما تم تصميم coroutine ليكون لانهائي.


الخبر السار هو أننا الآن خبراء في ضمان المستقبل ، ويمكننا فقط دفعه إلى الكود بدلاً من استخدامه في انتظار ...


 async def poll_top_stories_for_comments(loop, session, period, limit): """         . """ global fetch_counter iteration = 1 while True: now = datetime.now() log.info("Calculating comments for top {} stories. ({})".format( limit, iteration)) asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("Waiting for {} seconds...".format(period)) iteration += 1 fetch_counter = 0 await asyncio.sleep(period) 

  [10:55:40] Calculating comments for top 5 stories. (1) [10:55:40] > Calculating comments took 0.00 seconds and 0 fetches [10:55:40] Waiting for 5 seconds… [10:55:43] Post 13848196 has 32 comments (1) [10:55:43] Post 13849430 has 48 comments (1) [10:55:43] Post 13849037 has 16 comments (1) [10:55:43] Post 13845337 has 129 comments (1) [10:55:43] Post 13847465 has 29 comments (1) [10:55:45] Calculating comments for top 5 stories. (2) [10:55:45] > Calculating comments took 0.00 seconds and 260 fetches [10:55:45] Waiting for 5 seconds… [10:55:48] Post 13848196 has 32 comments (2) [10:55:48] Post 13849430 has 48 comments (2) [10:55:48] Post 13849037 has 16 comments (2) [10:55:48] Post 13845337 has 129 comments (2) [10:55:48] Post 13847465 has 29 comments (2) 

مهم ... حسنًا ، الخبر السار هو أن الطابع الزمني يقع بالضبط بعد خمس ثوانٍ بالضبط ، ولكن ماذا يوجد في 0.00 ثانية ولا توجد عينات ، ثم يستغرق التكرار التالي صفر ثانية و 260 عينة؟


هذه إحدى عواقب تجنب الانتظار ، والآن لم نعد نحظر coroutine وننتقل ببساطة إلى السطر التالي ، الذي يطبع صفر ثانية ، ولا يتم استرجاع أية رسائل للمرة الأولى. هذه مهام صغيرة جدًا ، حيث يمكننا العيش بدون رسائل ، ولكن ماذا لو كنا بحاجة إلى نتائج المهام؟


ثم يا صديقي نحتاج أن نلجأ إلى ... رد (((()


أعلم ، أعلم ، أن الهدف الأساسي من coroutine هو تجنب الاسترجاعات ، ولكن هذا لأن العنوان الفرعي الدرامي للمقال هو "خارج انتظار". لم نعد في انتظار المنطقة ، لدينا مغامرات مع التشغيل اليدوي للمهام ، مما يؤدي إلى حالة الاستخدام الخاصة بنا. ماذا يمنحك هذا؟ المفسد


كما ناقشنا سابقًا ، تضمن include_future كائنًا مستقبليًا يمكننا إضافة رد اتصال باستخدام _add_done callback .


قبل القيام بذلك ، ومن أجل الحصول على الجلب الصحيح ، نصل إلى النقطة التي نحتاجها لتغليف كوروتين الاستخراج في فئة URLFetcher . في هذه الحالة ، نقوم بإنشاء مثيل لكل مهمة بحيث يكون لدينا العدد الصحيح للعينات. نزيل أيضًا المتغير العام الذي أدخل الخطأ على أي حال:


 """               ensure_future   sleep.      future,    ensure_future,         ,    URLFetcher   . """ import asyncio import argparse import logging from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.') parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll') parser.add_argument('--limit', type=int, default=5, help='Number of new stories to calculate comments for') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) class URLFetcher(): """   URL     """ def __init__(self): self.fetch_counter = 0 async def fetch(self, session, url): """  URL   aiohttp,    JSON .     aiohttp    . """ with async_timeout.timeout(FETCH_TIMEOUT): self.fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, fetcher, post_id): """          . """ url = URL_TEMPLATE.format(post_id) response = await fetcher.fetch(session, url) #  .  . if response is None or 'kids' not in response: return 0 #     ,    (  ) number_of_comments = len(response['kids']) #       tasks = [post_number_of_comments( loop, session, fetcher, kid_id) for kid_id in response['kids']] # s     results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments async def get_comments_of_top_stories(loop, session, limit, iteration): """    HN. """ fetcher = URLFetcher() # create a new fetcher for this task response = await fetcher.fetch(session, TOP_STORIES_URL) tasks = [post_number_of_comments( loop, session, fetcher, post_id) for post_id in response[:limit]] results = await asyncio.gather(*tasks) for post_id, num_comments in zip(response[:limit], results): log.info("Post {} has {} comments ({})".format( post_id, num_comments, iteration)) return fetcher.fetch_counter # return the fetch count async def poll_top_stories_for_comments(loop, session, period, limit): """         . """ iteration = 1 while True: log.info("Calculating comments for top {} stories. ({})".format(limit, iteration)) future = asyncio.ensure_future(get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info('> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 await asyncio.sleep(period) if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: loop.run_until_complete( poll_top_stories_for_comments( loop, session, args.period, args.limit)) loop.close() 

  [12:23:40] Calculating comments for top 5 stories. (1) [12:23:40] Waiting for 5 seconds... [12:23:43] Post 13848196 has 38 comments (1) [12:23:43] Post 13849430 has 72 comments (1) [12:23:43] Post 13849037 has 19 comments (1) [12:23:43] Post 13848283 has 64 comments (1) [12:23:43] Post 13847465 has 34 comments (1) [12:23:43] > Calculating comments took 3.17 seconds and 233 fetches [12:23:45] Calculating comments for top 5 stories. (2) [12:23:45] Waiting for 5 seconds... [12:23:47] Post 13848196 has 38 comments (2) [12:23:47] Post 13849430 has 72 comments (2) [12:23:47] Post 13849037 has 19 comments (2) [12:23:47] Post 13848283 has 64 comments (2) [12:23:47] Post 13847465 has 34 comments (2) [12:23:47] > Calculating comments took 2.47 seconds and 233 fetches [12:23:50] Calculating comments for top 5 stories. (3) [12:23:50] Waiting for 5 seconds... 

, , callback:


 async def poll_top_stories_for_comments(loop, session, period, limit): """         . """ iteration = 1 while True: log.info("Calculating comments for top {} stories. ({})".format( limit, iteration)) future = asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 await asyncio.sleep(period) 

, callback , future. (fetch) URLFetcher _get_comments_of_top stories future.


ترى؟ , , await .


callback-, API asyncio AbstractBaseLoop _call later _call at ,
- . , , poll_top_stories_for_comments :


 def poll_top_stories_for_comments(loop, session, period, limit, iteration=0): """     get_comments_of_top_stories. """ log.info("Calculating comments for top {} stories ({})".format( limit, iteration)) future = asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 loop.call_later( period, partial( # or call_at(loop.time() + period) poll_top_stories_for_comments, loop, session, period, limit, iteration ) ) if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: poll_top_stories_for_comments( loop, session, args.period, args.limit) loop.run_forever() loop.close() 

. :


  • , ensure_future .
    ( : )
  • _poll_top_stories_for comments , , _run forever , .

, , , — ? ? URL:


 MAXIMUM_FETCHES = 5 class URLFetcher(): """   URL     """ def __init__(self): self.fetch_counter = 0 async def fetch(self, session, url): """  URL   aiohttp,    JSON .     aiohttp    . """ with async_timeout.timeout(FETCH_TIMEOUT): self.fetch_counter += 1 if self.fetch_counter > MAXIMUM_FETCHES: raise Exception('BOOM!') async with session.get(url) as response: return await response.json() 

  [12:51:00] Calculating comments for top 5 stories. (1) [12:51:00] Waiting for 5 seconds… [12:51:01] Exception in callback poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion('BOOM!',)>) at 05_periodic_coroutines.py:121 handle: <Handle poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion('BOOM!',)>) at 05_periodic_coroutines.py:121> Traceback (most recent call last): File “/Users/yeray/.pyenv/versions/3.6.0/lib/python3.6/asyncio/events.py”, line 126, in _run self._callback(*self._args) File “05_periodic_coroutines.py”, line 122, in callback fetch_count = fut.result() File “05_periodic_coroutines.py”, line 100, in get_comments_of_top_stories results = await asyncio.gather(*tasks) File “05_periodic_coroutines.py”, line 69, in post_number_of_comments response = await fetcher.fetch(session, url) File “05_periodic_coroutines.py”, line 58, in fetch raise Exception('BOOM!') Exception: BOOM! [12:51:05] Calculating comments for top 5 stories. (2) [12:51:05] Waiting for 5 seconds… 

, ?


ماذا تفعل , , : asyncio :

Source: https://habr.com/ru/post/ar420129/


All Articles