إرسال رسائل البريد الإلكتروني باستخدام asyncio و aiohttp من تطبيق Django

مرحبا بالجميع!

أقوم بتطوير ودعم خدمة إعلام في Ostrovok.ru . الخدمة مكتوبة في Python3 و Django. بالإضافة إلى خطابات المعاملات والدفعات والرسائل ، تتولى الخدمة أيضًا مهام المراسلات الجماعية للعروض التجارية (وليس البريد العشوائي! ثق بي ، تعمل إلغاء الاشتراكات بشكل أفضل من الاشتراكات) للمستخدمين الذين منحوا الموافقة على ذلك. بمرور الوقت ، نمت قاعدة المستلمين النشطين إلى أكثر من مليون عنوان ، لم تكن خدمة البريد جاهزة لها. أريد أن أتحدث عن كيف مكّنت ميزات Python الجديدة من تسريع المراسلات الجماعية وتوفير الموارد والمشاكل التي كان علينا التعامل معها عند العمل معهم.



تنفيذ المصدر


في البداية ، تم تنفيذ المراسلات الجماعية بأبسط الطرق: لكل مستلم ، وضعت مهمة في طابور اتخذه واحد من 60 عاملاً من العمال (ميزة من قوائم الانتظار لدينا هي أن كل عامل يعمل في عملية منفصلة) ، أعد السياق له ، وقدم القالب ، وأرسل طلب HTTP إلى Mailgun لإرسال خطاب وإنشاء سجل في قاعدة البيانات التي تم إرسال الرسالة. استغرق البريد بأكمله ما يصل إلى 12 ساعة ، وإرسال حوالي 0.3 رسائل في الثانية الواحدة من كل عامل ومنع البريد من الحملات الصغيرة.



حل غير متزامن


أظهر التوصيف السريع أن معظم الوقت الذي يقضيه العمال في إنشاء اتصالات مع Mailgun ، لذلك بدأنا في تجميع المهام في مجموعات ، بواسطة قطعة لكل عامل. بدأ العمال في استخدام اتصال واحد مع Mailgun ، مما سمح بتقليص وقت إرسال الرسائل إلى 9 ساعات ، حيث أرسل لكل عامل 0.5 حرفًا في المتوسط ​​في الثانية. أظهر التوصيف التالي مرة أخرى أن العمل مع الشبكة لا يزال يستغرق معظم الوقت ، مما دفعنا إلى استخدام المزامنة.

قبل وضع كل المعالجة في دورة التزامن ، كان علينا التفكير في حل لعدد من المشاكل:

  1. Django ORM غير قادر حتى الآن على العمل مع asyncio ، ومع ذلك ، فإنه يطلق GIL أثناء تنفيذ الاستعلام. هذا يعني أنه يمكن تنفيذ استعلامات قاعدة البيانات في سلسلة رسائل منفصلة وليس حظر الحلقة الرئيسية.
  2. تتطلب الإصدارات الحالية من aiohttp إصدار Python 3.6 والإصدارات الأحدث ، والتي كانت وقت التنفيذ تتطلب تحديث صورة عامل النقل. أظهرت التجارب على الإصدارات الأقدم من aiohttp و Python 3.5 أن سرعة الإرسال على هذه الإصدارات أقل بكثير من الإصدارات الأحدث ، ويمكن مقارنتها بالإرسال المتسلسل.
  3. تخزين كمية كبيرة من asyncio corutin بسرعة يؤدي إلى إنفاق كل الذاكرة. هذا يعني أنه من المستحيل التحضير المسبق لجميع coroutines للحروف ويتسبب في دورة لدراستها ، فمن الضروري إعداد البيانات كما كنت ترسل رسائل شكلت بالفعل.

بالنظر إلى جميع الميزات ، سننشئ داخل كل من العاملين لدينا دورة التزامن لدينا مع تشابه نمط ThreadPool ، الذي يتكون من:

  • منتج واحد أو أكثر يعمل مع قاعدة بيانات من خلال Django ORM في مؤشر ترابط منفصل من خلال asyncio.ThreadPoolExecutor. تحاول الشركة المصنعة تجميع طلبات الحصول على البيانات في مجموعات صغيرة ، وتقديم قوالب للبيانات المستلمة من خلال Jinja2 وإضافة بيانات لإرسالها إلى قائمة انتظار المهام.

