Envie e-mails usando asyncio e aiohttp de um aplicativo Django

Olá pessoal!

Estou desenvolvendo e dando suporte a um serviço de notificação no Ostrovok.ru . O serviço é escrito em Python3 e Django. Além de cartas transacionais, pushes e mensagens, o serviço também realiza as tarefas de envio em massa de ofertas comerciais (não spam! Confie em mim, a remoção da inscrição funciona melhor do que a inscrição) para os usuários que deram o seu consentimento. Com o tempo, a base de destinatários ativos cresceu para mais de um milhão de endereços, para os quais o serviço de correio não estava pronto. Quero falar sobre como os novos recursos do Python tornaram possível acelerar as correspondências em massa, economizar recursos e quais problemas tivemos que lidar ao trabalhar com eles.



Implementação da fonte


Inicialmente, as correspondências em massa foram implementadas da maneira mais simples: para cada destinatário, uma tarefa foi colocada em uma fila que foi realizada por um dos 60 trabalhadores em massa (um recurso de nossas filas é que cada trabalhador trabalha em um processo separado), preparou o contexto para ela, processou o modelo, enviou Uma solicitação HTTP ao Mailgun para enviar uma carta e criou um registro no banco de dados de que a carta foi enviada. A correspondência inteira levou até 12 horas, enviando cerca de 0,3 cartas por segundo de cada trabalhador e bloqueando a correspondência de pequenas campanhas.



Solução assíncrona


A criação de perfil rápido mostrou que na maioria das vezes os trabalhadores gastam no estabelecimento de conexões com o Mailgun, então começamos a agrupar tarefas em partes, por partes para cada trabalhador. Os trabalhadores começaram a usar uma conexão com o Mailgun, o que permitiu reduzir o tempo de envio para 9 horas, enviando para cada trabalhador uma média de 0,5 cartas por segundo. A criação de perfil subsequente mostrou novamente que o trabalho com a rede ainda ocupa a maior parte do tempo, o que nos levou a usar o assíncio.

Antes de colocar todo o processamento no ciclo assíncrono, tivemos que pensar em uma solução para vários problemas:

  1. O Django ORM ainda não pode trabalhar com asyncio, no entanto, libera o GIL durante a execução da consulta. Isso significa que as consultas ao banco de dados podem ser executadas em um thread separado e não bloquear o loop principal.
  2. As versões atuais do aiohttp requerem o Python versão 3.6 e superior, que no momento da implementação exigia a atualização da imagem do docker. Experimentos em versões mais antigas do aiohttp e Python 3.5 mostraram que a velocidade de envio nessas versões é muito menor do que nas versões mais recentes e é comparável ao envio seqüencial.
  3. Armazenar uma grande quantidade de assíncio corutin rapidamente leva ao gasto de toda a memória. Isso significa que é impossível pré-preparar todas as corotinas para cartas e causar um ciclo para processá-las; é necessário preparar os dados à medida que você envia cartas já formadas.

Considerando todos os recursos, criaremos dentro de cada um de nossos trabalhadores nosso ciclo assíncrono com a semelhança do padrão ThreadPool, consistindo em:

  • Um ou mais produtores trabalhando com um banco de dados através do Django ORM em um encadeamento separado por meio de asyncio.ThreadPoolExecutor. O fabricante tenta agregar solicitações de aquisição de dados em pequenos lotes, renderiza modelos para os dados recebidos através do Jinja2 e adiciona dados para envio à fila de tarefas.

