50种芹菜色

如果您想知道如何处理在Python开发人员圈子中广为人知的称为Celery的框架,就在这里。 即使Celery自信地在您的项目中执行基本命令,金融科技的经验也可能给您带来未知的一面。 因为fintech始终是大数据,因此需要后台任务,批处理,异步API等。


Oleg Churkin在Moscow Python Conf ++中关于芹菜的故事的美丽除了有关如何在负载下配置芹菜以及如何对其进行监控的详细说明之外,您还可以借鉴一些有用的想法。


关于演讲者和项目: Oleg Churkin( Bahusss )从事各种复杂度的Python项目开发已有8年,曾在许多知名公司工作:Yandex,Rambler,RBC,Kaspersky Lab。 现在,techlide进入fintech-StatusPoney创业公司。

该项目可处理大量用户(1.5 TB)的财务数据:帐户,交易,商人等。 每天执行多达一百万个任务。 也许对于某人来说,这个数字似乎并不大,但是对于容量有限的小型初创公司来说,这是大量数据,因此开发人员在稳定过程中必须面对各种问题。

奥列格谈到了工作的重点:

  • 您想使用该框架解决哪些任务,为什么选择Celery。
  • 芹菜如何提供帮助。
  • 如何在负载下配置Celery。
  • 如何监控芹菜状态。

他分享了一些设计实用程序,这些实用程序实现了Celery中缺少的功能。 事实证明,这可能是在2018年。 以下是第一人称报告的文本版本。

发行


需要解决以下任务:

  • 运行单独的后台任务
  • 批量处理任务 ,即一次运行多个任务。
  • 嵌入过程Extract,Transform,Load
  • 实现异步API 。 事实证明,异步API不仅可以使用异步框架实现,而且可以完全同步。
  • 执行定期任务 。 没有一个项目不能没有定期任务;对于某些项目,可以省去Cron,但也有更方便的工具。
  • 建立触发器体系结构 :要触发触发器,请运行任务以更新数据。 这样做是为了通过在后台预先计算数据来弥补运行时功能的不足。

后台任务包括任何类型的通知:电子邮件,推送,桌面-所有这些都是通过触发器在后台任务中发送的。 以相同的方式,开始定期更新财务数据。

在后台,执行各种特定检查,例如,检查用户的欺诈行为。 在金融初创公司中,由于我们允许用户将他们的银行帐户添加到我们的系统中,并且可以看到他们的所有交易,因此专门针对数据安全性进行大量的工作和关注 。 欺诈者可以尝试使用我们的服务来解决某些问题,例如,检查被盗帐户的余额。

后台任务的最后一类是维护任务 :调整,查看,修复,监视等。

对于批量通知,使用批处理 。 我们必须以某种方式计算和处理我们从用户那里收到的大量数据,包括 在批处理模式下。

相同的概念包括经典的Extract,Transform,Load

  • 从外部源(外部API)加载数据;
  • 保持未经处理;
  • 运行读取和处理数据的任务;
  • 我们将处理后的数据以正确的格式保存在正确的位置,以便以后例如在UI中方便使用。

异步API可以使用简单的轮询请求完成就已经不是什么秘密了:前端在后端启动该过程,后端启动一个任务,该任务定期启动,“倒出”结果并更新数据库中的状态。 前端向用户显示此交互状态正在改变。 这使您可以:

  • 从其他任务运行轮询任务;
  • 根据条件运行不同的任务。

在我们的服务中,这现在已经足够,但是将来我们可能不得不重写其他内容。

工具要求


为了实现这些任务,我们对工具具有以下要求:

  • 实现我们的雄心壮志所必需的功能。
  • 可扩展性,无需拐杖。
  • 监视系统以了解其工作方式。 我们使用错误报告,因此与Sentry的集成也不会与Django失去协调。
  • 性能 ,因为我们有很多任务。
  • 成熟,可靠和积极发展是显而易见的事情。 我们正在寻找一种将得到支持和开发的工具。
  • 文件是否足够- 任何地方都没有文件

选择哪个工具?


2018年市场上有哪些解决这些问题的选项?

曾几何时,我完成了一个不太方便的任务,便编写了一个方便的 ,该仍在某些项目中使用。 它易于操作并在后台执行任务。 但是同时,不需要代理(Celery和其他代理都不需要),只有具有假脱机程序的uwsgi应用程序服务器才是作为单独的工作程序启动的。 这是一个非常简单的解决方案-所有任务有条件地存储在文件中。 对于简单的项目,这已经足够了,但是对于我们的项目而言,这还不够。

