Senden Sie E-Mails mit asyncio und aiohttp aus einer Django-Anwendung

Hallo allerseits!

Ich entwickle und unterstütze einen Benachrichtigungsdienst in Ostrovok.ru . Der Service ist in Python3 und Django geschrieben. Neben Transaktionsbriefen, Pushs und Nachrichten übernimmt der Dienst auch die Aufgabe des Massenversands von kommerziellen Angeboten (kein Spam! Vertrauen Sie mir, Abmeldungen funktionieren besser als Abonnements) an Benutzer, die dem zugestimmt haben. Im Laufe der Zeit wuchs die Zahl der aktiven Empfänger auf über eine Million Adressen, für die der Mail-Service noch nicht bereit war. Ich möchte darüber sprechen, wie die neuen Python-Funktionen es ermöglichten, Massenmailings zu beschleunigen und Ressourcen zu sparen, und mit welchen Problemen wir bei der Arbeit zu kämpfen hatten.



Source-Implementierung


Am Anfang wurden Massenmailings auf einfachste Weise implementiert: Für jeden Empfänger wurde eine Aufgabe in eine Warteschlange gestellt, die von einem von 60 Massenarbeitern ausgeführt wurde (ein Merkmal unserer Warteschlangen ist, dass jeder Mitarbeiter in einem separaten Prozess arbeitet), der Kontext dafür vorbereitet, die Vorlage gerendert und gesendet Eine HTTP-Anforderung an Mailgun zum Senden eines Briefs und zum Erstellen eines Datensatzes in der Datenbank, in der der Brief gesendet wurde. Das gesamte Mailing dauerte bis zu 12 Stunden. Von jedem Mitarbeiter wurden ca. 0,3 Briefe pro Sekunde verschickt, und das Versenden kleiner Kampagnen wurde blockiert.



Asynchrone Lösung


Schnelle Profilerstellung ergab, dass die meisten Mitarbeiter Zeit damit verbringen, Verbindungen zu Mailgun herzustellen, und so begannen wir, Aufgaben für jeden Mitarbeiter in Gruppen zu gruppieren. Die Arbeiter begannen eine Verbindung mit Mailgun zu nutzen, wodurch sich die Zeit für Mailings auf 9 Stunden reduzierte und jeder Arbeiter durchschnittlich 0,5 Briefe pro Sekunde verschickte. Die anschließende Profilerstellung hat erneut gezeigt, dass die Arbeit mit dem Netzwerk immer noch die meiste Zeit in Anspruch nimmt, was uns dazu veranlasste, Asyncio zu verwenden.

Bevor wir die gesamte Verarbeitung in den Asyncio-Zyklus aufnehmen konnten, mussten wir eine Lösung für eine Reihe von Problemen finden:

  1. Django ORM kann noch nicht mit Asyncio arbeiten, gibt jedoch die GIL während der Abfrageausführung frei. Dies bedeutet, dass Datenbankabfragen in einem separaten Thread ausgeführt werden können und nicht die Hauptschleife blockieren.
  2. Für aktuelle Versionen von aiohttp ist Python Version 3.6 und höher erforderlich. Zum Zeitpunkt der Implementierung musste das Docker-Image aktualisiert werden. Experimente mit älteren Versionen von aiohttp und Python 3.5 haben gezeigt, dass die Sendegeschwindigkeit bei diesen Versionen viel niedriger ist als bei neueren Versionen und mit dem sequentiellen Senden vergleichbar ist.
  3. Das Speichern einer großen Menge von Asyncio corutin führt schnell zur Ausgabe des gesamten Speichers. Dies bedeutet, dass es unmöglich ist, alle Koroutinen für Briefe vorzubereiten und einen Zyklus zu veranlassen, sie zu verarbeiten. Es ist notwendig, die Daten vorzubereiten, während Sie bereits geformte Briefe senden.

Unter Berücksichtigung aller Funktionen erstellen wir in jedem unserer Worker unseren Asynchronisationszyklus mit der Ähnlichkeit des ThreadPool-Musters, bestehend aus:

  • Ein oder mehrere Produzenten, die mit einer Datenbank über Django ORM in einem separaten Thread über asyncio.ThreadPoolExecutor arbeiten. Der Hersteller versucht, Datenerfassungsanforderungen in kleinen Stapeln zusammenzufassen, Vorlagen für über Jinja2 empfangene Daten zu rendern und Daten zum Senden an die Taskwarteschlange hinzuzufügen.

