简介

何为消息队列

  1. 任务队列是一种在**线程或机器间**分发任务的机制。
  2. 消息队列的输入是工作的一个单元,称为任务,独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务。
  3. 1. 用消息通信,通常使用**中间人(Broker)在客户端和职程间斡旋**。这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程。
  4. 2. Celery 系统可包含多个职程和中间人,以此获得高可用性和横向扩展能力。[即并发]
  5. 3. Celery 是用 Python 编写的,但协议可以用任何语言实现。
  6. 1. 迄今,已有 Ruby 实现的 RCelery node.js 实现的 node-celery
  7. 2. 以及一个 PHP 客户端 ,语言互通也可以通过 using webhooks 实现。

定义

  1. 简单
  2. 1. 易于使用和维护,并且它 不需要配置文件
  3. 2. 有一个活跃、友好的社区来让你寻求帮助,包括一个 邮件列表 和一个 IRC 频道
  4. 3. 高可用性
  5. 倘若连接丢失或失败,职程和客户端会自动重试,
  6. 并且一些中间人通过 主/主 主/从 方式复制来提高可用性。
  7. 4. 快速
  8. 单个 Celery 进程每分钟可处理数以百万计的任务,
  9. 而保持往返延迟在亚毫秒级(使用 RabbitMQpy-librabbitmq 和优化过的设置)。
  10. 5. 灵活
  11. Celery 几乎所有部分都可以扩展或单独使用。
  12. 可以自制连接池、 序列化、压缩模式、日志、调度器、消费者、生产者、自动扩展、 中间人传输或更多。

图示

Celery - 图1

Celery - 图2

中间人broker支持

  1. 1. 中间人
  2. RabbitMQ, Redis,
  3. MongoDB (实验性), ZeroMQ (实验性)
  4. CouchDB (实验性), SQLAlchemy (实验性)
  5. Django ORM (实验性), Amazon SQS, (实验性)
  6. 2. 并发
  7. prefork(多进程),
  8. Eventlet, gevent
  9. 多线程/单线程
  10. 结果存储
  11. 3. AMQP, Redis
  12. memcached, MongoDB
  13. SQLAlchemy, Django ORM
  14. Apache Cassandra
  15. 4. 序列化
  16. pickle, json, yaml, msgpack
  17. zlib, bzip2 压缩
  18. 密码学消息签名

特性

  1. 1. 监视
  2. 整条流水线的监视时间由职程发出,并用于内建或外部的工具告知你集群的工作状况——而且是实时的。
  3. 2. 工作流
  4. 一系列功能强大的称为“Canvas”的原语(Primitive)用于构建或简单、或复杂的工作流。包括分组、连锁、分割等等。
  5. 3. 时间和速率限制
  6. 你可以控制每秒/分钟/小时执行的任务数,或任务的最长运行时间, 并且可以为特定职程或不同类型的任务设置为默认值。
  7. 4. 计划任务
  8. 你可以指定任务在若干秒后或在 datetime 运行,
  9. 或你可以基于单纯的时间间隔或支持分钟、小时、每周的第几天、每月的第几天
  10. 以及每年的第几月的 crontab 表达式来使用周期任务来重现事件。
  11. 5. 自动重载入
  12. 在开发中,职程可以配置为在源码修改时自动重载入,包含对 Linux 上的 inotify(7) 支持。
  13. 6. 自动扩展
  14. 根据负载自动重调职程池的大小或用户指定的测量值,用于限制共享主机/云环境的内存使用,或是保证给定的服务质量。
  15. 7. 资源泄露保护
  16. --maxtasksperchild 选项用于控制用户任务泄露的诸如内存或文件描述符这些易超出掌控的资源。
  17. 8. 用户组件
  18. 每个职程组件都可以自定义,并且额外组件可以由用户定义。职程是用 bootsteps 构建的——一个允许细粒度控制职程内构件的依赖图。

集成

