Hola a todos!
Estoy desarrollando y apoyando un servicio de notificación en
Ostrovok.ru . El servicio está escrito en Python3 y Django. Además de cartas transaccionales, notificaciones y mensajes, el servicio también realiza las tareas de envío masivo de ofertas comerciales (¡no spam! Confía en mí, las cancelaciones funcionan mejor que las suscripciones) a los usuarios que han dado su consentimiento. Con el tiempo, la base de destinatarios activos creció a más de un millón de direcciones, para las cuales el servicio de correo no estaba listo. Quiero hablar sobre cómo las nuevas características de Python hicieron posible acelerar los correos masivos y ahorrar recursos y qué problemas tuvimos que enfrentar al trabajar con ellos.

Implementación de origen
Inicialmente, los correos masivos se implementaron de la manera más simple: para cada destinatario, una tarea se colocó en una cola que fue tomada por uno de los 60 trabajadores masivos (una característica de nuestras colas es que cada trabajador trabaja en un proceso separado), preparó el contexto para ello, entregó la plantilla, envió Una solicitud HTTP a Mailgun para enviar una carta y creó un registro en la base de datos que la carta fue enviada. El envío completo tardó hasta 12 horas, enviando aproximadamente 0.3 cartas por segundo de cada trabajador y bloqueando el envío de pequeñas campañas.

Solución asincrónica
El perfil rápido mostró que la mayoría del tiempo que los trabajadores dedican a establecer conexiones con Mailgun, por lo que comenzamos a agrupar las tareas en trozos, por trozos, para cada trabajador. Los trabajadores comenzaron a usar una conexión con Mailgun, lo que permitió reducir el tiempo de envío a 9 horas, enviando a cada trabajador un promedio de 0.5 cartas por segundo. Los perfiles posteriores mostraron nuevamente que trabajar con la red todavía ocupa la mayor parte del tiempo, lo que nos llevó a usar asyncio.
Antes de poner todo el procesamiento en el ciclo de asincio, tuvimos que pensar en una solución a una serie de problemas:
- Django ORM aún no puede trabajar con asyncio, sin embargo, libera el GIL durante la ejecución de la consulta. Esto significa que las consultas de la base de datos pueden ejecutarse en un hilo separado y no bloquear el bucle principal.
- Las versiones actuales de aiohttp requieren Python versión 3.6 y superior, que en el momento de la implementación requería actualizar la imagen del acoplador. Los experimentos en versiones anteriores de aiohttp y Python 3.5 mostraron que la velocidad de envío en estas versiones es mucho más baja que en las versiones más nuevas, y es comparable al envío secuencial.
- Almacenar una gran cantidad de asyncio corutin rápidamente lleva al gasto de toda la memoria. Esto significa que es imposible preparar previamente todas las corutinas para las letras y provocar un ciclo para procesarlas, es necesario preparar los datos a medida que envía las letras ya formadas.
Considerando todas las características, crearemos dentro de cada uno de nuestros trabajadores nuestro ciclo de asincio con la similitud del patrón ThreadPool, que consiste en:
- Uno o más productores que trabajan con una base de datos a través de Django ORM en un hilo separado a través de asyncio.ThreadPoolExecutor. El fabricante intenta agrupar las solicitudes de adquisición de datos en pequeños lotes, genera plantillas para los datos recibidos a través de Jinja2 y agrega datos para enviarlos a la cola de tareas.
def get_campaign_send_data(ids: Iterable[int]) -> Iterable[Mapping[str, Any]]: """ , Django ORM .""" return [{'id': id} for id in ids] async def mail_campaign_producer(ids: Iterable[int], task_queue: asyncio.Queue) -> None: """ , . , ThreadPoolExecutor. """ loop = asyncio.get_event_loop() total = len(ids) for subchunk_start in range(0, total, PRODUCER_SUBCHUNK_SIZE): subchunk_ids = ids[subchunk_start : min(subchunk_start + PRODUCER_SUBCHUNK_SIZE, total)] send_tasks = await loop.run_in_executor(None, get_campaign_send_data, subchunk_ids) for task in send_tasks: await task_queue.put(task)
- Varios cientos de remitentes de cartas: asyncio coroutines, que en un ciclo interminable leen datos de la cola de tareas, envían solicitudes de red para cada una de ellas y colocan el resultado (respuesta o excepción) en la cola de informes.
async def send_mail(data: Mapping[str, Any], session: aiohttp.ClientSession) -> Union[Mapping[str, Any], Exception]: """ .""" async with session.post(REQUEST_URL, data=data) as response: if response.status_code != 200: raise Exception return data async def mail_campaign_sender( task_queue: asyncio.Queue, result_queue: asyncio.Queue, session: aiohttp.ClientSession ) -> None: """ . task_done, , . """ while True: try: task_data = await task_queue.get() result = await send_mail(task_data, session) await result_queue.put(result) except asyncio.CancelledError:
- Uno o más trabajadores que agrupan datos de la cola de informes y ponen información sobre el resultado de enviar una carta a la base de datos masiva con una solicitud.
def process_campaign_results(results: Iterable[Union[Mapping[str, Any], Exception]]) -> None: """ : """ pass async def mail_campaign_reporter(task_queue: asyncio.Queue, result_queue: asyncio.Queue) -> None: """ ThreadPoolExecutor, . """ loop = asyncio.get_event_loop() results_chunk = [] while True: try: results_chunk.append(await result_queue.get()) if len(results_chunk) >= REPORTER_BATCH_SIZE: await loop.run_in_executor(None, process_campaign_results, results_chunk) results_chunk.clear() except asyncio.CancelledError: await loop.run_in_executor(None, process_campaign_results, results_chunk) results_chunk.clear() raise finally: result_queue.task_done()
- La cola de tareas, que es una instancia de asyncio.Queue, limitada por el número máximo de elementos para que el fabricante no la sobrellene, gastando toda la memoria.
- Informe colas, también una instancia de asyncio.Queue con un límite en el número máximo de elementos.
- Un método asincrónico que crea colas, trabajadores y completa la distribución deteniéndolos.
async def send_mail_campaign( recipient_ids: Iterable[int], session: aiohttp.ClientSession, loop: asyncio.AbstractEventLoop = None ) -> None: """ . , . """ executor = ThreadPoolExecutor(max_workers=PRODUCERS_COUNT + 1) loop = loop or asyncio.get_event_loop() loop.set_default_executor(executor) task_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop) result_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop) producers = [ asyncio.ensure_future(mail_campaign_producer(recipient_ids, task_queue)) for _ in range(PRODUCERS_COUNT) ] consumers = [ asyncio.ensure_future(mail_campaign_sender(task_queue, result_queue, session)) for _ in range(SENDERS_COUNT) ] reporter = asyncio.ensure_future(mail_campaign_reporter(task_queue, result_queue))
- El código síncrono que crea el bucle e inicia la distribución.
async def close_session(future: asyncio.Future, session: aiohttp.ClientSession) -> None: """ , . aiohttp . """ await asyncio.wait([future]) await asyncio.sleep(0.250) await session.close() def mail_campaign_send_chunk(recipient_ids: Iterable[int]) -> None: """ . , asyncio . """ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop)
Después de implementar esta solución, el tiempo para enviar correos masivos se redujo a una hora con los mismos volúmenes de correo y 12 trabajadores involucrados. Es decir, cada trabajador envía 20-25 cartas por segundo, que es 50-80 veces más productivo que la solución original. El consumo de memoria de los trabajadores se mantuvo en el nivel inicial, la carga del procesador aumentó ligeramente, la utilización de la red aumentó muchas veces, que es el efecto esperado. El número de conexiones a la base de datos también ha aumentado, ya que cada uno de los flujos de trabajadores-productores y trabajadores que guardan informes trabajan activamente con la base de datos. Al mismo tiempo, los trabajadores gratuitos pueden enviar pequeños correos mientras se envía una campaña masiva.

