芹菜+Django——使用Django消息框架轮询任务状态和报告成功或失败

Celery+Django -- Poll task for state and report success or failure using Django messages framework

本文关键字:状态 报告 失败 任务 成功 使用 +Django Django 消息 芹菜 框架      更新时间:2023-09-26

在我的Django项目中使用了芹菜(以及其他一些东西),我有一个芹菜任务,它将在后台将文件上传到数据库。我使用轮询来跟踪上传进度,并显示上传进度条。下面是一些详细介绍上传过程的代码片段:

views.py:

from .tasks import upload_task
...
upload_task.delay(datapoints, user, description) # datapoints is a list of dictionaries, user and description are simple strings

tasks.py:

from taskman.celery import app, DBTask # taskman is the name of the Django app that has celery.py
from celery import task, current_task
@task(base=DBTask)
def upload_task(datapoints, user, description):
    from utils.db.databaseinserter import insertIntoDatabase
    for count in insertIntoDatabase(datapoints, user, description):
        percent_completion = int(100 * (float(count) / float(len(datapoints))))
        current_task.update_state(state='PROGRESS', meta={'percent':percent_completion})

databaseinserter.py:

def insertIntoDatabase(datapoints, user, description):
    # iterate through the datapoints and upload them one by one
    # at the end of an iteration, yield the number of datapoints completed so far

上传代码运行正常,进度条也运行正常。然而,我不确定如何发送Django消息告诉用户上传完成(或者,在发生错误时,发送Django消息通知用户错误)。当开始上传时,我在views.py:

中执行此操作。
from django.contrib import messages
...
messages.info(request, "Upload is in progress")

当上传成功时,我想这样做:

messages.info(request, "Upload successful!")

我不能在views.py中这样做,因为Celery任务是fire and forget。在celery.py中有办法做到这一点吗?在我的DBTask类在celery.py我有on_successon_failure定义,所以我能从那里发送Django消息吗?

另外,虽然我的轮询在技术上是有效的,但它目前并不理想。当前轮询的工作方式是,它将无休止地检查任务,而不管任务是否正在进行中。它会迅速淹没服务器控制台日志,我可以想象对整体性能有负面影响。我是编写轮询代码的新手,所以我不完全确定最佳实践,比如如何只在需要时进行轮询。处理持续轮询和服务器日志阻塞的最佳方法是什么?下面是我的轮询代码。

views.py:

def poll_state(request):
    data = 'Failure'
    if request.is_ajax():
        if 'task_id' in request.POST.keys() and request.POST['task_id']:
            task_id = request.POST['task_id']
            task = AsyncResult(task_id)
            data = task.result or task.state
            if data == 'SUCCESS' or data == 'FAILURE': # not sure what to do here; what I want is to exit the function early if the current task is already completed
                return HttpResponse({}, content_type='application/json')
        else:
            data ='No task_id in the request'
            logger.info('No task_id in the request')
    else:
        data = 'Not an ajax request'
        logger.info('Not an ajax request')
    json_data = json.dumps(data)
    return HttpResponse(json_data, content_type='application/json')
和相应的jQuery代码:
{% if task_id %}
    jQuery(document).ready(function() {
        var PollState = function(task_id) {
            jQuery.ajax({
                url: "poll_state",
                type: "POST",
                data: "task_id=" + task_id,
            }).done(function(task) {
                if (task.percent) {
                    jQuery('.bar').css({'width': task.percent + '%'});
                    jQuery('.bar').html(task.percent + '%');
                }
                else {
                    jQuery('.status').html(task);
                };
                PollState(task_id);
            });
        }
        PollState('{{ task_id }}');
    })
{% endif %}

(这最后两个片段主要来自之前关于Django+芹菜进度条的StackOverflow问题)

减少日志记录和开销的最简单答案是在下次PollState调用时设置超时。你现在写的函数会立即再次轮询。比如:

setTimeout(function () { PollState(task_id); }, 5000);

这将大大减少日志记录问题和开销。


关于你的Django消息传递问题,你需要通过某种处理将那些完成的任务拉出来。一种方法是使用Notification模型或类似模型,然后您可以添加一个中间件来获取未读通知并将其注入消息框架。

感谢Josh K关于使用setTimeout的提示。不幸的是,我永远无法弄清楚中间件的方法,所以我要用一个更简单的方法在poll_state中发送一个HttpResponse,如下所示:

if data == "SUCCESS":
    return HttpResponse(json.dumps({"message":"Upload successful!", "state":"SUCCESS"}, content_type='application/json'))
elif data == "FAILURE":
    return HttpResponse(json.dumps({"message":"Error in upload", "state":"FAILURE"}, content_type='application/json'))

目的是根据收到的JSON简单地呈现成功或错误消息。现在出现了新的问题,但那是针对另一个问题。