Django django-celery
Pyramid pyramid_celery
Pylons celery-pylons
Flask 不需要
web2py web2py-celery
Tornado tornado-celery

官方文档

  1. http://docs.jinkan.org/docs/celery/getting-started/introduction.html
  2. https://docs.celeryproject.org/en/stable/getting-started/introduction.html
  3. # eventlet
  4. https://pypi.org/project/eventlet/

installation

  1. pip install celery
  2. pip install eventlet # 负责并发,搭配使用

usage

  1. # 项目启动
  2. 项目结构
  3. Directory: C:\Users\41999\PycharmProjects\Celery\celery_tasks
  4. Mode LastWriteTime Length Name
  5. ---- ------------- ------ ----
  6. d---- 11/7/2021 5:04 PM __pycache__
  7. -a--- 11/7/2021 4:53 PM 615 celery_assignment.py
  8. -a--- 11/7/2021 4:05 PM 351 producer.py
  9. -a--- 11/7/2021 4:56 PM 254 task01.py
  10. -a--- 11/7/2021 4:56 PM 254 task02.py
  11. 启动方式
  12. 先进入第一级目录,C:\Users\41999\PycharmProjects\Celery\, 然后在父目录后追加主程序
  13. 即:celery -A celery_tasks.celery_assignment worker -l info -P eventlet
  14. 名字含义
  15. sun-harmonics 太阳谐波 5.n版本的名字

补充

  1. 启动方式---j
  2. celery -A celery_task worker --loglevel=info
  3. celery -A celery_task worker -l info -P eventlet[]
  4. 环境搭建
  5. 需要创建虚拟环境
  6. # 参数说明
  7. -P, --pool <pool>
  8. Pool implementation
  9. https://docs.celeryproject.org/en/stable/reference/cli.html#cmdoption-celery-workdir
  10. # 参数说明
  11. --eventlet
  12. Use eventlet
  13. https://docs.celeryproject.org/en/stable/reference/cli.html#cmdoption-celery-shell-eventlet
  14. # eventlet 介绍
  15. https://pypi.org/project/eventlet/

02 Celery异步执行语法

description

  1. 1. 组成
  2. 1. 监视进程 celery_task
  3. 2. 生产者 produce_task
  4. 3. 结果检查 result
  5. 2. 使用
  6. 1. 首先由监视进程创建持续连接,分别指定用于交换数据的消息队列,本次演示均使用redis;分别创建两个打印任务
  7. 2. 第一个任务打印邮件发送,第二个任务打印消息发送
  8. 3. result程序检查消息是否正确接收
  9. 3. 测试
  10. 1. 启动监视进程持续监听,以及持续连接
  11. 2. 运行 produce_task 发送消息
  12. 3. 运行 result 检查结果

Code

  1. ==============================================celery_task.py======================================================
  2. import celery,time
  3. backend = "redis://127.0.0.1:6379/1"
  4. broker = "redis://127.0.0.1:6379/2"
  5. cel = celery.Celery('test', backend=backend, broker=broker)
  6. @cel.task
  7. def send_mail(name):
  8. print("向%s发送邮件" % name)
  9. time.sleep(5)
  10. print("向%s发送邮件完成" % name)
  11. return "OK"
  12. @cel.task
  13. def send_message(name):
  14. print("向%s发送短信" % name)
  15. time.sleep(5)
  16. print("向%s发送短信完成" % name)
  17. return "OK"
  18. =============================================produce_task.py======================================================
  19. from celery_task import send_mail, send_message
  20. result = send_mail.delay("yuan")
  21. print(result.id)
  22. result2 = send_message.delay("alex")
  23. print(result2.id)
  24. ==============================================result.py============================================================
  25. from celery.result import AsyncResult
  26. from celery_task import cel
  27. async_result=AsyncResult(id="4b08b009-14e8-4713-97d2-a51d4d0d579c", app=cel)
  28. if async_result.successful():
  29. result = async_result.get()
  30. print(result)
  31. # result.forget() # 将结果删除
  32. elif async_result.failed():
  33. print('执行失败')
  34. elif async_result.status == 'PENDING':
  35. print('任务等待中被执行')
  36. elif async_result.status == 'RETRY':
  37. print('任务异常后正在重试')
  38. elif async_result.status == 'STARTED':
  39. print('任务已经开始被执行')

