Kirim email menggunakan asyncio dan aiohttp dari aplikasi Django

Halo semuanya!

Saya mengembangkan dan mendukung layanan notifikasi di Ostrovok.ru . Layanan ini ditulis dalam Python3 dan Django. Selain surat transaksional, push, dan pesan, layanan ini juga melakukan tugas pengiriman massal penawaran komersial (bukan spam! Percayalah, berhenti berlangganan berfungsi lebih baik daripada berlangganan) untuk pengguna yang telah memberikan izin untuk ini. Seiring waktu, basis penerima yang aktif tumbuh menjadi lebih dari satu juta alamat, yang layanan suratnya belum siap. Saya ingin berbicara tentang bagaimana fitur Python baru memungkinkan untuk mempercepat pengiriman massal dan menghemat sumber daya dan masalah apa yang harus kami tangani saat bekerja dengan mereka.



Implementasi sumber


Awalnya, surat massal dilaksanakan dengan cara paling sederhana: untuk setiap penerima, tugas ditempatkan dalam antrian yang diambil oleh salah satu dari 60 pekerja massa (fitur dari antrian kami adalah bahwa setiap pekerja bekerja dalam proses yang terpisah), menyiapkan konteks untuk itu, membuat templat, mengirim Permintaan HTTP ke Mailgun untuk mengirim surat dan membuat catatan dalam database bahwa surat itu dikirim. Keseluruhan pengiriman memakan waktu hingga 12 jam, mengirimkan sekitar 0,3 surat per detik dari setiap pekerja dan memblokir pengiriman kampanye kecil.



Solusi asinkron


Pembuatan profil cepat menunjukkan bahwa sebagian besar waktu yang dihabiskan pekerja untuk membangun koneksi dengan Mailgun, jadi kami mulai mengelompokkan tugas menjadi potongan-potongan, dengan potongan untuk setiap pekerja. Pekerja mulai menggunakan satu koneksi dengan Mailgun, yang memungkinkan untuk mengurangi waktu pengiriman hingga 9 jam, mengirimkan setiap pekerja rata-rata 0,5 huruf per detik. Profiling berikutnya lagi menunjukkan bahwa bekerja dengan jaringan masih memakan sebagian besar waktu, yang mendorong kami untuk menggunakan asyncio.

Sebelum memasukkan semua pemrosesan ke dalam siklus asyncio, kami harus memikirkan solusi untuk sejumlah masalah:

  1. Django ORM belum dapat bekerja dengan asyncio, namun, ia melepaskan GIL selama eksekusi permintaan. Ini berarti bahwa kueri basis data dapat dieksekusi di utas terpisah dan tidak memblokir loop utama.
  2. Versi aiohttp saat ini membutuhkan Python versi 3.6 dan lebih tinggi, yang pada saat implementasi diperlukan memperbarui gambar buruh pelabuhan. Eksperimen pada versi aiohttp dan Python 3.5 yang lebih lama menunjukkan bahwa kecepatan pengiriman pada versi ini jauh lebih rendah daripada versi yang lebih baru, dan dapat dibandingkan dengan pengiriman berurutan.
  3. Menyimpan sejumlah besar asyncio corutin dengan cepat menyebabkan pengeluaran semua memori. Ini berarti bahwa tidak mungkin untuk melakukan pra-persiapan semua coroutine untuk surat-surat dan menyebabkan siklus untuk memprosesnya, perlu untuk mempersiapkan data saat Anda mengirim surat yang sudah terbentuk.

Mempertimbangkan semua fitur, kami akan membuat di dalam masing-masing pekerja kami siklus asyncio kami dengan kesamaan pola ThreadPool, yang terdiri dari:

  • Satu atau lebih produsen bekerja dengan database melalui Django ORM di utas terpisah melalui asyncio.ThreadPoolExecutor. Pabrikan mencoba untuk menggabungkan permintaan akuisisi data ke dalam kelompok-kelompok kecil, menjadikan template untuk data yang diterima melalui Jinja2 dan menambahkan data untuk dikirim ke antrian tugas.

