Patrones de rutina de Asyncio: afuera aguardan

Prefacio del traductor:
Una vez más pisando un rastrillo mientras trabajaba con Python Asyncio, fui a Internet para encontrar algo más agradable que la documentación seca. Me encontré con un artículo de Yeray Díaz "Asyncio Coroutine Patterns: Beyond wait" , en el que el autor considera muy interesante el uso de asyncio y comparte algunos trucos. Como no encontré nada tan completo en ruso, decidí traducirlo.


Asyncio es el sueño competitivo de un programador de Python: usted escribe código que bordea la sincronización y deja que Python haga el resto. Esta es otra importación de la biblioteca antigravedad: importar antigravedad


En realidad, esto no es del todo cierto, la programación concurrente es un trabajo arduo, y si bien las rutinas nos permiten evitar el infierno de devolución de llamadas, que puede llevarlo lo suficientemente lejos, aún debe pensar en crear tareas, obtener resultados y capturar excepciones con elegancia. Es triste


La buena noticia es que todo esto es posible en asyncio. La mala noticia es que no siempre es obvio de inmediato qué está mal y cómo solucionarlo. A continuación hay algunos patrones que descubrí mientras trabajaba con asyncio.



Antes de comenzar:


Utilicé mi biblioteca aiohttp favorita para ejecutar solicitudes HTTP asincrónicas y la API de Hacker News porque es un sitio simple y conocido que sigue un escenario de uso familiar. Siguiendo la respuesta a mi artículo anterior , también utilicé la sintaxis async / await introducida en Python 3.5. Supuse que el lector estaba familiarizado con las ideas que se describen aquí. Y, en última instancia, todos los ejemplos están disponibles en el repositorio de GitHub de este artículo .


Ok, empecemos!


Corutinas recursivas


Crear y ejecutar tareas es trivial en asyncio. Para tales tareas, la API incluye varios métodos en la clase AbstractEventLoop, así como funciones en la biblioteca. Pero generalmente desea combinar los resultados de estas tareas y procesarlos de alguna manera, y la recursividad es un gran ejemplo de este esquema, y ​​también demuestra la simplicidad de la rutina en comparación con otras herramientas competitivas.


Un caso común para usar asyncio es crear algún tipo de rastreador web. Imagine que estamos demasiado ocupados para verificar HackerNews, o tal vez le guste un buen holívar, por lo que desea implementar un sistema que recupere el número de comentarios para una publicación HN específica y, si está por encima del umbral, le notifique. Buscaste un poco en Google y encontraste la documentación para la API de HN, justo lo que necesitas, pero notaste lo siguiente en la documentación:


¿Quieres saber el número total de comentarios de artículos? Ve alrededor del árbol y cuéntalos.

¡Llamada aceptada!


"""          ,       ,      . ,         Hacker News      """ import asyncio import argparse import logging from urllib.parse import urlparse, parse_qs from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the comments of a Hacker News post.') parser.add_argument('--id', type=int, default=8863, help='ID of the post in HN, defaults to 8863') parser.add_argument('--url', type=str, help='URL of a post in HN') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) fetch_counter = 0 async def fetch(session, url): """  URL   aiohttp,    JSON .     aiohttp    . """ global fetch_counter with async_timeout.timeout(FETCH_TIMEOUT): fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, post_id): """          . """ url = URL_TEMPLATE.format(post_id) now = datetime.now() response = await fetch(session, url) log.debug('{:^6} > Fetching of {} took {} seconds'.format( post_id, url, (datetime.now() - now).total_seconds())) if 'kids' not in response: #  .   return 0 #     ,    (  ) number_of_comments = len(response['kids']) #       log.debug('{:^6} > Fetching {} child posts'.format( post_id, number_of_comments)) tasks = [post_number_of_comments(loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments def id_from_HN_url(url): """   `id` URL       None. """ parse_result = urlparse(url) try: return parse_qs(parse_result.query)['id'][0] except (KeyError, IndexError): return None if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) loop.close() 