operation outcome

  1. ==============================================celery_task.py=====================================================
  2. (venv) C:\Users\41999\PycharmProjects\Celery\01异步任务>celery -A celery_task worker -l info -P eventlet
  3. -------------- celery@ThinkPad v5.1.2 (sun-harmonics)
  4. --- ***** -----
  5. -- ******* ---- Windows-10-10.0.22489-SP0 2021-10-30 21:54:40
  6. - *** --- * ---
  7. - ** ---------- [config]
  8. - ** ---------- .> app: test:0x1cca1c456a0
  9. - ** ---------- .> transport: redis://127.0.0.1:6379/2
  10. - ** ---------- .> results: redis://127.0.0.1:6379/1
  11. - *** --- * --- .> concurrency: 8 (eventlet)
  12. -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
  13. --- ***** -----
  14. -------------- [queues]
  15. .> celery exchange=celery(direct) key=celery
  16. [tasks]
  17. . celery_task.send_mail
  18. . celery_task.send_message
  19. [2021-10-30 21:54:40,665: INFO/MainProcess] Connected to redis://127.0.0.1:6379/2
  20. [2021-10-30 21:54:40,671: INFO/MainProcess] mingle: searching for neighbors
  21. [2021-10-30 21:54:41,695: INFO/MainProcess] mingle: all alone
  22. [2021-10-30 21:54:41,706: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/2.
  23. [2021-10-30 21:54:41,710: INFO/MainProcess] celery@ThinkPad ready.
  24. [2021-10-30 21:54:51,022: INFO/MainProcess] Task celery_task.send_mail[4b08b009-14e8-4713-97d2-a51d4d0d579c] received
  25. [2021-10-30 21:54:51,023: WARNING/MainProcess] yuan发送邮件
  26. [2021-10-30 21:54:51,023: WARNING/MainProcess]
  27. [2021-10-30 21:54:51,024: INFO/MainProcess] Task celery_task.send_message[f8b7ed5c-dea1-4219-af59-075d7b4cceaf] received
  28. [2021-10-30 21:54:51,024: WARNING/MainProcess] alex发送短信
  29. [2021-10-30 21:54:51,025: WARNING/MainProcess]
  30. [2021-10-30 21:54:56,025: WARNING/MainProcess] yuan发送邮件完成
  31. [2021-10-30 21:54:56,025: WARNING/MainProcess]
  32. [2021-10-30 21:54:56,026: WARNING/MainProcess] alex发送短信完成
  33. [2021-10-30 21:54:56,026: WARNING/MainProcess]
  34. [2021-10-30 21:54:56,028: INFO/MainProcess] Task celery_task.send_mail[4b08b009-14e8-4713-97d2-a51d4d0d579c] succeeded in 5.014999999999418s: 'OK'
  35. [2021-10-30 21:54:56,028: INFO/MainProcess] Task celery_task.send_message[f8b7ed5c-dea1-4219-af59-075d7b4cceaf] succeeded in 5.014999999999418s: '
  36. OK'
  37. =============================================produce_task.py======================================================
  38. C:\Users\41999\PycharmProjects\Celery\venv\Scripts\python.exe C:/Users/41999/PycharmProjects/Celery/01异步任务/produce_task.py
  39. 2e0d0bde-3da5-4cb7-bbfa-f60a735f5f80
  40. 0d761ef1-ec69-4ec0-b909-ff0c0922c0ac
  41. ==============================================result.py============================================================
  42. C:\Users\41999\PycharmProjects\Celery\venv\Scripts\python.exe C:/Users/41999/PycharmProjects/Celery/01异步任务/result.py
  43. OK

error & resolve

  1. 1. 报错
  2. Task handler raised error: ValueError: not enough values to unpack
  3. 2. 更换启动监视程序的命令 前提是安装eventlet
  4. celery -A celery_task worker -l info -P eventlet
  5. 3. 链接
  6. https://blog.csdn.net/PY0312/article/details/105538699
  7. 4. 报错
  8. Usage: celery [OPTIONS] COMMAND [ARGS]...
  9. Error: Invalid value for '-A' / '--app':
  10. Unable to load celery application.
  11. Module 'celery_tasks' has no attribute 'celery'
  12. 5. 解决
  13. 将文件夹下文件名修改为celery
  14. reference
  15. 1. https://blog.csdn.net/weixin_43162402/article/details/83314692
  16. 2. https://blog.csdn.net/weixin_43162402/article/details/83314877
  17. 3. https://www.crifan.com/python_newbie_debug_file_name_not_same_with_lib_name_to_test/

03 celery的多目录结构异步执行

description

  • 项目构成

    • 项目文件夹
    • 项目文件

      • celery_assignment.py 主程序
      • task01.py
      • task02.py
      • producer.py
    • 两个task需要分别以文件夹.文件的方式引入celery的配置文件,也就是主程序
    • 主程序相对之前而言,需要分别维持两个任务,可以添加时区选项,但并无必要
  • 价值

    • 主程序控制两个任务负责接收消息,由producer负责生产,该任务模式为多任务异步协程,可以提高效率
  • warning

    • 需要安装eventlet

code

  1. =======================================celery_assignment.py=====================================================
  2. import celery
  3. cel = celery.Celery('celery_demo',
  4. broker='redis://127.0.0.1:6379/1',
  5. backend='redis://127.0.0.1:6379/2',
  6. # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
  7. include=['celery_tasks.task01',
  8. 'celery_tasks.task02'
  9. ])
  10. # 时区
  11. cel.conf.timezone = 'Asia/Shanghai'
  12. # 是否使用UTC
  13. cel.conf.enable_utc = False
  14. ======================================task01.py================================================================
  15. import time
  16. from celery_tasks.celery_assignment import cel
  17. @cel.task
  18. def send_email(res):
  19. time.sleep(5)
  20. return "完成向%s发送邮件任务" % res
  21. ======================================task02.py================================================================
  22. import time
  23. from celery_tasks.celery_assignment import cel
  24. @cel.task
  25. def send_msg(name):
  26. time.sleep(5)
  27. return "完成向%s发送短信任务" % name
  28. =======================================producer.py=============================================================
  29. from celery_tasks.task01 import send_email
  30. from celery_tasks.task02 import send_msg
  31. # 立即告知celery去执行test_celery任务,并传入一个参数
  32. result = send_email.delay('yuan')
  33. print(result.id)
  34. result = send_msg.delay('yuan')
  35. print(result.id)

output

  1. =========================================terminal=================================================================
  2. (venv) PS C:\Users\41999\PycharmProjects\Celery> celery -A celery_tasks.celery_assignment worker -l info -P eventlet
  3. -------------- celery@ThinkPad v5.1.2 (sun-harmonics)
  4. --- ***** -----
  5. -- ******* ---- Windows-10-10.0.22494-SP0 2021-11-07 19:46:55
  6. - *** --- * ---
  7. - ** ---------- [config]
  8. - ** ---------- .> app: celery_demo:0x2177c4e0f40
  9. - ** ---------- .> transport: redis://127.0.0.1:6379/1
  10. - ** ---------- .> results: redis://127.0.0.1:6379/2
  11. - *** --- * --- .> concurrency: 8 (eventlet)
  12. -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
  13. --- ***** -----
  14. -------------- [queues]
  15. .> celery exchange=celery(direct) key=celery
  16. [tasks]
  17. . celery_tasks.task01.send_email
  18. . celery_tasks.task02.send_msg
  19. [2021-11-07 19:46:55,171: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1
  20. [2021-11-07 19:46:55,171: INFO/MainProcess] mingle: searching for neighbors
  21. [2021-11-07 19:46:56,188: INFO/MainProcess] mingle: all alone
  22. [2021-11-07 19:46:56,188: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/1.
  23. [2021-11-07 19:46:56,204: INFO/MainProcess] celery@ThinkPad ready.
  24. [2021-11-07 19:47:00,818: INFO/MainProcess] Task celery_tasks.task01.send_email[f9fb9f8c-7b22-4643-81c3-22ddcf66243f] received
  25. [2021-11-07 19:47:00,818: INFO/MainProcess] Task celery_tasks.task02.send_msg[7c60cfac-1387-4c10-9d10-67dc9e385a30] received
  26. [2021-11-07 19:47:05,836: INFO/MainProcess] Task celery_tasks.task01.send_email[f9fb9f8c-7b22-4643-81c3-22ddcf66243f] succeeded in 5.0149999999994
  27. 18s: '完成向yuan发送邮件任务'
  28. [2021-11-07 19:47:05,836: INFO/MainProcess] Task celery_tasks.task02.send_msg[7c60cfac-1387-4c10-9d10-67dc9e385a30] succeeded in 5.014999999999418
  29. s: '完成向yuan发送短信任务'
  30. =========================================console=================================================================
  31. C:\Users\41999\PycharmProjects\Celery\venv\Scripts\python.exe C:/Users/41999/PycharmProjects/Celery/celery_tasks/producer.py
  32. f9fb9f8c-7b22-4643-81c3-22ddcf66243f
  33. 7c60cfac-1387-4c10-9d10-67dc9e385a30
  34. Process finished with exit code 0

04 定时任务

description

  • 项目结构

    • 项目名:crontab

      • celery_task.py
      • produce_task.py
      • result.py
  • 两种定时任务

    • 第一种:指定时间,年月日时分秒
    • 第二种:设定偏移量

code

  1. =====================================================celery.task.py==============================================
  2. import celery
  3. import time
  4. backend = "redis://127.0.0.1:6379/1"
  5. broker = "redis://127.0.0.1:6379/2"
  6. cel = celery.Celery('test', backend=backend, broker=broker)
  7. @cel.task
  8. def send_mail(name):
  9. print("向%s发送邮件" % name)
  10. time.sleep(5)
  11. print("向%s发送邮件完成" % name)
  12. return "OK"
  13. @cel.task
  14. def send_message(name):
  15. print("向%s发送短信" % name)
  16. time.sleep(5)
  17. print("向%s发送短信完成" % name)
  18. return "OK"
  19. =====================================================produce.task.py==============================================
  20. from datetime import datetime
  21. from crontab.celery_task import send_mail
  22. # 方式一
  23. v1 = datetime(2021, 11, 7, 20, 28, 30)
  24. print(v1)
  25. v2 = datetime.utcfromtimestamp(v1.timestamp()) # 本地时间转换为国际时间
  26. print(v2)
  27. result = send_mail.apply_async(args=["egon", ], eta=v2)
  28. print(result.id)
  29. =====================================================produce.task.py==============================================
  30. from datetime import datetime
  31. from crontab.celery_task import send_mail
  32. # 方式二
  33. ctime = datetime.now()
  34. # 默认用utc时间
  35. utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
  36. from datetime import timedelta
  37. time_delay = timedelta(seconds=10)
  38. task_time = utc_ctime + time_delay
  39. # 使用apply_async并设定时间
  40. result = send_mail.apply_async(args=["egon"], eta=task_time)
  41. print(result.id)

output

  1. PS C:\Users\41999\PycharmProjects\Celery> celery -A crontab.celery_task worker -l info -P eventlet
  2. -------------- celery@ThinkPad v5.1.2 (sun-harmonics)
  3. --- ***** -----
  4. -- ******* ---- Windows-10-10.0.22494-SP0 2021-11-07 20:23:05
  5. - *** --- * ---
  6. - ** ---------- [config]
  7. - ** ---------- .> app: test:0x1b422d70f70
  8. - ** ---------- .> transport: redis://127.0.0.1:6379/2
  9. - ** ---------- .> results: redis://127.0.0.1:6379/1
  10. - *** --- * --- .> concurrency: 8 (eventlet)
  11. -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
  12. --- ***** -----
  13. -------------- [queues]
  14. .> celery exchange=celery(direct) key=celery
  15. [tasks]
  16. . crontab.celery_task.send_mail
  17. . crontab.celery_task.send_message
  18. [2021-11-07 20:23:05,802: INFO/MainProcess] Connected to redis://127.0.0.1:6379/2
  19. [2021-11-07 20:23:05,809: INFO/MainProcess] mingle: searching for neighbors
  20. [2021-11-07 20:23:06,828: INFO/MainProcess] mingle: all alone
  21. [2021-11-07 20:23:06,828: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/2.
  22. [2021-11-07 20:23:06,843: INFO/MainProcess] celery@ThinkPad ready.
  23. [2021-11-07 20:23:25,419: ERROR/MainProcess] Received unregistered task of type 'celery_task.send_mail'.
  24. The message has been ignored and discarded.
  25. [2021-11-07 20:28:26,309: INFO/MainProcess] Task crontab.celery_task.send_mail[375531f0-ae7b-4274-9e21-65720ec93a93] received
  26. [2021-11-07 20:28:30,015: WARNING/MainProcess] egon发送邮件
  27. [2021-11-07 20:28:30,015: WARNING/MainProcess]
  28. [2021-11-07 20:28:35,034: WARNING/MainProcess] egon发送邮件完成
  29. [2021-11-07 20:28:35,034: WARNING/MainProcess]
  30. [2021-11-07 20:28:35,034: INFO/MainProcess] Task crontab.celery_task.send_mail[375531f0-ae7b-4274-9e21-65720ec93a93] succeeded in 5.01600000000326
  31. s: 'OK'
  32. [2021-11-07 20:30:37,970: INFO/MainProcess] Task crontab.celery_task.send_mail[4c85cb68-c5ae-4f21-a80e-4208e349fa78] received
  33. [2021-11-07 20:30:47,729: WARNING/MainProcess] egon发送邮件
  34. [2021-11-07 20:30:47,729: WARNING/MainProcess]
  35. [2021-11-07 20:30:52,733: WARNING/MainProcess] egon发送邮件完成
  36. [2021-11-07 20:30:52,733: WARNING/MainProcess]
  37. [2021-11-07 20:30:52,733: INFO/MainProcess] Task crontab.celery_task.send_mail[4c85cb68-c5ae-4f21-a80e-4208e349fa78] succeeded in 5.0s: 'OK'

05 celery多目录结构下的定时任务

command

  1. # 启动虚拟环境后在 项目根目录使用这两条命令启动程序,先启动第二条定时发送,然后启动第一条接收
  2. celery -A celery_asn.celery_assignment worker -l info -P eventlet
  3. celery -A celery_asn.celery_assignment beat -l info

project construction

  • celery_asn

    • celery_assignment [main]
    • task01
    • task02

code

  1. ====celery_assignment [main]============================================================================
  2. # -*- coding: UTF-8 -*-
  3. """
  4. @author:41999
  5. @file:celery_assignment.py
  6. @time:2021/11/07
  7. """
  8. from datetime import timedelta
  9. import celery
  10. cel = celery.Celery('celery_demo',
  11. broker='redis://127.0.0.1:6379/1',
  12. backend='redis://127.0.0.1:6379/2',
  13. # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
  14. include=['celery_asn.task01',
  15. 'celery_asn.task02'
  16. ])
  17. # 时区
  18. cel.conf.timezone = 'Asia/Shanghai'
  19. # 是否使用UTC
  20. cel.conf.enable_utc = False
  21. cel.conf.beat_schedule = {
  22. # 名字随意命名
  23. 'add-every-10-seconds': {
  24. # 执行tasks1下的test_celery函数
  25. 'task': 'celery_asn.task01.send_email',
  26. # 每隔2秒执行一次
  27. # 'schedule': 1.0,
  28. # 'schedule': crontab(minute="*/1"),
  29. 'schedule': timedelta(seconds=6),
  30. # 传递参数
  31. 'args': ('张三',)
  32. },
  33. # 'add-every-12-seconds': {
  34. # 'task': 'celery_tasks.task01.send_email',
  35. # 每年4月11号,8点42分执行
  36. # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
  37. # 'args': ('张三',)
  38. # },
  39. }
  40. ====task01===============================================================================================
  41. # -*- coding: UTF-8 -*-
  42. """
  43. @author:41999
  44. @file:task01.py
  45. @time:2021/11/07
  46. """
  47. import time
  48. from celery_asn.celery_assignment import cel
  49. @cel.task
  50. def send_email(res):
  51. time.sleep(5)
  52. return "完成向%s发送邮件任务" % res
  53. ====task02===============================================================================================
  54. import time
  55. from celery_tasks.celery_assignment import cel
  56. @cel.task
  57. def send_msg(name):
  58. time.sleep(5)
  59. return "完成向%s发送短信任务" % name

Terminal1

  1. -------------- celery@ThinkPad v5.1.2 (sun-harmonics)
  2. --- ***** -----
  3. -- ******* ---- Windows-10-10.0.22504-SP0 2021-11-18 16:24:20
  4. - *** --- * ---
  5. - ** ---------- [config]
  6. - ** ---------- .> app: celery_demo:0x237e4740f70
  7. - ** ---------- .> transport: redis://127.0.0.1:6379/1
  8. - ** ---------- .> results: redis://127.0.0.1:6379/2
  9. - *** --- * --- .> concurrency: 8 (eventlet)
  10. -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
  11. --- ***** -----
  12. -------------- [queues]
  13. .> celery exchange=celery(direct) key=celery
  14. [tasks]
  15. . celery_asn.task01.send_email
  16. . celery_asn.task02.send_msg
  17. [2021-11-18 16:24:20,197: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1
  18. [2021-11-18 16:24:20,205: INFO/MainProcess] mingle: searching for neighbors
  19. [2021-11-18 16:24:21,234: INFO/MainProcess] mingle: all alone
  20. [2021-11-18 16:24:21,244: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/1.
  21. [2021-11-18 16:24:21,247: INFO/MainProcess] celery@ThinkPad ready.
  22. [2021-11-18 16:24:21,458: INFO/MainProcess] Task celery_asn.task01.send_email[a08d2686-fc65-4ae8-8480-86b46cf28c78] received
  23. [2021-11-18 16:24:24,106: INFO/MainProcess] Task celery_asn.task01.send_email[ca08be47-6b30-49a3-bf71-72298d0e4dee] received
  24. [2021-11-18 16:24:26,459: INFO/MainProcess] Task celery_asn.task01.send_email[a08d2686-fc65-4ae8-8480-86b46cf28c78] succeeded in 5.0s: '完成向张三发送邮件任务'
  25. [2021-11-18 16:24:29,106: INFO/MainProcess] Task celery_asn.task01.send_email[ca08be47-6b30-49a3-bf71-72298d0e4dee] succeeded in 5.0s: '完成向张三发送邮件任务'

Terminal2

  1. (venv) PS C:\Users\41999\PycharmProjects\Celery> celery -A celery_asn.celery_assignment beat -l info
  2. celery beat v5.1.2 (sun-harmonics) is starting.
  3. __ - ... __ - _
  4. LocalTime -> 2021-11-18 16:25:12
  5. Configuration ->
  6. . broker -> redis://127.0.0.1:6379/1
  7. . loader -> celery.loaders.app.AppLoader
  8. . scheduler -> celery.beat.PersistentScheduler
  9. . db -> celerybeat-schedule
  10. . logfile -> [stderr]@%INFO
  11. . maxinterval -> 5.00 minutes (300s)
  12. [2021-11-18 16:25:12,398: INFO/MainProcess] beat: Starting...
  13. [2021-11-18 16:25:12,508: INFO/MainProcess] Scheduler: Sending due task add-every-10-seconds (celery_asn.task01.send_email)

Django中使用celery

project structure

  1. C:.
  2. ├─.idea
  3. └─inspectionProfiles
  4. ├─app01
  5. ├─migrations
  6. └─__pycache__
  7. └─__pycache__
  8. ├─celerytest
  9. └─__pycache__
  10. ├─mycelery
  11. ├─email
  12. ├─sms
  13. └─__pycache__
  14. └─__pycache__
  15. ├─templates
  16. └─venv
  17. ├─Lib
  18. └─Scripts

celery启动

  1. (venv) PS C:\Users\41999\PycharmProjects\celerytest> celery -A mycelery.main worker -l info -P eventlet

Steps

  1. 1. 首先后端写好配置文件
  2. 2. in terminal start celery which used to listen
  3. 3. visit web page to trigger celery task

configure

Django logic

  1. ============================================celerytest.urls.py===========================================
  2. from django.contrib import admin
  3. from django.urls import path
  4. from app01 import views
  5. urlpatterns = [
  6. path('admin/', admin.site.urls),
  7. path('test/', views.test),
  8. ]
  9. ============================================celerytest.views.py==========================================
  10. from django.shortcuts import render, HttpResponse
  11. from mycelery.sms.tasks import send_sms, send_sms2
  12. from datetime import datetime, timedelta
  13. # Create your tests here.
  14. def test(request):
  15. # 异步任务
  16. # 1. 声明一个和celery一模一样的任务函数,但是我们可以导包来解决
  17. # send_sms.delay("110")
  18. # send_sms2.delay("119")
  19. # send_sms.delay() # 如果调用的任务函数没有参数,则不需要填写任何内容
  20. # 定时任务
  21. ctime = datetime.now()
  22. # 默认用utc时间
  23. utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
  24. time_delay = timedelta(seconds=10)
  25. task_time = utc_ctime + time_delay
  26. result = send_sms.apply_async(["911", ], eta=task_time)
  27. print(result.id)
  28. return HttpResponse("OK")

Celery

  1. =================================================mycelery/sms/tasks.py==================================
  2. from mycelery.main import app
  3. import time
  4. @app.task # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
  5. def send_sms(mobile):
  6. """发送短信"""
  7. print("向手机号%s发送短信成功!" % mobile)
  8. time.sleep(5)
  9. return "send_sms OK"
  10. @app.task # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
  11. def send_sms2(mobile):
  12. print("向手机号%s发送短信成功!" % mobile)
  13. time.sleep(5)
  14. return "send_sms2 OK"
  15. =================================================mycelery/config.py======================================
  16. broker_url = 'redis://127.0.0.1:6379/15'
  17. result_backend = 'redis://127.0.0.1:6379/14'
  18. =================================================mycelery/main.py========================================
  19. import os
  20. from celery import Celery
  21. # 创建celery实例对象
  22. app = Celery("celerytest")
  23. # 把celery和django进行组合,识别和加载django的配置文件
  24. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryPros.settings.dev')
  25. # 通过app对象加载配置
  26. app.config_from_object("mycelery.config")
  27. # 加载任务
  28. # 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
  29. # app.autodiscover_tasks(["任务1","任务2"])
  30. app.autodiscover_tasks(["mycelery.sms",])
  31. # 启动Celery的命令
  32. # 强烈建议切换目录到mycelery根目录下启动
  33. # celery -A mycelery.main worker --loglevel=info

display

Celery - 图3