Envoyer des e-mails en utilisant asyncio et aiohttp Ă  partir d'une application Django

Bonjour Ă  tous!

Je dĂ©veloppe et supporte un service de notification sur Ostrovok.ru . Le service est Ă©crit en Python3 et Django. En plus des lettres, push et messages transactionnels, le service entreprend Ă©galement des tĂąches d'envoi massif d'offres commerciales (pas de spam! Faites-moi confiance, les dĂ©sabonnements fonctionnent mieux que les abonnements) aux utilisateurs qui ont donnĂ© leur accord. Au fil du temps, la base de destinataires actifs est passĂ©e Ă  plus d'un million d'adresses, pour lesquelles le service de messagerie n'Ă©tait pas prĂȘt. Je veux parler de la façon dont les nouvelles fonctionnalitĂ©s de Python ont permis d'accĂ©lĂ©rer les envois de masse et d'Ă©conomiser des ressources et des problĂšmes que nous avons dĂ» traiter lors de leur utilisation.



Implémentation de la source


Initialement, les publipostages de masse ont Ă©tĂ© mis en Ɠuvre de la maniĂšre la plus simple: pour chaque destinataire, une tĂąche a Ă©tĂ© placĂ©e dans une file d'attente qui a Ă©tĂ© prise par l'un des 60 travailleurs de masse (une caractĂ©ristique de nos files d'attente est que chaque travailleur travaille dans un processus sĂ©parĂ©), a prĂ©parĂ© le contexte pour cela, rendu le modĂšle, envoyĂ© Une requĂȘte HTTP Ă  Mailgun pour envoyer une lettre et crĂ©Ă© un enregistrement dans la base de donnĂ©es que la lettre a Ă©tĂ© envoyĂ©e. L'envoi complet a durĂ© jusqu'Ă  12 heures, envoyant environ 0,3 lettre par seconde de chaque travailleur et bloquant l'envoi de petites campagnes.



Solution asynchrone


Le profilage rapide a montré que la plupart du temps que les travailleurs passent à établir des connexions avec Mailgun, nous avons donc commencé à regrouper les tùches en blocs, par bloc pour chaque travailleur. Les travailleurs ont commencé à utiliser une connexion avec Mailgun, ce qui a permis de réduire le temps d'envoi à 9 heures, envoyant à chaque travailleur une moyenne de 0,5 lettres par seconde. Le profilage ultérieur a de nouveau montré que travailler avec le réseau prend toujours la plupart du temps, ce qui nous a incités à utiliser asyncio.

Avant de mettre tout le traitement dans le cycle asyncio, nous avons dû réfléchir à une solution à un certain nombre de problÚmes:

  1. Django ORM n'est pas encore capable de fonctionner avec asyncio, cependant, il libĂšre le GIL pendant l'exĂ©cution de la requĂȘte. Cela signifie que les requĂȘtes de base de donnĂ©es peuvent ĂȘtre exĂ©cutĂ©es dans un thread sĂ©parĂ© et ne pas bloquer la boucle principale.
  2. Les versions actuelles d'aiohttp nécessitent Python version 3.6 et supérieure, ce qui, au moment de l'implémentation, nécessitait la mise à jour de l'image docker. Les expériences sur les anciennes versions d'aiohttp et Python 3.5 ont montré que la vitesse d'envoi sur ces versions est beaucoup plus faible que sur les versions plus récentes, et est comparable à l'envoi séquentiel.
  3. Stocker une grande quantité de corutine asyncio conduit rapidement à la dépense de toute la mémoire. Cela signifie qu'il est impossible de pré-préparer toutes les coroutines pour les lettres et de provoquer un cycle pour les traiter, il est nécessaire de préparer les données lorsque vous envoyez des lettres déjà formées.

Compte tenu de toutes les fonctionnalités, nous allons créer à l'intérieur de chacun de nos travailleurs notre cycle asyncio avec la similitude du modÚle ThreadPool, composé de:

  • Un ou plusieurs producteurs travaillant avec une base de donnĂ©es via Django ORM dans un thread sĂ©parĂ© via asyncio.ThreadPoolExecutor. Le fabricant essaie d'agrĂ©ger les demandes d'acquisition de donnĂ©es en petits lots, rend les modĂšles des donnĂ©es reçues via Jinja2 et ajoute les donnĂ©es Ă  envoyer Ă  la file d'attente des tĂąches.

def get_campaign_send_data(ids: Iterable[int]) -> Iterable[Mapping[str, Any]]: """    ,     Django ORM   .""" return [{'id': id} for id in ids] async def mail_campaign_producer(ids: Iterable[int], task_queue: asyncio.Queue) -> None: """           ,    .      ,     ThreadPoolExecutor. """ loop = asyncio.get_event_loop() total = len(ids) for subchunk_start in range(0, total, PRODUCER_SUBCHUNK_SIZE): subchunk_ids = ids[subchunk_start : min(subchunk_start + PRODUCER_SUBCHUNK_SIZE, total)] send_tasks = await loop.run_in_executor(None, get_campaign_send_data, subchunk_ids) for task in send_tasks: await task_queue.put(task) 

  • Plusieurs centaines d'expĂ©diteurs de lettres - des coroutines asyncio, qui, dans un cycle sans fin, lisent les donnĂ©es de la file d'attente des tĂąches, envoient des demandes rĂ©seau pour chacune d'entre elles et mettent le rĂ©sultat (rĂ©ponse ou exception) dans la file d'attente des rapports.

 async def send_mail(data: Mapping[str, Any], session: aiohttp.ClientSession) -> Union[Mapping[str, Any], Exception]: """    .""" async with session.post(REQUEST_URL, data=data) as response: if response.status_code != 200: raise Exception return data async def mail_campaign_sender( task_queue: asyncio.Queue, result_queue: asyncio.Queue, session: aiohttp.ClientSession ) -> None: """        .     task_done,    ,   . """ while True: try: task_data = await task_queue.get() result = await send_mail(task_data, session) await result_queue.put(result) except asyncio.CancelledError: #     raise except Exception as exception: #     await result_queue.put(exception) finally: task_queue.task_done() 

  • Un ou plusieurs travailleurs qui regroupent les donnĂ©es de la file d'attente de rapports et mettent des informations sur le rĂ©sultat de l'envoi d'une lettre Ă  la base de donnĂ©es en masse avec une demande.

 def process_campaign_results(results: Iterable[Union[Mapping[str, Any], Exception]]) -> None: """  :         """ pass async def mail_campaign_reporter(task_queue: asyncio.Queue, result_queue: asyncio.Queue) -> None: """          ThreadPoolExecutor,        . """ loop = asyncio.get_event_loop() results_chunk = [] while True: try: results_chunk.append(await result_queue.get()) if len(results_chunk) >= REPORTER_BATCH_SIZE: await loop.run_in_executor(None, process_campaign_results, results_chunk) results_chunk.clear() except asyncio.CancelledError: await loop.run_in_executor(None, process_campaign_results, results_chunk) results_chunk.clear() raise finally: result_queue.task_done() 

  • La file d'attente des tĂąches, qui est une instance de asyncio.Queue, limitĂ©e par le nombre maximal d'Ă©lĂ©ments afin que le fabricant ne la remplisse pas trop, dĂ©pensant toute la mĂ©moire.
  • Reportez les files d'attente, Ă©galement une instance de asyncio.Queue avec une limite sur le nombre maximal d'Ă©lĂ©ments.
  • Une mĂ©thode asynchrone qui crĂ©e des files d'attente, des travailleurs et termine la distribution en les arrĂȘtant.

 async def send_mail_campaign( recipient_ids: Iterable[int], session: aiohttp.ClientSession, loop: asyncio.AbstractEventLoop = None ) -> None: """       .    ,       . """ executor = ThreadPoolExecutor(max_workers=PRODUCERS_COUNT + 1) loop = loop or asyncio.get_event_loop() loop.set_default_executor(executor) task_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop) result_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop) producers = [ asyncio.ensure_future(mail_campaign_producer(recipient_ids, task_queue)) for _ in range(PRODUCERS_COUNT) ] consumers = [ asyncio.ensure_future(mail_campaign_sender(task_queue, result_queue, session)) for _ in range(SENDERS_COUNT) ] reporter = asyncio.ensure_future(mail_campaign_reporter(task_queue, result_queue)) # ,      done, _ = await asyncio.wait(producers) #    ,   await task_queue.join() while consumers: consumers.pop().cancel() #    ,     await result_queue.join() reporter.cancel() 

  • Le code synchrone qui crĂ©e la boucle et dĂ©marre la distribution.

 async def close_session(future: asyncio.Future, session: aiohttp.ClientSession) -> None: """  ,    .  aiohttp      . """ await asyncio.wait([future]) await asyncio.sleep(0.250) await session.close() def mail_campaign_send_chunk(recipient_ids: Iterable[int]) -> None: """     .   ,  asyncio     . """ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Session connector = aiohttp.TCPConnector(limit_per_host=0, limit=0) session = aiohttp.ClientSession( connector=connector, auth=aiohttp.BasicAuth('api', API_KEY), loop=loop, read_timeout=60 ) send_future = asyncio.ensure_future(send_mail_campaign(recipient_ids, session, loop=loop)) cleanup_future = asyncio.ensure_future(close_session(send_future, session)) loop.run_until_complete(asyncio.wait([send_future, cleanup_future])) loop.close() 