Siéntase libre de intentar ejecutar el script con el indicador "-verbose" para obtener una salida más detallada.


  [14:47:32] > Calculating comments took 2.23 seconds and 73 fetches [14:47:32] -- Post 8863 has 72 comments 

Salteamos el código repetitivo y vayamos directamente a la rutina recursiva. Tenga en cuenta que este código se lee casi por completo como lo sería con el código síncrono.


 async def post_number_of_comments(loop, session, post_id): """          . """ url = URL_TEMPLATE.format(post_id) now = datetime.now() response = await fetch(session, url) log.debug('{:^6} > Fetching of {} took {} seconds'.format( post_id, url, (datetime.now() - now).total_seconds())) if 'kids' not in response: # base case, there are no comments return 0 #     ,    (  ) number_of_comments = len(response['kids']) #       log.debug('{:^6} > Fetching {} child posts'.format( post_id, number_of_comments)) tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #          number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments 

  1. Primero obtenemos JSON con los datos de publicación.
  2. Recursivamente rodear a cada uno de los herederos.
  3. Al final, llegaremos al caso base y devolveremos cero,
    cuando la publicación no tiene comentarios.
  4. Al regresar del caso base, agregue las respuestas a la publicación actual
    al número de herederos y regreso

Este es un gran ejemplo de lo que Brett Slatkin describe como fan-in y fan-out , estamos fan-out para recibir datos de los herederos y fan-in resume los datos para calcular el número de comentarios


Hay un par de formas en la API asyncio para realizar estas operaciones de despliegue. Aquí uso la función de recopilación , que efectivamente espera que todas las corutinas se completen y devuelvan una lista de sus resultados.


Prestemos atención a cómo el uso de la rutina también se corresponde con la recursividad con cualquier punto en el que esté presente cualquier cantidad de corutinas, esperando respuestas a sus solicitudes durante la llamada a la función de recopilación y reanudando la ejecución después de que se complete la operación de E / S. Esto nos permite expresar un comportamiento bastante complejo en una corutina elegante y (fácilmente) legible.


"Muy simple", dices? Bien, subamos un nivel.


Disparó y olvidó


Imagine que desea enviarse un mensaje de correo electrónico con publicaciones que tengan más comentarios que un cierto valor, y desea hacerlo de la misma manera que recorrimos el árbol de publicaciones. Simplemente podemos agregar la declaración if al final de la función recursiva para lograr esto:


 async def post_number_of_comments(loop, session, post_id): """           . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: #  .   return 0 #       . number_of_comments = len(response['kids']) #       tasks = [post_number_of_comments(loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) #        if number_of_comments > MIN_COMMENTS: await log_post(response) return number_of_comments async def log_post(post): """   . """ await asyncio.sleep(random() * 3) log.info("Post logged") 

Sí, solía asyncio.sleep. Esta es la ultima vez. Lo prometo


 [09:41:02] Post logged [09:41:04] Post logged [09:41:06] Post logged [09:41:06] > Calculating comments took 6.35 seconds and 73 fetches [09:41:06] -- Post 8863 has 72 comments 

¡Esto es significativamente más lento que antes!
La razón es que, como discutimos anteriormente, esperar pausas en la ejecución de la rutina hasta que se complete el futuro, pero dado que no necesitamos el resultado del registro, no hay una razón real para hacerlo.


Necesitamos "disparar y olvidar" con nuestra rutina, y como no podemos esperar a que termine de usar aguardar, necesitamos otra forma de comenzar la rutina sin esperar. Un vistazo rápido a la API de asyncio encontrará la función sure_future que programará la ejecución de la rutina, la envolverá en un objeto Task y la devolverá. Recordando que antes de que se planificara la rutina, un ciclo de eventos controlará el resultado de nuestra rutina en algún momento en el futuro, cuando otra corutina estará en un estado de expectativa.


Genial, reemplace await log_post de la siguiente manera:


 async def post_number_of_comments(loop, session, post_id): """           . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: # base case, there are no comments return 0 #       . number_of_comments = len(response['kids']) #       tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) #        if number_of_comments > MIN_COMMENTS: asyncio.ensure_future(log_post(response)) return number_of_comments 

 [09:42:57] > Calculating comments took 1.69 seconds and 73 fetches [09:42:57] -- Post 8863 has 72 comments [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1109197f8>()]>> [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919948>()]>> [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919978>()]>> 

