Pola asyncio coroutine: menunggu di luar

Kata pengantar penerjemah:
Sekali lagi menginjak menyapu sambil bekerja dengan python asyncio, saya pergi ke Internet untuk menemukan sesuatu yang lebih menyenangkan daripada dokumentasi kering. Saya menemukan sebuah artikel oleh Yeray Diaz "Asyncio Coroutine Patterns: Beyond waiting" , di mana penulis dengan sangat menarik mempertimbangkan penggunaan asyncio dan berbagi beberapa trik. Karena saya tidak menemukan sesuatu yang lengkap dalam bahasa Rusia, saya memutuskan untuk menerjemahkannya.


Asyncio adalah mimpi kompetitif programmer python: Anda menulis kode yang berbatasan dengan sinkron dan membiarkan Python melakukan sisanya. Ini adalah impor lain dari perpustakaan anti-gravitasi: impor anti-gravitasi


Sebenarnya, ini sama sekali tidak benar, pemrograman bersamaan adalah kerja keras, dan sementara coroutine memungkinkan kita untuk menghindari panggilan balik neraka, yang dapat membawa Anda cukup jauh, Anda masih perlu berpikir tentang membuat tugas, mendapatkan hasil, dan menangkap pengecualian secara elegan. Menyedihkan.


Berita baiknya adalah semua ini dimungkinkan di asyncio. Berita buruknya adalah tidak selalu jelas dengan segera apa yang salah dan bagaimana cara memperbaikinya. Berikut adalah beberapa pola yang saya temukan saat bekerja dengan asyncio.



Sebelum kita mulai:


Saya menggunakan pustaka aiohttp favorit saya untuk mengeksekusi permintaan HTTP asinkron dan Hacker News API karena itu adalah situs sederhana dan terkenal yang mengikuti skenario penggunaan yang sudah umum. Mengikuti respons terhadap artikel saya sebelumnya , saya juga menggunakan sintaks async / waiting yang diperkenalkan dengan Python 3.5. Saya berasumsi bahwa pembaca sudah terbiasa dengan ide-ide yang dijelaskan di sini. Dan pada akhirnya, semua contoh tersedia di repositori GitHub dari artikel ini .


Ok, mari kita mulai!


Coroutine rekursif


Membuat dan menjalankan tugas sepele di asyncio. Untuk tugas-tugas tersebut, API mencakup beberapa metode di kelas AbstractEventLoop, serta fungsi di pustaka. Tetapi biasanya Anda ingin menggabungkan hasil dari tugas-tugas ini dan memprosesnya dengan beberapa cara, dan rekursi adalah contoh yang bagus dari skema ini, dan juga menunjukkan kesederhanaan coroutine dibandingkan dengan alat kompetitif lainnya.


Kasus umum untuk menggunakan asyncio adalah membuat semacam perayap web. Bayangkan kita terlalu sibuk untuk memeriksa HackerNews, atau mungkin Anda hanya menyukai holivar yang baik, jadi Anda ingin menerapkan sistem yang mengambil jumlah komentar untuk pos HN tertentu dan, jika di atas ambang batas, memberi tahu Anda. Anda mencari di Google sedikit dan menemukan dokumentasi untuk HN API, hanya apa yang Anda butuhkan, tetapi Anda perhatikan yang berikut dalam dokumentasi:


Ingin tahu jumlah total komentar artikel? Berkeliling pohon dan menghitungnya.

Panggilan diterima!


"""          ,       ,      . ,         Hacker News      """ import asyncio import argparse import logging from urllib.parse import urlparse, parse_qs from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the comments of a Hacker News post.') parser.add_argument('--id', type=int, default=8863, help='ID of the post in HN, defaults to 8863') parser.add_argument('--url', type=str, help='URL of a post in HN') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) fetch_counter = 0 async def fetch(session, url): """  URL   aiohttp,    JSON .     aiohttp    . """ global fetch_counter with async_timeout.timeout(FETCH_TIMEOUT): fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, post_id): """          . """ url = URL_TEMPLATE.format(post_id) now = datetime.now() response = await fetch(session, url) log.debug('{:^6} > Fetching of {} took {} seconds'.format( post_id, url, (datetime.now() - now).total_seconds())) if 'kids' not in response: #  .   return 0 #     ,    (  ) number_of_comments = len(response['kids']) #       log.debug('{:^6} > Fetching {} child posts'.format( post_id, number_of_comments)) tasks = [post_number_of_comments(loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments def id_from_HN_url(url): """   `id` URL       None. """ parse_result = urlparse(url) try: return parse_qs(parse_result.query)['id'][0] except (KeyError, IndexError): return None if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) loop.close() 

