Modèles de coroutine Asyncio: l'extérieur attend

Avant-propos du traducteur:
Une fois de plus sur un râteau tout en travaillant avec python asyncio, je suis allé sur Internet pour trouver quelque chose de plus agréable qu'une documentation sèche. Je suis tombé sur un article de Yeray Diaz "Asyncio Coroutine Patterns: Beyond wait" , dans lequel l'auteur envisage de manière très intéressante l'utilisation de l'asyncio et partage quelques astuces. Comme je n'ai rien trouvé d'aussi complet en russe, j'ai décidé de le traduire.


Asyncio est le rêve concurrentiel d'un programmeur python: vous écrivez du code proche de synchrone et laissez Python faire le reste. Ceci est une autre importation de la bibliothèque antigravity: import antigravity


En fait, ce n'est pas vrai du tout, la programmation simultanée est un travail difficile, et bien que les coroutines nous permettent d'éviter l'enfer de rappel, ce qui peut vous emmener assez loin, vous devez toujours penser à créer des tâches, obtenir des résultats et capturer avec élégance les exceptions. C'est triste.


La bonne nouvelle est que tout cela est possible en asyncio. La mauvaise nouvelle est qu'il n'est pas toujours immédiatement évident de savoir ce qui ne va pas et comment y remédier. Voici quelques modèles que j'ai découverts en travaillant avec asyncio.



Avant de commencer:


J'ai utilisé ma bibliothèque aiohttp préférée pour exécuter des requêtes HTTP asynchrones et l'API Hacker News car il s'agit d'un site simple et bien connu qui suit un scénario d'utilisation familier. Suite à la réponse à mon article précédent , j'ai également utilisé la syntaxe async / wait introduite dans Python 3.5. J'ai supposé que le lecteur connaissait les idées qui sont décrites ici. Et finalement, tous les exemples sont disponibles dans le référentiel GitHub de cet article .


Ok, commençons!


Coroutines récursives


La création et l'exécution de tâches sont triviales dans asyncio. Pour de telles tâches, l'API inclut plusieurs méthodes dans la classe AbstractEventLoop, ainsi que des fonctions dans la bibliothèque. Mais généralement, vous voulez combiner les résultats de ces tâches et les traiter d'une manière ou d'une autre, et la récursivité est un excellent exemple de ce schéma, et démontre également la simplicité de la coroutine par rapport à d'autres outils concurrents.


Un cas courant pour utiliser asyncio est de créer une sorte de robot d'indexation Web. Imaginez que nous sommes juste trop occupés pour vérifier HackerNews, ou peut-être que vous aimez juste un bon holivar, donc vous voulez implémenter un système qui récupère le nombre de commentaires pour un post HN spécifique et, s'il est supérieur au seuil, vous en informe. Vous avez recherché un peu sur Google et trouvé la documentation de l'API HN, exactement ce dont vous avez besoin, mais vous avez remarqué ce qui suit dans la documentation:


Vous voulez connaître le nombre total de commentaires d'articles? Faites le tour de l'arbre et comptez-les.

Appel accepté!


"""          ,       ,      . ,         Hacker News      """ import asyncio import argparse import logging from urllib.parse import urlparse, parse_qs from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the comments of a Hacker News post.') parser.add_argument('--id', type=int, default=8863, help='ID of the post in HN, defaults to 8863') parser.add_argument('--url', type=str, help='URL of a post in HN') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) fetch_counter = 0 async def fetch(session, url): """  URL   aiohttp,    JSON .     aiohttp    . """ global fetch_counter with async_timeout.timeout(FETCH_TIMEOUT): fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, post_id): """          . """ url = URL_TEMPLATE.format(post_id) now = datetime.now() response = await fetch(session, url) log.debug('{:^6} > Fetching of {} took {} seconds'.format( post_id, url, (datetime.now() - now).total_seconds())) if 'kids' not in response: #  .   return 0 #     ,    (  ) number_of_comments = len(response['kids']) #       log.debug('{:^6} > Fetching {} child posts'.format( post_id, number_of_comments)) tasks = [post_number_of_comments(loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments def id_from_HN_url(url): """   `id` URL       None. """ parse_result = urlparse(url) try: return parse_qs(parse_result.query)['id'][0] except (KeyError, IndexError): return None if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) loop.close() 

