3 casos para usar o aipo em um aplicativo Django

imagem

Estou criando aplicativos da web no Django. Basicamente, esses são serviços SaaS para empresas. Todos esses aplicativos requerem tarefas assíncronas. Para sua implementação, eu uso o aipo. No artigo, falarei sobre situações em que uso o Aipo, com exemplos de código.

O aipo é um sistema para gerenciar filas de tarefas. Fundamentalmente capaz 2 coisas: executar tarefas da fila e executar tarefas em um agendamento. O intermediário de filas geralmente é RabbitMQ ou Redis. As tarefas são colocadas na fila e, em seguida, os funcionários do aipo as retiram e as executam.

Para o Celery, você pode pensar em um aplicativo em quase qualquer aplicativo, mas descreverei apenas os casos em que eu mesmo o uso.

1. Tarefas agendadas


Muitas vezes, há tarefas que precisam ser concluídas em uma data e hora específicas: envie um lembrete ao usuário, encerre o período de avaliação da conta, publique uma postagem nas redes sociais.

No Celery, é possível especificar o parâmetro ETA ao chamar a tarefa - o horário em que a tarefa deve ser iniciada. Mas se você planeja tarefas dessa maneira, acaba sendo muito confiável: elas podem não iniciar e não são confortáveis ​​para cancelar.

Uma maneira mais confiável é usar a programação do aipo. Ou seja, crie uma agenda em que haverá tarefas que iniciem em uma determinada frequência ou em um horário específico. Por exemplo, se você precisar publicar uma postagem nas redes sociais em uma agenda, a tarefa para isso será iniciada uma vez por minuto. Se você precisar finalizar o período de avaliação da sua conta, poderá executar a tarefa uma vez por dia.

# schedule.py from datetime import timedelta from celery.schedules import crontab CELERYBEAT_SCHEDULE = { 'publish_post_starter': { 'task': 'publish_post_starter', 'schedule': timedelta(minutes=1), }, 'end_trial_starter': { 'task': 'end_trial_starter', 'schedule': crontab(hour=10, minute=21), }, } 

No iniciador de tarefas, obtemos todas as instâncias para as quais o tempo planejado já chegou. Analisamos as instâncias e, para cada uma, chamamos a tarefa principal. Como argumentos, passamos apenas o ID da instância para não entupir a fila com dados desnecessários. Podemos passar imediatamente por todas as instâncias e executar ações, mas na maioria das vezes é melhor chamar uma tarefa separada para cada instância. Portanto, aceleraremos a execução e, se ocorrer um erro, afetará apenas uma das tarefas.

 # tasks.py @app.task(name='publish_post') def publish_post(post_id): ... @app.task(name='publish_post_starter') def publish_post_starter(): post_ids = list( Post.objects.filter( publish_dt__lte=timezone.now(), is_published=False ).values_list('id', flat=True) ) for post_id in post_ids: publish_post.delay(post_id) 

2. Longas chamadas de computação e API da WSGI


WSGI refere-se ao contexto em que as solicitações dos usuários são processadas (Ciclo de Solicitação-Resposta). Em contraste com o contexto de tarefas assíncronas - aipo.

Para criar uma interface responsiva, todos os botões devem responder instantaneamente e não devem bloquear o restante da interface. Para fazer isso, depois de pressionar o botão é bloqueado, um botão giratório é colocado nele e uma solicitação de ajax é enviada ao servidor. Se o processamento da solicitação demorar mais de alguns segundos, você poderá mover o cálculo para a tarefa Aipo.

No WSGI, chamamos tarefa e retornamos uma resposta. Na frente, destrave o botão e remova o botão rotativo. Mostramos ao usuário uma mensagem de que a ação está em execução. Paralelamente, é executada uma tarefa do Aipo, que, após a conclusão, retorna uma resposta no soquete da web. Tendo recebido o resultado na frente, mostramos ao usuário.

 # rest_views.py from rest_framework import status from rest_framework.views import APIView from rest_framework.response import Response from tasks import send_emails class SendEmailView(APIView): def post(self, request): # this id will be used to send response with websocket request_uuid = request.data.get('request_uuid') if not request_uuid: return Response(status=status.HTTP_400_BAD_REQUEST) send_emails.delay(request.user.id, request_uuid) return Response(status=status.HTTP_200_OK) 

Separadamente, você pode distinguir chamadas de API externas do WSGI. Nesse caso, todas as chamadas, independentemente da duração de sua execução, são iniciadas por meio da tarefa Aipo. Isso é proteção contra o tolo. Não deve haver uma situação em que, devido à inacessibilidade de alguma API externa, a interface do usuário congele.

3. Desafios do Tornado


Ao integrar-se a uma rede social, telegrama ou serviço de pagamento, você precisa de um URL de webhook para o qual as notificações chegarão. O número de solicitações nem sempre pode ser calculado com antecedência, mas provavelmente o número excederá as solicitações dos usuários. Esses pedidos serão recebidos até receberem uma resposta com o código 200.

Para processar essas solicitações, a estrutura assíncrona do Tornado é adequada. Para não transformar o processamento em síncrono no Tornado, não deve haver operações de bloqueio. É aqui que o aipo é necessário. O manipulador Tornado recebe a solicitação, valida os dados, chama a tarefa Aipo e retorna uma resposta bem-sucedida.

 # tornado_handlers.py from tornado import gen, escape from tornado.web import RequestHandler from tasks import handle_vk_callback class VkCallbackHandler(RequestHandler): @gen.coroutine def post(self, *args, **kwargs): try: data = escape.json_decode(self.request.body) except ValueError: self.set_status(status_code=400, reason='Invalid data') return handle_vk_callback.delay(data) self.write('ok') return 

Source: https://habr.com/ru/post/pt461775/


All Articles