¡Ejem, la increíble tarea fue destruida pero está pendiente! atormentando a los usuarios de asyncio en todo el mundo La buena noticia es que hemos vuelto al tiempo que recibimos anteriormente (1.69 p.). La mala noticia es que a asyncio no le gusta ir más allá del rango de disparar y olvidar.


El problema es que cerramos forzosamente el ciclo de eventos después de obtener el resultado de la rutina post_number_of_comments , sin dejar que finalice nuestra tarea log_post time.


Tenemos dos opciones:
Dejamos que el bucle de eventos funcione interminablemente usando run_forever y finalicemos manualmente el script, o usamos el método all_tasks de la clase Task para encontrar todas las tareas de trabajo y esperar a que termine el cálculo del número de comentarios.


Intentemos salir de esta situación haciendo cambios rápidamente después de nuestra llamada a post_number_of_comments :


 if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) pending_tasks = [ task for task in asyncio.Task.all_tasks() if not task.done()] loop.run_until_complete(asyncio.gather(*pending_tasks)) loop.close() 

 [09:47:29] > Calculating comments took 1.72 seconds and 73 fetches [09:47:29]Post 8863 has 72 comments [09:47:30] Post logged [09:47:31] Post logged [09:47:32] Post logged 

Ahora estamos seguros de que las tareas de registro se han completado.
La suposición de que el método all_tasks funciona bien en los casos que estamos tratando es una gran idea cuando las tareas se realizan adecuadamente en nuestro bucle de eventos, pero en casos más complejos puede haber cualquier cantidad de tareas que se puedan realizar, cuya fuente se puede ubicar fuera de nuestro código .


Otro enfoque es restaurar el orden después de que registremos de manera independiente todas las corutinas que planeamos lanzar y permitir que se ejecuten, lo que se pospuso anteriormente
tan pronto como se complete el recuento de comentarios. Como sabe, la función sure_future devuelve un objeto Task . Podemos usarlo para registrar nuestras tareas con baja prioridad. Solo definamos una lista task_registry y almacenemos futuros en ella:


 async def post_number_of_comments(loop, session, post_id): """Retrieve data for current post and recursively for all comments. """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: # base case, there are no comments return 0 # calculate this post's comments as number of comments number_of_comments = len(response['kids']) # create recursive tasks for all comments tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] # schedule the tasks and retrieve results results = await asyncio.gather(*tasks) # reduce the descendents comments and add it to this post's number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) # Log if number of comments is over a threshold if number_of_comments > MIN_COMMENTS: # Add the future to the registry task_registry.append(asyncio.ensure_future(log_post(response))) return number_of_comments # (... ommitted code ...) # if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() task_registry = [] # define our task registry with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) pending_tasks = [task for task in task_registry if not task.done()] loop.run_until_complete(asyncio.gather(*pending_tasks)) loop.close() 

 [09:53:46] > Calculating comments took 1.68 seconds and 73 fetches [09:53:46]Post 8863 has 72 comments [09:53:46] Post logged [09:53:48] Post logged [09:53:49] Post logged 

Aprendemos la siguiente lección: asyncio no debe verse como una cola de tareas distribuidas como Celery . Todas las tareas se inician en un hilo y el ciclo de eventos debe administrarse en consecuencia, lo que le permite asignar tiempo para completar las tareas.


Lo que lleva a otro patrón generalmente aceptado:


Corutinas activadas periódicamente


Continuando con nuestro ejemplo sobre HN (e hicimos un gran trabajo antes), decidimos
lo cual es decisivamente importante para calcular el número de comentarios en la publicación de HN tan pronto como estén disponibles y mientras estén en la lista de 5 entradas recientes.


Un vistazo rápido a la API de HN muestra un punto final que devuelve los últimos 500 registros. Genial, así que simplemente podemos sondear este punto final para recibir nuevas publicaciones y calcular la cantidad de comentarios sobre ellas, digamos cada cinco segundos.