我们考虑过:

  • Celery(GitHub上有1万颗星);
  • RQ(在GitHub上为5K星);
  • Huey(在GitHub上为2K星);
  • Dramatiq(在GitHub上为1K星);
  • Tasktiger(在GitHub上为0.5K星);
  • 气流? 路易吉

有前途的候选人2018


现在,我想请您注意Dramatiq 。 这是一位熟练的Celery提供的库,他知道Celery的所有缺点,因此决定重写所有内容,只是非常精美。 Dramatiq的好处:

  • 一组所有必要的功能。
  • 提高生产力。
  • 支持Prometheus的哨兵和指标支持
  • 一个小而清晰的代码库,代码自动重载。

前一段时间,Dramatiq在许可证方面遇到了问题:首先是AGPL,然后由LGPL取代。 但是现在您可以尝试。

但是在2016年,除了芹菜之外,别无其他。 我们喜欢它的丰富功能,然后非常适合我们的任务,因为即使那样它仍然成熟并且可以起作用:

  • 开箱即用的定期任务;
  • 支持几个经纪人;
  • 与Django和Sentry集成。

项目特色


我会告诉您我们的情况,以便进一步讲清楚。

我们使用Redis作为消息代理 。 我听说过很多故事和谣言,说Redis正在丢失消息,它不适合用作消息代理。 关于生产经验,这还没有得到证实,但是事实证明,Redis现在比RabbitMQ更有效地工作(与Celery一起使用,至少,显然,问题出在与经纪人的集成代码中)。 在版本4中,Redis代理已修复,它确实在重启期间停止了丢失任务,并且运行稳定。 在2016年,Celery打算放弃Redis并专注于与RabbitMQ集成,但是幸运的是,这没有发生。

在Redis出现问题的情况下,如果我们需要很高的可用性,则由于我们使用Amazon的功能,我们将切换到Amazon SQS或Amazon MQ。

我们不使用结果后端来存储结果 ,因为我们更喜欢将结果自己存储在所需的位置,然后按所需的方式进行检查。 我们不希望芹菜为我们这样做。

我们使用pefork池 ,即流程工作者,他们创建单独的流程分支以实现更多的并发性。

工作单位


我们将讨论基本元素,以便使尚未尝试使用Celery但只打算使用Celery的人保持最新状态。 Celery的工作单元是一个挑战 。 我将举一个发送电子邮件的简单任务的示例。

简单的功能和装饰器:

@current_app.task def send_email(email: str): print(f'Sending email to email={email}') 

任务启动很简单:我们调用函数,然后任务将在运行时(send_email(email =“ python@example.com”))或在工作程序中执行,即,任务在后台的作用:

 send_email.delay(email="python@example.com") send_email.apply_async( kwargs={email: "python@example.com"} ) 

在高负荷下与Celery一起工作的两年中,我们提出了良好形式的规则。 有很多耙子,我们学会了如何解决它们,我将分享如何。

代码设计


该任务可能包含不同的逻辑。 通常,Celery可帮助您将任务保留在文件或打包任务中,或从某个位置导入。 有时,您会在一个模块中得到一堆业务逻辑。 我们认为,从应用程序模块化的角度出发,正确的方法是使任务中的逻辑最少 。 我们仅将难题用作代码的“触发”。 也就是说,任务本身并不包含逻辑,而是在后台触发代码的启动。

 @celery_app.task(queue='...') def run_regular_update(provider_account_id, *args, **kwargs): """...""" flow = flows.RegularSyncProviderAccountFlow(provider_account_id) return flow.run(*args, **kwargs) 

我们将所有代码放入使用其他类的外部类中。 所有任务基本上由两行组成。

参数中的简单对象


在上面的示例中,某个ID传递给任务。 在我们使用的所有任务中,我们仅传输小的标量数据 id。 我们不序列化Django模型来传输它们。 即使在ETL中,当来自外部服务的大数据blob时,我们也要先保存它,然后运行一个任务,该任务通过id读取所有这些blob并对其进行处理。

如果您不这样做,那么我们会在Redis中看到大量消耗的内存。 该消息开始占用更多内存,网络负载沉重,已处理任务(性能)数量下降。 只要对象完成,任务就无关紧要,该对象已被删除。 需要序列化的数据-并非所有内容都可以在Python中的JSON中很好地序列化。 当重试任务时,我们需要机会以某种方式快速决定如何处理此数据,再次获取它,并对它们进行一些检查。

如果您通过参数传输大数据,请三思! 最好在问题中传递带有少量信息的小标量,然后从任务中的此信息中获取所需的一切。