def get_campaign_send_data(ids: Iterable[int]) -> Iterable[Mapping[str, Any]]: """    ,     Django ORM   .""" return [{'id': id} for id in ids] async def mail_campaign_producer(ids: Iterable[int], task_queue: asyncio.Queue) -> None: """           ,    .      ,     ThreadPoolExecutor. """ loop = asyncio.get_event_loop() total = len(ids) for subchunk_start in range(0, total, PRODUCER_SUBCHUNK_SIZE): subchunk_ids = ids[subchunk_start : min(subchunk_start + PRODUCER_SUBCHUNK_SIZE, total)] send_tasks = await loop.run_in_executor(None, get_campaign_send_data, subchunk_ids) for task in send_tasks: await task_queue.put(task) 

  • Várias centenas de remetentes de cartas - asyncio coroutines, que em um ciclo interminável leem dados da fila de tarefas, enviam solicitações de rede para cada uma delas e colocam o resultado (resposta ou exceção) na fila de relatórios.

 async def send_mail(data: Mapping[str, Any], session: aiohttp.ClientSession) -> Union[Mapping[str, Any], Exception]: """    .""" async with session.post(REQUEST_URL, data=data) as response: if response.status_code != 200: raise Exception return data async def mail_campaign_sender( task_queue: asyncio.Queue, result_queue: asyncio.Queue, session: aiohttp.ClientSession ) -> None: """        .     task_done,    ,   . """ while True: try: task_data = await task_queue.get() result = await send_mail(task_data, session) await result_queue.put(result) except asyncio.CancelledError: #     raise except Exception as exception: #     await result_queue.put(exception) finally: task_queue.task_done() 

  • Um ou mais trabalhadores que agrupam dados da fila de relatórios e colocam informações sobre o resultado do envio de uma carta ao banco de dados em massa com uma solicitação.

 def process_campaign_results(results: Iterable[Union[Mapping[str, Any], Exception]]) -> None: """  :         """ pass async def mail_campaign_reporter(task_queue: asyncio.Queue, result_queue: asyncio.Queue) -> None: """          ThreadPoolExecutor,        . """ loop = asyncio.get_event_loop() results_chunk = [] while True: try: results_chunk.append(await result_queue.get()) if len(results_chunk) >= REPORTER_BATCH_SIZE: await loop.run_in_executor(None, process_campaign_results, results_chunk) results_chunk.clear() except asyncio.CancelledError: await loop.run_in_executor(None, process_campaign_results, results_chunk) results_chunk.clear() raise finally: result_queue.task_done() 

  • A fila de tarefas, que é uma instância de asyncio.Queue, limitada pelo número máximo de elementos para que o fabricante não a sobrecarregue, gastando toda a memória.
  • Filas de relatórios, também uma instância de asyncio.Queue com um limite no número máximo de itens.
  • Um método assíncrono que cria filas, trabalhadores e conclui a distribuição parando-os.

 async def send_mail_campaign( recipient_ids: Iterable[int], session: aiohttp.ClientSession, loop: asyncio.AbstractEventLoop = None ) -> None: """       .    ,       . """ executor = ThreadPoolExecutor(max_workers=PRODUCERS_COUNT + 1) loop = loop or asyncio.get_event_loop() loop.set_default_executor(executor) task_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop) result_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop) producers = [ asyncio.ensure_future(mail_campaign_producer(recipient_ids, task_queue)) for _ in range(PRODUCERS_COUNT) ] consumers = [ asyncio.ensure_future(mail_campaign_sender(task_queue, result_queue, session)) for _ in range(SENDERS_COUNT) ] reporter = asyncio.ensure_future(mail_campaign_reporter(task_queue, result_queue)) # ,      done, _ = await asyncio.wait(producers) #    ,   await task_queue.join() while consumers: consumers.pop().cancel() #    ,     await result_queue.join() reporter.cancel() 

  • O código síncrono que cria o loop e inicia a distribuição.

 async def close_session(future: asyncio.Future, session: aiohttp.ClientSession) -> None: """  ,    .  aiohttp      . """ await asyncio.wait([future]) await asyncio.sleep(0.250) await session.close() def mail_campaign_send_chunk(recipient_ids: Iterable[int]) -> None: """     .   ,  asyncio     . """ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Session connector = aiohttp.TCPConnector(limit_per_host=0, limit=0) session = aiohttp.ClientSession( connector=connector, auth=aiohttp.BasicAuth('api', API_KEY), loop=loop, read_timeout=60 ) send_future = asyncio.ensure_future(send_mail_campaign(recipient_ids, session, loop=loop)) cleanup_future = asyncio.ensure_future(close_session(send_future, session)) loop.run_until_complete(asyncio.wait([send_future, cleanup_future])) loop.close() 

Após a implementação desta solução, o tempo para o envio de correspondências em massa foi reduzido para uma hora com os mesmos volumes de correspondência e 12 funcionários envolvidos. Ou seja, cada trabalhador envia de 20 a 25 cartas por segundo, o que é 50 a 80 vezes mais produtivo que a solução original. O consumo de memória dos trabalhadores foi mantido no nível inicial, a carga do processador aumentou levemente, a utilização da rede aumentou muitas vezes, que é o efeito esperado. O número de conexões com o banco de dados também aumentou, pois cada um dos fluxos de trabalhadores-produtores e trabalhadores que salvam relatórios trabalham ativamente com o banco de dados. Ao mesmo tempo, trabalhadores livres podem enviar correspondências pequenas enquanto uma campanha em massa é enviada.



Apesar de todas as vantagens, essa implementação apresenta várias dificuldades que devem ser levadas em consideração:

  1. Cuidados devem ser tomados ao lidar com erros. Uma exceção não tratada pode encerrar o trabalhador, causando o congelamento da campanha.
  2. Quando o envio é concluído, é necessário não perder relatórios sobre os destinatários que não concluíram o bloco até o final e salvá-los no banco de dados.
  3. A lógica de interromper à força a retomada de campanhas está ficando mais complicada, porque depois de interromper os trabalhadores que enviam, é necessário comparar quais destinatários receberam cartas e quais não foram.
  4. Depois de algum tempo, a equipe de suporte da Mailgun entrou em contato conosco e pediu que diminuíssemos a velocidade de envio, porque os serviços de correio começaram a rejeitar temporariamente os e-mails se a frequência de envio exceder o valor limite. Isso é fácil, reduzindo o número de trabalhadores.
  5. Não seria possível usar o asyncio se alguns dos estágios do envio de cartas executassem operações que exigem processador. A renderização de modelos usando o jinja2 acabou por ser uma operação que não consome muitos recursos e praticamente não tem efeito na velocidade de envio.
  6. O uso de asyncio para listas de discussão requer que os manipuladores da fila de distribuição sejam iniciados por processos separados.

Espero que nossa experiência seja útil para você! Se você tiver alguma dúvida ou idéia, escreva nos comentários!

Source: https://habr.com/ru/post/pt482114/


All Articles