Bueno, dado que ahora nos estamos moviendo al sondeo periódico, simplemente podemos usar el ciclo while infinito, esperar a que se complete la tarea de sondeo (llamada en espera ) y quedarnos dormidos ( dormir en llamada) durante el tiempo requerido. Hice algunos cambios menores para obtener entradas principales en lugar de usar la URL de publicación directa.


 """ An example of periodically scheduling coroutines using an infinite loop of awaiting and sleeping. """ import asyncio import argparse import logging from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.') parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll') parser.add_argument('--limit', type=int, default=5,help='Number of new stories to calculate comments for') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) fetch_counter = 0 async def fetch(session, url): """  URL   aiohttp,    JSON .     aiohttp    . """ global fetch_counter with async_timeout.timeout(FETCH_TIMEOUT): fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, post_id): """          . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: # base case, there are no comments return 0 #     ,    (  ) number_of_comments = len(response['kids']) #       tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments async def get_comments_of_top_stories(loop, session, limit, iteration): """     HN. """ response = await fetch(session, TOP_STORIES_URL) tasks = [post_number_of_comments(loop, session, post_id) for post_id in response[:limit]] results = await asyncio.gather(*tasks) for post_id, num_comments in zip(response[:limit], results): log.info("Post {} has {} comments ({})".format(post_id, num_comments, iteration)) async def poll_top_stories_for_comments(loop, session, period, limit): """         . """ global fetch_counter iteration = 1 while True: now = datetime.now() log.info("Calculating comments for top {} stories. ({})".format(limit, iteration)) await get_comments_of_top_stories(loop, session, limit, iteration) log.info('> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("Waiting for {} seconds...".format(period)) iteration += 1 fetch_counter = 0 await asyncio.sleep(period) if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: loop.run_until_complete( poll_top_stories_for_comments( loop, session, args.period, args.limit)) loop.close() 

 [10:14:03] Calculating comments for top 5 stories. (1) [10:14:06] Post 13848196 has 31 comments (1) [10:14:06] Post 13849430 has 37 comments (1) [10:14:06] Post 13849037 has 15 comments (1) [10:14:06] Post 13845337 has 128 comments (1) [10:14:06] Post 13847465 has 27 comments (1) [10:14:06] > Calculating comments took 2.96 seconds and 244 fetches [10:14:06] Waiting for 5 seconds… [10:14:11] Calculating comments for top 5 stories. (2) [10:14:14] Post 13848196 has 31 comments (2) [10:14:14] Post 13849430 has 37 comments (2) [10:14:14] Post 13849037 has 15 comments (2) [10:14:14] Post 13845337 has 128 comments (2) [10:14:14] Post 13847465 has 27 comments (2) [10:14:14] > Calculating comments took 3.04 seconds and 244 fetches [10:14:14] Waiting for 5 seconds… 

Genial, pero hay un problema menor: si prestaste atención a la marca de tiempo,
entonces la tarea no comienza estrictamente cada 5 segundos, comienza 5 segundos después de que se completen las get_comments_of_top_stories . Nuevamente, las consecuencias de usar aguardan y bloquean hasta que recuperemos nuestros resultados.
Estas características no representan un problema cuando la tarea lleva más de cinco segundos. Además, parece erróneo usar _run_until complete cuando la rutina está diseñada para ser infinita.


La buena noticia es que ahora somos expertos en sure_future , y podemos simplemente insertarlo en el código en lugar de usar wait ...


 async def poll_top_stories_for_comments(loop, session, period, limit): """         . """ global fetch_counter iteration = 1 while True: now = datetime.now() log.info("Calculating comments for top {} stories. ({})".format( limit, iteration)) asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("Waiting for {} seconds...".format(period)) iteration += 1 fetch_counter = 0 await asyncio.sleep(period) 

  [10:55:40] Calculating comments for top 5 stories. (1) [10:55:40] > Calculating comments took 0.00 seconds and 0 fetches [10:55:40] Waiting for 5 seconds… [10:55:43] Post 13848196 has 32 comments (1) [10:55:43] Post 13849430 has 48 comments (1) [10:55:43] Post 13849037 has 16 comments (1) [10:55:43] Post 13845337 has 129 comments (1) [10:55:43] Post 13847465 has 29 comments (1) [10:55:45] Calculating comments for top 5 stories. (2) [10:55:45] > Calculating comments took 0.00 seconds and 260 fetches [10:55:45] Waiting for 5 seconds… [10:55:48] Post 13848196 has 32 comments (2) [10:55:48] Post 13849430 has 48 comments (2) [10:55:48] Post 13849037 has 16 comments (2) [10:55:48] Post 13845337 has 129 comments (2) [10:55:48] Post 13847465 has 29 comments (2) 

Ejem ... Bien, la buena noticia es que la marca de tiempo se encuentra exactamente cinco segundos después, pero ¿qué hay en 0.00 segundos y sin muestras? Y luego la siguiente iteración toma cero segundos y 260 muestras.


Esta es una de las consecuencias de evitar la espera, ahora ya no bloqueamos la rutina y simplemente pasamos a la siguiente línea, que imprime cero segundos y, por primera vez, cero mensajes recuperados. Estas son tareas bastante pequeñas, ya que podemos vivir sin mensajes, pero ¿qué pasa si necesitamos resultados de las tareas?


Entonces, mi amigo, tenemos que recurrir a ... devoluciones de llamada (encogimiento ((()


Lo sé, lo sé, el objetivo de la rutina es evitar devoluciones de llamada, pero eso se debe a que el subtítulo dramático del artículo es "Fuera de esperar". Ya no estamos en territorio de espera , tenemos aventuras con el lanzamiento manual de tareas, lo que lleva a nuestro caso de uso. ¿Qué te da esto? spoiler


Como discutimos anteriormente, sure_future devuelve un objeto Future al que podemos agregar una devolución de llamada usando _add_done callback .


Antes de hacer esto, y para tener las recuperaciones correctas, llegamos al punto de que necesitamos encapsular nuestra rutina de extracción en la clase URLFetcher . En este caso, creamos una instancia para cada tarea para que tengamos el recuento correcto de muestras. También eliminamos la variable global que introdujo el error de todos modos:


 """               ensure_future   sleep.      future,    ensure_future,         ,    URLFetcher   . """ import asyncio import argparse import logging from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.') parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll') parser.add_argument('--limit', type=int, default=5, help='Number of new stories to calculate comments for') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) class URLFetcher(): """   URL     """ def __init__(self): self.fetch_counter = 0 async def fetch(self, session, url): """  URL   aiohttp,    JSON .     aiohttp    . """ with async_timeout.timeout(FETCH_TIMEOUT): self.fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, fetcher, post_id): """          . """ url = URL_TEMPLATE.format(post_id) response = await fetcher.fetch(session, url) #  .  . if response is None or 'kids' not in response: return 0 #     ,    (  ) number_of_comments = len(response['kids']) #       tasks = [post_number_of_comments( loop, session, fetcher, kid_id) for kid_id in response['kids']] # s     results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments async def get_comments_of_top_stories(loop, session, limit, iteration): """    HN. """ fetcher = URLFetcher() # create a new fetcher for this task response = await fetcher.fetch(session, TOP_STORIES_URL) tasks = [post_number_of_comments( loop, session, fetcher, post_id) for post_id in response[:limit]] results = await asyncio.gather(*tasks) for post_id, num_comments in zip(response[:limit], results): log.info("Post {} has {} comments ({})".format( post_id, num_comments, iteration)) return fetcher.fetch_counter # return the fetch count async def poll_top_stories_for_comments(loop, session, period, limit): """         . """ iteration = 1 while True: log.info("Calculating comments for top {} stories. ({})".format(limit, iteration)) future = asyncio.ensure_future(get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info('> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 await asyncio.sleep(period) if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: loop.run_until_complete( poll_top_stories_for_comments( loop, session, args.period, args.limit)) loop.close() 

  [12:23:40] Calculating comments for top 5 stories. (1) [12:23:40] Waiting for 5 seconds... [12:23:43] Post 13848196 has 38 comments (1) [12:23:43] Post 13849430 has 72 comments (1) [12:23:43] Post 13849037 has 19 comments (1) [12:23:43] Post 13848283 has 64 comments (1) [12:23:43] Post 13847465 has 34 comments (1) [12:23:43] > Calculating comments took 3.17 seconds and 233 fetches [12:23:45] Calculating comments for top 5 stories. (2) [12:23:45] Waiting for 5 seconds... [12:23:47] Post 13848196 has 38 comments (2) [12:23:47] Post 13849430 has 72 comments (2) [12:23:47] Post 13849037 has 19 comments (2) [12:23:47] Post 13848283 has 64 comments (2) [12:23:47] Post 13847465 has 34 comments (2) [12:23:47] > Calculating comments took 2.47 seconds and 233 fetches [12:23:50] Calculating comments for top 5 stories. (3) [12:23:50] Waiting for 5 seconds... 

Ok, mejor ya, pero centrémonos en la sección de devolución de llamada:


 async def poll_top_stories_for_comments(loop, session, period, limit): """         . """ iteration = 1 while True: log.info("Calculating comments for top {} stories. ({})".format( limit, iteration)) future = asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 await asyncio.sleep(period) 

Tenga en cuenta que la función de devolución de llamada debe tomar un argumento, en el que se pasa el propio objeto futuro. También devolvemos el número de recuperación de la instancia de URLFetcher como resultado de _get_comments_of_top stories y obtenemos estos datos como resultado del futuro.


¿Ves?Te dije que sería bueno, pero definitivamente no hay que esperar aquí .


callback-, API asyncio AbstractBaseLoop _call later _call at ,
- . , , poll_top_stories_for_comments :


 def poll_top_stories_for_comments(loop, session, period, limit, iteration=0): """     get_comments_of_top_stories. """ log.info("Calculating comments for top {} stories ({})".format( limit, iteration)) future = asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 loop.call_later( period, partial( # or call_at(loop.time() + period) poll_top_stories_for_comments, loop, session, period, limit, iteration ) ) if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: poll_top_stories_for_comments( loop, session, args.period, args.limit) loop.run_forever() loop.close() 

. :


  • , ensure_future .
    ( : )
  • _poll_top_stories_for comments , , _run forever , .

, , , — ? ? URL:


 MAXIMUM_FETCHES = 5 class URLFetcher(): """   URL     """ def __init__(self): self.fetch_counter = 0 async def fetch(self, session, url): """  URL   aiohttp,    JSON .     aiohttp    . """ with async_timeout.timeout(FETCH_TIMEOUT): self.fetch_counter += 1 if self.fetch_counter > MAXIMUM_FETCHES: raise Exception('BOOM!') async with session.get(url) as response: return await response.json() 

  [12:51:00] Calculating comments for top 5 stories. (1) [12:51:00] Waiting for 5 seconds… [12:51:01] Exception in callback poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion('BOOM!',)>) at 05_periodic_coroutines.py:121 handle: <Handle poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion('BOOM!',)>) at 05_periodic_coroutines.py:121> Traceback (most recent call last): File “/Users/yeray/.pyenv/versions/3.6.0/lib/python3.6/asyncio/events.py”, line 126, in _run self._callback(*self._args) File “05_periodic_coroutines.py”, line 122, in callback fetch_count = fut.result() File “05_periodic_coroutines.py”, line 100, in get_comments_of_top_stories results = await asyncio.gather(*tasks) File “05_periodic_coroutines.py”, line 69, in post_number_of_comments response = await fetcher.fetch(session, url) File “05_periodic_coroutines.py”, line 58, in fetch raise Exception('BOOM!') Exception: BOOM! [12:51:05] Calculating comments for top 5 stories. (2) [12:51:05] Waiting for 5 seconds… 

, ?


Que hacer , , : asyncio :

Source: https://habr.com/ru/post/es420129/


All Articles