引言 Celery是一个处理大量消息的分布式系统,用于实时处理异步任务队列,支持任务调度。Celery不提供消息队列的处理,因此可结合中间件(RabbitMQ、Redis)进行使用。处理的结果可以存储在mysql、redis中。

1.概述

1.Celery的优点:

优点 说明
简单 celery使用和维护简单,可以用简单的配置文件
高可用 当worker和client发生【网络连接丢失或失败】——>可以自动进行重试
快速,处理数量级高 单个celery进程:
- 每分钟可以处理百万级任务
- (当配置rabbitMQ、librabbitMQ和优化设置时)只需要毫秒级的往返延迟
灵活 celery的每个部分度可扩展使用【自定义池使用、序列化、压缩方案、日志记录、调度器、生产者、消费者、broker传输等】

2.使用场景:

  • 异步任务:将耗时操作任务提交给Celery去异步执行,比如:发送短信、邮件、消息推送、音视频处理等等。
  • 定时任务:类似于crontab,比如:每日数据统计。

    2.Celery工作架构

    2.1 Celery的三大构成

    1.【概述】celery的使用 - 图1

    2.2 Celery的工作流

    celery架构.png

    3.Celery的安装

    1. pipenv install celery

    1.扩展安装

    1. pipenv install redis pymysql sqlalchemy eventlet flask

    【说明】 eventlet :只用于在windows下运行celery

4.【代码实践】Celery最简结构

image.png

4.1 【生产者】Celery实例化、创建任务

【前提】请确保已安装”celery redis pymysql sqlalchemy eventlet”

  1. #【celery最简结构组成】
  2. from celery import Celery
  3. #【消息队列】rabbitMQ方式
  4. mq_broker="amqp://user:james加密@124.隐藏.184:5672/"
  5. #【消息队列】rabbitMQ方式
  6. redis_broker="redis://:james加密@42.隐藏.45:6379/5"
  7. #【结果存储】mysql方式
  8. mysql_backend="db+mysql+pymysql://admin:James加密@124.隐藏.77.184:3306/mock"
  9. #【结果存储】redis方式
  10. redis_backend="redis://:james加密@42.隐藏.53.45:6379/6"
  11. #【step1】创建celery实例对象
  12. celeryApp=Celery("tasks",broker=mq_broker,backend=mysql_backend)
  13. # 【step2】创建“异步任务”[words的反向输出]
  14. # [name="reverseWords",非必填【作用:别名】
  15. @celeryApp.task(name="reverseWords")
  16. def resWords(words):
  17. print("开始执行words的反向输出")
  18. return words[::-1]
  19. #【windows】在tasks.py所在的目录执行---> celery -A task workers -l info -P eventlet
分类 方式 语法 示例 说明
创建Celery示例 celeryApp=Celery(‘taskName’,broker,backend) celeryApp=Celery(“tasks”,broker=mq_broker,backend=mysql_backend) 必填 broker,backend
broker
【消息队列】
rabbitMQ mq_broker=”amqp://用户名:密码@ip:port/ mq_broker=”amqp://user:james加密@124.隐藏.184:5672/“ 配置broker【消息队列】
Rdies redis_broker=”redis://:密码@ip:port/redis分区 redis_broker=”redis://:james加密@42.隐藏.45:6379/5”
backend
【任务结果存储】
mysql mysql_backendr=”db+mysql+pymysql://用户名:密码@ip:port/database mysql_backendr=”db+mysql+pymysql://admin:James加密@124.隐藏.77.184:3306/mock” 配置backend【任务结果存储】
Rdies redis_backend=”redis://:密码@ip:port/redis分区 redis_backend=”redis://:james加密@42.隐藏.53.45:6379/6”
异步任务 @clelery实例装饰器
def 异步任务函数():
函数体
# 【step2】创建“异步任务”[words的反向输出]
@celeryApp.task(name=“reverseWords”)
def resWords(words):
print(“开始执行words的反向输出”)
return words[::-1]
注意 必须使用clelery实例装饰器
定时任务 @clelery实例装饰器
def 定时任务函数():
函数体
暂无,后续补充
启动celery windows celery -A task workers -l info** -P eventlet** 注释【windows】在tasks.py所在的目录执行
celery -A task workers -l info -P eventlet
注意 windows必须安装使用eventlet【即加参数 -P eventlet】
linux/maxOS celery -A task workers -l info** ** 注释【_linux/maxOS】在tasks.py所在的目录执行
celery -A task workers -l info _
不需要eventlet
  1. # -*- coding: utf-8 -*-
  2. """
  3. ====================================
  4. @File Name :tasks.py
  5. @Time : 2022/10/30 22:50
  6. @Program IDE :PyCharm
  7. @Create by Author : 一一Cooling
  8. ====================================
  9. @描述
  10. 【celery最简结构组成】
  11. ====================================
  12. """
  13. from celery import Celery
  14. #【消息队列】rabbitMQ方式
  15. mq_broker="amqp://user:隐藏@124.隐藏.77.184:5672/"
  16. #【消息队列】rabbitMQ方式
  17. redis_broker="redis://:隐藏@42.隐藏.53.45:6379/5"
  18. #【结果存储】mysql方式
  19. mysql_backend="db+mysql+pymysql://admin:隐藏@124.隐藏.77.184:3306/mock"
  20. #【结果存储】redis方式
  21. redis_backend="redis://:隐藏@42.隐藏.53.45:6379/6"
  22. #【step1】创建celery实例对象
  23. celeryApp=Celery("tasks",broker=mq_broker,backend=mysql_backend)
  24. # 【step2】使用clelery实例装饰器--》创建“异步任务”[words的反向输出]
  25. @celeryApp.task(name="reverseWords")
  26. def resWords(words):
  27. print("开始执行words的反向输出")
  28. return words[::-1]
  29. #【windows】在tasks.py所在的目录执行---> celery -A task workers -l info -P eventlet

