本文是我之前的PPT
https://docs.google.com/presentation/d/1QIEgZwApk7m6ukEmm_qM2AbKfdw0j_lRn82pW7rNwfU/edit?usp=sharing
1. Celery是什么
Celery 是一个由 Python 编写的简单、灵活、可靠的用来处理大量信息的分布式系统,它同时提供操作和维护分布式系统所需的工具(它本身不是一个任务队列, 它是 任务队列管理的工具)。官网地址
Celery的使用场景
- 爬虫
- 对实时性要求不高的任务(如推送和邮件)
Celery中五个角色
- Task 任务
- Broker 消息中间件(消息队列)
- Worker 消费者(任务执行者)
- Beat 定时任务调度器
- Backend 任务结果保存者
2. Celery配置
1. 从其他配置文件导入(最常用)
定义worker消费者
定义conf文件2. upload 方法导入
3. 直接导入
3. 单机下异步任务调用
任务-官方地址异步任务编写
任务在使用之前需要被导入,参见配置直接导入。
通过task()装饰器可以轻松从任何可调用对象创建任务
task支持附加的参数:https://docs.celeryproject.org/en/latest/userguide/tasks.html#task-request
当多个装饰器和任务装饰器结合使用的时候,必须保证最后使用任务装饰器异步任务调用
在 tasks/example/task1.py 中,写入__all__ == [‘task1_a’, ‘task1_b’](规定import可导内容)
manage.py 中from tasks.examples.task1 import *(import * 会减低代码可读性一般不适用,在此仅方便讲解)
任务的调用方法有三种:
- apply_async(args [, kwargs [, …] ] ) 【常用方法】发送任务消息
- delay ( args, *kwargs )【 最常用方法】发送任务的快捷方式
- calling ( call )
官方文档地址: https://docs.celeryproject.org/en/latest/userguide/calling.html
常用的Celery启动命令和各个参数的含义
常用的Celery 启动命令
celery -A tasks.celery_worker.learn worker -l info -c 3 -n a_team -Q q1
参数解释:
| celery -A | 固定格式 |
|---|---|
| tasks.celery_worker.learn | python以module导包的方式,指定worker |
| -l info | 日志级别 |
| -c 3 | 3个并发 |
| -n a_team | worker消费者命名为 a_team |
| -Q q1 | 指定worker消费队列q1内的任务 |
| -f notice.log | 日志文件的名称 |
获取异步任务的值
- 在Celery任务中调用Celery任务结果
- 在非Celery任务中调用Celery任务结果
4. 单机下定时任务调用
官网文档地址:https://docs.celeryproject.org/en/latest/reference/celery.schedules.html?highlight=crontab
定时任务启动命令(添加参数-B)
celery -A tasks.celery_worker.learn worker -B -l info -c 3 -n a_team -Q q1
5. 单机下设计任务流
任务流一共有三种 (group, chain, chord)。
接下来以以下的异步任务为例子进行讲解
工作流-group
工作流-chain
执行第一个任务并将其返回值传递给链中的下一个任务,以此类推。
工作流-chord
6. Celery对接Sentry与日志序列化
Celery 中日志打印
Celery 有个特殊的记录器 celery.task 。这个记录器由 celery worker 设立,目的是将任务相关信息添加到日志中。日志中包含两个新的参数 “task_id” “task_name”。 通过访问任务记录器celery.utils.log 可以对打印日志进行处理。
日志输出
自定义celery日志
自定义Celery日志有三种策略
- 增加Celery记录器
- 覆盖Celery根记录器
- 停用Celery记录器
因为第二种最简单,所以介绍一下:本质是通过连接setuplogging信号来组织Celery配置任何记录器

真实使用案例
序列化与sentry对接
Sentry可以在定义的日志拦截器里轻松添加,而Json序列化的包有很多,在日志格式化器(formatters)里添加就好了。
7. Celery与Flask相结合
因为Celery和Flask 各有一套上下文,一般来说是以Flask为主, Celery为辅助,所以需要将Celery的上下文注册到Flask之内。
真实使用中可以按如下方法
8. 用docker 部署服务
version: '3.7'services:ledger_pay:image: docker.zk.com/ledger-pay-server/project:v0.0.3restart: alwaysports:- 8533:9333env_file:- product.envsync_crontab:container_name: ledger_pay_crontabimage: docker.zk.com/ledger-pay-server/project:v0.0.6restart: alwaysworking_dir: /usr/src/app/ledger-pay/command: celery -A project.sync_tasks.celery_worker.celery_ledger worker -B -E -l INFO -f /var/log/celery/crontab.log -c 10 -Q crontabvolumes:- './logs:/var/log/celery/'env_file:- product.envsync_check_order:container_name: ledger_pay_check_orderimage: docker.zk.com/ledger-pay-server/project:v0.0.6restart: alwaysworking_dir: /usr/src/app/ledger-pay/command: celery -A project.sync_tasks.celery_worker.celery_ledger worker -E -l INFO -f wechat_trade.log -c 10 -Q wechat_tradevolumes:- './logs:/var/log/celery/'env_file:- product.envsync_profit_sharing:container_name: ledger_pay_profit_sharingimage: docker.zk.com/ledger-pay-server/project:v0.0.6restart: alwaysworking_dir: /usr/src/app/ledger-pay/command: celery -A project.sync_tasks.celery_worker.celery_ledger worker -E -l INFO -f wechat_separate.log -c 10 -Q wechat_separatevolumes:- './logs:/var/log/celery/'env_file:- product.envcheck_separate_status:container_name: ledger_pay_separate_checkimage: docker.zk.com/ledger-pay-server/project:v0.0.6restart: alwaysworking_dir: /usr/src/app/ledger-pay/command: celery -A project.sync_tasks.celery_worker.celery_ledger worker -E -l INFO -f query_wechat_separate.log -c 10 -Q query_wechat_separatevolumes:- './logs:/var/log/celery/'env_file:- product.env