N'hésitez pas à essayer d'exécuter le script avec le drapeau «-verbose» pour une sortie plus détaillée.


  [14:47:32] > Calculating comments took 2.23 seconds and 73 fetches [14:47:32] -- Post 8863 has 72 comments 

Ignorons le code passe-partout et allons directement à la coroutine récursive. Notez que ce code est lu presque complètement comme il le serait avec le code synchrone.


 async def post_number_of_comments(loop, session, post_id): """          . """ url = URL_TEMPLATE.format(post_id) now = datetime.now() response = await fetch(session, url) log.debug('{:^6} > Fetching of {} took {} seconds'.format( post_id, url, (datetime.now() - now).total_seconds())) if 'kids' not in response: # base case, there are no comments return 0 #     ,    (  ) number_of_comments = len(response['kids']) #       log.debug('{:^6} > Fetching {} child posts'.format( post_id, number_of_comments)) tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #          number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments 

  1. D'abord, nous obtenons JSON avec les données de publication.
  2. Faites le tour récursivement de chacun des héritiers.
  3. En fin de compte, nous atteindrons le cas de base et retournerons zéro,
    lorsque le message n'a pas de commentaires.
  4. En revenant du cas de base, ajoutez les réponses au message actuel
    au nombre d'héritiers et retour

Ceci est un excellent exemple de ce que Brett Slatkin décrit comme fan-in et fan-out , nous sommes fan-out pour recevoir des données des héritiers et fan-in résume les données pour calculer le nombre de commentaires


Il existe plusieurs façons dans l'API asyncio d'effectuer ces opérations de fan-out. Ici, j'utilise la fonction de collecte , qui attend effectivement de toutes les coroutines qu'elles complètent et renvoient une liste de leurs résultats.


Faisons attention à la façon dont l'utilisation de la coroutine correspond également bien à la récursivité avec n'importe quel point auquel un nombre quelconque de coroutines est présent, en attendant les réponses à leurs demandes lors de l'appel à la fonction de collecte et en reprenant l'exécution une fois l'opération d'E / S terminée. Cela nous permet d'exprimer un comportement plutôt complexe dans une corutine élégante et (facilement) lisible.


«Très simple», dites-vous? D'accord, montons d'un cran.


Tiré et oublié


Imaginez que vous souhaitiez vous envoyer un e-mail avec des articles contenant plus de commentaires qu'une certaine valeur, et que vous souhaitiez procéder de la même manière que nous avons parcouru l'arborescence des articles. Nous pouvons simplement ajouter l'instruction if à la fin de la fonction récursive pour y parvenir:


 async def post_number_of_comments(loop, session, post_id): """           . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: #  .   return 0 #       . number_of_comments = len(response['kids']) #       tasks = [post_number_of_comments(loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) #        if number_of_comments > MIN_COMMENTS: await log_post(response) return number_of_comments async def log_post(post): """   . """ await asyncio.sleep(random() * 3) log.info("Post logged") 

Oui, j'ai utilisé asyncio.sleep. C'est la dernière fois. Je te le promets.


 [09:41:02] Post logged [09:41:04] Post logged [09:41:06] Post logged [09:41:06] > Calculating comments took 6.35 seconds and 73 fetches [09:41:06] -- Post 8863 has 72 comments 

C'est beaucoup plus lent qu'avant!
La raison en est que, comme nous l'avons vu plus haut, attendre les pauses d'exécution de la coroutine jusqu'à ce que l'avenir soit terminé, mais comme nous n'avons pas besoin du résultat de la journalisation, il n'y a aucune raison réelle de le faire.


Nous devons «tirer et oublier» avec notre coroutine, et comme nous ne pouvons pas attendre qu'il finisse d'utiliser l'attente, nous avons besoin d'une autre façon de démarrer la coroutine sans attendre. Un rapide coup d'œil à l'API asyncio trouvera la fonction assure_future qui planifiera l'exécution de la coroutine, l'enveloppera dans un objet Task et la renverra. Rappelant qu'avant la planification de la coroutine, un cycle d'événements contrôlera le résultat de notre coroutine à un moment donné dans le futur, quand une autre coroutine sera dans un état d'attente.


Super, remplaçons attendez log_post comme suit:


 async def post_number_of_comments(loop, session, post_id): """           . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: # base case, there are no comments return 0 #       . number_of_comments = len(response['kids']) #       tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) #        if number_of_comments > MIN_COMMENTS: asyncio.ensure_future(log_post(response)) return number_of_comments 

 [09:42:57] > Calculating comments took 1.69 seconds and 73 fetches [09:42:57] -- Post 8863 has 72 comments [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1109197f8>()]>> [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919948>()]>> [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919978>()]>> 