def get_campaign_send_data(ids: Iterable[int]) -> Iterable[Mapping[str, Any]]: """    ,     Django ORM   .""" return [{'id': id} for id in ids] async def mail_campaign_producer(ids: Iterable[int], task_queue: asyncio.Queue) -> None: """           ,    .      ,     ThreadPoolExecutor. """ loop = asyncio.get_event_loop() total = len(ids) for subchunk_start in range(0, total, PRODUCER_SUBCHUNK_SIZE): subchunk_ids = ids[subchunk_start : min(subchunk_start + PRODUCER_SUBCHUNK_SIZE, total)] send_tasks = await loop.run_in_executor(None, get_campaign_send_data, subchunk_ids) for task in send_tasks: await task_queue.put(task) 

  • Mehrere hundert Absender von Briefen - asyncio coroutines, die in einem endlosen Zyklus Daten aus der Task-Warteschlange lesen, für jeden von ihnen Netzwerkanforderungen senden und das Ergebnis (Antwort oder Ausnahme) in die Berichtswarteschlange stellen.

 async def send_mail(data: Mapping[str, Any], session: aiohttp.ClientSession) -> Union[Mapping[str, Any], Exception]: """    .""" async with session.post(REQUEST_URL, data=data) as response: if response.status_code != 200: raise Exception return data async def mail_campaign_sender( task_queue: asyncio.Queue, result_queue: asyncio.Queue, session: aiohttp.ClientSession ) -> None: """        .     task_done,    ,   . """ while True: try: task_data = await task_queue.get() result = await send_mail(task_data, session) await result_queue.put(result) except asyncio.CancelledError: #     raise except Exception as exception: #     await result_queue.put(exception) finally: task_queue.task_done() 

  • Ein oder mehrere Mitarbeiter, die Daten aus der Berichtwarteschlange gruppieren und Informationen zum Ergebnis des Sendens eines Briefs an die Massendatenbank mit einer Anforderung bereitstellen.

 def process_campaign_results(results: Iterable[Union[Mapping[str, Any], Exception]]) -> None: """  :         """ pass async def mail_campaign_reporter(task_queue: asyncio.Queue, result_queue: asyncio.Queue) -> None: """          ThreadPoolExecutor,        . """ loop = asyncio.get_event_loop() results_chunk = [] while True: try: results_chunk.append(await result_queue.get()) if len(results_chunk) >= REPORTER_BATCH_SIZE: await loop.run_in_executor(None, process_campaign_results, results_chunk) results_chunk.clear() except asyncio.CancelledError: await loop.run_in_executor(None, process_campaign_results, results_chunk) results_chunk.clear() raise finally: result_queue.task_done() 

  • Die Task-Warteschlange, bei der es sich um eine Instanz von asyncio.Queue handelt, wird durch die maximale Anzahl von Elementen begrenzt, damit der Hersteller sie nicht überfüllt und den gesamten Speicher belegt.
  • Melden Sie Warteschlangen, auch eine Instanz von asyncio.Queue, mit einer Begrenzung der maximalen Anzahl von Elementen.
  • Eine asynchrone Methode, die Warteschlangen und Worker erstellt und die Verteilung durch Anhalten beendet.

 async def send_mail_campaign( recipient_ids: Iterable[int], session: aiohttp.ClientSession, loop: asyncio.AbstractEventLoop = None ) -> None: """       .    ,       . """ executor = ThreadPoolExecutor(max_workers=PRODUCERS_COUNT + 1) loop = loop or asyncio.get_event_loop() loop.set_default_executor(executor) task_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop) result_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop) producers = [ asyncio.ensure_future(mail_campaign_producer(recipient_ids, task_queue)) for _ in range(PRODUCERS_COUNT) ] consumers = [ asyncio.ensure_future(mail_campaign_sender(task_queue, result_queue, session)) for _ in range(SENDERS_COUNT) ] reporter = asyncio.ensure_future(mail_campaign_reporter(task_queue, result_queue)) # ,      done, _ = await asyncio.wait(producers) #    ,   await task_queue.join() while consumers: consumers.pop().cancel() #    ,     await result_queue.join() reporter.cancel() 

  • Der Synchroncode, der die Schleife erstellt und die Verteilung startet.

 async def close_session(future: asyncio.Future, session: aiohttp.ClientSession) -> None: """  ,    .  aiohttp      . """ await asyncio.wait([future]) await asyncio.sleep(0.250) await session.close() def mail_campaign_send_chunk(recipient_ids: Iterable[int]) -> None: """     .   ,  asyncio     . """ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Session connector = aiohttp.TCPConnector(limit_per_host=0, limit=0) session = aiohttp.ClientSession( connector=connector, auth=aiohttp.BasicAuth('api', API_KEY), loop=loop, read_timeout=60 ) send_future = asyncio.ensure_future(send_mail_campaign(recipient_ids, session, loop=loop)) cleanup_future = asyncio.ensure_future(close_session(send_future, session)) loop.run_until_complete(asyncio.wait([send_future, cleanup_future])) loop.close() 

