Celery taskcls: nouveau décorateur, nouvelles fonctionnalités

Bonjour, Habr! Je vais vous raconter l'histoire de mon épuisement professionnel.


Il se trouve que je déteste les tapis roulants de routine. J'ai plusieurs projets utilisant Celery . Chaque fois qu'une tâche devient plus compliquée que 2 + 2 = 5 , le modèle de solution est réduit à créer une classe qui exécute la tâche et une fonction de démarrage avec laquelle Celery est capable de travailler - un passe-partout. Dans cet article, je vais vous dire comment j'ai eu du mal avec un passe-partout et ce qui en est sorti.


Le logo


Point de départ


Considérez la tâche ordinaire du céleri. Il existe une classe qui exécute la tâche et une fonction de démarrage qui instancie la classe et démarre l'une de ses méthodes, dans laquelle toute la logique de la tâche est implémentée et la gestion des erreurs est héritée:


 class MyTask( FirstMixin, SecondMixin, ThirdMixin, ): def main(self): data = self.do_something() response = self.remote_call(data) parsed = self.parser(response) return self.process(parsed) @app.task(bind=True) def my_task(self, arg1, arg2): instance = MyTask( celery_task=self, arg1=arg1, arg2=arg2, ) return instance.full_task() 

Dans le même temps, la méthode full_task comprend un appel à main , mais elle traite également la gestion des erreurs, la journalisation et d'autres absurdités qui ne sont pas directement liées à la tâche principale.


Idée de classe de tâches


À la racine de la classe de tâches se trouve une idée simple: dans la classe de base, vous pouvez définir une méthode de la classe de task , y implémenter le comportement de la fonction de démarrage, puis hériter:


 class BaseTask: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) def full_task(self): try: return self.main() except: self.celery_task.retry(countdown=30) @classmethod def task(cls, task, **kwargs): self = cls( celery_task=celery_task, **kwargs, ) return self.full_task() 

Tout l'ennui auxiliaire collecté dans la classe de base. Nous ne lui reviendrons plus. Nous réalisons la logique de la tâche:


 @app.taskcls(bind=True) class MyTask( BaseTask, FirstMixin, SecondMixin, ThirdMixin, ): def main(self): data = self.do_something() response = self.remote_call(data) parsed = self.parser(response) return self.process(parsed) 

Plus de balle, beaucoup mieux. Mais qu'en est-il du point d'entrée?


 MyTask.task.delay(...) 

MyTask.task a toutes les méthodes de tâche habituelles: delay , apply_async et, d'une manière générale, c'est le cas.


Maintenant, les arguments du décorateur. Il est particulièrement amusant de faire glisser bind = True dans chaque tâche. Est-il possible de passer des arguments par défaut via une classe de base?


 class BaseTask: class MetaTask: bind = True def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) def full_task(self): try: return self.main() except: self.celery_task.retry(countdown=30) @classmethod def task(cls, task, **kwargs): self = cls( celery_task=celery_task, **kwargs, ) return self.full_task() 

La classe MetaTask imbriquée contient des arguments par défaut et sera disponible pour toutes les classes enfants. Fait intéressant, il peut également être hérité:


 class BaseHasTimeout(BaseTask): class MetaTask(BaseTask.MetaTask): timeout = 42 

Les arguments transmis au décorateur @app.taskcls ont la priorité la plus élevée:


 @app.taskcls(timeout=20) class MyTask( BaseHasTimeout, FirstMixin, SecondMixin, ThirdMixin, ): def main(self): ... 

Par conséquent, le délai d'expiration de la tâche sera de 20.


Aller au-delà


Dans les applications Web, il est souvent nécessaire de démarrer une tâche à partir de la vue. En cas d'adhérence élevée, la vue et la tâche peuvent être combinées:


 class BaseViewTask: @classmethod def task(cls, **kwargs): # Somehow init View class manually self = cls(...) return self.celery() @app.taskcls class MyView( BaseViewTask, FirstMixin, SecondMixin, ThirdMixin, APIView, ): queryset = MyModel.objects.all() def get_some_data(self, *args, **kwargs): # common methed return self.queryset.filtert(...) def get(self, request): data = self.get_some_data(request.field) # used in request handling return Response(json.dumps(data)) def post(self, request): self.task.delay(...) return Response(status=201) def celery(self): data = self.get_some_data(...) # also used in background task return self.do_something(data) 

Soit dit en passant, pour éviter la collision de noms, la classe imbriquée est appelée MetaTask , pas Meta , comme dans django.


Conclusion


Cette fonctionnalité est attendue dans Celery 4.5 . Cependant, j'ai également préparé un package qui vous permet d'essayer le décorateur de tâches aujourd'hui. L'idée du package est que lorsque vous mettez à niveau Celery vers la version 4.5, vous pouvez supprimer son importation sans modifier une seule ligne de code.

Source: https://habr.com/ru/post/fr470547/


All Articles