def get_campaign_send_data(ids: Iterable[int]) -> Iterable[Mapping[str, Any]]: """    ,     Django ORM   .""" return [{'id': id} for id in ids] async def mail_campaign_producer(ids: Iterable[int], task_queue: asyncio.Queue) -> None: """           ,    .      ,     ThreadPoolExecutor. """ loop = asyncio.get_event_loop() total = len(ids) for subchunk_start in range(0, total, PRODUCER_SUBCHUNK_SIZE): subchunk_ids = ids[subchunk_start : min(subchunk_start + PRODUCER_SUBCHUNK_SIZE, total)] send_tasks = await loop.run_in_executor(None, get_campaign_send_data, subchunk_ids) for task in send_tasks: await task_queue.put(task) 

  • عدة مئات من مرسلي الرسائل - coroutines asyncio ، الذين يقرؤون في دورة لا نهاية لها البيانات من قائمة انتظار المهام ، يرسلون طلبات الشبكة لكل منهم ووضع النتيجة (الاستجابة أو الاستثناء) في قائمة انتظار التقرير.

 async def send_mail(data: Mapping[str, Any], session: aiohttp.ClientSession) -> Union[Mapping[str, Any], Exception]: """    .""" async with session.post(REQUEST_URL, data=data) as response: if response.status_code != 200: raise Exception return data async def mail_campaign_sender( task_queue: asyncio.Queue, result_queue: asyncio.Queue, session: aiohttp.ClientSession ) -> None: """        .     task_done,    ,   . """ while True: try: task_data = await task_queue.get() result = await send_mail(task_data, session) await result_queue.put(result) except asyncio.CancelledError: #     raise except Exception as exception: #     await result_queue.put(exception) finally: task_queue.task_done() 

  • عامل واحد أو أكثر يجمعون بيانات من قائمة انتظار التقارير ويضعون معلومات عن نتيجة إرسال خطاب إلى قاعدة البيانات المجمعة مع طلب.

 def process_campaign_results(results: Iterable[Union[Mapping[str, Any], Exception]]) -> None: """  :         """ pass async def mail_campaign_reporter(task_queue: asyncio.Queue, result_queue: asyncio.Queue) -> None: """          ThreadPoolExecutor,        . """ loop = asyncio.get_event_loop() results_chunk = [] while True: try: results_chunk.append(await result_queue.get()) if len(results_chunk) >= REPORTER_BATCH_SIZE: await loop.run_in_executor(None, process_campaign_results, results_chunk) results_chunk.clear() except asyncio.CancelledError: await loop.run_in_executor(None, process_campaign_results, results_chunk) results_chunk.clear() raise finally: result_queue.task_done() 

  • قائمة انتظار المهام ، والتي هي مثيل asyncio.Queue ، محدودة بعدد أكبر من العناصر بحيث لا تقوم الشركة المصنعة بملئها ، وتنفق كل الذاكرة.
  • قوائم انتظار التقرير ، وكذلك مثيل asyncio.Queue مع حد أقصى لعدد العناصر.
  • طريقة غير متزامنة تنشئ قوائم انتظار ، عمال ، وتكمل التوزيع بإيقافها.

 async def send_mail_campaign( recipient_ids: Iterable[int], session: aiohttp.ClientSession, loop: asyncio.AbstractEventLoop = None ) -> None: """       .    ,       . """ executor = ThreadPoolExecutor(max_workers=PRODUCERS_COUNT + 1) loop = loop or asyncio.get_event_loop() loop.set_default_executor(executor) task_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop) result_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop) producers = [ asyncio.ensure_future(mail_campaign_producer(recipient_ids, task_queue)) for _ in range(PRODUCERS_COUNT) ] consumers = [ asyncio.ensure_future(mail_campaign_sender(task_queue, result_queue, session)) for _ in range(SENDERS_COUNT) ] reporter = asyncio.ensure_future(mail_campaign_reporter(task_queue, result_queue)) # ,      done, _ = await asyncio.wait(producers) #    ,   await task_queue.join() while consumers: consumers.pop().cancel() #    ,     await result_queue.join() reporter.cancel() 

  • الرمز المتزامن الذي ينشئ الحلقة ويبدأ التوزيع.

 async def close_session(future: asyncio.Future, session: aiohttp.ClientSession) -> None: """  ,    .  aiohttp      . """ await asyncio.wait([future]) await asyncio.sleep(0.250) await session.close() def mail_campaign_send_chunk(recipient_ids: Iterable[int]) -> None: """     .   ,  asyncio     . """ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Session connector = aiohttp.TCPConnector(limit_per_host=0, limit=0) session = aiohttp.ClientSession( connector=connector, auth=aiohttp.BasicAuth('api', API_KEY), loop=loop, read_timeout=60 ) send_future = asyncio.ensure_future(send_mail_campaign(recipient_ids, session, loop=loop)) cleanup_future = asyncio.ensure_future(close_session(send_future, session)) loop.run_until_complete(asyncio.wait([send_future, cleanup_future])) loop.close() 