Nach der Implementierung dieser Lösung konnte die Zeit für den Versand von Massenmailings auf eine Stunde bei gleichem Mailingvolumen und 12 beteiligten Mitarbeitern reduziert werden. Das heißt, jeder Mitarbeiter sendet 20 bis 25 Briefe pro Sekunde, was 50 bis 80 Mal produktiver ist als die ursprüngliche Lösung. Der Speicherverbrauch der Worker wurde auf dem anfänglichen Niveau gehalten, die Prozessorauslastung leicht erhöht, die Netzwerkauslastung um ein Vielfaches erhöht, was den erwarteten Effekt darstellt. Die Anzahl der Verbindungen zur Datenbank hat ebenfalls zugenommen, da jeder Fluss von Arbeiterproduzenten und Arbeitern, die Berichte speichern, aktiv mit der Datenbank arbeitet. Gleichzeitig können freie Mitarbeiter kleine Mailings versenden, während eine Massenkampagne verschickt wird.



Trotz aller Vorteile weist eine solche Implementierung eine Reihe von Schwierigkeiten auf, die berücksichtigt werden müssen:

  1. Beim Umgang mit Fehlern ist Vorsicht geboten. Eine nicht behandelte Ausnahme kann den Worker beenden und dazu führen, dass die Kampagne einfriert.
  2. Wenn das Senden abgeschlossen ist, müssen keine Berichte zu Empfängern verloren gehen, die den Block bis zum Ende nicht abgeschlossen haben, und sie müssen in der Datenbank gespeichert werden.
  3. Die Logik, die Wiederaufnahme von Kampagnen gewaltsam zu stoppen, wird immer komplizierter, da nach dem Stoppen der sendenden Mitarbeiter verglichen werden muss, welche Empfänger Briefe erhalten haben und welche nicht.
  4. Nach einiger Zeit setzten sich die Mitarbeiter des Mailgun-Supports mit uns in Verbindung und baten uns, die Sendegeschwindigkeit zu verlangsamen, da E-Mails vorübergehend abgelehnt wurden, wenn die Häufigkeit des Sendens den Schwellenwert überschritt. Dies ist einfach zu tun, indem die Anzahl der Arbeitnehmer verringert wird.
  5. Es wäre nicht möglich, Asyncio zu verwenden, wenn einige der Phasen des Briefversands prozessorintensive Vorgänge ausführen würden. Das Rendern von Vorlagen mit jinja2 erwies sich als wenig ressourcenintensiv und hat praktisch keine Auswirkungen auf die Sendegeschwindigkeit.
  6. Für die Verwendung von asyncio für Mailinglisten müssen die Verteilungswarteschlangen-Handler von separaten Prozessen gestartet werden.

Ich hoffe, unsere Erfahrung wird für Sie von Nutzen sein! Wenn Sie Fragen oder Anregungen haben, schreiben Sie in die Kommentare!

Source: https://habr.com/ru/post/de482114/


All Articles