一、项目结构
1、app.py文件
from __future__ import absolute_import, unicode_literals
from celery import Celery
capp = Celery('proj', include=['proj.tasks']) #创建Celery实例,include是程序启动时导入的模块,可以在该处添加任务模块,便于职程能够对应相应的任务。
capp.config_from_object('proj.celeryconfig') #加载配置项
if __name__ == '__main__':
capp.start()
2、celeryconfig.py配置文件
BROKER_URL = 'redis://:密码@服务器ip:6379/0' # 使用redis中0数据库作为消息代理
CELERY_RESULT_BACKEND = 'redis://:密码@服务器ip:6379/1' # 把任务结果存在了Redis 1数据库中
CELERY_TASK_SERIALIZER = 'msgpack' # 任务序列化和反序列化使用msgpack方案
CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显
CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的内容类型
3、tasks.py文件
from __future__ import absolute_import
from proj.app import capp
@capp.task
def add(x, y):
return x + y
二、启动运行消费者(worker)
终端:celery -A proj.app worker -l info
启动完的效果:Control + c 就可以停止<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/22661793/1638969000619-afa8e999-d488-418d-aade-c0f5011d5aa4.png#clientId=ue9651524-dd32-4&from=paste&height=123&id=u49435f71&margin=%5Bobject%20Object%5D&name=image.png&originHeight=246&originWidth=1488&originalType=binary&ratio=1&size=47431&status=done&style=none&taskId=u480f4f4c-f05a-451e-830b-af6df5f9f6c&width=744)
三、发布任务
调用任务.py文件
from proj.tasks import add
for i in range(1, 5):
add.delay(i, 5) # 方法必须.delay才可以发布任务,如果只是add(),这就跟调用普通方法,没区别,任务不会发布出去
备注:delay() 实际上为 apply_async() 的快捷使用,apply_async() 可以指定调用时执行的参数,例如运行的时间,使用的任务队列等,add.apply_async((2, 2), queue='lopri', countdown=10),任务被下发到 lopri 队列中,任务下发之后会在最早10秒内执行。 直接调用任务函数进行执行任务,不会发送任何任务消息
redis服务运行后的效果:
1、调用任务执行任务的时候,会通过下面的broker发布出去,redis 0数据库
2、消费者消费后数据存储在,redis 1数据库
3、你可以在终端将worker消费者关闭掉再执行一遍调度任务看看效果:
所有任务都积压在broker通道中了,因为没有消费者去消费,当你再次启动worker,会发现积压的任务会自动被消费掉