بعد تنفيذ هذا الحل ، تم تقليل وقت إرسال المراسلات الجماعية إلى ساعة واحدة بنفس وحدات التخزين البريدية و 12 عاملاً معنيًا. أي أن كل عامل يرسل 20 إلى 25 حرفًا في الثانية ، وهو أكثر إنتاجية بنسبة 50-80 مرة من الحل الأصلي. زيادة استهلاك الذاكرة من العمال في المستوى الأولي ، وزيادة الحمل المعالج قليلا ، وزيادة استخدام الشبكة عدة مرات ، وهو التأثير المتوقع. كما زاد عدد الاتصالات بقاعدة البيانات ، حيث أن كل تدفقات المنتجين العاملين والعمال الذين يقومون بحفظ التقارير يعملون بنشاط مع قاعدة البيانات. في الوقت نفسه ، يمكن للعمال الأحرار إرسال رسائل صغيرة أثناء إرسال حملة جماهيرية.



على الرغم من كل المزايا ، يحتوي هذا التطبيق على عدد من الصعوبات التي يجب مراعاتها:

  1. يجب توخي الحذر عند التعامل مع الأخطاء. استثناء غير معالج قد ينهي العامل ، مما يتسبب في تجميد الحملة.
  2. عند اكتمال الإرسال ، من الضروري عدم فقد التقارير المتعلقة بالمستلمين الذين لم يكملوا المقطع حتى النهاية ، وحفظهم في قاعدة البيانات.
  3. إن منطق الإجبار على إيقاف استئناف الحملات يزداد تعقيدًا ، لأنه بعد إيقاف العمال المرسلين ، من الضروري مقارنة المستلمين الذين أرسلوا رسائل وأيهم لم يكن.
  4. بعد مرور بعض الوقت ، اتصل بنا موظفو دعم Mailgun وطلبوا منا إبطاء سرعة الإرسال ، لأن خدمات البريد بدأت ترفض رسائل البريد الإلكتروني مؤقتًا إذا كان تواتر إرسالها يتجاوز قيمة العتبة. من السهل القيام بذلك عن طريق تقليل عدد العمال.
  5. لن يكون من الممكن استخدام التزامن إذا كانت بعض مراحل إرسال الرسائل تؤدي عمليات تتطلب المعالج. تحولت قوالب العرض باستخدام jinja2 إلى أنها ليست عملية كثيفة الاستخدام للموارد وليست لها أي تأثير عملي على سرعة الإرسال.
  6. يتطلب استخدام المزامنة للقوائم البريدية أن تبدأ معالجات قائمة انتظار التوزيع بعمليات منفصلة.

آمل أن تكون تجربتنا مفيدة لك! إذا كان لديك أي أسئلة أو أفكار ، فاكتب في التعليقات!

Source: https://habr.com/ru/post/ar482114/


All Articles