3 casos para usar Celery en una aplicación Django

imagen

Estoy creando aplicaciones web en Django. Básicamente, estos son servicios SaaS para empresas. Todas estas aplicaciones requieren tareas asincrónicas. Para su implementación utilizo Celery. En el artículo contaré sobre situaciones en las que uso Celery, con ejemplos de código.

El apio es un sistema para gestionar colas de tareas. 2 cosas fundamentales: tomar tareas de la cola y realizar tareas en un horario. El intermediario de colas suele ser RabbitMQ o Redis. Las tareas se ponen en la cola, y luego los trabajadores del apio las toman de allí y las realizan.

Para Celery, puede pensar en una aplicación en casi cualquier aplicación, pero luego describiré solo aquellos casos en los que la uso yo mismo.

1. Tareas programadas


A menudo hay tareas que deben completarse en una fecha y hora específicas: enviar un recordatorio al usuario, finalizar el período de prueba de la cuenta, publicar una publicación en las redes sociales.

En Celery, es posible especificar el parámetro ETA cuando se llama a la tarea, la hora a la que debe iniciarse la tarea. Pero si planifica las tareas de esta manera, resulta que no es muy confiable: puede que no se inicien y puede ser incómodo cancelarlas.

Una forma más confiable es usar el programa de apio. Es decir, cree un cronograma donde habrá tareas que comiencen con cierta frecuencia o en un momento específico. Por ejemplo, si necesita publicar una publicación en las redes sociales en un horario, la tarea para esto se inicia una vez por minuto. Si necesita finalizar el período de prueba de su cuenta, puede ejecutar la tarea una vez al día.

# schedule.py from datetime import timedelta from celery.schedules import crontab CELERYBEAT_SCHEDULE = { 'publish_post_starter': { 'task': 'publish_post_starter', 'schedule': timedelta(minutes=1), }, 'end_trial_starter': { 'task': 'end_trial_starter', 'schedule': crontab(hour=10, minute=21), }, } 

En el inicio de la tarea, obtenemos todas las instancias para las cuales ya ha llegado el tiempo planificado. Pasamos por las instancias y para cada una llamamos la tarea principal. Como argumentos, pasamos solo la identificación de la instancia para no obstruir la cola con datos innecesarios. Podemos pasar inmediatamente por todas las instancias y realizar acciones, pero la mayoría de las veces es mejor llamar una tarea separada para cada instancia. Por lo tanto, aceleraremos la ejecución y, si ocurre un error, afectará solo una de las tareas.

 # tasks.py @app.task(name='publish_post') def publish_post(post_id): ... @app.task(name='publish_post_starter') def publish_post_starter(): post_ids = list( Post.objects.filter( publish_dt__lte=timezone.now(), is_published=False ).values_list('id', flat=True) ) for post_id in post_ids: publish_post.delay(post_id) 

2. Llamadas informáticas largas y llamadas API desde WSGI


WSGI se refiere al contexto en el que se procesan las solicitudes de los usuarios (Ciclo de solicitud-respuesta). En contraste con el contexto de tareas asincrónicas - Apio.

Para crear una interfaz receptiva, todos los botones deben responder instantáneamente y no deben bloquear el resto de la interfaz. Para hacer esto, después de presionar el botón está bloqueado, se coloca una rueda giratoria y se envía una solicitud ajax al servidor. Si procesar la solicitud lleva más de un par de segundos, puede mover el cálculo a la tarea de Apio.

En WSGI, llamamos tarea y devolvemos una respuesta. En la parte delantera, desbloquee el botón y retire la rueda giratoria. Le mostramos al usuario un mensaje de que la acción se está ejecutando. Paralelamente, se ejecuta una tarea de Apio que, al finalizar, devuelve una respuesta en el socket web. Habiendo recibido el resultado en el frente, se lo mostramos al usuario.

 # rest_views.py from rest_framework import status from rest_framework.views import APIView from rest_framework.response import Response from tasks import send_emails class SendEmailView(APIView): def post(self, request): # this id will be used to send response with websocket request_uuid = request.data.get('request_uuid') if not request_uuid: return Response(status=status.HTTP_400_BAD_REQUEST) send_emails.delay(request.user.id, request_uuid) return Response(status=status.HTTP_200_OK) 

Por separado, puede distinguir las llamadas API externas de WSGI. En este caso, todas las llamadas, independientemente de la duración de su ejecución, se inician a través de la tarea Celery. Esta es la protección del tonto. No debería haber una situación en la que, debido a la inaccesibilidad de alguna API externa, la interfaz de usuario se congele.

3. Desafíos del tornado


Al integrarse con una red social, Telegram o un servicio de pago, necesita una URL de webhook a la que llegarán las notificaciones. El número de solicitudes no siempre se puede calcular por adelantado, pero lo más probable es que su número supere las solicitudes de los usuarios. Estas solicitudes se recibirán hasta que reciban una respuesta con el código 200.

Para procesar tales solicitudes, el marco asincrónico Tornado es adecuado. Para no convertir el procesamiento en sincrónico en Tornado, no debe haber operaciones de bloqueo. Aquí es donde se necesita el apio. El controlador Tornado recibe la solicitud, valida los datos, llama a la tarea de Apio y devuelve una respuesta exitosa.

 # tornado_handlers.py from tornado import gen, escape from tornado.web import RequestHandler from tasks import handle_vk_callback class VkCallbackHandler(RequestHandler): @gen.coroutine def post(self, *args, **kwargs): try: data = escape.json_decode(self.request.body) except ValueError: self.set_status(status_code=400, reason='Invalid data') return handle_vk_callback.delay(data) self.write('ok') return 

Source: https://habr.com/ru/post/461775/


All Articles