def get_campaign_send_data(ids: Iterable[int]) -> Iterable[Mapping[str, Any]]: """    ,     Django ORM   .""" return [{'id': id} for id in ids] async def mail_campaign_producer(ids: Iterable[int], task_queue: asyncio.Queue) -> None: """           ,    .      ,     ThreadPoolExecutor. """ loop = asyncio.get_event_loop() total = len(ids) for subchunk_start in range(0, total, PRODUCER_SUBCHUNK_SIZE): subchunk_ids = ids[subchunk_start : min(subchunk_start + PRODUCER_SUBCHUNK_SIZE, total)] send_tasks = await loop.run_in_executor(None, get_campaign_send_data, subchunk_ids) for task in send_tasks: await task_queue.put(task) 

  • Beberapa ratus pengirim surat - asyncio coroutine, yang dalam siklus tanpa akhir membaca data dari antrian tugas, mengirim permintaan jaringan untuk masing-masing dan memasukkan hasilnya (respons, atau pengecualian) dalam antrian laporan.

 async def send_mail(data: Mapping[str, Any], session: aiohttp.ClientSession) -> Union[Mapping[str, Any], Exception]: """    .""" async with session.post(REQUEST_URL, data=data) as response: if response.status_code != 200: raise Exception return data async def mail_campaign_sender( task_queue: asyncio.Queue, result_queue: asyncio.Queue, session: aiohttp.ClientSession ) -> None: """        .     task_done,    ,   . """ while True: try: task_data = await task_queue.get() result = await send_mail(task_data, session) await result_queue.put(result) except asyncio.CancelledError: #     raise except Exception as exception: #     await result_queue.put(exception) finally: task_queue.task_done() 

  • Satu atau lebih pekerja yang mengelompokkan data dari antrian laporan dan memberikan informasi tentang hasil pengiriman surat ke database massal dengan permintaan.

 def process_campaign_results(results: Iterable[Union[Mapping[str, Any], Exception]]) -> None: """  :         """ pass async def mail_campaign_reporter(task_queue: asyncio.Queue, result_queue: asyncio.Queue) -> None: """          ThreadPoolExecutor,        . """ loop = asyncio.get_event_loop() results_chunk = [] while True: try: results_chunk.append(await result_queue.get()) if len(results_chunk) >= REPORTER_BATCH_SIZE: await loop.run_in_executor(None, process_campaign_results, results_chunk) results_chunk.clear() except asyncio.CancelledError: await loop.run_in_executor(None, process_campaign_results, results_chunk) results_chunk.clear() raise finally: result_queue.task_done() 

  • Antrian tugas, yang merupakan turunan dari asyncio.Queue, dibatasi oleh jumlah maksimum elemen sehingga pabrikan tidak memenuhi sampai melimpahi, menghabiskan semua memori.
  • Laporkan antrian, juga merupakan instance dari asyncio.Queue dengan batas jumlah item maksimum.
  • Metode asinkron yang membuat antrian, pekerja, dan menyelesaikan distribusi dengan menghentikannya.

 async def send_mail_campaign( recipient_ids: Iterable[int], session: aiohttp.ClientSession, loop: asyncio.AbstractEventLoop = None ) -> None: """       .    ,       . """ executor = ThreadPoolExecutor(max_workers=PRODUCERS_COUNT + 1) loop = loop or asyncio.get_event_loop() loop.set_default_executor(executor) task_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop) result_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop) producers = [ asyncio.ensure_future(mail_campaign_producer(recipient_ids, task_queue)) for _ in range(PRODUCERS_COUNT) ] consumers = [ asyncio.ensure_future(mail_campaign_sender(task_queue, result_queue, session)) for _ in range(SENDERS_COUNT) ] reporter = asyncio.ensure_future(mail_campaign_reporter(task_queue, result_queue)) # ,      done, _ = await asyncio.wait(producers) #    ,   await task_queue.join() while consumers: consumers.pop().cancel() #    ,     await result_queue.join() reporter.cancel() 

  • Kode sinkron yang menciptakan loop dan memulai distribusi.

 async def close_session(future: asyncio.Future, session: aiohttp.ClientSession) -> None: """  ,    .  aiohttp      . """ await asyncio.wait([future]) await asyncio.sleep(0.250) await session.close() def mail_campaign_send_chunk(recipient_ids: Iterable[int]) -> None: """     .   ,  asyncio     . """ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Session connector = aiohttp.TCPConnector(limit_per_host=0, limit=0) session = aiohttp.ClientSession( connector=connector, auth=aiohttp.BasicAuth('api', API_KEY), loop=loop, read_timeout=60 ) send_future = asyncio.ensure_future(send_mail_campaign(recipient_ids, session, loop=loop)) cleanup_future = asyncio.ensure_future(close_session(send_future, session)) loop.run_until_complete(asyncio.wait([send_future, cleanup_future])) loop.close() 

Setelah menerapkan solusi ini, waktu untuk mengirim surat massal dikurangi menjadi satu jam dengan volume pengiriman yang sama dan 12 pekerja yang terlibat. Artinya, setiap pekerja mengirim 20-25 surat per detik, yang 50-80 kali lebih produktif daripada solusi asli. Konsumsi memori pekerja dipertahankan pada tingkat awal, beban prosesor meningkat sedikit, pemanfaatan jaringan meningkat berkali-kali, yang merupakan efek yang diharapkan. Jumlah koneksi ke database juga meningkat, karena masing-masing aliran pekerja-produsen dan pekerja yang menyimpan laporan secara aktif bekerja dengan database. Pada saat yang sama, pekerja gratis dapat mengirimkan surat kecil saat kampanye massal dikirim.



Terlepas dari semua keuntungannya, implementasi ini memiliki sejumlah kesulitan yang harus dipertimbangkan:

  1. Harus diperhatikan saat menangani kesalahan. Pengecualian yang tidak ditangani dapat mengakhiri pekerja, menyebabkan kampanye membeku.
  2. Ketika pengiriman selesai, perlu untuk tidak kehilangan laporan tentang penerima yang belum menyelesaikan potongan sampai akhir, dan menyimpannya ke database.
  3. Logika untuk menghentikan dimulainya kembali kampanye secara paksa menjadi semakin rumit, karena setelah menghentikan pekerja pengirim, perlu untuk membandingkan penerima mana yang dikirim surat dan mana yang tidak.
  4. Setelah beberapa waktu, staf dukungan Mailgun menghubungi kami dan meminta kami untuk memperlambat kecepatan pengiriman, karena layanan email mulai untuk sementara waktu menolak email jika frekuensi pengiriman mereka melebihi nilai ambang batas. Ini mudah dilakukan dengan mengurangi jumlah pekerja.
  5. Tidak mungkin menggunakan asyncio jika beberapa tahap pengiriman surat akan melakukan operasi yang menuntut prosesor. Templat rendering menggunakan jinja2 ternyata bukan operasi yang sangat intensif sumber daya dan praktis tidak berpengaruh pada kecepatan pengiriman.
  6. Menggunakan asyncio untuk milis memerlukan penangan antrian distribusi untuk memulai dengan proses terpisah.

Semoga pengalaman kami bermanfaat bagi Anda! Jika Anda memiliki pertanyaan atau ide, tulis di komentar!

Source: https://habr.com/ru/post/id482114/


All Articles