AprĂšs la mise en Ɠuvre de cette solution, le dĂ©lai d'envoi de mailings en masse a Ă©tĂ© rĂ©duit Ă  une heure avec les mĂȘmes volumes d'envoi et 12 collaborateurs impliquĂ©s. Autrement dit, chaque travailleur envoie 20 Ă  25 lettres par seconde, ce qui est 50 Ă  80 fois plus productif que la solution d'origine. La consommation de mĂ©moire des travailleurs a Ă©tĂ© maintenue au niveau initial, la charge du processeur a lĂ©gĂšrement augmentĂ©, l'utilisation du rĂ©seau a augmentĂ© plusieurs fois, ce qui est l'effet attendu. Le nombre de connexions Ă  la base de donnĂ©es a Ă©galement augmentĂ©, car chacun des flux de travailleurs-producteurs et de travailleurs qui enregistrent des rapports travaille activement avec la base de donnĂ©es. Dans le mĂȘme temps, les travailleurs libres peuvent envoyer de petits mailings pendant qu'une campagne de masse est envoyĂ©e.



MalgrĂ© tous les avantages, une telle mise en Ɠuvre prĂ©sente un certain nombre de difficultĂ©s Ă  prendre en compte:

  1. Des prĂ©cautions doivent ĂȘtre prises lors de la manipulation des erreurs. Une exception non gĂ©rĂ©e peut mettre fin au travailleur, provoquant le gel de la campagne.
  2. Lorsque l'envoi est terminé, il est nécessaire de ne pas perdre les rapports sur les destinataires qui n'ont pas terminé le bloc à la fin et de les enregistrer dans la base de données.
  3. La logique de l'arrĂȘt forcĂ© de la reprise des campagnes devient de plus en plus compliquĂ©e, car aprĂšs l'arrĂȘt des travailleurs expĂ©diteurs, il est nĂ©cessaire de comparer quels destinataires ont Ă©tĂ© envoyĂ©s et lesquels ne l'ont pas Ă©tĂ©.
  4. AprÚs un certain temps, le personnel d'assistance de Mailgun nous a contactés et nous a demandé de ralentir la vitesse d'envoi, car les services de messagerie ont commencé à rejeter temporairement les e-mails si la fréquence de leur envoi dépasse la valeur seuil. Ceci est facile à faire en réduisant le nombre de travailleurs.
  5. Il ne serait pas possible d'utiliser asyncio si certaines des Ă©tapes de l'envoi de lettres effectuaient des opĂ©rations exigeantes en termes de processeur. Le rendu des modĂšles utilisant jinja2 s'est avĂ©rĂ© ĂȘtre une opĂ©ration peu gourmande en ressources et n'a pratiquement aucun effet sur la vitesse d'envoi.
  6. L'utilisation d'asyncio pour les listes de diffusion nécessite que les gestionnaires de files d'attente de distribution soient démarrés par des processus distincts.

J'espÚre que notre expérience vous sera utile! Si vous avez des questions ou des idées, écrivez dans les commentaires!

Source: https://habr.com/ru/post/fr482114/


All Articles