Ahem, la tâche impressionnante a été détruite mais elle est en attente! hanter les utilisateurs asyncio dans le monde. La bonne nouvelle est que nous revenons à l'époque que nous avions reçue plus tôt (1,69 p.). La mauvaise nouvelle est que asyncio n'aime pas aller au-delà de la plage de tir et d'oubli.


Le problème est que nous fermons de force la boucle d'événements après avoir obtenu le résultat de la coroutine post_number_of_comments, sans laisser à notre tâche log_post le temps de se terminer.


Nous avons deux options:
soit nous laissons la boucle d'événements fonctionner sans fin en utilisant run_forever et terminons manuellement le script, soit nous utilisons la méthode all_tasks de la classe Task pour trouver toutes les tâches de travail et attendre la fin du calcul du nombre de commentaires.


Essayons de sortir de cette situation en apportant rapidement des modifications après notre appel à post_number_of_comments :


 if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) pending_tasks = [ task for task in asyncio.Task.all_tasks() if not task.done()] loop.run_until_complete(asyncio.gather(*pending_tasks)) loop.close() 

 [09:47:29] > Calculating comments took 1.72 seconds and 73 fetches [09:47:29] — Post 8863 has 72 comments [09:47:30] Post logged [09:47:31] Post logged [09:47:32] Post logged 

Maintenant, nous sommes sûrs que les tâches de journalisation sont terminées.
L'hypothèse selon laquelle la méthode all_tasks fonctionne correctement dans les cas auxquels nous sommes confrontés est une excellente idée lorsque les tâches sont effectuées de manière appropriée dans notre boucle d'événements, mais dans des cas plus complexes, il peut y avoir un certain nombre de tâches pouvant être effectuées, dont la source peut être située en dehors de notre code. .


Une autre approche consiste à rétablir l'ordre après avoir enregistré de manière indépendante toutes les coroutines que nous avions prévu de lancer et permettre leur exécution, ce qui a été reporté plus tôt,
dès que le décompte des commentaires est terminé. Comme vous le savez, la fonction Ensure_future renvoie un objet Task . Nous pouvons l'utiliser pour enregistrer nos tâches avec une faible priorité. Définissons simplement une liste task_registry et stockons les futurs dans celle-ci:


 async def post_number_of_comments(loop, session, post_id): """Retrieve data for current post and recursively for all comments. """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: # base case, there are no comments return 0 # calculate this post's comments as number of comments number_of_comments = len(response['kids']) # create recursive tasks for all comments tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] # schedule the tasks and retrieve results results = await asyncio.gather(*tasks) # reduce the descendents comments and add it to this post's number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) # Log if number of comments is over a threshold if number_of_comments > MIN_COMMENTS: # Add the future to the registry task_registry.append(asyncio.ensure_future(log_post(response))) return number_of_comments # (... ommitted code ...) # if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() task_registry = [] # define our task registry with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) pending_tasks = [task for task in task_registry if not task.done()] loop.run_until_complete(asyncio.gather(*pending_tasks)) loop.close() 

 [09:53:46] > Calculating comments took 1.68 seconds and 73 fetches [09:53:46] — Post 8863 has 72 comments [09:53:46] Post logged [09:53:48] Post logged [09:53:49] Post logged 

Nous apprenons la leçon suivante - asyncio ne doit pas être considéré comme une file d'attente de tâches distribuée comme Celery . Toutes les tâches sont lancées dans un seul thread et le cycle des événements doit être géré en conséquence, vous permettant d'allouer du temps pour terminer les tâches.


Ce qui conduit à un autre schéma généralement accepté:


