Prefacio del traductor:
Una vez más pisando un rastrillo mientras trabajaba con Python Asyncio, fui a Internet para encontrar algo más agradable que la documentación seca. Me encontré con un artículo de Yeray Díaz "Asyncio Coroutine Patterns: Beyond wait" , en el que el autor considera muy interesante el uso de asyncio y comparte algunos trucos. Como no encontré nada tan completo en ruso, decidí traducirlo.
Asyncio es el sueño competitivo de un programador de Python: usted escribe código que bordea la sincronización y deja que Python haga el resto. Esta es otra importación de la biblioteca antigravedad: importar antigravedad
En realidad, esto no es del todo cierto, la programación concurrente es un trabajo arduo, y si bien las rutinas nos permiten evitar el infierno de devolución de llamadas, que puede llevarlo lo suficientemente lejos, aún debe pensar en crear tareas, obtener resultados y capturar excepciones con elegancia. Es triste
La buena noticia es que todo esto es posible en asyncio. La mala noticia es que no siempre es obvio de inmediato qué está mal y cómo solucionarlo. A continuación hay algunos patrones que descubrí mientras trabajaba con asyncio.
Antes de comenzar:
Utilicé mi biblioteca aiohttp favorita para ejecutar solicitudes HTTP asincrónicas y la API de Hacker News porque es un sitio simple y conocido que sigue un escenario de uso familiar. Siguiendo la respuesta a mi artículo anterior , también utilicé la sintaxis async / await introducida en Python 3.5. Supuse que el lector estaba familiarizado con las ideas que se describen aquí. Y, en última instancia, todos los ejemplos están disponibles en el repositorio de GitHub de este artículo .
Ok, empecemos!
Corutinas recursivas
Crear y ejecutar tareas es trivial en asyncio. Para tales tareas, la API incluye varios métodos en la clase AbstractEventLoop, así como funciones en la biblioteca. Pero generalmente desea combinar los resultados de estas tareas y procesarlos de alguna manera, y la recursividad es un gran ejemplo de este esquema, y también demuestra la simplicidad de la rutina en comparación con otras herramientas competitivas.
Un caso común para usar asyncio es crear algún tipo de rastreador web. Imagine que estamos demasiado ocupados para verificar HackerNews, o tal vez le guste un buen holívar, por lo que desea implementar un sistema que recupere el número de comentarios para una publicación HN específica y, si está por encima del umbral, le notifique. Buscaste un poco en Google y encontraste la documentación para la API de HN, justo lo que necesitas, pero notaste lo siguiente en la documentación:
¿Quieres saber el número total de comentarios de artículos? Ve alrededor del árbol y cuéntalos.
¡Llamada aceptada!
""" , , . , Hacker News """ import asyncio import argparse import logging from urllib.parse import urlparse, parse_qs from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the comments of a Hacker News post.') parser.add_argument('--id', type=int, default=8863, help='ID of the post in HN, defaults to 8863') parser.add_argument('--url', type=str, help='URL of a post in HN') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) fetch_counter = 0 async def fetch(session, url): """ URL aiohttp, JSON . aiohttp . """ global fetch_counter with async_timeout.timeout(FETCH_TIMEOUT): fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, post_id): """ . """ url = URL_TEMPLATE.format(post_id) now = datetime.now() response = await fetch(session, url) log.debug('{:^6} > Fetching of {} took {} seconds'.format( post_id, url, (datetime.now() - now).total_seconds())) if 'kids' not in response:
Siéntase libre de intentar ejecutar el script con el indicador "-verbose" para obtener una salida más detallada.
[14:47:32] > Calculating comments took 2.23 seconds and 73 fetches [14:47:32] -- Post 8863 has 72 comments
Salteamos el código repetitivo y vayamos directamente a la rutina recursiva. Tenga en cuenta que este código se lee casi por completo como lo sería con el código síncrono.
async def post_number_of_comments(loop, session, post_id): """ . """ url = URL_TEMPLATE.format(post_id) now = datetime.now() response = await fetch(session, url) log.debug('{:^6} > Fetching of {} took {} seconds'.format( post_id, url, (datetime.now() - now).total_seconds())) if 'kids' not in response:
- Primero obtenemos JSON con los datos de publicación.
- Recursivamente rodear a cada uno de los herederos.
- Al final, llegaremos al caso base y devolveremos cero,
cuando la publicación no tiene comentarios. - Al regresar del caso base, agregue las respuestas a la publicación actual
al número de herederos y regreso
Este es un gran ejemplo de lo que Brett Slatkin describe como fan-in y fan-out , estamos fan-out para recibir datos de los herederos y fan-in resume los datos para calcular el número de comentarios
Hay un par de formas en la API asyncio para realizar estas operaciones de despliegue. Aquí uso la función de recopilación , que efectivamente espera que todas las corutinas se completen y devuelvan una lista de sus resultados.
Prestemos atención a cómo el uso de la rutina también se corresponde con la recursividad con cualquier punto en el que esté presente cualquier cantidad de corutinas, esperando respuestas a sus solicitudes durante la llamada a la función de recopilación y reanudando la ejecución después de que se complete la operación de E / S. Esto nos permite expresar un comportamiento bastante complejo en una corutina elegante y (fácilmente) legible.
"Muy simple", dices? Bien, subamos un nivel.
Disparó y olvidó
Imagine que desea enviarse un mensaje de correo electrónico con publicaciones que tengan más comentarios que un cierto valor, y desea hacerlo de la misma manera que recorrimos el árbol de publicaciones. Simplemente podemos agregar la declaración if al final de la función recursiva para lograr esto:
async def post_number_of_comments(loop, session, post_id): """ . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response:
Sí, solía asyncio.sleep. Esta es la ultima vez. Lo prometo
[09:41:02] Post logged [09:41:04] Post logged [09:41:06] Post logged [09:41:06] > Calculating comments took 6.35 seconds and 73 fetches [09:41:06] -- Post 8863 has 72 comments
¡Esto es significativamente más lento que antes!
La razón es que, como discutimos anteriormente, esperar pausas en la ejecución de la rutina hasta que se complete el futuro, pero dado que no necesitamos el resultado del registro, no hay una razón real para hacerlo.
Necesitamos "disparar y olvidar" con nuestra rutina, y como no podemos esperar a que termine de usar aguardar, necesitamos otra forma de comenzar la rutina sin esperar. Un vistazo rápido a la API de asyncio encontrará la función sure_future que programará la ejecución de la rutina, la envolverá en un objeto Task y la devolverá. Recordando que antes de que se planificara la rutina, un ciclo de eventos controlará el resultado de nuestra rutina en algún momento en el futuro, cuando otra corutina estará en un estado de expectativa.
Genial, reemplace await log_post de la siguiente manera:
async def post_number_of_comments(loop, session, post_id): """ . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response:
[09:42:57] > Calculating comments took 1.69 seconds and 73 fetches [09:42:57] -- Post 8863 has 72 comments [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1109197f8>()]>> [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919948>()]>> [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919978>()]>>
¡Ejem, la increíble tarea fue destruida pero está pendiente! atormentando a los usuarios de asyncio en todo el mundo La buena noticia es que hemos vuelto al tiempo que recibimos anteriormente (1.69 p.). La mala noticia es que a asyncio no le gusta ir más allá del rango de disparar y olvidar.
El problema es que cerramos forzosamente el ciclo de eventos después de obtener el resultado de la rutina post_number_of_comments , sin dejar que finalice nuestra tarea log_post time.
Tenemos dos opciones:
Dejamos que el bucle de eventos funcione interminablemente usando run_forever y finalicemos manualmente el script, o usamos el método all_tasks de la clase Task para encontrar todas las tareas de trabajo y esperar a que termine el cálculo del número de comentarios.
Intentemos salir de esta situación haciendo cambios rápidamente después de nuestra llamada a post_number_of_comments :
if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) pending_tasks = [ task for task in asyncio.Task.all_tasks() if not task.done()] loop.run_until_complete(asyncio.gather(*pending_tasks)) loop.close()
[09:47:29] > Calculating comments took 1.72 seconds and 73 fetches [09:47:29] — Post 8863 has 72 comments [09:47:30] Post logged [09:47:31] Post logged [09:47:32] Post logged
Ahora estamos seguros de que las tareas de registro se han completado.
La suposición de que el método all_tasks funciona bien en los casos que estamos tratando es una gran idea cuando las tareas se realizan adecuadamente en nuestro bucle de eventos, pero en casos más complejos puede haber cualquier cantidad de tareas que se puedan realizar, cuya fuente se puede ubicar fuera de nuestro código .
Otro enfoque es restaurar el orden después de que registremos de manera independiente todas las corutinas que planeamos lanzar y permitir que se ejecuten, lo que se pospuso anteriormente
tan pronto como se complete el recuento de comentarios. Como sabe, la función sure_future devuelve un objeto Task . Podemos usarlo para registrar nuestras tareas con baja prioridad. Solo definamos una lista task_registry y almacenemos futuros en ella:
async def post_number_of_comments(loop, session, post_id): """Retrieve data for current post and recursively for all comments. """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response:
[09:53:46] > Calculating comments took 1.68 seconds and 73 fetches [09:53:46] — Post 8863 has 72 comments [09:53:46] Post logged [09:53:48] Post logged [09:53:49] Post logged
Aprendemos la siguiente lección: asyncio no debe verse como una cola de tareas distribuidas como Celery . Todas las tareas se inician en un hilo y el ciclo de eventos debe administrarse en consecuencia, lo que le permite asignar tiempo para completar las tareas.
Lo que lleva a otro patrón generalmente aceptado:
Corutinas activadas periódicamente
Continuando con nuestro ejemplo sobre HN (e hicimos un gran trabajo antes), decidimos
lo cual es decisivamente importante para calcular el número de comentarios en la publicación de HN tan pronto como estén disponibles y mientras estén en la lista de 5 entradas recientes.
Un vistazo rápido a la API de HN muestra un punto final que devuelve los últimos 500 registros. Genial, así que simplemente podemos sondear este punto final para recibir nuevas publicaciones y calcular la cantidad de comentarios sobre ellas, digamos cada cinco segundos.
Bueno, dado que ahora nos estamos moviendo al sondeo periódico, simplemente podemos usar el ciclo while infinito, esperar a que se complete la tarea de sondeo (llamada en espera ) y quedarnos dormidos ( dormir en llamada) durante el tiempo requerido. Hice algunos cambios menores para obtener entradas principales en lugar de usar la URL de publicación directa.
""" An example of periodically scheduling coroutines using an infinite loop of awaiting and sleeping. """ import asyncio import argparse import logging from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.') parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll') parser.add_argument('--limit', type=int, default=5,help='Number of new stories to calculate comments for') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) fetch_counter = 0 async def fetch(session, url): """ URL aiohttp, JSON . aiohttp . """ global fetch_counter with async_timeout.timeout(FETCH_TIMEOUT): fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, post_id): """ . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response:
[10:14:03] Calculating comments for top 5 stories. (1) [10:14:06] Post 13848196 has 31 comments (1) [10:14:06] Post 13849430 has 37 comments (1) [10:14:06] Post 13849037 has 15 comments (1) [10:14:06] Post 13845337 has 128 comments (1) [10:14:06] Post 13847465 has 27 comments (1) [10:14:06] > Calculating comments took 2.96 seconds and 244 fetches [10:14:06] Waiting for 5 seconds… [10:14:11] Calculating comments for top 5 stories. (2) [10:14:14] Post 13848196 has 31 comments (2) [10:14:14] Post 13849430 has 37 comments (2) [10:14:14] Post 13849037 has 15 comments (2) [10:14:14] Post 13845337 has 128 comments (2) [10:14:14] Post 13847465 has 27 comments (2) [10:14:14] > Calculating comments took 3.04 seconds and 244 fetches [10:14:14] Waiting for 5 seconds…
Genial, pero hay un problema menor: si prestaste atención a la marca de tiempo,
entonces la tarea no comienza estrictamente cada 5 segundos, comienza 5 segundos después de que se completen las get_comments_of_top_stories . Nuevamente, las consecuencias de usar aguardan y bloquean hasta que recuperemos nuestros resultados.
Estas características no representan un problema cuando la tarea lleva más de cinco segundos. Además, parece erróneo usar _run_until complete cuando la rutina está diseñada para ser infinita.
La buena noticia es que ahora somos expertos en sure_future , y podemos simplemente insertarlo en el código en lugar de usar wait ...
async def poll_top_stories_for_comments(loop, session, period, limit): """ . """ global fetch_counter iteration = 1 while True: now = datetime.now() log.info("Calculating comments for top {} stories. ({})".format( limit, iteration)) asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("Waiting for {} seconds...".format(period)) iteration += 1 fetch_counter = 0 await asyncio.sleep(period)
[10:55:40] Calculating comments for top 5 stories. (1) [10:55:40] > Calculating comments took 0.00 seconds and 0 fetches [10:55:40] Waiting for 5 seconds… [10:55:43] Post 13848196 has 32 comments (1) [10:55:43] Post 13849430 has 48 comments (1) [10:55:43] Post 13849037 has 16 comments (1) [10:55:43] Post 13845337 has 129 comments (1) [10:55:43] Post 13847465 has 29 comments (1) [10:55:45] Calculating comments for top 5 stories. (2) [10:55:45] > Calculating comments took 0.00 seconds and 260 fetches [10:55:45] Waiting for 5 seconds… [10:55:48] Post 13848196 has 32 comments (2) [10:55:48] Post 13849430 has 48 comments (2) [10:55:48] Post 13849037 has 16 comments (2) [10:55:48] Post 13845337 has 129 comments (2) [10:55:48] Post 13847465 has 29 comments (2)
Ejem ... Bien, la buena noticia es que la marca de tiempo se encuentra exactamente cinco segundos después, pero ¿qué hay en 0.00 segundos y sin muestras? Y luego la siguiente iteración toma cero segundos y 260 muestras.
Esta es una de las consecuencias de evitar la espera, ahora ya no bloqueamos la rutina y simplemente pasamos a la siguiente línea, que imprime cero segundos y, por primera vez, cero mensajes recuperados. Estas son tareas bastante pequeñas, ya que podemos vivir sin mensajes, pero ¿qué pasa si necesitamos resultados de las tareas?
Entonces, mi amigo, tenemos que recurrir a ... devoluciones de llamada (encogimiento ((()
Lo sé, lo sé, el objetivo de la rutina es evitar devoluciones de llamada, pero eso se debe a que el subtítulo dramático del artículo es "Fuera de esperar". Ya no estamos en territorio de espera , tenemos aventuras con el lanzamiento manual de tareas, lo que lleva a nuestro caso de uso. ¿Qué te da esto? spoiler
Como discutimos anteriormente, sure_future devuelve un objeto Future al que podemos agregar una devolución de llamada usando _add_done callback .
Antes de hacer esto, y para tener las recuperaciones correctas, llegamos al punto de que necesitamos encapsular nuestra rutina de extracción en la clase URLFetcher . En este caso, creamos una instancia para cada tarea para que tengamos el recuento correcto de muestras. También eliminamos la variable global que introdujo el error de todos modos:
""" ensure_future sleep. future, ensure_future, , URLFetcher . """ import asyncio import argparse import logging from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.') parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll') parser.add_argument('--limit', type=int, default=5, help='Number of new stories to calculate comments for') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) class URLFetcher(): """ URL """ def __init__(self): self.fetch_counter = 0 async def fetch(self, session, url): """ URL aiohttp, JSON . aiohttp . """ with async_timeout.timeout(FETCH_TIMEOUT): self.fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, fetcher, post_id): """ . """ url = URL_TEMPLATE.format(post_id) response = await fetcher.fetch(session, url)
[12:23:40] Calculating comments for top 5 stories. (1) [12:23:40] Waiting for 5 seconds... [12:23:43] Post 13848196 has 38 comments (1) [12:23:43] Post 13849430 has 72 comments (1) [12:23:43] Post 13849037 has 19 comments (1) [12:23:43] Post 13848283 has 64 comments (1) [12:23:43] Post 13847465 has 34 comments (1) [12:23:43] > Calculating comments took 3.17 seconds and 233 fetches [12:23:45] Calculating comments for top 5 stories. (2) [12:23:45] Waiting for 5 seconds... [12:23:47] Post 13848196 has 38 comments (2) [12:23:47] Post 13849430 has 72 comments (2) [12:23:47] Post 13849037 has 19 comments (2) [12:23:47] Post 13848283 has 64 comments (2) [12:23:47] Post 13847465 has 34 comments (2) [12:23:47] > Calculating comments took 2.47 seconds and 233 fetches [12:23:50] Calculating comments for top 5 stories. (3) [12:23:50] Waiting for 5 seconds...
Ok, mejor ya, pero centrémonos en la sección de devolución de llamada:
async def poll_top_stories_for_comments(loop, session, period, limit): """ . """ iteration = 1 while True: log.info("Calculating comments for top {} stories. ({})".format( limit, iteration)) future = asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 await asyncio.sleep(period)
Tenga en cuenta que la función de devolución de llamada debe tomar un argumento, en el que se pasa el propio objeto futuro. También devolvemos el número de recuperación de la instancia de URLFetcher como resultado de _get_comments_of_top stories y obtenemos estos datos como resultado del futuro.
¿Ves?Te dije que sería bueno, pero definitivamente no hay que esperar aquí .
callback-, API asyncio AbstractBaseLoop _call later _call at ,
- . , , poll_top_stories_for_comments :
def poll_top_stories_for_comments(loop, session, period, limit, iteration=0): """ get_comments_of_top_stories. """ log.info("Calculating comments for top {} stories ({})".format( limit, iteration)) future = asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 loop.call_later( period, partial(
. :
- , ensure_future .
( : ) - _poll_top_stories_for comments , , _run forever , .
, , , — ? ? URL:
MAXIMUM_FETCHES = 5 class URLFetcher(): """ URL """ def __init__(self): self.fetch_counter = 0 async def fetch(self, session, url): """ URL aiohttp, JSON . aiohttp . """ with async_timeout.timeout(FETCH_TIMEOUT): self.fetch_counter += 1 if self.fetch_counter > MAXIMUM_FETCHES: raise Exception('BOOM!') async with session.get(url) as response: return await response.json()
[12:51:00] Calculating comments for top 5 stories. (1) [12:51:00] Waiting for 5 seconds… [12:51:01] Exception in callback poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion('BOOM!',)>) at 05_periodic_coroutines.py:121 handle: <Handle poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion('BOOM!',)>) at 05_periodic_coroutines.py:121> Traceback (most recent call last): File “/Users/yeray/.pyenv/versions/3.6.0/lib/python3.6/asyncio/events.py”, line 126, in _run self._callback(*self._args) File “05_periodic_coroutines.py”, line 122, in callback fetch_count = fut.result() File “05_periodic_coroutines.py”, line 100, in get_comments_of_top_stories results = await asyncio.gather(*tasks) File “05_periodic_coroutines.py”, line 69, in post_number_of_comments response = await fetcher.fetch(session, url) File “05_periodic_coroutines.py”, line 58, in fetch raise Exception('BOOM!') Exception: BOOM! [12:51:05] Calculating comments for top 5 stories. (2) [12:51:05] Waiting for 5 seconds…
, ?
Que hacer , , : asyncio :