简介
何为消息队列
任务队列是一种在**线程或机器间**分发任务的机制。
消息队列的输入是工作的一个单元,称为任务,独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务。
1. 用消息通信,通常使用**中间人(Broker)在客户端和职程间斡旋**。这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程。
2. Celery 系统可包含多个职程和中间人,以此获得高可用性和横向扩展能力。[即并发]
3. Celery 是用 Python 编写的,但协议可以用任何语言实现。
1. 迄今,已有 Ruby 实现的 RCelery 、node.js 实现的 node-celery
2. 以及一个 PHP 客户端 ,语言互通也可以通过 using webhooks 实现。
定义
简单
1. 易于使用和维护,并且它 不需要配置文件 。
2. 有一个活跃、友好的社区来让你寻求帮助,包括一个 邮件列表 和一个 IRC 频道 。
3. 高可用性
倘若连接丢失或失败,职程和客户端会自动重试,
并且一些中间人通过 主/主 或 主/从 方式复制来提高可用性。
4. 快速
单个 Celery 进程每分钟可处理数以百万计的任务,
而保持往返延迟在亚毫秒级(使用 RabbitMQ、py-librabbitmq 和优化过的设置)。
5. 灵活
Celery 几乎所有部分都可以扩展或单独使用。
可以自制连接池、 序列化、压缩模式、日志、调度器、消费者、生产者、自动扩展、 中间人传输或更多。
图示
中间人broker支持
1. 中间人
RabbitMQ, Redis,
MongoDB (实验性), ZeroMQ (实验性)
CouchDB (实验性), SQLAlchemy (实验性)
Django ORM (实验性), Amazon SQS, (实验性)
2. 并发
prefork(多进程),
Eventlet, gevent
多线程/单线程
结果存储
3. AMQP, Redis
memcached, MongoDB
SQLAlchemy, Django ORM
Apache Cassandra
4. 序列化
pickle, json, yaml, msgpack
zlib, bzip2 压缩
密码学消息签名
特性
1. 监视
整条流水线的监视时间由职程发出,并用于内建或外部的工具告知你集群的工作状况——而且是实时的。
2. 工作流
一系列功能强大的称为“Canvas”的原语(Primitive)用于构建或简单、或复杂的工作流。包括分组、连锁、分割等等。
3. 时间和速率限制
你可以控制每秒/分钟/小时执行的任务数,或任务的最长运行时间, 并且可以为特定职程或不同类型的任务设置为默认值。
4. 计划任务
你可以指定任务在若干秒后或在 datetime 运行,
或你可以基于单纯的时间间隔或支持分钟、小时、每周的第几天、每月的第几天
以及每年的第几月的 crontab 表达式来使用周期任务来重现事件。
5. 自动重载入
在开发中,职程可以配置为在源码修改时自动重载入,包含对 Linux 上的 inotify(7) 支持。
6. 自动扩展
根据负载自动重调职程池的大小或用户指定的测量值,用于限制共享主机/云环境的内存使用,或是保证给定的服务质量。
7. 资源泄露保护
--maxtasksperchild 选项用于控制用户任务泄露的诸如内存或文件描述符这些易超出掌控的资源。
8. 用户组件
每个职程组件都可以自定义,并且额外组件可以由用户定义。职程是用 “bootsteps” 构建的——一个允许细粒度控制职程内构件的依赖图。
集成
Django | django-celery |
---|---|
Pyramid | pyramid_celery |
Pylons | celery-pylons |
Flask | 不需要 |
web2py | web2py-celery |
Tornado | tornado-celery |
官方文档
http://docs.jinkan.org/docs/celery/getting-started/introduction.html
https://docs.celeryproject.org/en/stable/getting-started/introduction.html
# eventlet
https://pypi.org/project/eventlet/
installation
pip install celery
pip install eventlet # 负责并发,搭配使用
usage
# 项目启动
项目结构
Directory: C:\Users\41999\PycharmProjects\Celery\celery_tasks
Mode LastWriteTime Length Name
---- ------------- ------ ----
d---- 11/7/2021 5:04 PM __pycache__
-a--- 11/7/2021 4:53 PM 615 celery_assignment.py
-a--- 11/7/2021 4:05 PM 351 producer.py
-a--- 11/7/2021 4:56 PM 254 task01.py
-a--- 11/7/2021 4:56 PM 254 task02.py
启动方式
先进入第一级目录,C:\Users\41999\PycharmProjects\Celery\, 然后在父目录后追加主程序
即:celery -A celery_tasks.celery_assignment worker -l info -P eventlet
名字含义
sun-harmonics 太阳谐波 是5.n版本的名字
补充
启动方式---j
celery -A celery_task worker --loglevel=info
celery -A celery_task worker -l info -P eventlet[]
环境搭建
需要创建虚拟环境
# 参数说明
-P, --pool <pool>¶
Pool implementation
https://docs.celeryproject.org/en/stable/reference/cli.html#cmdoption-celery-workdir
# 参数说明
--eventlet¶
Use eventlet
https://docs.celeryproject.org/en/stable/reference/cli.html#cmdoption-celery-shell-eventlet
# eventlet 介绍
https://pypi.org/project/eventlet/
02 Celery异步执行语法
description
1. 组成
1. 监视进程 celery_task
2. 生产者 produce_task
3. 结果检查 result
2. 使用
1. 首先由监视进程创建持续连接,分别指定用于交换数据的消息队列,本次演示均使用redis;分别创建两个打印任务
2. 第一个任务打印邮件发送,第二个任务打印消息发送
3. 由result程序检查消息是否正确接收
3. 测试
1. 启动监视进程持续监听,以及持续连接
2. 运行 produce_task 发送消息
3. 运行 result 检查结果
Code
==============================================celery_task.py======================================================
import celery,time
backend = "redis://127.0.0.1:6379/1"
broker = "redis://127.0.0.1:6379/2"
cel = celery.Celery('test', backend=backend, broker=broker)
@cel.task
def send_mail(name):
print("向%s发送邮件" % name)
time.sleep(5)
print("向%s发送邮件完成" % name)
return "OK"
@cel.task
def send_message(name):
print("向%s发送短信" % name)
time.sleep(5)
print("向%s发送短信完成" % name)
return "OK"
=============================================produce_task.py======================================================
from celery_task import send_mail, send_message
result = send_mail.delay("yuan")
print(result.id)
result2 = send_message.delay("alex")
print(result2.id)
==============================================result.py============================================================
from celery.result import AsyncResult
from celery_task import cel
async_result=AsyncResult(id="4b08b009-14e8-4713-97d2-a51d4d0d579c", app=cel)
if async_result.successful():
result = async_result.get()
print(result)
# result.forget() # 将结果删除
elif async_result.failed():
print('执行失败')
elif async_result.status == 'PENDING':
print('任务等待中被执行')
elif async_result.status == 'RETRY':
print('任务异常后正在重试')
elif async_result.status == 'STARTED':
print('任务已经开始被执行')
operation outcome
==============================================celery_task.py=====================================================
(venv) C:\Users\41999\PycharmProjects\Celery\01异步任务>celery -A celery_task worker -l info -P eventlet
-------------- celery@ThinkPad v5.1.2 (sun-harmonics)
--- ***** -----
-- ******* ---- Windows-10-10.0.22489-SP0 2021-10-30 21:54:40
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: test:0x1cca1c456a0
- ** ---------- .> transport: redis://127.0.0.1:6379/2
- ** ---------- .> results: redis://127.0.0.1:6379/1
- *** --- * --- .> concurrency: 8 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery_task.send_mail
. celery_task.send_message
[2021-10-30 21:54:40,665: INFO/MainProcess] Connected to redis://127.0.0.1:6379/2
[2021-10-30 21:54:40,671: INFO/MainProcess] mingle: searching for neighbors
[2021-10-30 21:54:41,695: INFO/MainProcess] mingle: all alone
[2021-10-30 21:54:41,706: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/2.
[2021-10-30 21:54:41,710: INFO/MainProcess] celery@ThinkPad ready.
[2021-10-30 21:54:51,022: INFO/MainProcess] Task celery_task.send_mail[4b08b009-14e8-4713-97d2-a51d4d0d579c] received
[2021-10-30 21:54:51,023: WARNING/MainProcess] 向yuan发送邮件
[2021-10-30 21:54:51,023: WARNING/MainProcess]
[2021-10-30 21:54:51,024: INFO/MainProcess] Task celery_task.send_message[f8b7ed5c-dea1-4219-af59-075d7b4cceaf] received
[2021-10-30 21:54:51,024: WARNING/MainProcess] 向alex发送短信
[2021-10-30 21:54:51,025: WARNING/MainProcess]
[2021-10-30 21:54:56,025: WARNING/MainProcess] 向yuan发送邮件完成
[2021-10-30 21:54:56,025: WARNING/MainProcess]
[2021-10-30 21:54:56,026: WARNING/MainProcess] 向alex发送短信完成
[2021-10-30 21:54:56,026: WARNING/MainProcess]
[2021-10-30 21:54:56,028: INFO/MainProcess] Task celery_task.send_mail[4b08b009-14e8-4713-97d2-a51d4d0d579c] succeeded in 5.014999999999418s: 'OK'
[2021-10-30 21:54:56,028: INFO/MainProcess] Task celery_task.send_message[f8b7ed5c-dea1-4219-af59-075d7b4cceaf] succeeded in 5.014999999999418s: '
OK'
=============================================produce_task.py======================================================
C:\Users\41999\PycharmProjects\Celery\venv\Scripts\python.exe C:/Users/41999/PycharmProjects/Celery/01异步任务/produce_task.py
2e0d0bde-3da5-4cb7-bbfa-f60a735f5f80
0d761ef1-ec69-4ec0-b909-ff0c0922c0ac
==============================================result.py============================================================
C:\Users\41999\PycharmProjects\Celery\venv\Scripts\python.exe C:/Users/41999/PycharmProjects/Celery/01异步任务/result.py
OK
error & resolve
1. 报错
Task handler raised error: ValueError: not enough values to unpack
2. 更换启动监视程序的命令 前提是安装eventlet
celery -A celery_task worker -l info -P eventlet
3. 链接
https://blog.csdn.net/PY0312/article/details/105538699
4. 报错
Usage: celery [OPTIONS] COMMAND [ARGS]...
Error: Invalid value for '-A' / '--app':
Unable to load celery application.
Module 'celery_tasks' has no attribute 'celery'
5. 解决
将文件夹下文件名修改为celery
reference
1. https://blog.csdn.net/weixin_43162402/article/details/83314692
2. https://blog.csdn.net/weixin_43162402/article/details/83314877
3. https://www.crifan.com/python_newbie_debug_file_name_not_same_with_lib_name_to_test/
03 celery的多目录结构异步执行
description
项目构成
- 项目文件夹
项目文件
- celery_assignment.py 主程序
- task01.py
- task02.py
- producer.py
- 两个task需要分别以文件夹.文件的方式引入celery的配置文件,也就是主程序
- 主程序相对之前而言,需要分别维持两个任务,可以添加时区选项,但并无必要
价值
- 主程序控制两个任务负责接收消息,由producer负责生产,该任务模式为多任务异步协程,可以提高效率
warning
- 需要安装eventlet
code
=======================================celery_assignment.py=====================================================
import celery
cel = celery.Celery('celery_demo',
broker='redis://127.0.0.1:6379/1',
backend='redis://127.0.0.1:6379/2',
# 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
include=['celery_tasks.task01',
'celery_tasks.task02'
])
# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False
======================================task01.py================================================================
import time
from celery_tasks.celery_assignment import cel
@cel.task
def send_email(res):
time.sleep(5)
return "完成向%s发送邮件任务" % res
======================================task02.py================================================================
import time
from celery_tasks.celery_assignment import cel
@cel.task
def send_msg(name):
time.sleep(5)
return "完成向%s发送短信任务" % name
=======================================producer.py=============================================================
from celery_tasks.task01 import send_email
from celery_tasks.task02 import send_msg
# 立即告知celery去执行test_celery任务,并传入一个参数
result = send_email.delay('yuan')
print(result.id)
result = send_msg.delay('yuan')
print(result.id)
output
=========================================terminal=================================================================
(venv) PS C:\Users\41999\PycharmProjects\Celery> celery -A celery_tasks.celery_assignment worker -l info -P eventlet
-------------- celery@ThinkPad v5.1.2 (sun-harmonics)
--- ***** -----
-- ******* ---- Windows-10-10.0.22494-SP0 2021-11-07 19:46:55
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: celery_demo:0x2177c4e0f40
- ** ---------- .> transport: redis://127.0.0.1:6379/1
- ** ---------- .> results: redis://127.0.0.1:6379/2
- *** --- * --- .> concurrency: 8 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery_tasks.task01.send_email
. celery_tasks.task02.send_msg
[2021-11-07 19:46:55,171: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1
[2021-11-07 19:46:55,171: INFO/MainProcess] mingle: searching for neighbors
[2021-11-07 19:46:56,188: INFO/MainProcess] mingle: all alone
[2021-11-07 19:46:56,188: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/1.
[2021-11-07 19:46:56,204: INFO/MainProcess] celery@ThinkPad ready.
[2021-11-07 19:47:00,818: INFO/MainProcess] Task celery_tasks.task01.send_email[f9fb9f8c-7b22-4643-81c3-22ddcf66243f] received
[2021-11-07 19:47:00,818: INFO/MainProcess] Task celery_tasks.task02.send_msg[7c60cfac-1387-4c10-9d10-67dc9e385a30] received
[2021-11-07 19:47:05,836: INFO/MainProcess] Task celery_tasks.task01.send_email[f9fb9f8c-7b22-4643-81c3-22ddcf66243f] succeeded in 5.0149999999994
18s: '完成向yuan发送邮件任务'
[2021-11-07 19:47:05,836: INFO/MainProcess] Task celery_tasks.task02.send_msg[7c60cfac-1387-4c10-9d10-67dc9e385a30] succeeded in 5.014999999999418
s: '完成向yuan发送短信任务'
=========================================console=================================================================
C:\Users\41999\PycharmProjects\Celery\venv\Scripts\python.exe C:/Users/41999/PycharmProjects/Celery/celery_tasks/producer.py
f9fb9f8c-7b22-4643-81c3-22ddcf66243f
7c60cfac-1387-4c10-9d10-67dc9e385a30
Process finished with exit code 0
04 定时任务
description
项目结构
项目名:crontab
- celery_task.py
- produce_task.py
- result.py
两种定时任务
- 第一种:指定时间,年月日时分秒
- 第二种:设定偏移量
code
=====================================================celery.task.py==============================================
import celery
import time
backend = "redis://127.0.0.1:6379/1"
broker = "redis://127.0.0.1:6379/2"
cel = celery.Celery('test', backend=backend, broker=broker)
@cel.task
def send_mail(name):
print("向%s发送邮件" % name)
time.sleep(5)
print("向%s发送邮件完成" % name)
return "OK"
@cel.task
def send_message(name):
print("向%s发送短信" % name)
time.sleep(5)
print("向%s发送短信完成" % name)
return "OK"
=====================================================produce.task.py==============================================
from datetime import datetime
from crontab.celery_task import send_mail
# 方式一
v1 = datetime(2021, 11, 7, 20, 28, 30)
print(v1)
v2 = datetime.utcfromtimestamp(v1.timestamp()) # 本地时间转换为国际时间
print(v2)
result = send_mail.apply_async(args=["egon", ], eta=v2)
print(result.id)
=====================================================produce.task.py==============================================
from datetime import datetime
from crontab.celery_task import send_mail
# 方式二
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay
# 使用apply_async并设定时间
result = send_mail.apply_async(args=["egon"], eta=task_time)
print(result.id)
output
PS C:\Users\41999\PycharmProjects\Celery> celery -A crontab.celery_task worker -l info -P eventlet
-------------- celery@ThinkPad v5.1.2 (sun-harmonics)
--- ***** -----
-- ******* ---- Windows-10-10.0.22494-SP0 2021-11-07 20:23:05
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: test:0x1b422d70f70
- ** ---------- .> transport: redis://127.0.0.1:6379/2
- ** ---------- .> results: redis://127.0.0.1:6379/1
- *** --- * --- .> concurrency: 8 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. crontab.celery_task.send_mail
. crontab.celery_task.send_message
[2021-11-07 20:23:05,802: INFO/MainProcess] Connected to redis://127.0.0.1:6379/2
[2021-11-07 20:23:05,809: INFO/MainProcess] mingle: searching for neighbors
[2021-11-07 20:23:06,828: INFO/MainProcess] mingle: all alone
[2021-11-07 20:23:06,828: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/2.
[2021-11-07 20:23:06,843: INFO/MainProcess] celery@ThinkPad ready.
[2021-11-07 20:23:25,419: ERROR/MainProcess] Received unregistered task of type 'celery_task.send_mail'.
The message has been ignored and discarded.
[2021-11-07 20:28:26,309: INFO/MainProcess] Task crontab.celery_task.send_mail[375531f0-ae7b-4274-9e21-65720ec93a93] received
[2021-11-07 20:28:30,015: WARNING/MainProcess] 向egon发送邮件
[2021-11-07 20:28:30,015: WARNING/MainProcess]
[2021-11-07 20:28:35,034: WARNING/MainProcess] 向egon发送邮件完成
[2021-11-07 20:28:35,034: WARNING/MainProcess]
[2021-11-07 20:28:35,034: INFO/MainProcess] Task crontab.celery_task.send_mail[375531f0-ae7b-4274-9e21-65720ec93a93] succeeded in 5.01600000000326
s: 'OK'
[2021-11-07 20:30:37,970: INFO/MainProcess] Task crontab.celery_task.send_mail[4c85cb68-c5ae-4f21-a80e-4208e349fa78] received
[2021-11-07 20:30:47,729: WARNING/MainProcess] 向egon发送邮件
[2021-11-07 20:30:47,729: WARNING/MainProcess]
[2021-11-07 20:30:52,733: WARNING/MainProcess] 向egon发送邮件完成
[2021-11-07 20:30:52,733: WARNING/MainProcess]
[2021-11-07 20:30:52,733: INFO/MainProcess] Task crontab.celery_task.send_mail[4c85cb68-c5ae-4f21-a80e-4208e349fa78] succeeded in 5.0s: 'OK'
05 celery多目录结构下的定时任务
command
# 启动虚拟环境后在 项目根目录使用这两条命令启动程序,先启动第二条定时发送,然后启动第一条接收
celery -A celery_asn.celery_assignment worker -l info -P eventlet
celery -A celery_asn.celery_assignment beat -l info
project construction
celery_asn
- celery_assignment [main]
- task01
- task02
code
====celery_assignment [main]============================================================================
# -*- coding: UTF-8 -*-
"""
@author:41999
@file:celery_assignment.py
@time:2021/11/07
"""
from datetime import timedelta
import celery
cel = celery.Celery('celery_demo',
broker='redis://127.0.0.1:6379/1',
backend='redis://127.0.0.1:6379/2',
# 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
include=['celery_asn.task01',
'celery_asn.task02'
])
# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False
cel.conf.beat_schedule = {
# 名字随意命名
'add-every-10-seconds': {
# 执行tasks1下的test_celery函数
'task': 'celery_asn.task01.send_email',
# 每隔2秒执行一次
# 'schedule': 1.0,
# 'schedule': crontab(minute="*/1"),
'schedule': timedelta(seconds=6),
# 传递参数
'args': ('张三',)
},
# 'add-every-12-seconds': {
# 'task': 'celery_tasks.task01.send_email',
# 每年4月11号,8点42分执行
# 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
# 'args': ('张三',)
# },
}
====task01===============================================================================================
# -*- coding: UTF-8 -*-
"""
@author:41999
@file:task01.py
@time:2021/11/07
"""
import time
from celery_asn.celery_assignment import cel
@cel.task
def send_email(res):
time.sleep(5)
return "完成向%s发送邮件任务" % res
====task02===============================================================================================
import time
from celery_tasks.celery_assignment import cel
@cel.task
def send_msg(name):
time.sleep(5)
return "完成向%s发送短信任务" % name
Terminal1
-------------- celery@ThinkPad v5.1.2 (sun-harmonics)
--- ***** -----
-- ******* ---- Windows-10-10.0.22504-SP0 2021-11-18 16:24:20
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: celery_demo:0x237e4740f70
- ** ---------- .> transport: redis://127.0.0.1:6379/1
- ** ---------- .> results: redis://127.0.0.1:6379/2
- *** --- * --- .> concurrency: 8 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery_asn.task01.send_email
. celery_asn.task02.send_msg
[2021-11-18 16:24:20,197: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1
[2021-11-18 16:24:20,205: INFO/MainProcess] mingle: searching for neighbors
[2021-11-18 16:24:21,234: INFO/MainProcess] mingle: all alone
[2021-11-18 16:24:21,244: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/1.
[2021-11-18 16:24:21,247: INFO/MainProcess] celery@ThinkPad ready.
[2021-11-18 16:24:21,458: INFO/MainProcess] Task celery_asn.task01.send_email[a08d2686-fc65-4ae8-8480-86b46cf28c78] received
[2021-11-18 16:24:24,106: INFO/MainProcess] Task celery_asn.task01.send_email[ca08be47-6b30-49a3-bf71-72298d0e4dee] received
[2021-11-18 16:24:26,459: INFO/MainProcess] Task celery_asn.task01.send_email[a08d2686-fc65-4ae8-8480-86b46cf28c78] succeeded in 5.0s: '完成向张三发送邮件任务'
[2021-11-18 16:24:29,106: INFO/MainProcess] Task celery_asn.task01.send_email[ca08be47-6b30-49a3-bf71-72298d0e4dee] succeeded in 5.0s: '完成向张三发送邮件任务'
Terminal2
(venv) PS C:\Users\41999\PycharmProjects\Celery> celery -A celery_asn.celery_assignment beat -l info
celery beat v5.1.2 (sun-harmonics) is starting.
__ - ... __ - _
LocalTime -> 2021-11-18 16:25:12
Configuration ->
. broker -> redis://127.0.0.1:6379/1
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%INFO
. maxinterval -> 5.00 minutes (300s)
[2021-11-18 16:25:12,398: INFO/MainProcess] beat: Starting...
[2021-11-18 16:25:12,508: INFO/MainProcess] Scheduler: Sending due task add-every-10-seconds (celery_asn.task01.send_email)
Django中使用celery
project structure
C:.
├─.idea
│ └─inspectionProfiles
├─app01
│ ├─migrations
│ │ └─__pycache__
│ └─__pycache__
├─celerytest
│ └─__pycache__
├─mycelery
│ ├─sms
│ │ └─__pycache__
│ └─__pycache__
├─templates
└─venv
├─Lib
└─Scripts
celery启动
(venv) PS C:\Users\41999\PycharmProjects\celerytest> celery -A mycelery.main worker -l info -P eventlet
Steps
1. 首先后端写好配置文件
2. in terminal start celery which used to listen
3. visit web page to trigger celery task
configure
Django logic
============================================celerytest.urls.py===========================================
from django.contrib import admin
from django.urls import path
from app01 import views
urlpatterns = [
path('admin/', admin.site.urls),
path('test/', views.test),
]
============================================celerytest.views.py==========================================
from django.shortcuts import render, HttpResponse
from mycelery.sms.tasks import send_sms, send_sms2
from datetime import datetime, timedelta
# Create your tests here.
def test(request):
# 异步任务
# 1. 声明一个和celery一模一样的任务函数,但是我们可以导包来解决
# send_sms.delay("110")
# send_sms2.delay("119")
# send_sms.delay() # 如果调用的任务函数没有参数,则不需要填写任何内容
# 定时任务
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay
result = send_sms.apply_async(["911", ], eta=task_time)
print(result.id)
return HttpResponse("OK")
Celery
=================================================mycelery/sms/tasks.py==================================
from mycelery.main import app
import time
@app.task # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
def send_sms(mobile):
"""发送短信"""
print("向手机号%s发送短信成功!" % mobile)
time.sleep(5)
return "send_sms OK"
@app.task # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
def send_sms2(mobile):
print("向手机号%s发送短信成功!" % mobile)
time.sleep(5)
return "send_sms2 OK"
=================================================mycelery/config.py======================================
broker_url = 'redis://127.0.0.1:6379/15'
result_backend = 'redis://127.0.0.1:6379/14'
=================================================mycelery/main.py========================================
import os
from celery import Celery
# 创建celery实例对象
app = Celery("celerytest")
# 把celery和django进行组合,识别和加载django的配置文件
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryPros.settings.dev')
# 通过app对象加载配置
app.config_from_object("mycelery.config")
# 加载任务
# 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
# app.autodiscover_tasks(["任务1","任务2"])
app.autodiscover_tasks(["mycelery.sms",])
# 启动Celery的命令
# 强烈建议切换目录到mycelery根目录下启动
# celery -A mycelery.main worker --loglevel=info