Prefácio do tradutor:
Mais uma vez pisando em um ancinho enquanto trabalhava com python assíncio, fui à Internet para encontrar algo mais agradável que a documentação seca. Me deparei com um artigo de Yeray Diaz "Padrões assíncronos de corotina: além do esperado" , no qual o autor considera muito interessante o uso do assíncio e compartilha alguns truques. Como não encontrei nada tão completo em russo, decidi traduzi-lo.
O Asyncio é o sonho competitivo de um programador em python: você escreve código parecido com o síncrono e deixa o Python fazer o resto. Esta é outra importação da biblioteca antigravidade: import antigravity
Na verdade, isso não é verdade, a programação simultânea é um trabalho árduo e, embora as corotinas nos permitam evitar avisos de retorno de chamada, o que pode levá-lo longe o suficiente, você ainda precisa pensar em criar tarefas, obter resultados e capturar exceções com elegância. Isso é triste
A boa notícia é que tudo isso é possível em assincronismo. A má notícia é que nem sempre é óbvio o que está errado e como corrigi-lo. Abaixo estão alguns padrões que eu descobri enquanto trabalhava com asyncio.
Antes de começarmos:
Usei minha biblioteca aiohttp favorita para executar solicitações HTTP assíncronas e a API Hacker News, porque é um site simples e conhecido que segue um cenário de uso familiar. Após a resposta ao meu artigo anterior , também usei a sintaxe async / waitit introduzida no Python 3.5. Supus que o leitor estivesse familiarizado com as idéias descritas aqui. E, finalmente, todos os exemplos estão disponíveis no repositório GitHub deste artigo .
Ok, vamos começar!
Corotinas recursivas
Criar e executar tarefas é trivial no assíncio. Para essas tarefas, a API inclui vários métodos na classe AbstractEventLoop, além de funções na biblioteca. Mas geralmente você deseja combinar os resultados dessas tarefas e processá-los de alguma forma, e a recursão é um ótimo exemplo desse esquema e também demonstra a simplicidade da corotina em comparação com outras ferramentas da concorrência.
Um caso comum para o uso de assíncrono é criar algum tipo de rastreador da web. Imagine que estamos ocupados demais para verificar o HackerNews, ou talvez você goste de um bom holivar, então você deseja implementar um sistema que recupere o número de comentários para uma postagem específica do HN e, se estiver acima do limite, notifica você. Você pesquisou um pouco no Google e encontrou a documentação para a API HN, exatamente o que precisa, mas observou o seguinte na documentação:
Deseja saber o número total de comentários de artigos? Contorne a árvore e conte-os.
Chamada aceita!
""" , , . , Hacker News """ import asyncio import argparse import logging from urllib.parse import urlparse, parse_qs from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the comments of a Hacker News post.') parser.add_argument('--id', type=int, default=8863, help='ID of the post in HN, defaults to 8863') parser.add_argument('--url', type=str, help='URL of a post in HN') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) fetch_counter = 0 async def fetch(session, url): """ URL aiohttp, JSON . aiohttp . """ global fetch_counter with async_timeout.timeout(FETCH_TIMEOUT): fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, post_id): """ . """ url = URL_TEMPLATE.format(post_id) now = datetime.now() response = await fetch(session, url) log.debug('{:^6} > Fetching of {} took {} seconds'.format( post_id, url, (datetime.now() - now).total_seconds())) if 'kids' not in response:
Sinta-se à vontade para tentar executar o script com o sinalizador "-verbose" para obter uma saída mais detalhada.
[14:47:32] > Calculating comments took 2.23 seconds and 73 fetches [14:47:32] -- Post 8863 has 72 comments
Vamos pular o código padrão e ir direto para a rotina recursiva. Observe que esse código é lido quase completamente como seria com o código síncrono.
async def post_number_of_comments(loop, session, post_id): """ . """ url = URL_TEMPLATE.format(post_id) now = datetime.now() response = await fetch(session, url) log.debug('{:^6} > Fetching of {} took {} seconds'.format( post_id, url, (datetime.now() - now).total_seconds())) if 'kids' not in response:
- Primeiro, obtemos JSON com os dados da postagem.
- Recursivamente, contorne cada um dos herdeiros.
- No final, alcançaremos o caso base e retornaremos zero,
quando a postagem não tiver comentários. - Ao retornar do caso base, adicione as respostas à postagem atual
ao número de herdeiros e retornar
Este é um ótimo exemplo do que Brett Slatkin descreve como fan-in e fan-out ; somos fan-out para receber dados dos herdeiros e fan-in resume os dados para calcular o número de comentários
Existem algumas maneiras na API assíncrona de executar essas operações de dispersão. Aqui, uso a função de coleta , que espera efetivamente todas as corotinas para concluir e retornar uma lista de seus resultados.
Vamos prestar atenção em como o uso da corotina também corresponde bem à recursão em qualquer ponto em que esteja presente um número de corotinas, aguardando respostas para seus pedidos durante a chamada para a função de coleta e retomando a execução após a conclusão da operação de E / S. Isso nos permite expressar um comportamento bastante complexo em uma corutina elegante e (facilmente) legível.
"Muito simples", você diz? Ok, vamos subir um pouco.
Atirou e esqueceu
Imagine que você deseja enviar para si mesmo uma mensagem de email com postagens que tenham mais comentários que um determinado valor e faça isso da mesma maneira que percorremos a árvore de postagens. Podemos simplesmente adicionar a instrução if no final da função recursiva para conseguir isso:
async def post_number_of_comments(loop, session, post_id): """ . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response:
Sim, eu usei asyncio.sleep. Esta é a última vez. Eu prometo.
[09:41:02] Post logged [09:41:04] Post logged [09:41:06] Post logged [09:41:06] > Calculating comments took 6.35 seconds and 73 fetches [09:41:06] -- Post 8863 has 72 comments
Isso é significativamente mais lento do que antes!
O motivo é que, como discutimos anteriormente, aguardamos uma pausa na execução da rotina até que o futuro esteja completo, mas como não precisamos do resultado do log, não há motivo real para fazer isso.
Precisamos "disparar e esquecer" com nossa corotina e, como não podemos esperar que ela termine de esperar, precisamos de outra maneira de iniciar a corotina sem esperar. Uma rápida olhada na API assíncrona encontrará a função sure_future que agendará a execução da corrotina, envolva-a em um objeto Tarefa e retorne-a. Lembrando que antes que a corotina fosse planejada, um ciclo de eventos controlará o resultado de nossa corotina em algum momento no futuro, quando outra corotina estiver em um estado de expectativa.
Ótimo, vamos substituir aguardar log_post da seguinte maneira:
async def post_number_of_comments(loop, session, post_id): """ . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response:
[09:42:57] > Calculating comments took 1.69 seconds and 73 fetches [09:42:57] -- Post 8863 has 72 comments [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1109197f8>()]>> [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919948>()]>> [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919978>()]>>
Ahem, a tarefa impressionante foi destruída, mas está pendente! assombrando usuários assíncronos em todo o mundo. A boa notícia é que voltamos ao tempo que recebemos anteriormente (1,69 p.). A má notícia é que o assíncio não gosta de ir além do alcance de atirar e esquecer.
O problema é que fechamos com força o loop de eventos depois de obtermos o resultado da rotina post_number_of_comments , sem deixar tempo para concluir a tarefa log_post .
Temos duas opções:
deixamos o loop de eventos funcionar infinitamente usando run_forever e finalizamos manualmente o script, ou usamos o método all_tasks da classe Task para encontrar todas as tarefas de trabalho e aguardar o cálculo do número de comentários.
Vamos tentar sair dessa situação fazendo alterações rapidamente após nossa chamada para post_number_of_comments :
if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) pending_tasks = [ task for task in asyncio.Task.all_tasks() if not task.done()] loop.run_until_complete(asyncio.gather(*pending_tasks)) loop.close()
[09:47:29] > Calculating comments took 1.72 seconds and 73 fetches [09:47:29] — Post 8863 has 72 comments [09:47:30] Post logged [09:47:31] Post logged [09:47:32] Post logged
Agora temos certeza de que as tarefas de log foram concluídas.
A suposição de que o método all_tasks funciona bem nos casos com os quais estamos lidando é uma ótima idéia quando as tarefas são executadas adequadamente em nosso loop de eventos, mas em casos mais complexos, pode haver várias tarefas que podem ser executadas, cuja origem pode estar localizada fora do nosso código .
Outra abordagem é restaurar a ordem depois de registrar independentemente absolutamente todas as corotinas que planejamos lançar e permitir que sejam executadas, que foram adiadas anteriormente,
assim que a contagem dos comentários for concluída. Como você sabe, a função sure_future retorna um objeto Task . Podemos usá-lo para registrar nossas tarefas com baixa prioridade. Vamos apenas definir uma lista task_registry e armazenar futuros nela:
async def post_number_of_comments(loop, session, post_id): """Retrieve data for current post and recursively for all comments. """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response:
[09:53:46] > Calculating comments took 1.68 seconds and 73 fetches [09:53:46] — Post 8863 has 72 comments [09:53:46] Post logged [09:53:48] Post logged [09:53:49] Post logged
Aprendemos a lição a seguir - o assíncio não deve ser visto como uma fila de tarefas distribuída como o Aipo . Todas as tarefas são iniciadas em um encadeamento e o ciclo de eventos deve ser gerenciado adequadamente, permitindo que você aloque tempo para concluir as tarefas.
O que leva a outro padrão geralmente aceito:
Corotinas acionadas periodicamente
Continuando com nosso exemplo sobre HN (e fizemos um ótimo trabalho anteriormente), decidimos
o que é decisivamente importante para calcular o número de comentários na publicação HN assim que estiverem disponíveis e enquanto estiverem na lista de 5 entradas recentes.
Uma rápida olhada na API do HN mostra um ponto de extremidade que retorna os últimos 500 registros. Ótimo, então podemos simplesmente pesquisar esse endpoint para receber novas publicações e calcular o número de comentários sobre elas, digamos a cada cinco segundos.
Bem, como agora estamos migrando para a pesquisa periódica, podemos simplesmente usar o loop while infinito, aguardar a conclusão da tarefa de pesquisa (chamada em espera ) e adormecer (chamada em espera ) pelo tempo necessário. Fiz algumas pequenas alterações para obter as principais entradas em vez de usar o URL da postagem direta.
""" An example of periodically scheduling coroutines using an infinite loop of awaiting and sleeping. """ import asyncio import argparse import logging from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.') parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll') parser.add_argument('--limit', type=int, default=5,help='Number of new stories to calculate comments for') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) fetch_counter = 0 async def fetch(session, url): """ URL aiohttp, JSON . aiohttp . """ global fetch_counter with async_timeout.timeout(FETCH_TIMEOUT): fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, post_id): """ . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response:
[10:14:03] Calculating comments for top 5 stories. (1) [10:14:06] Post 13848196 has 31 comments (1) [10:14:06] Post 13849430 has 37 comments (1) [10:14:06] Post 13849037 has 15 comments (1) [10:14:06] Post 13845337 has 128 comments (1) [10:14:06] Post 13847465 has 27 comments (1) [10:14:06] > Calculating comments took 2.96 seconds and 244 fetches [10:14:06] Waiting for 5 seconds… [10:14:11] Calculating comments for top 5 stories. (2) [10:14:14] Post 13848196 has 31 comments (2) [10:14:14] Post 13849430 has 37 comments (2) [10:14:14] Post 13849037 has 15 comments (2) [10:14:14] Post 13845337 has 128 comments (2) [10:14:14] Post 13847465 has 27 comments (2) [10:14:14] > Calculating comments took 3.04 seconds and 244 fetches [10:14:14] Waiting for 5 seconds…
Ótimo, mas há um pequeno problema: se você prestou atenção ao carimbo de hora,
a tarefa não inicia estritamente a cada 5 segundos, é iniciada 5 segundos após a conclusão de get_comments_of_top_stories . Novamente, as conseqüências do uso aguardam e bloqueiam até recuperarmos nossos resultados.
Esses recursos não representam um problema quando a tarefa leva mais de cinco segundos. Além disso, parece incorreto usar _run_until complete quando a corotina é projetada para ser infinita.
A boa notícia é que agora somos especialistas em sure_future , e podemos simplesmente inseri- lo no código em vez de esperar ...
async def poll_top_stories_for_comments(loop, session, period, limit): """ . """ global fetch_counter iteration = 1 while True: now = datetime.now() log.info("Calculating comments for top {} stories. ({})".format( limit, iteration)) asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("Waiting for {} seconds...".format(period)) iteration += 1 fetch_counter = 0 await asyncio.sleep(period)
[10:55:40] Calculating comments for top 5 stories. (1) [10:55:40] > Calculating comments took 0.00 seconds and 0 fetches [10:55:40] Waiting for 5 seconds… [10:55:43] Post 13848196 has 32 comments (1) [10:55:43] Post 13849430 has 48 comments (1) [10:55:43] Post 13849037 has 16 comments (1) [10:55:43] Post 13845337 has 129 comments (1) [10:55:43] Post 13847465 has 29 comments (1) [10:55:45] Calculating comments for top 5 stories. (2) [10:55:45] > Calculating comments took 0.00 seconds and 260 fetches [10:55:45] Waiting for 5 seconds… [10:55:48] Post 13848196 has 32 comments (2) [10:55:48] Post 13849430 has 48 comments (2) [10:55:48] Post 13849037 has 16 comments (2) [10:55:48] Post 13845337 has 129 comments (2) [10:55:48] Post 13847465 has 29 comments (2)
Ahem ... Ok, a boa notícia é que o registro de data e hora está localizado exatamente cinco segundos depois, mas o que há em 0,00 segundos e nenhuma amostra? E a próxima iteração leva zero segundos e 260 amostras?
Essa é uma das conseqüências de evitar a espera, agora não bloqueamos mais a rotina e simplesmente passamos para a próxima linha, que imprime zero segundos e, pela primeira vez, zero mensagens recuperadas. Essas são tarefas muito pequenas, pois podemos viver sem mensagens, mas e se precisarmos de resultados de tarefas?
Então, meu amigo, precisamos recorrer a ... retornos de chamada (encolhendo ((())
Eu sei, eu sei, o ponto principal da corotina é evitar retornos de chamada, mas isso ocorre porque o subtítulo dramático do artigo é "Fora de espera". Não estamos mais aguardando território, temos aventuras com o lançamento manual de tarefas, o que leva ao nosso caso de uso. O que isso te dá? spoiler
Como discutimos anteriormente, assegure_futura retorna um objeto Future ao qual podemos adicionar um retorno de chamada usando o retorno de chamada _add_done.
Antes de fazer isso, e para obter as buscas corretas, chegamos ao ponto em que precisamos encapsular nossa rotina de extração na classe URLFetcher . Nesse caso, criamos uma instância para cada tarefa, para que tenhamos a contagem correta de amostras. Também removemos a variável global que introduziu o bug de qualquer maneira:
""" ensure_future sleep. future, ensure_future, , URLFetcher . """ import asyncio import argparse import logging from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.') parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll') parser.add_argument('--limit', type=int, default=5, help='Number of new stories to calculate comments for') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) class URLFetcher(): """ URL """ def __init__(self): self.fetch_counter = 0 async def fetch(self, session, url): """ URL aiohttp, JSON . aiohttp . """ with async_timeout.timeout(FETCH_TIMEOUT): self.fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, fetcher, post_id): """ . """ url = URL_TEMPLATE.format(post_id) response = await fetcher.fetch(session, url)
[12:23:40] Calculating comments for top 5 stories. (1) [12:23:40] Waiting for 5 seconds... [12:23:43] Post 13848196 has 38 comments (1) [12:23:43] Post 13849430 has 72 comments (1) [12:23:43] Post 13849037 has 19 comments (1) [12:23:43] Post 13848283 has 64 comments (1) [12:23:43] Post 13847465 has 34 comments (1) [12:23:43] > Calculating comments took 3.17 seconds and 233 fetches [12:23:45] Calculating comments for top 5 stories. (2) [12:23:45] Waiting for 5 seconds... [12:23:47] Post 13848196 has 38 comments (2) [12:23:47] Post 13849430 has 72 comments (2) [12:23:47] Post 13849037 has 19 comments (2) [12:23:47] Post 13848283 has 64 comments (2) [12:23:47] Post 13847465 has 34 comments (2) [12:23:47] > Calculating comments took 2.47 seconds and 233 fetches [12:23:50] Calculating comments for top 5 stories. (3) [12:23:50] Waiting for 5 seconds...
, , callback:
async def poll_top_stories_for_comments(loop, session, period, limit): """ . """ iteration = 1 while True: log.info("Calculating comments for top {} stories. ({})".format( limit, iteration)) future = asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 await asyncio.sleep(period)
, callback , future. (fetch) URLFetcher _get_comments_of_top stories future.
Está vendo? , , await .
callback-, API asyncio AbstractBaseLoop _call later _call at ,
- . , , poll_top_stories_for_comments :
def poll_top_stories_for_comments(loop, session, period, limit, iteration=0): """ get_comments_of_top_stories. """ log.info("Calculating comments for top {} stories ({})".format( limit, iteration)) future = asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 loop.call_later( period, partial(
. :
- , ensure_future .
( : ) - _poll_top_stories_for comments , , _run forever , .
, , , — ? ? URL:
MAXIMUM_FETCHES = 5 class URLFetcher(): """ URL """ def __init__(self): self.fetch_counter = 0 async def fetch(self, session, url): """ URL aiohttp, JSON . aiohttp . """ with async_timeout.timeout(FETCH_TIMEOUT): self.fetch_counter += 1 if self.fetch_counter > MAXIMUM_FETCHES: raise Exception('BOOM!') async with session.get(url) as response: return await response.json()
[12:51:00] Calculating comments for top 5 stories. (1) [12:51:00] Waiting for 5 seconds… [12:51:01] Exception in callback poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion('BOOM!',)>) at 05_periodic_coroutines.py:121 handle: <Handle poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion('BOOM!',)>) at 05_periodic_coroutines.py:121> Traceback (most recent call last): File “/Users/yeray/.pyenv/versions/3.6.0/lib/python3.6/asyncio/events.py”, line 126, in _run self._callback(*self._args) File “05_periodic_coroutines.py”, line 122, in callback fetch_count = fut.result() File “05_periodic_coroutines.py”, line 100, in get_comments_of_top_stories results = await asyncio.gather(*tasks) File “05_periodic_coroutines.py”, line 69, in post_number_of_comments response = await fetcher.fetch(session, url) File “05_periodic_coroutines.py”, line 58, in fetch raise Exception('BOOM!') Exception: BOOM! [12:51:05] Calculating comments for top 5 stories. (2) [12:51:05] Waiting for 5 seconds…
, ?
O que fazer , , : asyncio :