简介
何为消息队列
任务队列是一种在**线程或机器间**分发任务的机制。消息队列的输入是工作的一个单元,称为任务,独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务。1. 用消息通信,通常使用**中间人(Broker)在客户端和职程间斡旋**。这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程。2. Celery 系统可包含多个职程和中间人,以此获得高可用性和横向扩展能力。[即并发]3. Celery 是用 Python 编写的,但协议可以用任何语言实现。1. 迄今,已有 Ruby 实现的 RCelery 、node.js 实现的 node-celery2. 以及一个 PHP 客户端 ,语言互通也可以通过 using webhooks 实现。
定义
简单1. 易于使用和维护,并且它 不需要配置文件 。2. 有一个活跃、友好的社区来让你寻求帮助,包括一个 邮件列表 和一个 IRC 频道 。3. 高可用性倘若连接丢失或失败,职程和客户端会自动重试,并且一些中间人通过 主/主 或 主/从 方式复制来提高可用性。4. 快速单个 Celery 进程每分钟可处理数以百万计的任务,而保持往返延迟在亚毫秒级(使用 RabbitMQ、py-librabbitmq 和优化过的设置)。5. 灵活Celery 几乎所有部分都可以扩展或单独使用。可以自制连接池、 序列化、压缩模式、日志、调度器、消费者、生产者、自动扩展、 中间人传输或更多。
图示


