Celery Taskcls:新装饰器,新功能

哈Ha! 我会告诉你我职业倦怠的故事。


碰巧我讨厌常规的跑步机。 我有几个使用Celery的项目。 每当任务变得比2 + 2 = 5复杂时,解决方案模板就会简化为创建一个执行任务的类,以及Celery能够使用的启动程序功能 -样板。 在本文中,我将告诉您如何为样板而苦恼以及产生了什么。


商标


起点


考虑芹菜的普通任务。 有一个执行任务的类,以及一个启动程序函数,该函数实例化该类并启动其方法之一,其中实现了任务的所有逻辑并继承了错误处理:


 class MyTask( FirstMixin, SecondMixin, ThirdMixin, ): def main(self): data = self.do_something() response = self.remote_call(data) parsed = self.parser(response) return self.process(parsed) @app.task(bind=True) def my_task(self, arg1, arg2): instance = MyTask( celery_task=self, arg1=arg1, arg2=arg2, ) return instance.full_task() 

同时, full_task方法包括对main的调用,但是,它也处理错误处理,日志记录以及与主任务没有直接关系的其他废话。


任务类的想法


task类的根源在于一个简单的想法:在基类中,您可以定义task类的方法,在其中实现启动程序函数的行为,然后继承:


 class BaseTask: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) def full_task(self): try: return self.main() except: self.celery_task.retry(countdown=30) @classmethod def task(cls, task, **kwargs): self = cls( celery_task=celery_task, **kwargs, ) return self.full_task() 

所有辅助无聊都收集在基类中。 我们不会再回到她身边。 我们意识到任务的逻辑:


 @app.taskcls(bind=True) class MyTask( BaseTask, FirstMixin, SecondMixin, ThirdMixin, ): def main(self): data = self.do_something() response = self.remote_call(data) parsed = self.parser(response) return self.process(parsed) 

没有更多的外壳,更好。 但是,入口点呢?


 MyTask.task.delay(...) 

MyTask.task具有所有常见的任务方法: delayapply_async ,并且通常来说是。


现在是装饰器的参数。 将bind = True拖到每个任务中特别有趣。 是否可以通过基类传递默认参数?


 class BaseTask: class MetaTask: bind = True def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) def full_task(self): try: return self.main() except: self.celery_task.retry(countdown=30) @classmethod def task(cls, task, **kwargs): self = cls( celery_task=celery_task, **kwargs, ) return self.full_task() 

嵌套的MetaTask类包含默认参数,并且可用于所有子类。 有趣的是,它也可以被继承:


 class BaseHasTimeout(BaseTask): class MetaTask(BaseTask.MetaTask): timeout = 42 

传递给@app.taskcls装饰器的参数具有最高优先级:


 @app.taskcls(timeout=20) class MyTask( BaseHasTimeout, FirstMixin, SecondMixin, ThirdMixin, ): def main(self): ... 

结果,该任务的超时将为20。


超越


在Web应用程序中,通常需要从视图开始任务。 在高附着力的情况下,视图和任务可以结合使用:


 class BaseViewTask: @classmethod def task(cls, **kwargs): # Somehow init View class manually self = cls(...) return self.celery() @app.taskcls class MyView( BaseViewTask, FirstMixin, SecondMixin, ThirdMixin, APIView, ): queryset = MyModel.objects.all() def get_some_data(self, *args, **kwargs): # common methed return self.queryset.filtert(...) def get(self, request): data = self.get_some_data(request.field) # used in request handling return Response(json.dumps(data)) def post(self, request): self.task.delay(...) return Response(status=201) def celery(self): data = self.get_some_data(...) # also used in background task return self.do_something(data) 

顺便说一下,为了避免名称冲突,嵌套类称为MetaTask ,而不是django中的Meta


结论


Celery 4.5中应具有此功能。 但是,我还准备了一个程序包 ,可让您今天尝试使用taskcls装饰器。 该软件包的想法是,当您将Celery升级到4.5版时,可以删除其导入,而无需更改任何代码行。

Source: https://habr.com/ru/post/zh-CN470547/


All Articles