Celery设计的挺好,但很多情况下在运行不正常的时候不输出错误信息,就尼玛离谱,我debug至少要能找到bug是什么吧?锤子信息都没有,de你妈。

模块结构

文件组织

在项目中添加如下目录结构作为Celery子模块。
CELERY_TASK

  • celery.py
  • tasks.py
  • init.py

    初始化

    ```python

    celery.py

    from celery import Celery

celery = Celery(‘tasks’, broker=’redis://:pswd@localhost:6379/0’, backend=’redis://:pswd@localhost:6379/1’, include=[‘celery_task.tasks’])

celery.conf.update(result_expires=3600)

  1. tasks中一个模拟的长时任务如下:
  2. ```python
  3. # tasks.py
  4. from .celery import celery
  5. @celery.task
  6. def test(x, y):
  7. return x + y

__init__.py 可为空。

使用

启动工人

Windows下需要引入 eventlet 完成工人的创建,Linux下则不需要。

  1. # -c 进程数
  2. celery -A celery_task worker -l info -P eventlet -c 10
  3. celery -A celery_task worker -l info

执行任务

执行任务时直接引入即可。

  1. from celery_task.tasks import test
  2. task = test.apply_async(args=(x, y))
  3. taskid = task.id # 保存以在未来查找

获取任务进度

异步任务的封装需要进行修改。self 用以传出信息元组。

@celery.task(bind=True)
def async_function(self, arg, ...):

    ...
    self.update_state(state='PROGRESS', meta={'progress': progress_percent})
    ...

    return {'progress': 100}

在外部使用taskid查找到任务,即可读取其进度。

task = async_trainModel.AsyncResult(taskid)

print(task.state, task.info)    # task.info 即 上文 meta 字典