3 cas pour utiliser Celery dans une application Django

image

Je crĂ©e des applications web sur Django. Fondamentalement, ce sont des services SaaS pour les entreprises. Toutes ces applications nĂ©cessitent des tĂąches asynchrones. Pour leur mise en Ɠuvre, j'utilise du cĂ©leri. Dans l'article, je parlerai des situations dans lesquelles j'utilise Celery, avec des exemples de code.

Le céleri est un systÚme de gestion des files d'attente de tùches. Fondamentalement capable 2 choses: prendre des tùches de la file d'attente et effectuer des tùches selon un calendrier. Le courtier de file d'attente est généralement RabbitMQ ou Redis. Les tùches sont placées dans la file d'attente, puis les employés de Celery les prennent à partir de là et les exécutent.

Pour le cĂ©leri, vous pouvez penser Ă  une application dans presque toutes les applications, mais je dĂ©crirai ensuite uniquement les cas dans lesquels je l'utilise moi-mĂȘme.

1. Tùches planifiées


Il y a souvent des tĂąches qui doivent ĂȘtre accomplies Ă  une date et une heure spĂ©cifiques: envoyer un rappel Ă  l'utilisateur, mettre fin Ă  la pĂ©riode d'essai du compte, publier une publication sur les rĂ©seaux sociaux.

Dans Celery, il est possible de spĂ©cifier le paramĂštre ETA lors de l'appel de la tĂąche - l'heure Ă  laquelle la tĂąche doit ĂȘtre lancĂ©e. Mais si vous planifiez des tĂąches de cette façon, cela se rĂ©vĂšle trĂšs peu fiable: elles peuvent ne pas dĂ©marrer et sont mal Ă  l'aise pour annuler.

Un moyen plus fiable consiste Ă  utiliser le calendrier du cĂ©leri. C'est-Ă -dire, crĂ©ez un calendrier oĂč il y aura des tĂąches qui commencent Ă  une certaine frĂ©quence ou Ă  une heure spĂ©cifique. Par exemple, si vous devez publier un article sur les rĂ©seaux sociaux selon un calendrier, la tĂąche est lancĂ©e une fois par minute. Si vous devez terminer la pĂ©riode d'essai pour votre compte, vous pouvez exĂ©cuter la tĂąche une fois par jour.

# schedule.py from datetime import timedelta from celery.schedules import crontab CELERYBEAT_SCHEDULE = { 'publish_post_starter': { 'task': 'publish_post_starter', 'schedule': timedelta(minutes=1), }, 'end_trial_starter': { 'task': 'end_trial_starter', 'schedule': crontab(hour=10, minute=21), }, } 

Dans le démarreur de tùches, nous obtenons toutes les instances pour lesquelles l'heure prévue est déjà arrivée. Nous passons par les instances et pour chacune, nous appelons la tùche principale. Comme arguments, nous transmettons uniquement l'ID d'instance afin de ne pas obstruer la file d'attente avec des données inutiles. Nous pouvons immédiatement parcourir toutes les instances et effectuer des actions, mais le plus souvent, il est préférable d'appeler une tùche distincte pour chaque instance. Nous allons donc accélérer l'exécution, et si une erreur se produit, cela n'affectera qu'une seule des tùches.

 # tasks.py @app.task(name='publish_post') def publish_post(post_id): ... @app.task(name='publish_post_starter') def publish_post_starter(): post_ids = list( Post.objects.filter( publish_dt__lte=timezone.now(), is_published=False ).values_list('id', flat=True) ) for post_id in post_ids: publish_post.delay(post_id) 

2. Appels informatiques et API longs de WSGI


WSGI fait référence au contexte dans lequel les demandes des utilisateurs sont traitées (cycle de demande-réponse). Contrairement au contexte des tùches asynchrones - Céleri.

Pour crĂ©er une interface rĂ©active, tous les boutons doivent rĂ©pondre instantanĂ©ment et ne doivent pas bloquer le reste de l'interface. Pour ce faire, aprĂšs avoir appuyĂ© sur le bouton est bloquĂ©, un spinner est placĂ© dessus et une requĂȘte ajax est envoyĂ©e au serveur. Si le traitement de la demande prend plus de quelques secondes, vous pouvez dĂ©placer le calcul vers la tĂąche CĂ©leri.

Dans WSGI, nous appelons task et renvoyons une rĂ©ponse. À l'avant, dĂ©verrouillez le bouton et retirez le cĂŽne. Nous montrons Ă  l'utilisateur un message indiquant que l'action est en cours d'exĂ©cution. En parallĂšle, une tĂąche Celery est exĂ©cutĂ©e, qui, une fois terminĂ©e, renvoie une rĂ©ponse sur le socket Web. AprĂšs avoir reçu le rĂ©sultat Ă  l'avant, nous le montrons Ă  l'utilisateur.

 # rest_views.py from rest_framework import status from rest_framework.views import APIView from rest_framework.response import Response from tasks import send_emails class SendEmailView(APIView): def post(self, request): # this id will be used to send response with websocket request_uuid = request.data.get('request_uuid') if not request_uuid: return Response(status=status.HTTP_400_BAD_REQUEST) send_emails.delay(request.user.id, request_uuid) return Response(status=status.HTTP_200_OK) 

SĂ©parĂ©ment, vous pouvez distinguer les appels d'API externes de WSGI. Dans ce cas, tous les appels, quelle que soit la durĂ©e de leur exĂ©cution, sont lancĂ©s via la tĂąche CĂ©leri. Ceci est une protection contre le fou. Il ne devrait pas y avoir de situation oĂč, en raison de l'inaccessibilitĂ© de certaines API externes, l'interface utilisateur se bloque.

3. Défis de la tornade


Lors de l'intĂ©gration avec un rĂ©seau social, Telegram ou un service de paiement, vous avez besoin d'une URL de webhook Ă  laquelle les notifications seront envoyĂ©es. Le nombre de demandes ne peut pas toujours ĂȘtre calculĂ© Ă  l'avance, mais leur nombre dĂ©passera probablement les demandes des utilisateurs. Ces demandes seront reçues jusqu'Ă  ce qu'elles reçoivent une rĂ©ponse avec le code 200.

Pour le traitement de telles demandes, le cadre asynchrone Tornado convient. Afin de ne pas transformer le traitement en synchrone dans Tornado, il ne devrait pas y avoir d'opérations de blocage. C'est là que le céleri est nécessaire. Le gestionnaire Tornado reçoit la demande, valide les données, appelle la tùche Celery et renvoie une réponse réussie.

 # tornado_handlers.py from tornado import gen, escape from tornado.web import RequestHandler from tasks import handle_vk_callback class VkCallbackHandler(RequestHandler): @gen.coroutine def post(self, *args, **kwargs): try: data = escape.json_decode(self.request.body) except ValueError: self.set_status(status_code=400, reason='Invalid data') return handle_vk_callback.delay(data) self.write('ok') return 

Source: https://habr.com/ru/post/fr461775/


All Articles