Jangan ragu untuk menjalankan skrip dengan flag "-verbose" untuk hasil yang lebih detail.


  [14:47:32] > Calculating comments took 2.23 seconds and 73 fetches [14:47:32] -- Post 8863 has 72 comments 

Mari kita lewati kode boilerplate dan langsung ke coroutine rekursif. Perhatikan bahwa kode ini dibaca hampir sepenuhnya seperti halnya dengan kode sinkron.


 async def post_number_of_comments(loop, session, post_id): """          . """ url = URL_TEMPLATE.format(post_id) now = datetime.now() response = await fetch(session, url) log.debug('{:^6} > Fetching of {} took {} seconds'.format( post_id, url, (datetime.now() - now).total_seconds())) if 'kids' not in response: # base case, there are no comments return 0 #     ,    (  ) number_of_comments = len(response['kids']) #       log.debug('{:^6} > Fetching {} child posts'.format( post_id, number_of_comments)) tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #          number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments 

  1. Pertama kita dapatkan JSON dengan data posting.
  2. Secara rekursif berkeliling setiap ahli waris.
  3. Pada akhirnya, kita akan mencapai kasus dasar dan mengembalikan nol,
    ketika pos tidak memiliki umpan balik.
  4. Ketika kembali dari kasing, tambahkan jawaban ke pos saat ini
    ke jumlah ahli waris dan kembali

Ini adalah contoh yang bagus dari apa yang digambarkan Brett Slatkin sebagai penggemar dan penggemar , kami penggemar untuk menerima data dari ahli waris dan fan-in merangkum data untuk menghitung jumlah komentar


Ada beberapa cara di API asyncio untuk melakukan operasi penggeledahan ini. Di sini saya menggunakan fungsi kumpulkan , yang secara efektif mengharapkan semua coroutine untuk melengkapi dan mengembalikan daftar hasil mereka.


Mari kita perhatikan bagaimana penggunaan coroutine juga sesuai dengan rekursi dengan satu titik di mana sejumlah coroutine hadir, menunggu jawaban atas permintaan mereka selama panggilan ke fungsi kumpul dan melanjutkan eksekusi setelah operasi I / O selesai. Ini memungkinkan kita untuk mengekspresikan perilaku yang agak rumit dalam satu corutin yang elegan dan mudah dibaca.


"Sangat sederhana," katamu? Oke, mari kita naik satu tingkat.


Ditembak dan lupa


Bayangkan Anda ingin mengirim sendiri pesan email dengan postingan yang memiliki lebih banyak komentar daripada nilai tertentu, dan Anda ingin melakukan ini dengan cara yang sama seperti ketika kita memutari pohon postingan. Kami cukup menambahkan pernyataan if di akhir fungsi rekursif untuk mencapai ini:


 async def post_number_of_comments(loop, session, post_id): """           . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: #  .   return 0 #       . number_of_comments = len(response['kids']) #       tasks = [post_number_of_comments(loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) #        if number_of_comments > MIN_COMMENTS: await log_post(response) return number_of_comments async def log_post(post): """   . """ await asyncio.sleep(random() * 3) log.info("Post logged") 

Ya, saya menggunakan asyncio.sleep. Ini yang terakhir. Aku janji.


 [09:41:02] Post logged [09:41:04] Post logged [09:41:06] Post logged [09:41:06] > Calculating comments took 6.35 seconds and 73 fetches [09:41:06] -- Post 8863 has 72 comments 

Ini jauh lebih lambat dari sebelumnya!
Alasannya adalah, seperti yang telah kita bahas sebelumnya, menunggu jeda eksekusi coroutine sampai masa depan selesai, tetapi karena kita tidak memerlukan hasil logging, tidak ada alasan nyata untuk melakukan ini.


Kita perlu "menembak dan melupakan" dengan coroutine kita, dan karena kita tidak bisa menunggu sampai selesai menggunakan menunggu, kita perlu cara lain untuk memulai coroutine tanpa menunggu. Melihat sekilas pada API asyncio akan menemukan fungsi sure_future yang akan menjadwalkan coroutine untuk dijalankan, membungkusnya dalam objek Task, dan mengembalikannya. Mengingat bahwa sebelum coroutine direncanakan, siklus peristiwa akan mengendalikan hasil coroutine kita di beberapa titik di masa depan, ketika coroutine lain akan berada dalam kondisi yang diharapkan.


Luar biasa, mari kita ganti menunggu log_post sebagai berikut:


 async def post_number_of_comments(loop, session, post_id): """           . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: # base case, there are no comments return 0 #       . number_of_comments = len(response['kids']) #       tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) #        if number_of_comments > MIN_COMMENTS: asyncio.ensure_future(log_post(response)) return number_of_comments 

 [09:42:57] > Calculating comments took 1.69 seconds and 73 fetches [09:42:57] -- Post 8863 has 72 comments [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1109197f8>()]>> [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919948>()]>> [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919978>()]>> 