Coroutines déclenchées périodiquement


Poursuivant notre exemple sur HN (et nous avons fait un excellent travail plus tôt), nous avons décidé
ce qui est décisivement important pour calculer le nombre de commentaires sur la publication HN dès qu'ils sont disponibles et tant qu'ils sont dans la liste des 5 entrées récentes.


Un rapide coup d'œil à l'API HN montre un point de terminaison qui renvoie les 500 derniers enregistrements. Génial, nous pouvons donc simplement interroger ce point de terminaison pour recevoir de nouvelles publications et calculer le nombre de commentaires à leur sujet, disons toutes les cinq secondes.


Eh bien, puisque nous passons maintenant à l'interrogation périodique, nous pouvons simplement utiliser la boucle infinie while, attendre la fin de la tâche d'interrogation (appel en attente ) et nous endormir (appel en veille ) pendant la durée requise. J'ai apporté quelques modifications mineures pour obtenir les meilleures entrées au lieu d'utiliser l'URL de publication directe.


 """ An example of periodically scheduling coroutines using an infinite loop of awaiting and sleeping. """ import asyncio import argparse import logging from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.') parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll') parser.add_argument('--limit', type=int, default=5,help='Number of new stories to calculate comments for') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) fetch_counter = 0 async def fetch(session, url): """  URL   aiohttp,    JSON .     aiohttp    . """ global fetch_counter with async_timeout.timeout(FETCH_TIMEOUT): fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, post_id): """          . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: # base case, there are no comments return 0 #     ,    (  ) number_of_comments = len(response['kids']) #       tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments async def get_comments_of_top_stories(loop, session, limit, iteration): """     HN. """ response = await fetch(session, TOP_STORIES_URL) tasks = [post_number_of_comments(loop, session, post_id) for post_id in response[:limit]] results = await asyncio.gather(*tasks) for post_id, num_comments in zip(response[:limit], results): log.info("Post {} has {} comments ({})".format(post_id, num_comments, iteration)) async def poll_top_stories_for_comments(loop, session, period, limit): """         . """ global fetch_counter iteration = 1 while True: now = datetime.now() log.info("Calculating comments for top {} stories. ({})".format(limit, iteration)) await get_comments_of_top_stories(loop, session, limit, iteration) log.info('> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("Waiting for {} seconds...".format(period)) iteration += 1 fetch_counter = 0 await asyncio.sleep(period) if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: loop.run_until_complete( poll_top_stories_for_comments( loop, session, args.period, args.limit)) loop.close() 

 [10:14:03] Calculating comments for top 5 stories. (1) [10:14:06] Post 13848196 has 31 comments (1) [10:14:06] Post 13849430 has 37 comments (1) [10:14:06] Post 13849037 has 15 comments (1) [10:14:06] Post 13845337 has 128 comments (1) [10:14:06] Post 13847465 has 27 comments (1) [10:14:06] > Calculating comments took 2.96 seconds and 244 fetches [10:14:06] Waiting for 5 seconds… [10:14:11] Calculating comments for top 5 stories. (2) [10:14:14] Post 13848196 has 31 comments (2) [10:14:14] Post 13849430 has 37 comments (2) [10:14:14] Post 13849037 has 15 comments (2) [10:14:14] Post 13845337 has 128 comments (2) [10:14:14] Post 13847465 has 27 comments (2) [10:14:14] > Calculating comments took 3.04 seconds and 244 fetches [10:14:14] Waiting for 5 seconds… 

Très bien, mais il y a un problème mineur: si vous avez fait attention à l'horodatage,
alors la tâche ne démarre pas strictement toutes les 5 secondes, elle démarre 5 secondes après la fin de get_comments_of_top_stories . Encore une fois, les conséquences de l'utilisation de l' attente et du blocage jusqu'à ce que nous obtenions nos résultats.
Ces fonctionnalités ne posent pas de problème lorsque la tâche prend plus de cinq secondes. En outre, il semble erroné d'utiliser _run_until complete lorsque la coroutine est conçue pour être infinie.


La bonne nouvelle est que nous sommes maintenant des experts sur Ensure_future , et nous pouvons simplement le pousser dans le code au lieu d'utiliser l' attente ...


 async def poll_top_stories_for_comments(loop, session, period, limit): """         . """ global fetch_counter iteration = 1 while True: now = datetime.now() log.info("Calculating comments for top {} stories. ({})".format( limit, iteration)) asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("Waiting for {} seconds...".format(period)) iteration += 1 fetch_counter = 0 await asyncio.sleep(period) 

  [10:55:40] Calculating comments for top 5 stories. (1) [10:55:40] > Calculating comments took 0.00 seconds and 0 fetches [10:55:40] Waiting for 5 seconds… [10:55:43] Post 13848196 has 32 comments (1) [10:55:43] Post 13849430 has 48 comments (1) [10:55:43] Post 13849037 has 16 comments (1) [10:55:43] Post 13845337 has 129 comments (1) [10:55:43] Post 13847465 has 29 comments (1) [10:55:45] Calculating comments for top 5 stories. (2) [10:55:45] > Calculating comments took 0.00 seconds and 260 fetches [10:55:45] Waiting for 5 seconds… [10:55:48] Post 13848196 has 32 comments (2) [10:55:48] Post 13849430 has 48 comments (2) [10:55:48] Post 13849037 has 16 comments (2) [10:55:48] Post 13845337 has 129 comments (2) [10:55:48] Post 13847465 has 29 comments (2) 

Ahem ... D'accord, la bonne nouvelle est que l'horodatage est localisé exactement cinq secondes plus tard, mais qu'y a-t-il en 0,00 secondes et pas d'échantillons Et puis la prochaine itération prend zéro seconde et 260 échantillons?


C'est l'une des conséquences de l'évitement de l'attente, maintenant nous ne bloquons plus la coroutine et passons simplement à la ligne suivante, qui affiche zéro seconde et, pour la première fois, aucun message récupéré. Ce sont des tâches assez petites, car nous pouvons vivre sans messages, mais que faire si nous avons besoin des résultats des tâches?


Ensuite, mon ami, nous devons recourir Ă  ... rappels (grincement des dents ((()


Je sais, je sais, le but de la coroutine est d’éviter les rappels, mais c’est parce que le sous-titre dramatique de l’article est «Hors d’attente». Nous ne sommes plus en attente de territoire, nous avons des aventures avec le lancement manuel des tâches, ce qui conduit à notre cas d'utilisation. Qu'est-ce que cela vous donne? spoiler


Comme nous l'avons vu précédemment, assure_future renvoie un objet Future auquel nous pouvons ajouter un rappel en utilisant le rappel _add_done.


Avant de faire cela, et afin d'avoir le nombre correct de récupérations, nous arrivons au point que nous devons encapsuler notre coroutine d'extraction dans la classe URLFetcher . Dans ce cas, nous créons une instance pour chaque tâche afin d'avoir le nombre correct d'échantillons. Nous supprimons également la variable globale qui a introduit le bogue de toute façon:


 """               ensure_future   sleep.      future,    ensure_future,         ,    URLFetcher   . """ import asyncio import argparse import logging from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.') parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll') parser.add_argument('--limit', type=int, default=5, help='Number of new stories to calculate comments for') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) class URLFetcher(): """   URL     """ def __init__(self): self.fetch_counter = 0 async def fetch(self, session, url): """  URL   aiohttp,    JSON .     aiohttp    . """ with async_timeout.timeout(FETCH_TIMEOUT): self.fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, fetcher, post_id): """          . """ url = URL_TEMPLATE.format(post_id) response = await fetcher.fetch(session, url) #  .  . if response is None or 'kids' not in response: return 0 #     ,    (  ) number_of_comments = len(response['kids']) #       tasks = [post_number_of_comments( loop, session, fetcher, kid_id) for kid_id in response['kids']] # s     results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments async def get_comments_of_top_stories(loop, session, limit, iteration): """    HN. """ fetcher = URLFetcher() # create a new fetcher for this task response = await fetcher.fetch(session, TOP_STORIES_URL) tasks = [post_number_of_comments( loop, session, fetcher, post_id) for post_id in response[:limit]] results = await asyncio.gather(*tasks) for post_id, num_comments in zip(response[:limit], results): log.info("Post {} has {} comments ({})".format( post_id, num_comments, iteration)) return fetcher.fetch_counter # return the fetch count async def poll_top_stories_for_comments(loop, session, period, limit): """         . """ iteration = 1 while True: log.info("Calculating comments for top {} stories. ({})".format(limit, iteration)) future = asyncio.ensure_future(get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info('> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 await asyncio.sleep(period) if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: loop.run_until_complete( poll_top_stories_for_comments( loop, session, args.period, args.limit)) loop.close() 

  [12:23:40] Calculating comments for top 5 stories. (1) [12:23:40] Waiting for 5 seconds... [12:23:43] Post 13848196 has 38 comments (1) [12:23:43] Post 13849430 has 72 comments (1) [12:23:43] Post 13849037 has 19 comments (1) [12:23:43] Post 13848283 has 64 comments (1) [12:23:43] Post 13847465 has 34 comments (1) [12:23:43] > Calculating comments took 3.17 seconds and 233 fetches [12:23:45] Calculating comments for top 5 stories. (2) [12:23:45] Waiting for 5 seconds... [12:23:47] Post 13848196 has 38 comments (2) [12:23:47] Post 13849430 has 72 comments (2) [12:23:47] Post 13849037 has 19 comments (2) [12:23:47] Post 13848283 has 64 comments (2) [12:23:47] Post 13847465 has 34 comments (2) [12:23:47] > Calculating comments took 2.47 seconds and 233 fetches [12:23:50] Calculating comments for top 5 stories. (3) [12:23:50] Waiting for 5 seconds... 

, , callback:


 async def poll_top_stories_for_comments(loop, session, period, limit): """         . """ iteration = 1 while True: log.info("Calculating comments for top {} stories. ({})".format( limit, iteration)) future = asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 await asyncio.sleep(period) 

, callback , future. (fetch) URLFetcher _get_comments_of_top stories future.


Tu vois? , , await .


callback-, API asyncio AbstractBaseLoop _call later _call at ,
- . , , poll_top_stories_for_comments :


 def poll_top_stories_for_comments(loop, session, period, limit, iteration=0): """     get_comments_of_top_stories. """ log.info("Calculating comments for top {} stories ({})".format( limit, iteration)) future = asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 loop.call_later( period, partial( # or call_at(loop.time() + period) poll_top_stories_for_comments, loop, session, period, limit, iteration ) ) if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: poll_top_stories_for_comments( loop, session, args.period, args.limit) loop.run_forever() loop.close() 

. :


  • , ensure_future .
    ( : )
  • _poll_top_stories_for comments , , _run forever , .

, , , — ? ? URL:


 MAXIMUM_FETCHES = 5 class URLFetcher(): """   URL     """ def __init__(self): self.fetch_counter = 0 async def fetch(self, session, url): """  URL   aiohttp,    JSON .     aiohttp    . """ with async_timeout.timeout(FETCH_TIMEOUT): self.fetch_counter += 1 if self.fetch_counter > MAXIMUM_FETCHES: raise Exception('BOOM!') async with session.get(url) as response: return await response.json() 

  [12:51:00] Calculating comments for top 5 stories. (1) [12:51:00] Waiting for 5 seconds… [12:51:01] Exception in callback poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion('BOOM!',)>) at 05_periodic_coroutines.py:121 handle: <Handle poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion('BOOM!',)>) at 05_periodic_coroutines.py:121> Traceback (most recent call last): File “/Users/yeray/.pyenv/versions/3.6.0/lib/python3.6/asyncio/events.py”, line 126, in _run self._callback(*self._args) File “05_periodic_coroutines.py”, line 122, in callback fetch_count = fut.result() File “05_periodic_coroutines.py”, line 100, in get_comments_of_top_stories results = await asyncio.gather(*tasks) File “05_periodic_coroutines.py”, line 69, in post_number_of_comments response = await fetcher.fetch(session, url) File “05_periodic_coroutines.py”, line 58, in fetch raise Exception('BOOM!') Exception: BOOM! [12:51:05] Calculating comments for top 5 stories. (2) [12:51:05] Waiting for 5 seconds… 

, ?


Que faire , , : asyncio :

Source: https://habr.com/ru/post/fr420129/


All Articles