等幂问题


Celery开发人员自己推荐这种方法。 当重复代码部分时,不会出现任何副作用,结果应相同。 这并非总是容易实现的,尤其是在与许多服务交互或两阶段提交的情况下。

但是,当您在本地进行所有操作时,您始终可以检查传入的数据是否存在并且相关,您可以真正对其进行处理并使用事务。 如果对一个任务的数据库查询很多,并且在运行时可能出错,请使用事务回滚不必要的更改。

向后兼容


部署应用程序时,我们产生了一些有趣的副作用。 无论您使用哪种类型的部署(蓝色+绿色或滚动更新),总会有旧服务代码为新工作程序代码创建消息的情况,反之亦然,因为旧工作代码会“首先”发布,所以旧工作人员会从新服务代码接收消息。交通就到了。

我们发现了错误并丢失了任务,直到我们学会了如何保持发行版之间的向后兼容性 。 向后兼容性是,无论在此任务中使用什么参数,版本之间的任务均应安全运行。 因此,在所有任务中,我们现在都在制作“橡胶”签名(** kwargs)。 当您需要在下一发行版中添加新参数时,可以从新发行版的** kwargs中获取它,而不会在旧版本中获取它-不会有任何损坏。 一旦签名更改,并且Celery不知道它,它将崩溃并给出错误消息,表明任务中没有这样的参数。

避免此类问题的更严格方法是在发行版之间对任务队列进行版本控制,但实现起来相当困难,我们现在将其留在了积压中。

超时时间


由于数量不足或错误的超时可能会出现问题。

不为任务设置超时是邪恶的。 这意味着您不了解任务中正在发生的事情,业务逻辑应如何工作。

因此,我们所有的任务都挂有超时,包括所有任务的全局超时,并且还为每个特定任务设置了超时。

必须粘贴:soft_limit_timeout过期。

过期是指任务可以满足的任务量。 在出现问题的情况下,任务必须不在队列中堆积。 例如,如果我们现在想向用户报告某事,但是某事发生了,并且该任务只能在明天完成-这没有意义,明天该消息将不再有意义。 因此,对于通知,我们有一个相当小的到期时间。

注意eta(countdown)+可见性 _timeout的使用 。 FAQ描述了Redis的这种问题-Redis代理的所谓可见性超时。 默认情况下,它的值为一小时:如果一个小时后工人发现没有人执行任务,则将其重新添加到队列中。 因此,如果倒计时是两个小时,则一个小时后,经纪人将发现此任务尚未完成,并将创建另一个任务。 在两个小时内,将完成两个相同的任务。

如果估计时间或倒计时超过1小时,那么很可能使用Redis将导致任务重复,除非您当然更改了用于连接到代理的设置中的visible_timeout值。

重试政策


对于那些可以重复执行或可能失败的任务,我们使用重试策略。 但是我们会谨慎使用它,以免淹没外部服务。 如果您在没有指定指数补偿的情况下快速重复执行任务,那么外部服务或内部服务可能根本无法承受。

参数retry_backoffretry_jittermax_retries可以很好地明确指定,尤其是max_retries。 retry_jitter-一个允许您带来一点混乱的参数,这样任务就不会同时开始重复。

内存泄漏


不幸的是,内存泄漏非常容易,并且很难找到并修复它们。

通常,在Python中使用内存存在很大争议。 您将花费大量时间和精力来理解泄漏发生的原因,然后事实证明泄漏甚至不在您的代码中。 因此,总是在启动项目时对worker设置内存限制 :worker_max_memory_per_child。

这样可以确保OOM Killer不会一天出现,不会杀死所有工作人员,并且您不会丢失所有任务。 芹菜将在需要时重新启动工作人员。

优先任务


总是有必须比其他人更快完成的任务,要比其他任何人都快-它们必须立即完成! 有些任务不是那么重要-让它们在一天之内完成。 为此,任务具有优先级参数 在Redis中,它的工作非常有趣-创建了一个新队列,并在其中添加了优先级。

我们使用不同的方法- 优先考虑独立工作人员 ,即 以老式的方式,我们创建了具有不同“重要性”的芹菜工人:

 celery multi start high_priority low_priority -c:high_priority 2 -c:low_priority 6 -Q:high_priority urgent_notifications -Q:low_priority emails,urgent_notifications 

Celery multi start是一个帮助程序,可以帮助您在一台计算机上从同一命令行运行整个Celery配置。 在此示例中,我们创建节点(或工作程序):high_priority和low_priority,2和6是并发。