Ahem, Tugas yang luar biasa telah dihancurkan tetapi sedang menunggu! menghantui pengguna asyncio di seluruh dunia. Kabar baiknya adalah bahwa kita kembali ke waktu yang kita terima sebelumnya (1,69 hal.), Kabar buruknya adalah bahwa asyncio tidak suka melampaui rentang tembakan-dan-lupakan.


Masalahnya adalah kita dengan paksa menutup loop acara setelah kita mendapatkan hasil dari postoutnumber_of_comments coroutine , tanpa meninggalkan tugas kita log_post waktu untuk menyelesaikan.


Kami memiliki dua opsi:
kami membiarkan loop acara bekerja tanpa henti menggunakan run_forever dan menghentikan skrip secara manual, atau kami menggunakan metode all_tasks dari kelas Tugas untuk menemukan semua tugas yang berjalan dan menunggu perhitungan jumlah komentar selesai.


Mari kita coba keluar dari situasi ini dengan cepat membuat perubahan setelah panggilan kita ke post_number_of_comments :


 if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) pending_tasks = [ task for task in asyncio.Task.all_tasks() if not task.done()] loop.run_until_complete(asyncio.gather(*pending_tasks)) loop.close() 

 [09:47:29] > Calculating comments took 1.72 seconds and 73 fetches [09:47:29]Post 8863 has 72 comments [09:47:30] Post logged [09:47:31] Post logged [09:47:32] Post logged 

Sekarang kami yakin bahwa tugas logging sudah selesai.
Asumsi bahwa metode all_tasks berfungsi dengan baik dalam kasus-kasus yang kita hadapi adalah ide bagus ketika tugas dilakukan dengan tepat di loop acara kami, tetapi dalam kasus yang lebih kompleks ada sejumlah tugas yang dapat dilakukan, sumber yang dapat ditemukan di luar kode kita .


Pendekatan lain adalah memulihkan ketertiban setelah kami secara mandiri mendaftarkan sepenuhnya semua coroutine yang kami rencanakan untuk diluncurkan dan memungkinkan untuk dieksekusi, yang ditunda sebelumnya,
segera setelah penghitungan komentar selesai. Seperti yang Anda ketahui, fungsi sure_future mengembalikan objek Tugas . Kita dapat menggunakan ini untuk mendaftarkan tugas kami dengan prioritas rendah. Mari kita mendefinisikan daftar task_registry dan menyimpan futures di dalamnya:


 async def post_number_of_comments(loop, session, post_id): """Retrieve data for current post and recursively for all comments. """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: # base case, there are no comments return 0 # calculate this post's comments as number of comments number_of_comments = len(response['kids']) # create recursive tasks for all comments tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] # schedule the tasks and retrieve results results = await asyncio.gather(*tasks) # reduce the descendents comments and add it to this post's number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) # Log if number of comments is over a threshold if number_of_comments > MIN_COMMENTS: # Add the future to the registry task_registry.append(asyncio.ensure_future(log_post(response))) return number_of_comments # (... ommitted code ...) # if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() task_registry = [] # define our task registry with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) pending_tasks = [task for task in task_registry if not task.done()] loop.run_until_complete(asyncio.gather(*pending_tasks)) loop.close() 

 [09:53:46] > Calculating comments took 1.68 seconds and 73 fetches [09:53:46]Post 8863 has 72 comments [09:53:46] Post logged [09:53:48] Post logged [09:53:49] Post logged 

