在Django应用程序中使用Celery的3种情况

图片

我正在Django上创建Web应用程序。 基本上,这些是企业的SaaS服务。 所有这些应用程序都需要异步任务。 对于它们的实现,我使用Celery。 在本文中,我将通过代码示例介绍使用Celery的情况。

Celery是用于管理任务队列的系统。 从根本上讲有两件事:从队列中执行任务并按计划执行任务。 队列代理通常是RabbitMQ或Redis。 任务被放入队列中,然后Celery工作人员从那里带走并执行它们。

对于Celery,您几乎可以在任何应用程序中想到一个应用程序,但随后我将仅描述我自己使用该应用程序的情况。

1.计划任务


通常,有一些任务需要在特定的日期和时间完成:向用户发送提醒,结束帐户的试用期,在社交网络上发布帖子。

在Celery中,可以在调用任务时指定ETA参数-必须启动任务的时间。 但是,如果您以这种方式计划任务,那么结果将是非常不可靠的:它们可能无法启动,并且取消它们可能会很不舒服。

一种更可靠的方法是使用celerybeat计划。 也就是说,创建一个计划,其中将有以特定频率或特定时间开始的任务。 例如,如果您需要按计划在社交网络上发布帖子,则该任务每分钟启动一次。 如果您需要完成帐户的试用期,则可以每天运行一次任务。

# schedule.py from datetime import timedelta from celery.schedules import crontab CELERYBEAT_SCHEDULE = { 'publish_post_starter': { 'task': 'publish_post_starter', 'schedule': timedelta(minutes=1), }, 'end_trial_starter': { 'task': 'end_trial_starter', 'schedule': crontab(hour=10, minute=21), }, } 

在任务启动器中,我们获取计划时间已经到来的所有实例。 我们遍历实例,并为每个实例调用主任务。 作为参数,我们仅传递实例ID,以免不必要的数据阻塞队列。 我们可以立即遍历所有实例并执行操作,但是通常最好为每个实例调用一个单独的任务。 因此,我们将加快执行速度,并且如果发生错误,它将仅影响其中一项任务。

 # tasks.py @app.task(name='publish_post') def publish_post(post_id): ... @app.task(name='publish_post_starter') def publish_post_starter(): post_ids = list( Post.objects.filter( publish_dt__lte=timezone.now(), is_published=False ).values_list('id', flat=True) ) for post_id in post_ids: publish_post.delay(post_id) 

2.来自WSGI的长计算和API调用


WSGI指的是在其中处理用户请求的上下文(请求-响应周期)。 与异步任务的上下文相反-Celery。

要创建响应界面,所有按钮都必须立即响应,并且不应阻塞其余界面。 为此,在锁定按钮后,将微调器放置在其上,并将ajax请求发送到服务器。 如果处理该请求的时间超过几秒钟,则可以将计算移至Celery任务。

在WSGI中,我们调用task并返回响应。 在前面,解锁按钮并卸下微调器。 我们向用户显示该操作正在运行的消息。 并行执行Celery任务,完成该任务后,将在Web套接字上返回响应。 在前面收到结果后,我们将其显示给用户。

 # rest_views.py from rest_framework import status from rest_framework.views import APIView from rest_framework.response import Response from tasks import send_emails class SendEmailView(APIView): def post(self, request): # this id will be used to send response with websocket request_uuid = request.data.get('request_uuid') if not request_uuid: return Response(status=status.HTTP_400_BAD_REQUEST) send_emails.delay(request.user.id, request_uuid) return Response(status=status.HTTP_200_OK) 

另外,您可以区分外部API调用和WSGI。 在这种情况下,所有调用(无论执行时间长短)均通过Celery任务启动。 这是对傻瓜的保护。 由于某些外部API的不可访问性,不应出现用户界面冻结的情况。

3.龙卷风带来的挑战


与社交网络,Telegram或支付服务集成时,您需要一个Webhook网址,通知将发送到该网址。 不能总是预先计算请求的数量,但是很可能它们的数量将超过用户的请求。 这些请求将一直被接收,直到它们收到代码200的响应为止。

对于处理此类请求,Tornado异步框架是合适的。 为了避免在Tornado中将处理转变为同步,应该没有阻塞操作。 这是需要芹菜的地方。 龙卷风处理程序接收到该请求,验证数据,调用Celery任务并返回成功的响应。

 # tornado_handlers.py from tornado import gen, escape from tornado.web import RequestHandler from tasks import handle_vk_callback class VkCallbackHandler(RequestHandler): @gen.coroutine def post(self, *args, **kwargs): try: data = escape.json_decode(self.request.body) except ValueError: self.set_status(status_code=400, reason='Invalid data') return handle_vk_callback.delay(data) self.write('ok') return 

Source: https://habr.com/ru/post/zh-CN461775/


All Articles