两名高优先级工作人员不断处理紧急通知队列。 没有其他人会雇用这些工作人员,他们只会从Emergency_notifications队列中读取重要任务。

对于不重要的任务,有一个low_priority队列。 有6位工人从所有其他队列接收消息。 我们还向低优先级工人订阅紧急通知,以便在高优先级工人无法应对时提供帮助。

我们使用这种经典方案来确定任务的优先级。

提取,转换,加载


最常见的是,ETL看起来像一连串的任务,每个任务都接收前一个任务的输入。

 @task def download_account_data(account_id) … return account_id @task def process_account_data(account_id, processing_type) … return account_data @task def store_account_data(account_data) … 

该示例包含三个任务。 Celery有一种分布式处理方法,还有一些有用的实用程序,包括功能,该函数使以下三个任务中的一个流水线:

 chain( download_account_data.s(account_id), process_account_data.s(processing_type='fast'), store_account_data.s() ).delay() 

Celery将拆开管道,依次执行第一个任务,然后将接收到的数据传输到第二个,并将第二个任务返回的数据传输到第三个。 这就是我们实现简单ETL管道的方式。

对于更复杂的链,您必须连接其他逻辑。 但是,请务必记住,如果一项任务在此链中出现问题,则整个链将崩溃 。 如果您不希望出现这种情况,请处理异常并继续执行,或者通过异常停止整个链。

实际上,此链内部看起来像一个大任务,其中包含具有所有参数的所有任务。 因此,如果您滥用了链中的任务数量,将导致很高的内存消耗并减慢整个过程。 创建成千上万的任务链是一个坏主意。

批处理任务


现在最有趣的事情是:当您需要向200万用户发送电子邮件时会发生什么。

您编写这样的函数来绕过所有用户:

 @task def send_report_emails_to_users(): for user_id in User.get_active_ids(): send_report_email.delay(user_id=user_id) 

但是,大多数情况下,该函数不仅会接收用户ID,而且通常还会洗掉整个用户表。 每个用户将有自己的任务。

此任务有几个问题:

  • 任务是按顺序启动的,也就是说,最后一个任务(第二百万个用户)将在20分钟内启动,并且可能在此超时之前已经可以工作。
  • 所有用户ID首先都加载到应用程序内存中,然后再加载到队列中-delay()将执行2百万个任务。

我称它为Task Flood,图表看起来像这样。

大量任务逐渐被工人开始处理。 如果任务使用主副本,则会发生以下情况:整个项目开始破解,无济于事。 下面是我们的实践示例,说实话,DB CPU的使用率为100%几个小时,我们设法感到害怕。

问题在于,随着用户数量的增加,系统大大降级。 处理计划的任务:

  • 需要越来越多的内存;
  • 运行时间更长,可以被超时“杀死”。

发生任务泛滥:任务堆积在队列中,不仅对内部服务而且对外部服务都造成很大的负担。

我们试图降低工人的竞争力,从某种意义上说,这有所帮助-减轻了服务的负担。 或者,您可以扩展内部服务 。 但这不能解决发电机问题的问题,发电机问题仍然很多。 而且绝不会影响对外部服务性能的依赖。

任务生成


我们决定走另一条路。 大多数情况下,我们现在不需要立即运行所有200万个任务。 如果这些字母不太重要,通常向所有用户发送通知会花费例如4个小时。

首先,我们尝试使用Celery.chunks

 send_report_email.chunks( ({'user_id': user.id} for user in User.objects.active()), n=100 ).apply_async() 

这并没有改变这种情况,因为尽管有迭代器,但所有user_id都将被加载到内存中。 所有的工人都有一连串的任务,尽管他们会放松一些,但最终我们对这一决定并不满意。

我们尝试将rate_limit设置为worker,以便它们每秒仅处理一定数量的任务,并且我们发现实际上为该任务指定的rate_limit是该worker的rate_limit。 也就是说,如果为任务指定了rate_limit,则并不意味着任务将每秒执行70次。 这意味着工作人员每秒将执行70次,并且根据您对工作人员的了解,此限制可以动态更改,即 实际限制rate_limit * len(工人)。

如果工作人员启动或停止,则合计rate_limit会更改。 而且,如果您的任务很慢,那么填充这些工作线程的队列中的所有预取将被这些慢速任务所阻塞。 工人看起来:“哦,我在rate_limit中有此任务,我不能再执行它。 而且队列中的以下所有任务完全相同-让它们挂起!” -等待。

块化器


