通过Django应用程序使用asyncio和aiohttp发送电子邮件

大家好!

我正在Ostrovok.ru中开发和支持通知服务。 该服务使用Python3和Django编写。 除了交易信件,推送和消息外,该服务还承担向同意的用户批量发送商业报价(不是垃圾邮件!相信我,退订比订阅更好的工作)的任务。 随着时间的流逝,活动收件人的数量增加到超过一百万,而邮件服务尚未准备好。 我想谈谈新的Python功能如何使加速大量邮件发送和节省资源成为可能,以及在使用它们时我们必须解决哪些问题。



源实现


最初,以最简单的方式实现群发邮件:对于每个收件人,将一个任务放入一个队列,该队列由60个群发工作者之一承担(我们的队列的功能是每个工作者在单独的进程中工作),为其准备上下文,呈现模板,然后发送向Mailgun发送HTTP请求以发送一封信,并在数据库中创建了发送该信的记录。 整个邮件过程最多需要12个小时,每个工人每秒发送大约0.3封信,并阻止了小型活动的邮寄。



异步解决方案


快速剖析显示,工作人员大部分时间都花在与Mailgun建立连接上,因此我们开始将任务按工作人员分组。 工人开始与Mailgun使用一种连接,这使邮件发送时间减少到9小时,平均每位工人每秒发送0.5个字母。 随后的分析再次表明,使用网络仍然需要花费大部分时间,这促使我们使用asyncio。

在将所有处理放入异步周期之前,我们必须仔细考虑以下问题的解决方案:

  1. Django ORM尚不能使用asyncio,但是,它在查询执行期间释放了GIL。 这意味着数据库查询可以在单独的线程中执行,而不会阻塞主循环。
  2. 当前版本的aiohttp需要Python 3.6及更高版本,在实现时需要更新docker映像。 在较旧版本的aiohttp和Python 3.5上进行的实验表明,这些版本的发送速度远低于较新版本的发送速度,与顺序发送相当。
  3. 快速存储大量异步corutin会导致所有内存消耗。 这意味着不可能预先准备好所有协程中的字母并引起一个处理它们的周期,有必要在发送已经形成的字母时准备数据。

考虑到所有功能,我们将在每个工人内部创建与ThreadPool模式相似的异步周期,包括:

  • 一个或多个生产者通过Django ORM在asyncio.ThreadPoolExecutor的单独线程中使用数据库。 制造商尝试将数据获取请求聚合为小批量,通过Jinja2渲染接收数据的模板,并将要发送的数据添加到任务队列。