4.2 启动celery服务

分类 方式 语法 示例 说明
启动celery windows celery -A task workers -l info** -P eventlet** 注释【windows】在tasks.py所在的目录执行
celery -A task workers -l info -P eventlet
注意 windows必须安装使用eventlet【即加参数 -P eventlet】
linux/maxOS celery -A task workers -l info** ** 注释【_linux/maxOS】在tasks.py所在的目录执行
celery -A task workers -l info _
不需要eventlet
  1. # -*- coding: utf-8 -*-
  2. """
  3. ====================================
  4. @File Name :runTasks.py
  5. @Time : 2022/10/30 23:12
  6. @Program IDE :PyCharm
  7. @Create by Author : 一一Cooling
  8. ====================================
  9. @描述【启动celery服务】
  10. ====================================
  11. """
  12. import os,sys
  13. # 判断当前的OS信息
  14. def getOS():
  15. myOS = sys.platform
  16. # print(myOS)
  17. return myOS
  18. # 启动--celery-task
  19. def runTasks():
  20. myOS=getOS()
  21. win_shell="celery -A tasks worker -l info -P eventlet"
  22. linux_shell = "celery -A tasks worker -l info"
  23. # print(sy)
  24. if (myOS == "win32"):
  25. print("[windows]开始celery启动")
  26. os.system(win_shell)
  27. else:
  28. print("[maxOS/Linux]开始celery启动")
  29. os.system(linux_shell)
  30. if __name__ == '__main__':
  31. runTasks()

4.3 【消费者】worker执行task

在celery中,对“task函数(即本文4.1中的 resWords(words))添加delay()”就等同于“task函数交给celery进行调用”。

  1. # -*- coding: utf-8 -*-
  2. """
  3. ====================================
  4. @File Name :callTask.py
  5. @Time : 2022/10/30 23:10
  6. @Program IDE :PyCharm
  7. @Create by Author : 一一Cooling
  8. ====================================
  9. @描述
  10. ====================================
  11. """
  12. #导入celery生产者的“任务”
  13. from tasks import resWords
  14. #执行:异步任务【resWords()】
  15. def call_resWords():
  16. myWords = "hello celery-windows"
  17. # 异步任务函数【resWords】使用delay()--->转化为celery可以调用
  18. result = resWords.delay(myWords)
  19. # 输出:task结果id
  20. print("[taskID]:\t", result.id)
  21. if __name__ == '__main__':
  22. call_resWords()

4.3.1 任务结果—常见属性

前提 res=taskFunction.delay()

属性 说明 示例
id 获取task的id resID=res.id
ready() 检测是否已经处理完毕
返回值 True、False
res_ready=res.ready()
get() res_get=resget()

4.4 执行结果

结果 异步任务执行成功,且任务结果成功写入mysql
image.png