Kita belajar pelajaran berikut - asyncio tidak boleh dilihat sebagai antrian tugas terdistribusi seperti Celery . Semua tugas diluncurkan dalam satu utas dan siklus acara harus dikelola sesuai, memungkinkan Anda untuk mengalokasikan waktu untuk menyelesaikan tugas.


Yang mengarah ke pola lain yang diterima secara umum:


Coroutine yang dipicu secara berkala


Melanjutkan dengan contoh kami tentang HN (dan kami melakukan pekerjaan yang baik sebelumnya), kami memutuskan
yang sangat penting untuk menghitung jumlah komentar pada publikasi HN segera setelah tersedia dan saat mereka berada dalam daftar 5 entri terbaru.


Pandangan cepat pada HN API menunjukkan titik akhir yang mengembalikan 500 catatan terakhir. Hebat, jadi kita bisa polling titik akhir ini untuk menerima publikasi baru dan menghitung jumlah komentar pada mereka, katakan setiap lima detik.


Nah, karena kita sekarang pindah ke polling periodik, kita bisa menggunakan loop infinite while , menunggu tugas polling selesai (panggilan tunggu ), dan tertidur (call sleep ) untuk jumlah waktu yang diperlukan. Saya membuat beberapa perubahan kecil untuk mendapatkan entri teratas daripada menggunakan URL posting langsung.


 """ An example of periodically scheduling coroutines using an infinite loop of awaiting and sleeping. """ import asyncio import argparse import logging from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.') parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll') parser.add_argument('--limit', type=int, default=5,help='Number of new stories to calculate comments for') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) fetch_counter = 0 async def fetch(session, url): """  URL   aiohttp,    JSON .     aiohttp    . """ global fetch_counter with async_timeout.timeout(FETCH_TIMEOUT): fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, post_id): """          . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: # base case, there are no comments return 0 #     ,    (  ) number_of_comments = len(response['kids']) #       tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments async def get_comments_of_top_stories(loop, session, limit, iteration): """     HN. """ response = await fetch(session, TOP_STORIES_URL) tasks = [post_number_of_comments(loop, session, post_id) for post_id in response[:limit]] results = await asyncio.gather(*tasks) for post_id, num_comments in zip(response[:limit], results): log.info("Post {} has {} comments ({})".format(post_id, num_comments, iteration)) async def poll_top_stories_for_comments(loop, session, period, limit): """         . """ global fetch_counter iteration = 1 while True: now = datetime.now() log.info("Calculating comments for top {} stories. ({})".format(limit, iteration)) await get_comments_of_top_stories(loop, session, limit, iteration) log.info('> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("Waiting for {} seconds...".format(period)) iteration += 1 fetch_counter = 0 await asyncio.sleep(period) if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: loop.run_until_complete( poll_top_stories_for_comments( loop, session, args.period, args.limit)) loop.close() 

 [10:14:03] Calculating comments for top 5 stories. (1) [10:14:06] Post 13848196 has 31 comments (1) [10:14:06] Post 13849430 has 37 comments (1) [10:14:06] Post 13849037 has 15 comments (1) [10:14:06] Post 13845337 has 128 comments (1) [10:14:06] Post 13847465 has 27 comments (1) [10:14:06] > Calculating comments took 2.96 seconds and 244 fetches [10:14:06] Waiting for 5 seconds… [10:14:11] Calculating comments for top 5 stories. (2) [10:14:14] Post 13848196 has 31 comments (2) [10:14:14] Post 13849430 has 37 comments (2) [10:14:14] Post 13849037 has 15 comments (2) [10:14:14] Post 13845337 has 128 comments (2) [10:14:14] Post 13847465 has 27 comments (2) [10:14:14] > Calculating comments took 3.04 seconds and 244 fetches [10:14:14] Waiting for 5 seconds… 

Hebat, tapi ada masalah kecil: jika Anda memperhatikan cap waktu,
maka tugas tidak dimulai secara ketat setiap 5 detik, itu dimulai 5 detik setelah get_comments_of_top_stories selesai . Lagi-lagi konsekuensi dari penggunaan menunggu dan memblokir sampai kami mendapatkan hasil kami kembali.
Fitur-fitur ini tidak menimbulkan masalah ketika tugas membutuhkan waktu lebih dari lima detik. Juga, tampaknya keliru menggunakan _run_until lengkap ketika coroutine dirancang untuk menjadi tak terbatas.


Kabar baiknya adalah bahwa kita sekarang adalah pakar di sure_future , dan kita bisa memasukkannya ke dalam kode alih-alih menggunakan menunggu ...


 async def poll_top_stories_for_comments(loop, session, period, limit): """         . """ global fetch_counter iteration = 1 while True: now = datetime.now() log.info("Calculating comments for top {} stories. ({})".format( limit, iteration)) asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("Waiting for {} seconds...".format(period)) iteration += 1 fetch_counter = 0 await asyncio.sleep(period) 

  [10:55:40] Calculating comments for top 5 stories. (1) [10:55:40] > Calculating comments took 0.00 seconds and 0 fetches [10:55:40] Waiting for 5 seconds… [10:55:43] Post 13848196 has 32 comments (1) [10:55:43] Post 13849430 has 48 comments (1) [10:55:43] Post 13849037 has 16 comments (1) [10:55:43] Post 13845337 has 129 comments (1) [10:55:43] Post 13847465 has 29 comments (1) [10:55:45] Calculating comments for top 5 stories. (2) [10:55:45] > Calculating comments took 0.00 seconds and 260 fetches [10:55:45] Waiting for 5 seconds… [10:55:48] Post 13848196 has 32 comments (2) [10:55:48] Post 13849430 has 48 comments (2) [10:55:48] Post 13849037 has 16 comments (2) [10:55:48] Post 13845337 has 129 comments (2) [10:55:48] Post 13847465 has 29 comments (2) 

Ahem ... Oke, kabar baiknya adalah bahwa stempel waktu terletak tepat lima detik kemudian, tapi apa yang ada dalam 0,00 detik dan tidak ada sampel. Kemudian iterasi berikutnya membutuhkan nol detik dan 260 sampel?


Ini adalah salah satu konsekuensi dari menghindari menunggu, sekarang kita tidak lagi memblokir coroutine dan hanya pergi ke baris berikutnya, yang mencetak nol detik dan, untuk pertama kalinya, nol pesan diambil. Ini adalah tugas yang cukup kecil, karena kita dapat hidup tanpa pesan, tetapi bagaimana jika kita membutuhkan hasil tugas?


Kemudian, teman saya, kita perlu menggunakan ... panggilan balik (ngeri (())


Saya tahu, saya tahu, inti dari coroutine adalah untuk menghindari panggilan balik, tetapi itu karena subtitle dramatis dari artikel ini adalah "Di luar menunggu". Kami tidak lagi berada di wilayah menunggu , kami memiliki petualangan dengan peluncuran tugas secara manual, yang mengarah ke kasus penggunaan kami. Apa yang ini berikan padamu? spoiler


Seperti yang telah kita bahas sebelumnya, sure_future mengembalikan objek Masa Depan yang dapat kita tambahkan callback menggunakan _add_done callback .


Sebelum kita melakukan ini, dan untuk mendapatkan penghitungan pengambilan yang benar, kita sampai pada titik bahwa kita perlu merangkum ekstraksi coroutine kita ke dalam kelas URLFetcher . Dalam hal ini, kami membuat contoh untuk setiap tugas sehingga kami memiliki jumlah sampel yang benar. Kami juga menghapus variabel global yang memperkenalkan bug:


 """               ensure_future   sleep.      future,    ensure_future,         ,    URLFetcher   . """ import asyncio import argparse import logging from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.') parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll') parser.add_argument('--limit', type=int, default=5, help='Number of new stories to calculate comments for') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) class URLFetcher(): """   URL     """ def __init__(self): self.fetch_counter = 0 async def fetch(self, session, url): """  URL   aiohttp,    JSON .     aiohttp    . """ with async_timeout.timeout(FETCH_TIMEOUT): self.fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, fetcher, post_id): """          . """ url = URL_TEMPLATE.format(post_id) response = await fetcher.fetch(session, url) #  .  . if response is None or 'kids' not in response: return 0 #     ,    (  ) number_of_comments = len(response['kids']) #       tasks = [post_number_of_comments( loop, session, fetcher, kid_id) for kid_id in response['kids']] # s     results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments async def get_comments_of_top_stories(loop, session, limit, iteration): """    HN. """ fetcher = URLFetcher() # create a new fetcher for this task response = await fetcher.fetch(session, TOP_STORIES_URL) tasks = [post_number_of_comments( loop, session, fetcher, post_id) for post_id in response[:limit]] results = await asyncio.gather(*tasks) for post_id, num_comments in zip(response[:limit], results): log.info("Post {} has {} comments ({})".format( post_id, num_comments, iteration)) return fetcher.fetch_counter # return the fetch count async def poll_top_stories_for_comments(loop, session, period, limit): """         . """ iteration = 1 while True: log.info("Calculating comments for top {} stories. ({})".format(limit, iteration)) future = asyncio.ensure_future(get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info('> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 await asyncio.sleep(period) if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: loop.run_until_complete( poll_top_stories_for_comments( loop, session, args.period, args.limit)) loop.close() 

  [12:23:40] Calculating comments for top 5 stories. (1) [12:23:40] Waiting for 5 seconds... [12:23:43] Post 13848196 has 38 comments (1) [12:23:43] Post 13849430 has 72 comments (1) [12:23:43] Post 13849037 has 19 comments (1) [12:23:43] Post 13848283 has 64 comments (1) [12:23:43] Post 13847465 has 34 comments (1) [12:23:43] > Calculating comments took 3.17 seconds and 233 fetches [12:23:45] Calculating comments for top 5 stories. (2) [12:23:45] Waiting for 5 seconds... [12:23:47] Post 13848196 has 38 comments (2) [12:23:47] Post 13849430 has 72 comments (2) [12:23:47] Post 13849037 has 19 comments (2) [12:23:47] Post 13848283 has 64 comments (2) [12:23:47] Post 13847465 has 34 comments (2) [12:23:47] > Calculating comments took 2.47 seconds and 233 fetches [12:23:50] Calculating comments for top 5 stories. (3) [12:23:50] Waiting for 5 seconds... 

, , callback:


 async def poll_top_stories_for_comments(loop, session, period, limit): """         . """ iteration = 1 while True: log.info("Calculating comments for top {} stories. ({})".format( limit, iteration)) future = asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 await asyncio.sleep(period) 

, callback , future. (fetch) URLFetcher _get_comments_of_top stories future.


Lihat? , , await .


callback-, API asyncio AbstractBaseLoop _call later _call at ,
- . , , poll_top_stories_for_comments :


 def poll_top_stories_for_comments(loop, session, period, limit, iteration=0): """     get_comments_of_top_stories. """ log.info("Calculating comments for top {} stories ({})".format( limit, iteration)) future = asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 loop.call_later( period, partial( # or call_at(loop.time() + period) poll_top_stories_for_comments, loop, session, period, limit, iteration ) ) if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: poll_top_stories_for_comments( loop, session, args.period, args.limit) loop.run_forever() loop.close() 

. :


  • , ensure_future .
    ( : )
  • _poll_top_stories_for comments , , _run forever , .

, , , — ? ? URL:


 MAXIMUM_FETCHES = 5 class URLFetcher(): """   URL     """ def __init__(self): self.fetch_counter = 0 async def fetch(self, session, url): """  URL   aiohttp,    JSON .     aiohttp    . """ with async_timeout.timeout(FETCH_TIMEOUT): self.fetch_counter += 1 if self.fetch_counter > MAXIMUM_FETCHES: raise Exception('BOOM!') async with session.get(url) as response: return await response.json() 

  [12:51:00] Calculating comments for top 5 stories. (1) [12:51:00] Waiting for 5 seconds… [12:51:01] Exception in callback poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion('BOOM!',)>) at 05_periodic_coroutines.py:121 handle: <Handle poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion('BOOM!',)>) at 05_periodic_coroutines.py:121> Traceback (most recent call last): File “/Users/yeray/.pyenv/versions/3.6.0/lib/python3.6/asyncio/events.py”, line 126, in _run self._callback(*self._args) File “05_periodic_coroutines.py”, line 122, in callback fetch_count = fut.result() File “05_periodic_coroutines.py”, line 100, in get_comments_of_top_stories results = await asyncio.gather(*tasks) File “05_periodic_coroutines.py”, line 69, in post_number_of_comments response = await fetcher.fetch(session, url) File “05_periodic_coroutines.py”, line 58, in fetch raise Exception('BOOM!') Exception: BOOM! [12:51:05] Calculating comments for top 5 stories. (2) [12:51:05] Waiting for 5 seconds… 

, ?


Apa yang harus dilakukan , , : asyncio :

Source: https://habr.com/ru/post/id420129/


All Articles