A pesar de todas las ventajas, dicha implementación tiene una serie de dificultades que deben tenerse en cuenta:
- Se debe tener cuidado al manejar errores. Una excepción no controlada puede terminar con el trabajador, haciendo que la campaña se congele.
- Cuando se completa el envío, es necesario no perder informes sobre los destinatarios que no han completado el fragmento hasta el final, y guardarlos en la base de datos.
- La lógica de detener por la fuerza la reanudación de las campañas se está volviendo más complicada, porque después de detener a los trabajadores que envían, es necesario comparar qué destinatarios recibieron cartas y cuáles no.
- Después de un tiempo, el personal de soporte de Mailgun nos contactó y nos pidió que reduzcamos la velocidad de envío, porque los servicios de correo comenzaron a rechazar temporalmente los correos electrónicos si la frecuencia de su envío supera el valor umbral. Esto es fácil de hacer al reducir el número de trabajadores.
- No sería posible usar asyncio si algunas de las etapas de envío de cartas realizaran operaciones que requieran un procesador. La presentación de plantillas con jinja2 resultó ser una operación que no requiere muchos recursos y prácticamente no tiene ningún efecto en la velocidad de envío.
- El uso de asyncio para listas de correo requiere que los manejadores de colas de distribución se inicien mediante procesos separados.
¡Espero que nuestra experiencia te sea útil! Si tiene alguna pregunta o idea, ¡escriba los comentarios!