def get_campaign_send_data(ids: Iterable[int]) -> Iterable[Mapping[str, Any]]: """    ,     Django ORM   .""" return [{'id': id} for id in ids] async def mail_campaign_producer(ids: Iterable[int], task_queue: asyncio.Queue) -> None: """           ,    .      ,     ThreadPoolExecutor. """ loop = asyncio.get_event_loop() total = len(ids) for subchunk_start in range(0, total, PRODUCER_SUBCHUNK_SIZE): subchunk_ids = ids[subchunk_start : min(subchunk_start + PRODUCER_SUBCHUNK_SIZE, total)] send_tasks = await loop.run_in_executor(None, get_campaign_send_data, subchunk_ids) for task in send_tasks: await task_queue.put(task) 

  • 数百个信件发送者-异步协程,它们无休止地从任务队列中读取数据,为每个任务发送网络请求,并将结果(响应或异常)放入报告队列。

 async def send_mail(data: Mapping[str, Any], session: aiohttp.ClientSession) -> Union[Mapping[str, Any], Exception]: """    .""" async with session.post(REQUEST_URL, data=data) as response: if response.status_code != 200: raise Exception return data async def mail_campaign_sender( task_queue: asyncio.Queue, result_queue: asyncio.Queue, session: aiohttp.ClientSession ) -> None: """        .     task_done,    ,   . """ while True: try: task_data = await task_queue.get() result = await send_mail(task_data, session) await result_queue.put(result) except asyncio.CancelledError: #     raise except Exception as exception: #     await result_queue.put(exception) finally: task_queue.task_done() 

  • 一个或多个工作人员,负责对报告队列中的数据进行分组,并根据请求向大容量数据库发送信件的结果放入信息。

 def process_campaign_results(results: Iterable[Union[Mapping[str, Any], Exception]]) -> None: """  :         """ pass async def mail_campaign_reporter(task_queue: asyncio.Queue, result_queue: asyncio.Queue) -> None: """          ThreadPoolExecutor,        . """ loop = asyncio.get_event_loop() results_chunk = [] while True: try: results_chunk.append(await result_queue.get()) if len(results_chunk) >= REPORTER_BATCH_SIZE: await loop.run_in_executor(None, process_campaign_results, results_chunk) results_chunk.clear() except asyncio.CancelledError: await loop.run_in_executor(None, process_campaign_results, results_chunk) results_chunk.clear() raise finally: result_queue.task_done() 

  • 任务队列是asyncio.Queue的一个实例,受最大元素数量限制,以便制造商不会过度填充它,而浪费所有内存。
  • 报告队列,也是asyncio.Queue的实例,具有最大项目数限制。
  • 一种异步方法,它创建队列,工作器并通过停止它们来完成分配。

 async def send_mail_campaign( recipient_ids: Iterable[int], session: aiohttp.ClientSession, loop: asyncio.AbstractEventLoop = None ) -> None: """       .    ,       . """ executor = ThreadPoolExecutor(max_workers=PRODUCERS_COUNT + 1) loop = loop or asyncio.get_event_loop() loop.set_default_executor(executor) task_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop) result_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop) producers = [ asyncio.ensure_future(mail_campaign_producer(recipient_ids, task_queue)) for _ in range(PRODUCERS_COUNT) ] consumers = [ asyncio.ensure_future(mail_campaign_sender(task_queue, result_queue, session)) for _ in range(SENDERS_COUNT) ] reporter = asyncio.ensure_future(mail_campaign_reporter(task_queue, result_queue)) # ,      done, _ = await asyncio.wait(producers) #    ,   await task_queue.join() while consumers: consumers.pop().cancel() #    ,     await result_queue.join() reporter.cancel() 

  • 创建循环并开始分发的同步代码。

 async def close_session(future: asyncio.Future, session: aiohttp.ClientSession) -> None: """  ,    .  aiohttp      . """ await asyncio.wait([future]) await asyncio.sleep(0.250) await session.close() def mail_campaign_send_chunk(recipient_ids: Iterable[int]) -> None: """     .   ,  asyncio     . """ loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Session connector = aiohttp.TCPConnector(limit_per_host=0, limit=0) session = aiohttp.ClientSession( connector=connector, auth=aiohttp.BasicAuth('api', API_KEY), loop=loop, read_timeout=60 ) send_future = asyncio.ensure_future(send_mail_campaign(recipient_ids, session, loop=loop)) cleanup_future = asyncio.ensure_future(close_session(send_future, session)) loop.run_until_complete(asyncio.wait([send_future, cleanup_future])) loop.close() 

实施此解决方案后,在发送相同数量的邮件和12名工作人员的情况下,发送大量邮件的时间减少到一个小时。 也就是说,每个工人每秒发送20-25个字母,这比原始解决方案的生产率高50-80倍。 工作人员的内存消耗保持在初始水平,处理器负载略有增加,网络利用率增加了很多倍,这是预期的效果。 由于工人生产者和保存报告的工人的每个流程都积极地与数据库一起工作,因此与数据库的连接数量也增加了。 同时,在开展大规模运动的同时,自由工作者可以发送小邮件。



尽管具有所有优点,但这样的实现仍存在许多必须考虑的困难:

  1. 处理错误时必须小心。 未处理的异常可能会终止工作人员,从而导致活动冻结。
  2. 发送完成后,没有必要丢失尚未完成块的收件人的报告,并将其保存到数据库中。
  3. 强制停止活动恢复的逻辑变得越来越复杂,因为停止发送工作人员之后,有必要比较哪些收件人收到了信件,哪些收件人没有收到。
  4. 一段时间后,Mailgun支持人员与我们联系并要求我们降低发送速度,因为如果邮件服务的发送频率超过阈值,则它们会开始临时拒绝邮件。 减少工人数量很容易做到这一点。
  5. 如果发送信件的某些阶段将执行处理器要求的操作,则将无法使用asyncio。 事实证明,使用jinja2渲染模板不是非常耗费资源的操作,实际上对发送速度没有影响。
  6. 使用asyncio发送邮件列表要求分发队列处理程序由单独的进程启动。

希望我们的经验对您有所帮助! 如果您有任何疑问或想法,请在评论中写下!

Source: https://habr.com/ru/post/zh-CN482114/


All Articles