Vorwort des Übersetzers:
Während ich mit Python Asyncio arbeitete, trat ich erneut auf einen Rechen und ging ins Internet, um etwas Angenehmeres als trockene Dokumentation zu finden. Ich bin auf einen Artikel von Yeray Diaz "Asyncio Coroutine Patterns: Beyond await" gestoßen , in dem der Autor die Verwendung von Asyncio sehr interessant betrachtet und einige Tricks teilt. Da ich auf Russisch nichts so vollständiges fand, beschloss ich, es zu übersetzen.
Asyncio ist der Wettbewerbstraum eines Python-Programmierers: Sie schreiben Code, der an Synchronität grenzt, und lassen Python den Rest erledigen. Dies ist ein weiterer Import der Antigravitationsbibliothek: Import der Antigravitation
Tatsächlich ist dies überhaupt nicht der Fall, die gleichzeitige Programmierung ist harte Arbeit, und während Coroutinen es uns ermöglichen, die Rückrufhölle zu vermeiden, die Sie weit genug bringen kann, müssen Sie immer noch darüber nachdenken, Aufgaben zu erstellen, Ergebnisse zu erzielen und Ausnahmen elegant abzufangen. Es ist traurig.
Die gute Nachricht ist, dass all dies in Asyncio möglich ist. Die schlechte Nachricht ist, dass nicht immer sofort klar ist, was falsch ist und wie es behoben werden kann. Im Folgenden sind einige Muster aufgeführt, die ich bei der Arbeit mit Asyncio entdeckt habe.
Bevor wir beginnen:
Ich habe meine bevorzugte aiohttp-Bibliothek verwendet, um asynchrone HTTP-Anforderungen und die Hacker News-API auszuführen, da es sich um eine einfache und bekannte Site handelt, die einem bekannten Verwendungsszenario folgt. Nach der Antwort auf meinen vorherigen Artikel habe ich auch die in Python 3.5 eingeführte Async / Await-Syntax verwendet. Ich nahm an, dass der Leser mit den hier beschriebenen Ideen vertraut war. Und letztendlich sind alle Beispiele im GitHub-Repository dieses Artikels verfügbar.
Ok, lass uns anfangen!
Rekursive Coroutinen
Das Erstellen und Ausführen von Aufgaben ist in asyncio trivial. Für solche Aufgaben enthält die API mehrere Methoden in der AbstractEventLoop-Klasse sowie Funktionen in der Bibliothek. Normalerweise möchten Sie die Ergebnisse dieser Aufgaben kombinieren und auf irgendeine Weise verarbeiten. Die Rekursion ist ein hervorragendes Beispiel für dieses Schema und zeigt auch die Einfachheit der Coroutine im Vergleich zu anderen wettbewerbsfähigen Tools.
Ein häufiger Fall für die Verwendung von Asyncio ist das Erstellen einer Art Webcrawler. Stellen Sie sich vor, wir sind einfach zu beschäftigt, um nach HackerNews zu suchen, oder Sie mögen einfach nur einen guten Holivar. Sie möchten also ein System implementieren, das die Anzahl der Kommentare für einen bestimmten HN-Beitrag abruft und Sie benachrichtigt, wenn dieser über dem Schwellenwert liegt. Sie haben ein bisschen gegoogelt und die Dokumentation für die HN-API gefunden, genau das, was Sie brauchen, aber Sie haben Folgendes in der Dokumentation bemerkt:
Möchten Sie die Gesamtzahl der Artikelkommentare erfahren? Gehen Sie um den Baum herum und zählen Sie sie.
Herausforderung angenommen!
""" , , . , Hacker News """ import asyncio import argparse import logging from urllib.parse import urlparse, parse_qs from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the comments of a Hacker News post.') parser.add_argument('--id', type=int, default=8863, help='ID of the post in HN, defaults to 8863') parser.add_argument('--url', type=str, help='URL of a post in HN') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) fetch_counter = 0 async def fetch(session, url): """ URL aiohttp, JSON . aiohttp . """ global fetch_counter with async_timeout.timeout(FETCH_TIMEOUT): fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, post_id): """ . """ url = URL_TEMPLATE.format(post_id) now = datetime.now() response = await fetch(session, url) log.debug('{:^6} > Fetching of {} took {} seconds'.format( post_id, url, (datetime.now() - now).total_seconds())) if 'kids' not in response:
Versuchen Sie, das Skript mit dem Flag "-verbose" auszuführen, um eine detailliertere Ausgabe zu erhalten.
[14:47:32] > Calculating comments took 2.23 seconds and 73 fetches [14:47:32] -- Post 8863 has 72 comments
Lassen Sie uns den Boilerplate-Code überspringen und direkt zur rekursiven Coroutine gehen. Beachten Sie, dass dieser Code fast vollständig gelesen wird, wie dies beim synchronen Code der Fall wäre.
async def post_number_of_comments(loop, session, post_id): """ . """ url = URL_TEMPLATE.format(post_id) now = datetime.now() response = await fetch(session, url) log.debug('{:^6} > Fetching of {} took {} seconds'.format( post_id, url, (datetime.now() - now).total_seconds())) if 'kids' not in response:
- Zuerst erhalten wir JSON mit den Postdaten.
- Gehen Sie rekursiv um jeden der Erben herum.
- Am Ende werden wir den Basisfall erreichen und Null zurückgeben,
wenn der Beitrag kein Feedback hat. - Wenn Sie vom Basisfall zurückkehren, fügen Sie die Antworten zum aktuellen Beitrag hinzu
auf die Anzahl der Erben und zurück
Dies ist ein großartiges Beispiel für das, was Brett Slatkin als Fan-In und Fan-Out beschreibt . Wir sind Fan-Out, um Daten von den Erben zu erhalten, und Fan-In fasst die Daten zusammen, um die Anzahl der Kommentare zu berechnen
Die Asyncio-API bietet verschiedene Möglichkeiten, um diese Fan-Out-Vorgänge auszuführen. Hier verwende ich die Sammelfunktion , die effektiv erwartet, dass alle Coroutinen eine Liste ihrer Ergebnisse vervollständigen und zurückgeben.
Lassen Sie uns darauf achten, wie die Verwendung von Coroutine auch der Rekursion mit einem beliebigen Punkt entspricht, an dem eine beliebige Anzahl von Coroutinen vorhanden ist. Warten Sie auf Antworten auf ihre Anforderungen während des Aufrufs der Erfassungsfunktion und setzen Sie die Ausführung fort, nachdem der E / A-Vorgang abgeschlossen ist. Dies ermöglicht es uns, ziemlich komplexes Verhalten in einem eleganten und (leicht) lesbaren Corutin auszudrücken.
"Sehr einfach", sagst du? Okay, lass uns eine Stufe höher gehen.
Geschossen und vergessen
Stellen Sie sich vor, Sie möchten sich eine E-Mail-Nachricht mit Posts senden, die mehr Kommentare als einen bestimmten Wert enthalten, und Sie möchten dies auf die gleiche Weise tun, wie wir den Posts-Baum umrundet haben. Wir können einfach die if-Anweisung am Ende der rekursiven Funktion hinzufügen, um dies zu erreichen:
async def post_number_of_comments(loop, session, post_id): """ . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response:
Ja, ich habe asyncio.sleep verwendet. Dies ist das letzte Mal. Ich verspreche es.
[09:41:02] Post logged [09:41:04] Post logged [09:41:06] Post logged [09:41:06] > Calculating comments took 6.35 seconds and 73 fetches [09:41:06] -- Post 8863 has 72 comments
Dies ist deutlich langsamer als zuvor!
Der Grund dafür ist, dass, wie bereits erwähnt, die Coroutine-Ausführung angehalten wird, bis die Zukunft abgeschlossen ist. Da wir jedoch das Ergebnis der Protokollierung nicht benötigen, gibt es keinen wirklichen Grund, dies zu tun.
Wir müssen mit unserer Coroutine „schießen und vergessen“, und da wir nicht warten können, bis die Verwendung von await beendet ist, brauchen wir einen anderen Weg, um Coroutine zu starten, ohne zu warten. Bei einem kurzen Blick auf die Asyncio-API wird die Funktion sure_future gefunden , mit der die Ausführung der Coroutine geplant, in ein Task-Objekt eingeschlossen und zurückgegeben wird. Denken Sie daran, dass vor der Planung von Coroutine ein Zyklus von Ereignissen das Ergebnis unserer Coroutine zu einem späteren Zeitpunkt steuern wird, wenn sich eine andere Coroutine in einem Erwartungszustand befindet.
Großartig, lassen Sie uns warte log_post wie folgt ersetzen:
async def post_number_of_comments(loop, session, post_id): """ . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response:
[09:42:57] > Calculating comments took 1.69 seconds and 73 fetches [09:42:57] -- Post 8863 has 72 comments [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1109197f8>()]>> [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919948>()]>> [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919978>()]>>
Ähm, die großartige Aufgabe wurde zerstört, aber sie steht noch aus! Asyncio-Benutzer auf der ganzen Welt verfolgen. Die gute Nachricht ist, dass wir zu der Zeit zurückkehren, die wir zuvor erhalten haben (1,69 S.). Die schlechte Nachricht ist, dass Asyncio nicht gerne über den Schuss-und-Vergessen-Bereich hinausgeht.
Das Problem ist, dass wir die Ereignisschleife zwangsweise schließen, nachdem wir das Ergebnis der Coroutine post_number_of_comments erhalten haben, ohne dass unsere Task log_post Zeit zum Abschließen lässt .
Wir haben zwei Möglichkeiten:
Entweder lassen wir die Ereignisschleife endlos mit run_forever arbeiten und beenden das Skript manuell, oder wir verwenden die all_tasks- Methode der Task-Klasse, um alle Arbeitsaufgaben zu finden und warten, bis die Berechnung der Anzahl der Kommentare abgeschlossen ist.
Versuchen wir, aus dieser Situation herauszukommen, indem wir nach unserem Aufruf von post_number_of_comments schnell Änderungen vornehmen :
if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) pending_tasks = [ task for task in asyncio.Task.all_tasks() if not task.done()] loop.run_until_complete(asyncio.gather(*pending_tasks)) loop.close()
[09:47:29] > Calculating comments took 1.72 seconds and 73 fetches [09:47:29] — Post 8863 has 72 comments [09:47:30] Post logged [09:47:31] Post logged [09:47:32] Post logged
Jetzt sind wir sicher, dass die Protokollierungsaufgaben abgeschlossen sind.
Die Annahme, dass die all_tasks- Methode in den Fällen, mit denen wir uns befassen, einwandfrei funktioniert , ist eine gute Idee, wenn Aufgaben in unserer Ereignisschleife ordnungsgemäß ausgeführt werden. In komplexeren Fällen kann jedoch eine beliebige Anzahl von Aufgaben ausgeführt werden, deren Quelle sich außerhalb unseres Codes befindet .
Ein anderer Ansatz besteht darin, die Ordnung wiederherzustellen, nachdem wir absolut alle Coroutinen, die wir starten wollten, unabhängig registriert und ausführen lassen, was früher verschoben wurde.
sobald die Auszählung der Kommentare abgeschlossen ist. Wie Sie wissen, gibt die Funktion sure_future ein Task- Objekt zurück. Damit können wir unsere Tasks mit niedriger Priorität registrieren. Definieren wir einfach eine task_registry- Liste und speichern darin Futures:
async def post_number_of_comments(loop, session, post_id): """Retrieve data for current post and recursively for all comments. """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response:
[09:53:46] > Calculating comments took 1.68 seconds and 73 fetches [09:53:46] — Post 8863 has 72 comments [09:53:46] Post logged [09:53:48] Post logged [09:53:49] Post logged
Wir lernen die folgende Lektion: Asyncio sollte nicht als verteilte Aufgabenwarteschlange wie Sellerie angesehen werden . Alle Aufgaben werden in einem Thread gestartet, und der Ereigniszyklus muss entsprechend verwaltet werden, damit Sie Zeit für die Erledigung von Aufgaben zuweisen können.
Was zu einem anderen allgemein akzeptierten Muster führt:
Periodisch ausgelöste Coroutinen
Wir haben unser Beispiel zu HN fortgesetzt (und wir haben früher großartige Arbeit geleistet) und beschlossen
Dies ist von entscheidender Bedeutung, um die Anzahl der Kommentare zur HN-Veröffentlichung zu berechnen, sobald sie verfügbar sind und sich in der Liste der 5 letzten Einträge befinden.
Ein kurzer Blick auf die HN-API zeigt einen Endpunkt, der die letzten 500 Datensätze zurückgibt. Großartig, also können wir diesen Endpunkt einfach abfragen, um neue Veröffentlichungen zu erhalten und die Anzahl der Kommentare zu ihnen zu berechnen, beispielsweise alle fünf Sekunden.
Nun, da wir jetzt zur periodischen Abfrage übergehen, können wir einfach die Endlos- while- Schleife verwenden, warten, bis die Abrufaufgabe abgeschlossen ist (Anruf warten ) und für die erforderliche Zeit einschlafen (Anruf schlafen ). Ich habe ein paar kleinere Änderungen vorgenommen, um Top-Einträge zu erhalten, anstatt die direkte Post-URL zu verwenden.
""" An example of periodically scheduling coroutines using an infinite loop of awaiting and sleeping. """ import asyncio import argparse import logging from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.') parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll') parser.add_argument('--limit', type=int, default=5,help='Number of new stories to calculate comments for') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) fetch_counter = 0 async def fetch(session, url): """ URL aiohttp, JSON . aiohttp . """ global fetch_counter with async_timeout.timeout(FETCH_TIMEOUT): fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, post_id): """ . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response:
[10:14:03] Calculating comments for top 5 stories. (1) [10:14:06] Post 13848196 has 31 comments (1) [10:14:06] Post 13849430 has 37 comments (1) [10:14:06] Post 13849037 has 15 comments (1) [10:14:06] Post 13845337 has 128 comments (1) [10:14:06] Post 13847465 has 27 comments (1) [10:14:06] > Calculating comments took 2.96 seconds and 244 fetches [10:14:06] Waiting for 5 seconds… [10:14:11] Calculating comments for top 5 stories. (2) [10:14:14] Post 13848196 has 31 comments (2) [10:14:14] Post 13849430 has 37 comments (2) [10:14:14] Post 13849037 has 15 comments (2) [10:14:14] Post 13845337 has 128 comments (2) [10:14:14] Post 13847465 has 27 comments (2) [10:14:14] > Calculating comments took 3.04 seconds and 244 fetches [10:14:14] Waiting for 5 seconds…
Großartig, aber es gibt ein kleines Problem: Wenn Sie auf den Zeitstempel geachtet haben,
Dann wird die Aufgabe nicht alle 5 Sekunden gestartet, sondern 5 Sekunden nach Abschluss der get_comments_of_top_stories . Wieder die Konsequenzen der Verwendung warten und blockieren, bis wir unsere Ergebnisse zurückbekommen.
Diese Funktionen stellen kein Problem dar, wenn die Aufgabe länger als fünf Sekunden dauert. Außerdem scheint es falsch zu sein, _run_until complete zu verwenden, wenn die Coroutine so ausgelegt ist, dass sie unendlich ist.
Die gute Nachricht ist, dass wir jetzt Experten für sure_future sind und es einfach in den Code verschieben können, anstatt wait zu verwenden ...
async def poll_top_stories_for_comments(loop, session, period, limit): """ . """ global fetch_counter iteration = 1 while True: now = datetime.now() log.info("Calculating comments for top {} stories. ({})".format( limit, iteration)) asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("Waiting for {} seconds...".format(period)) iteration += 1 fetch_counter = 0 await asyncio.sleep(period)
[10:55:40] Calculating comments for top 5 stories. (1) [10:55:40] > Calculating comments took 0.00 seconds and 0 fetches [10:55:40] Waiting for 5 seconds… [10:55:43] Post 13848196 has 32 comments (1) [10:55:43] Post 13849430 has 48 comments (1) [10:55:43] Post 13849037 has 16 comments (1) [10:55:43] Post 13845337 has 129 comments (1) [10:55:43] Post 13847465 has 29 comments (1) [10:55:45] Calculating comments for top 5 stories. (2) [10:55:45] > Calculating comments took 0.00 seconds and 260 fetches [10:55:45] Waiting for 5 seconds… [10:55:48] Post 13848196 has 32 comments (2) [10:55:48] Post 13849430 has 48 comments (2) [10:55:48] Post 13849037 has 16 comments (2) [10:55:48] Post 13845337 has 129 comments (2) [10:55:48] Post 13847465 has 29 comments (2)
Ähm ... Okay, die gute Nachricht ist, dass der Zeitstempel genau fünf Sekunden später liegt, aber was gibt es in 0,00 Sekunden und ohne Samples. Und dann dauert die nächste Iteration null Sekunden und 260 Samples?
Dies ist eine der Konsequenzen der Vermeidung von Wartezeiten. Jetzt blockieren wir Coroutine nicht mehr und gehen einfach zur nächsten Zeile, in der null Sekunden gedruckt und zum ersten Mal keine Nachrichten abgerufen werden. Dies sind ziemlich kleine Aufgaben, da wir ohne Nachrichten leben können, aber was ist, wenn wir Ergebnisse von Aufgaben benötigen?
Dann, mein Freund, müssen wir auf ... Rückrufe zurückgreifen (cringing ((())
Ich weiß, ich weiß, der springende Punkt bei Coroutine ist, Rückrufe zu vermeiden, aber das liegt daran, dass der dramatische Untertitel des Artikels "Außerhalb des Wartens" lautet. Wir warten nicht mehr auf Territorium, wir haben Abenteuer mit dem manuellen Starten von Aufgaben, was zu unserem Anwendungsfall führt. Was gibt dir das? Spoiler
Wie bereits erwähnt, gibt sure_future ein Future- Objekt zurück, zu dem wir mithilfe des Rückrufs _add_done einen Rückruf hinzufügen können.
Bevor wir dies tun und um die richtigen Abrufe zu erhalten, kommen wir zu dem Punkt, an dem wir unsere Extraktionskoroutine in die URLFetcher- Klasse einkapseln müssen . In diesem Fall erstellen wir für jede Aufgabe eine Instanz, damit wir die richtige Anzahl von Stichproben haben. Wir entfernen auch die globale Variable, die den Fehler trotzdem verursacht hat:
""" ensure_future sleep. future, ensure_future, , URLFetcher . """ import asyncio import argparse import logging from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.') parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll') parser.add_argument('--limit', type=int, default=5, help='Number of new stories to calculate comments for') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) class URLFetcher(): """ URL """ def __init__(self): self.fetch_counter = 0 async def fetch(self, session, url): """ URL aiohttp, JSON . aiohttp . """ with async_timeout.timeout(FETCH_TIMEOUT): self.fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, fetcher, post_id): """ . """ url = URL_TEMPLATE.format(post_id) response = await fetcher.fetch(session, url)
[12:23:40] Calculating comments for top 5 stories. (1) [12:23:40] Waiting for 5 seconds... [12:23:43] Post 13848196 has 38 comments (1) [12:23:43] Post 13849430 has 72 comments (1) [12:23:43] Post 13849037 has 19 comments (1) [12:23:43] Post 13848283 has 64 comments (1) [12:23:43] Post 13847465 has 34 comments (1) [12:23:43] > Calculating comments took 3.17 seconds and 233 fetches [12:23:45] Calculating comments for top 5 stories. (2) [12:23:45] Waiting for 5 seconds... [12:23:47] Post 13848196 has 38 comments (2) [12:23:47] Post 13849430 has 72 comments (2) [12:23:47] Post 13849037 has 19 comments (2) [12:23:47] Post 13848283 has 64 comments (2) [12:23:47] Post 13847465 has 34 comments (2) [12:23:47] > Calculating comments took 2.47 seconds and 233 fetches [12:23:50] Calculating comments for top 5 stories. (3) [12:23:50] Waiting for 5 seconds...
, , callback:
async def poll_top_stories_for_comments(loop, session, period, limit): """ . """ iteration = 1 while True: log.info("Calculating comments for top {} stories. ({})".format( limit, iteration)) future = asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 await asyncio.sleep(period)
, callback , future. (fetch) URLFetcher _get_comments_of_top stories future.
Sehen Sie? , , await .
callback-, API asyncio AbstractBaseLoop _call later _call at ,
- . , , poll_top_stories_for_comments :
def poll_top_stories_for_comments(loop, session, period, limit, iteration=0): """ get_comments_of_top_stories. """ log.info("Calculating comments for top {} stories ({})".format( limit, iteration)) future = asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 loop.call_later( period, partial(
. :
- , ensure_future .
( : ) - _poll_top_stories_for comments , , _run forever , .
, , , — ? ? URL:
MAXIMUM_FETCHES = 5 class URLFetcher(): """ URL """ def __init__(self): self.fetch_counter = 0 async def fetch(self, session, url): """ URL aiohttp, JSON . aiohttp . """ with async_timeout.timeout(FETCH_TIMEOUT): self.fetch_counter += 1 if self.fetch_counter > MAXIMUM_FETCHES: raise Exception('BOOM!') async with session.get(url) as response: return await response.json()
[12:51:00] Calculating comments for top 5 stories. (1) [12:51:00] Waiting for 5 seconds… [12:51:01] Exception in callback poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion('BOOM!',)>) at 05_periodic_coroutines.py:121 handle: <Handle poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion('BOOM!',)>) at 05_periodic_coroutines.py:121> Traceback (most recent call last): File “/Users/yeray/.pyenv/versions/3.6.0/lib/python3.6/asyncio/events.py”, line 126, in _run self._callback(*self._args) File “05_periodic_coroutines.py”, line 122, in callback fetch_count = fut.result() File “05_periodic_coroutines.py”, line 100, in get_comments_of_top_stories results = await asyncio.gather(*tasks) File “05_periodic_coroutines.py”, line 69, in post_number_of_comments response = await fetcher.fetch(session, url) File “05_periodic_coroutines.py”, line 58, in fetch raise Exception('BOOM!') Exception: BOOM! [12:51:05] Calculating comments for top 5 stories. (2) [12:51:05] Waiting for 5 seconds…
, ?
? , , : asyncio :