最后,我们决定自己编写一个小库,称为Chunkificator。

 @task @chunkify_task(sleep_timeout=...l initial_chunk=...) def send_report_emails_to_users(chunk: Chunk): for user_id in User.get_active_ids(chunk=chunk): send_report_email.delay(user_id=user_id) 

它需要sleep_timeout和initial_chunk,并用一个新的块来调用它自己。 块是整数列表,日期或日期时间列表的抽象。 我们将块传递给仅接收带有该块的用户的函数,并仅对该块运行任务。

因此,任务生成器仅运行所需数量的任务,并且不会消耗大量内存。 画面变得像这样。

最重要的是,我们使用稀疏块,即我们在数据库中使用实例作为块ID(可能会跳过其中的一些实例,因此任务可能会更少)。 结果,负载变得更加均匀,过程变得更长,但是每个人都还活着并且身体健康,基础不再紧张。

该库是为Python 3.6+实现的,可在GitHub上使用。 我计划修复一个细微差别,但目前datetime-chunk需要一个pickle序列化器-许多人将无法做到这一点。

有几个反问-这些信息是从哪里来的? 我们如何发现自己有问题? 您怎么知道问题很快将变得至关重要,您需要已经开始解决它?

答案当然是监视。

监控方式


我真的很喜欢监视,我喜欢监视所有内容,并随时掌握脉搏。 如果您不注意脉搏,那么您将不断踩踏耙子。

标准监控问题:

  • 当前的工作程序/并发配置是否可以处理负载?
  • 任务执行时间的减少是什么?
  • 任务挂起多长时间? 突然这条线已经挤满了?

我们尝试了几种选择。 Celery具有CLI界面,功能非常丰富,并提供:

  • 检查-有关系统的信息;
  • 控制-管理系统设置;
  • 清除-清除队列(不可抗力);
  • 事件-控制台UI,用于显示有关正在执行的任务的信息。

但是很难真正监控某些东西。 它更适合于本地装饰,或者您想在运行时更改一些rate_limit。

注意:您需要访问生产代理才能使用CLI界面。

Celery Flower允许您仅通过Web界面执行与CLI相同的操作,而这还不是全部。 但是它可以构建一些简单的图形,并允许您即时更改设置。

通常,芹菜花适合仅查看小型设置中的所有功能。 此外,它还支持HTTP API,如果您正在编写自动化程序,这将非常方便。

但是我们选择了普罗米修斯。 他们选择了当前的导出器 :修复了其中的内存泄漏; 添加了异常类型的指标; 添加了队列中消息数量的度量; 与Grafana中的警报集成并欢欣鼓舞。 它也发布在GitHub上,您可以在此处查看

Grafana中的例子



以上所有异常的统计信息:哪些异常适用于哪些任务。 下面是完成任务的时间。

芹菜中缺少什么?


这是一个多叶的框架,它有很多东西,但是我们不见了! 没有足够的小功能,例如:

  • 在开发过程中自动重新加载代码 -不支持此Celery-重新启动。
  • Prometheus的度量标准是开箱即用的,但是Dramatiq可以。
  • 支持 任务锁定 -一次只运行一个任务。 您可以自己完成操作,但是Dramatiq和Tasktiger具有方便的装饰器,可确保阻止所有其他类似任务。
  • 一个任务的Rate_limit-不适用于工人。

结论


尽管Celery是许多生产中使用的框架,但它包含3个库-Celery,Kombu和Billiard。 这三个库都是由联合开发人员开发的,它们可以释放一个依赖关系并破坏程序集。

因此,我希望您已经以某种方式对其进行了分类,并使您的程序集具有确定性。

实际上,结论并不那么令人遗憾。 芹菜在负载下处理我们在金融科技项目中的任务 。 我们积累了与您分享的经验,您可以应用我们的解决方案或对其进行完善,也可以克服所有困难。

不要忘记监视应该是项目的重要组成部分 。 只有通过监视,您才能找出问题出在哪里,需要修复,添加或修复的问题。

联系发言人Oleg ChurkinBahusssfacebookgithub

下一届大型Python Python Conf ++将于4月5日在莫斯科举行。 今年,我们将尝试在一天之内以实验模式体现所有收益。 将会有更多的报告,我们将为知名图书馆和产品的外国开发商分配全部资源。 此外,星期五是举办晚会的理想之日,如您所知,这是会议交流中不可或缺的一部分。

参加我们的专业Python会议- 在这里提交您的报告, 在这里预订票。 同时,准备工作正在进行中,有关莫斯科Python Conf ++ 2018的文章将在此处显示。

Source: https://habr.com/ru/post/zh-CN433476/


All Articles