中间人broker支持
1. 中间人RabbitMQ, Redis,MongoDB (实验性), ZeroMQ (实验性)CouchDB (实验性), SQLAlchemy (实验性)Django ORM (实验性), Amazon SQS, (实验性)2. 并发prefork(多进程),Eventlet, gevent多线程/单线程结果存储3. AMQP, Redismemcached, MongoDBSQLAlchemy, Django ORMApache Cassandra4. 序列化pickle, json, yaml, msgpackzlib, bzip2 压缩密码学消息签名
特性
1. 监视整条流水线的监视时间由职程发出,并用于内建或外部的工具告知你集群的工作状况——而且是实时的。2. 工作流一系列功能强大的称为“Canvas”的原语(Primitive)用于构建或简单、或复杂的工作流。包括分组、连锁、分割等等。3. 时间和速率限制你可以控制每秒/分钟/小时执行的任务数,或任务的最长运行时间, 并且可以为特定职程或不同类型的任务设置为默认值。4. 计划任务你可以指定任务在若干秒后或在 datetime 运行,或你可以基于单纯的时间间隔或支持分钟、小时、每周的第几天、每月的第几天以及每年的第几月的 crontab 表达式来使用周期任务来重现事件。5. 自动重载入在开发中,职程可以配置为在源码修改时自动重载入,包含对 Linux 上的 inotify(7) 支持。6. 自动扩展根据负载自动重调职程池的大小或用户指定的测量值,用于限制共享主机/云环境的内存使用,或是保证给定的服务质量。7. 资源泄露保护--maxtasksperchild 选项用于控制用户任务泄露的诸如内存或文件描述符这些易超出掌控的资源。8. 用户组件每个职程组件都可以自定义,并且额外组件可以由用户定义。职程是用 “bootsteps” 构建的——一个允许细粒度控制职程内构件的依赖图。
集成
| Django | django-celery |
|---|---|
| Pyramid | pyramid_celery |
| Pylons | celery-pylons |
| Flask | 不需要 |
| web2py | web2py-celery |
| Tornado | tornado-celery |
官方文档
http://docs.jinkan.org/docs/celery/getting-started/introduction.htmlhttps://docs.celeryproject.org/en/stable/getting-started/introduction.html# eventlethttps://pypi.org/project/eventlet/
installation
pip install celerypip install eventlet # 负责并发,搭配使用
usage
# 项目启动项目结构Directory: C:\Users\41999\PycharmProjects\Celery\celery_tasksMode LastWriteTime Length Name---- ------------- ------ ----d---- 11/7/2021 5:04 PM __pycache__-a--- 11/7/2021 4:53 PM 615 celery_assignment.py-a--- 11/7/2021 4:05 PM 351 producer.py-a--- 11/7/2021 4:56 PM 254 task01.py-a--- 11/7/2021 4:56 PM 254 task02.py启动方式先进入第一级目录,C:\Users\41999\PycharmProjects\Celery\, 然后在父目录后追加主程序即:celery -A celery_tasks.celery_assignment worker -l info -P eventlet名字含义sun-harmonics 太阳谐波 是5.n版本的名字
补充
启动方式---jcelery -A celery_task worker --loglevel=infocelery -A celery_task worker -l info -P eventlet[]环境搭建需要创建虚拟环境# 参数说明-P, --pool <pool>¶Pool implementationhttps://docs.celeryproject.org/en/stable/reference/cli.html#cmdoption-celery-workdir# 参数说明--eventlet¶Use eventlethttps://docs.celeryproject.org/en/stable/reference/cli.html#cmdoption-celery-shell-eventlet# eventlet 介绍https://pypi.org/project/eventlet/
02 Celery异步执行语法
description
1. 组成1. 监视进程 celery_task2. 生产者 produce_task3. 结果检查 result2. 使用1. 首先由监视进程创建持续连接,分别指定用于交换数据的消息队列,本次演示均使用redis;分别创建两个打印任务2. 第一个任务打印邮件发送,第二个任务打印消息发送3. 由result程序检查消息是否正确接收3. 测试1. 启动监视进程持续监听,以及持续连接2. 运行 produce_task 发送消息3. 运行 result 检查结果
Code
==============================================celery_task.py======================================================import celery,timebackend = "redis://127.0.0.1:6379/1"broker = "redis://127.0.0.1:6379/2"cel = celery.Celery('test', backend=backend, broker=broker)@cel.taskdef send_mail(name):print("向%s发送邮件" % name)time.sleep(5)print("向%s发送邮件完成" % name)return "OK"@cel.taskdef send_message(name):print("向%s发送短信" % name)time.sleep(5)print("向%s发送短信完成" % name)return "OK"=============================================produce_task.py======================================================from celery_task import send_mail, send_messageresult = send_mail.delay("yuan")print(result.id)result2 = send_message.delay("alex")print(result2.id)==============================================result.py============================================================from celery.result import AsyncResultfrom celery_task import celasync_result=AsyncResult(id="4b08b009-14e8-4713-97d2-a51d4d0d579c", app=cel)if async_result.successful():result = async_result.get()print(result)# result.forget() # 将结果删除elif async_result.failed():print('执行失败')elif async_result.status == 'PENDING':print('任务等待中被执行')elif async_result.status == 'RETRY':print('任务异常后正在重试')elif async_result.status == 'STARTED':print('任务已经开始被执行')
operation outcome
==============================================celery_task.py=====================================================(venv) C:\Users\41999\PycharmProjects\Celery\01异步任务>celery -A celery_task worker -l info -P eventlet-------------- celery@ThinkPad v5.1.2 (sun-harmonics)--- ***** ------- ******* ---- Windows-10-10.0.22489-SP0 2021-10-30 21:54:40- *** --- * ---- ** ---------- [config]- ** ---------- .> app: test:0x1cca1c456a0- ** ---------- .> transport: redis://127.0.0.1:6379/2- ** ---------- .> results: redis://127.0.0.1:6379/1- *** --- * --- .> concurrency: 8 (eventlet)-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)--- ***** ------------------- [queues].> celery exchange=celery(direct) key=celery[tasks]. celery_task.send_mail. celery_task.send_message[2021-10-30 21:54:40,665: INFO/MainProcess] Connected to redis://127.0.0.1:6379/2[2021-10-30 21:54:40,671: INFO/MainProcess] mingle: searching for neighbors[2021-10-30 21:54:41,695: INFO/MainProcess] mingle: all alone[2021-10-30 21:54:41,706: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/2.[2021-10-30 21:54:41,710: INFO/MainProcess] celery@ThinkPad ready.[2021-10-30 21:54:51,022: INFO/MainProcess] Task celery_task.send_mail[4b08b009-14e8-4713-97d2-a51d4d0d579c] received[2021-10-30 21:54:51,023: WARNING/MainProcess] 向yuan发送邮件[2021-10-30 21:54:51,023: WARNING/MainProcess][2021-10-30 21:54:51,024: INFO/MainProcess] Task celery_task.send_message[f8b7ed5c-dea1-4219-af59-075d7b4cceaf] received[2021-10-30 21:54:51,024: WARNING/MainProcess] 向alex发送短信[2021-10-30 21:54:51,025: WARNING/MainProcess][2021-10-30 21:54:56,025: WARNING/MainProcess] 向yuan发送邮件完成[2021-10-30 21:54:56,025: WARNING/MainProcess][2021-10-30 21:54:56,026: WARNING/MainProcess] 向alex发送短信完成[2021-10-30 21:54:56,026: WARNING/MainProcess][2021-10-30 21:54:56,028: INFO/MainProcess] Task celery_task.send_mail[4b08b009-14e8-4713-97d2-a51d4d0d579c] succeeded in 5.014999999999418s: 'OK'[2021-10-30 21:54:56,028: INFO/MainProcess] Task celery_task.send_message[f8b7ed5c-dea1-4219-af59-075d7b4cceaf] succeeded in 5.014999999999418s: 'OK'=============================================produce_task.py======================================================C:\Users\41999\PycharmProjects\Celery\venv\Scripts\python.exe C:/Users/41999/PycharmProjects/Celery/01异步任务/produce_task.py2e0d0bde-3da5-4cb7-bbfa-f60a735f5f800d761ef1-ec69-4ec0-b909-ff0c0922c0ac==============================================result.py============================================================C:\Users\41999\PycharmProjects\Celery\venv\Scripts\python.exe C:/Users/41999/PycharmProjects/Celery/01异步任务/result.pyOK
error & resolve
1. 报错Task handler raised error: ValueError: not enough values to unpack2. 更换启动监视程序的命令 前提是安装eventletcelery -A celery_task worker -l info -P eventlet3. 链接https://blog.csdn.net/PY0312/article/details/1055386994. 报错Usage: celery [OPTIONS] COMMAND [ARGS]...Error: Invalid value for '-A' / '--app':Unable to load celery application.Module 'celery_tasks' has no attribute 'celery'5. 解决将文件夹下文件名修改为celeryreference1. https://blog.csdn.net/weixin_43162402/article/details/833146922. https://blog.csdn.net/weixin_43162402/article/details/833148773. https://www.crifan.com/python_newbie_debug_file_name_not_same_with_lib_name_to_test/
03 celery的多目录结构异步执行
description
项目构成
- 项目文件夹
项目文件
- celery_assignment.py 主程序
- task01.py
- task02.py
- producer.py
- 两个task需要分别以文件夹.文件的方式引入celery的配置文件,也就是主程序
- 主程序相对之前而言,需要分别维持两个任务,可以添加时区选项,但并无必要
价值
- 主程序控制两个任务负责接收消息,由producer负责生产,该任务模式为多任务异步协程,可以提高效率
warning
- 需要安装eventlet
code
=======================================celery_assignment.py=====================================================import celerycel = celery.Celery('celery_demo',broker='redis://127.0.0.1:6379/1',backend='redis://127.0.0.1:6379/2',# 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类include=['celery_tasks.task01','celery_tasks.task02'])# 时区cel.conf.timezone = 'Asia/Shanghai'# 是否使用UTCcel.conf.enable_utc = False======================================task01.py================================================================import timefrom celery_tasks.celery_assignment import cel@cel.taskdef send_email(res):time.sleep(5)return "完成向%s发送邮件任务" % res======================================task02.py================================================================import timefrom celery_tasks.celery_assignment import cel@cel.taskdef send_msg(name):time.sleep(5)return "完成向%s发送短信任务" % name=======================================producer.py=============================================================from celery_tasks.task01 import send_emailfrom celery_tasks.task02 import send_msg# 立即告知celery去执行test_celery任务,并传入一个参数result = send_email.delay('yuan')print(result.id)result = send_msg.delay('yuan')print(result.id)
output
=========================================terminal=================================================================(venv) PS C:\Users\41999\PycharmProjects\Celery> celery -A celery_tasks.celery_assignment worker -l info -P eventlet-------------- celery@ThinkPad v5.1.2 (sun-harmonics)--- ***** ------- ******* ---- Windows-10-10.0.22494-SP0 2021-11-07 19:46:55- *** --- * ---- ** ---------- [config]- ** ---------- .> app: celery_demo:0x2177c4e0f40- ** ---------- .> transport: redis://127.0.0.1:6379/1- ** ---------- .> results: redis://127.0.0.1:6379/2- *** --- * --- .> concurrency: 8 (eventlet)-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)--- ***** ------------------- [queues].> celery exchange=celery(direct) key=celery[tasks]. celery_tasks.task01.send_email. celery_tasks.task02.send_msg[2021-11-07 19:46:55,171: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1[2021-11-07 19:46:55,171: INFO/MainProcess] mingle: searching for neighbors[2021-11-07 19:46:56,188: INFO/MainProcess] mingle: all alone[2021-11-07 19:46:56,188: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/1.[2021-11-07 19:46:56,204: INFO/MainProcess] celery@ThinkPad ready.[2021-11-07 19:47:00,818: INFO/MainProcess] Task celery_tasks.task01.send_email[f9fb9f8c-7b22-4643-81c3-22ddcf66243f] received[2021-11-07 19:47:00,818: INFO/MainProcess] Task celery_tasks.task02.send_msg[7c60cfac-1387-4c10-9d10-67dc9e385a30] received[2021-11-07 19:47:05,836: INFO/MainProcess] Task celery_tasks.task01.send_email[f9fb9f8c-7b22-4643-81c3-22ddcf66243f] succeeded in 5.014999999999418s: '完成向yuan发送邮件任务'[2021-11-07 19:47:05,836: INFO/MainProcess] Task celery_tasks.task02.send_msg[7c60cfac-1387-4c10-9d10-67dc9e385a30] succeeded in 5.014999999999418s: '完成向yuan发送短信任务'=========================================console=================================================================C:\Users\41999\PycharmProjects\Celery\venv\Scripts\python.exe C:/Users/41999/PycharmProjects/Celery/celery_tasks/producer.pyf9fb9f8c-7b22-4643-81c3-22ddcf66243f7c60cfac-1387-4c10-9d10-67dc9e385a30Process finished with exit code 0
04 定时任务
description
项目结构
项目名:crontab
- celery_task.py
- produce_task.py
- result.py
两种定时任务
- 第一种:指定时间,年月日时分秒
- 第二种:设定偏移量
code
=====================================================celery.task.py==============================================import celeryimport timebackend = "redis://127.0.0.1:6379/1"broker = "redis://127.0.0.1:6379/2"cel = celery.Celery('test', backend=backend, broker=broker)@cel.taskdef send_mail(name):print("向%s发送邮件" % name)time.sleep(5)print("向%s发送邮件完成" % name)return "OK"@cel.taskdef send_message(name):print("向%s发送短信" % name)time.sleep(5)print("向%s发送短信完成" % name)return "OK"=====================================================produce.task.py==============================================from datetime import datetimefrom crontab.celery_task import send_mail# 方式一v1 = datetime(2021, 11, 7, 20, 28, 30)print(v1)v2 = datetime.utcfromtimestamp(v1.timestamp()) # 本地时间转换为国际时间print(v2)result = send_mail.apply_async(args=["egon", ], eta=v2)print(result.id)=====================================================produce.task.py==============================================from datetime import datetimefrom crontab.celery_task import send_mail# 方式二ctime = datetime.now()# 默认用utc时间utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())from datetime import timedeltatime_delay = timedelta(seconds=10)task_time = utc_ctime + time_delay# 使用apply_async并设定时间result = send_mail.apply_async(args=["egon"], eta=task_time)print(result.id)
output
PS C:\Users\41999\PycharmProjects\Celery> celery -A crontab.celery_task worker -l info -P eventlet-------------- celery@ThinkPad v5.1.2 (sun-harmonics)--- ***** ------- ******* ---- Windows-10-10.0.22494-SP0 2021-11-07 20:23:05- *** --- * ---- ** ---------- [config]- ** ---------- .> app: test:0x1b422d70f70- ** ---------- .> transport: redis://127.0.0.1:6379/2- ** ---------- .> results: redis://127.0.0.1:6379/1- *** --- * --- .> concurrency: 8 (eventlet)-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)--- ***** ------------------- [queues].> celery exchange=celery(direct) key=celery[tasks]. crontab.celery_task.send_mail. crontab.celery_task.send_message[2021-11-07 20:23:05,802: INFO/MainProcess] Connected to redis://127.0.0.1:6379/2[2021-11-07 20:23:05,809: INFO/MainProcess] mingle: searching for neighbors[2021-11-07 20:23:06,828: INFO/MainProcess] mingle: all alone[2021-11-07 20:23:06,828: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/2.[2021-11-07 20:23:06,843: INFO/MainProcess] celery@ThinkPad ready.[2021-11-07 20:23:25,419: ERROR/MainProcess] Received unregistered task of type 'celery_task.send_mail'.The message has been ignored and discarded.[2021-11-07 20:28:26,309: INFO/MainProcess] Task crontab.celery_task.send_mail[375531f0-ae7b-4274-9e21-65720ec93a93] received[2021-11-07 20:28:30,015: WARNING/MainProcess] 向egon发送邮件[2021-11-07 20:28:30,015: WARNING/MainProcess][2021-11-07 20:28:35,034: WARNING/MainProcess] 向egon发送邮件完成[2021-11-07 20:28:35,034: WARNING/MainProcess][2021-11-07 20:28:35,034: INFO/MainProcess] Task crontab.celery_task.send_mail[375531f0-ae7b-4274-9e21-65720ec93a93] succeeded in 5.01600000000326s: 'OK'[2021-11-07 20:30:37,970: INFO/MainProcess] Task crontab.celery_task.send_mail[4c85cb68-c5ae-4f21-a80e-4208e349fa78] received[2021-11-07 20:30:47,729: WARNING/MainProcess] 向egon发送邮件[2021-11-07 20:30:47,729: WARNING/MainProcess][2021-11-07 20:30:52,733: WARNING/MainProcess] 向egon发送邮件完成[2021-11-07 20:30:52,733: WARNING/MainProcess][2021-11-07 20:30:52,733: INFO/MainProcess] Task crontab.celery_task.send_mail[4c85cb68-c5ae-4f21-a80e-4208e349fa78] succeeded in 5.0s: 'OK'
05 celery多目录结构下的定时任务
command
# 启动虚拟环境后在 项目根目录使用这两条命令启动程序,先启动第二条定时发送,然后启动第一条接收celery -A celery_asn.celery_assignment worker -l info -P eventletcelery -A celery_asn.celery_assignment beat -l info
project construction
celery_asn
- celery_assignment [main]
- task01
- task02
code
====celery_assignment [main]============================================================================# -*- coding: UTF-8 -*-"""@author:41999@file:celery_assignment.py@time:2021/11/07"""from datetime import timedeltaimport celerycel = celery.Celery('celery_demo',broker='redis://127.0.0.1:6379/1',backend='redis://127.0.0.1:6379/2',# 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类include=['celery_asn.task01','celery_asn.task02'])# 时区cel.conf.timezone = 'Asia/Shanghai'# 是否使用UTCcel.conf.enable_utc = Falsecel.conf.beat_schedule = {# 名字随意命名'add-every-10-seconds': {# 执行tasks1下的test_celery函数'task': 'celery_asn.task01.send_email',# 每隔2秒执行一次# 'schedule': 1.0,# 'schedule': crontab(minute="*/1"),'schedule': timedelta(seconds=6),# 传递参数'args': ('张三',)},# 'add-every-12-seconds': {# 'task': 'celery_tasks.task01.send_email',# 每年4月11号,8点42分执行# 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),# 'args': ('张三',)# },}====task01===============================================================================================# -*- coding: UTF-8 -*-"""@author:41999@file:task01.py@time:2021/11/07"""import timefrom celery_asn.celery_assignment import cel@cel.taskdef send_email(res):time.sleep(5)return "完成向%s发送邮件任务" % res====task02===============================================================================================import timefrom celery_tasks.celery_assignment import cel@cel.taskdef send_msg(name):time.sleep(5)return "完成向%s发送短信任务" % name
Terminal1
-------------- celery@ThinkPad v5.1.2 (sun-harmonics)--- ***** ------- ******* ---- Windows-10-10.0.22504-SP0 2021-11-18 16:24:20- *** --- * ---- ** ---------- [config]- ** ---------- .> app: celery_demo:0x237e4740f70- ** ---------- .> transport: redis://127.0.0.1:6379/1- ** ---------- .> results: redis://127.0.0.1:6379/2- *** --- * --- .> concurrency: 8 (eventlet)-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)--- ***** ------------------- [queues].> celery exchange=celery(direct) key=celery[tasks]. celery_asn.task01.send_email. celery_asn.task02.send_msg[2021-11-18 16:24:20,197: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1[2021-11-18 16:24:20,205: INFO/MainProcess] mingle: searching for neighbors[2021-11-18 16:24:21,234: INFO/MainProcess] mingle: all alone[2021-11-18 16:24:21,244: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/1.[2021-11-18 16:24:21,247: INFO/MainProcess] celery@ThinkPad ready.[2021-11-18 16:24:21,458: INFO/MainProcess] Task celery_asn.task01.send_email[a08d2686-fc65-4ae8-8480-86b46cf28c78] received[2021-11-18 16:24:24,106: INFO/MainProcess] Task celery_asn.task01.send_email[ca08be47-6b30-49a3-bf71-72298d0e4dee] received[2021-11-18 16:24:26,459: INFO/MainProcess] Task celery_asn.task01.send_email[a08d2686-fc65-4ae8-8480-86b46cf28c78] succeeded in 5.0s: '完成向张三发送邮件任务'[2021-11-18 16:24:29,106: INFO/MainProcess] Task celery_asn.task01.send_email[ca08be47-6b30-49a3-bf71-72298d0e4dee] succeeded in 5.0s: '完成向张三发送邮件任务'
Terminal2
(venv) PS C:\Users\41999\PycharmProjects\Celery> celery -A celery_asn.celery_assignment beat -l infocelery beat v5.1.2 (sun-harmonics) is starting.__ - ... __ - _LocalTime -> 2021-11-18 16:25:12Configuration ->. broker -> redis://127.0.0.1:6379/1. loader -> celery.loaders.app.AppLoader. scheduler -> celery.beat.PersistentScheduler. db -> celerybeat-schedule. logfile -> [stderr]@%INFO. maxinterval -> 5.00 minutes (300s)[2021-11-18 16:25:12,398: INFO/MainProcess] beat: Starting...[2021-11-18 16:25:12,508: INFO/MainProcess] Scheduler: Sending due task add-every-10-seconds (celery_asn.task01.send_email)
Django中使用celery
project structure
C:.├─.idea│ └─inspectionProfiles├─app01│ ├─migrations│ │ └─__pycache__│ └─__pycache__├─celerytest│ └─__pycache__├─mycelery│ ├─sms│ │ └─__pycache__│ └─__pycache__├─templates└─venv├─Lib└─Scripts
celery启动
(venv) PS C:\Users\41999\PycharmProjects\celerytest> celery -A mycelery.main worker -l info -P eventlet
Steps
1. 首先后端写好配置文件2. in terminal start celery which used to listen3. visit web page to trigger celery task
configure
Django logic
============================================celerytest.urls.py===========================================from django.contrib import adminfrom django.urls import pathfrom app01 import viewsurlpatterns = [path('admin/', admin.site.urls),path('test/', views.test),]============================================celerytest.views.py==========================================from django.shortcuts import render, HttpResponsefrom mycelery.sms.tasks import send_sms, send_sms2from datetime import datetime, timedelta# Create your tests here.def test(request):# 异步任务# 1. 声明一个和celery一模一样的任务函数,但是我们可以导包来解决# send_sms.delay("110")# send_sms2.delay("119")# send_sms.delay() # 如果调用的任务函数没有参数,则不需要填写任何内容# 定时任务ctime = datetime.now()# 默认用utc时间utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())time_delay = timedelta(seconds=10)task_time = utc_ctime + time_delayresult = send_sms.apply_async(["911", ], eta=task_time)print(result.id)return HttpResponse("OK")
Celery
=================================================mycelery/sms/tasks.py==================================from mycelery.main import appimport time@app.task # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名def send_sms(mobile):"""发送短信"""print("向手机号%s发送短信成功!" % mobile)time.sleep(5)return "send_sms OK"@app.task # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名def send_sms2(mobile):print("向手机号%s发送短信成功!" % mobile)time.sleep(5)return "send_sms2 OK"=================================================mycelery/config.py======================================broker_url = 'redis://127.0.0.1:6379/15'result_backend = 'redis://127.0.0.1:6379/14'=================================================mycelery/main.py========================================import osfrom celery import Celery# 创建celery实例对象app = Celery("celerytest")# 把celery和django进行组合,识别和加载django的配置文件os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryPros.settings.dev')# 通过app对象加载配置app.config_from_object("mycelery.config")# 加载任务# 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称# app.autodiscover_tasks(["任务1","任务2"])app.autodiscover_tasks(["mycelery.sms",])# 启动Celery的命令# 强烈建议切换目录到mycelery根目录下启动# celery -A mycelery.main worker --loglevel=info
display

