Tareas de apio: nuevo decorador, nuevas características

Hola Habr! Te contaré la historia de mi agotamiento profesional.


Sucedió que odio las cintas de correr de rutina. Tengo varios proyectos con apio . Cada vez que una tarea se vuelve más complicada que 2 + 2 = 5 , la plantilla de solución se reduce a la creación de una clase que realiza la tarea y una función de inicio con la que Celery puede trabajar: una placa repetitiva. En este artículo te diré cómo luché con una placa repetitiva y qué salió de ella.


Logotipo


Punto de partida


Considere la tarea ordinaria del apio. Hay una clase que realiza la tarea y una función de inicio que crea una instancia de la clase e inicia uno de sus métodos, en el que se implementa toda la lógica de la tarea y se hereda el manejo de errores:


 class MyTask( FirstMixin, SecondMixin, ThirdMixin, ): def main(self): data = self.do_something() response = self.remote_call(data) parsed = self.parser(response) return self.process(parsed) @app.task(bind=True) def my_task(self, arg1, arg2): instance = MyTask( celery_task=self, arg1=arg1, arg2=arg2, ) return instance.full_task() 

Al mismo tiempo, el método full_task incluye una llamada a main , sin embargo, también se ocupa del manejo de errores, el registro y otras tonterías que no están directamente relacionadas con la tarea principal.


Idea de clase de tarea


En la raíz de la clase de tarea se encuentra una idea simple: en la clase base, puede definir un método de la clase de task , implementar el comportamiento de la función de inicio en ella y luego heredar:


 class BaseTask: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) def full_task(self): try: return self.main() except: self.celery_task.retry(countdown=30) @classmethod def task(cls, task, **kwargs): self = cls( celery_task=celery_task, **kwargs, ) return self.full_task() 

Todo el aburrimiento auxiliar recogido en la clase base. No volveremos a ella otra vez. Nos damos cuenta de la lógica de la tarea:


 @app.taskcls(bind=True) class MyTask( BaseTask, FirstMixin, SecondMixin, ThirdMixin, ): def main(self): data = self.do_something() response = self.remote_call(data) parsed = self.parser(response) return self.process(parsed) 

No más cáscara, mucho mejor. Sin embargo, ¿qué pasa con el punto de entrada?


 MyTask.task.delay(...) 

MyTask.task tiene todos los métodos de tareas habituales: delay , apply_async y, en general, lo es.


Ahora los argumentos del decorador. Es especialmente divertido arrastrar bind = True a cada tarea. ¿Es posible pasar argumentos predeterminados a través de una clase base?


 class BaseTask: class MetaTask: bind = True def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) def full_task(self): try: return self.main() except: self.celery_task.retry(countdown=30) @classmethod def task(cls, task, **kwargs): self = cls( celery_task=celery_task, **kwargs, ) return self.full_task() 

La clase MetaTask anidada contiene argumentos predeterminados y estará disponible para todas las clases secundarias. Curiosamente, también se puede heredar:


 class BaseHasTimeout(BaseTask): class MetaTask(BaseTask.MetaTask): timeout = 42 

Los argumentos pasados ​​al decorador @app.taskcls tienen la máxima prioridad:


 @app.taskcls(timeout=20) class MyTask( BaseHasTimeout, FirstMixin, SecondMixin, ThirdMixin, ): def main(self): ... 

Como resultado, el tiempo de espera para la tarea será de 20.


Yendo más allá


En las aplicaciones web, a menudo es necesario iniciar una tarea desde la vista. En el caso de una alta adhesión, la vista y la tarea se pueden combinar:


 class BaseViewTask: @classmethod def task(cls, **kwargs): # Somehow init View class manually self = cls(...) return self.celery() @app.taskcls class MyView( BaseViewTask, FirstMixin, SecondMixin, ThirdMixin, APIView, ): queryset = MyModel.objects.all() def get_some_data(self, *args, **kwargs): # common methed return self.queryset.filtert(...) def get(self, request): data = self.get_some_data(request.field) # used in request handling return Response(json.dumps(data)) def post(self, request): self.task.delay(...) return Response(status=201) def celery(self): data = self.get_some_data(...) # also used in background task return self.do_something(data) 

Por cierto, para evitar la colisión de nombres, la clase anidada se llama MetaTask , no Meta , como en django.


Conclusión


Esta funcionalidad se espera en Celery 4.5 . Sin embargo, también preparé un paquete que le permite probar el decorador taskcls hoy. La idea del paquete es que cuando actualiza Celery a la versión 4.5, puede eliminar su importación sin cambiar una sola línea de código.

Source: https://habr.com/ru/post/470547/


All Articles