Celery任务队列高级用法

2019-06-24

对于普通的任务来说可能无法满足我们的任务需求,所以还需要了解一些进阶用法,Celery 提供了诸多调度方式,例如任务编排、根据任务状态执行不同的操作、重试机制等。

Celery任务队列高级用法 _ Escape - 图1

任务队列Celery高级用法


1. 相关组件介绍

了解 Celery 相关的生态环境

Celery 通过消息机制进行通信,通常使用中间人 Broker 为客户端和 Worker 调节。启动一个任务,客户端向消息队列发送一条消息,然后中间人 Broker 将消息传递给一个 Worker,最后由 Worker 进行执行分配的任务。可以有多个 BrokerWorker,用来提高 Celery 的高可用性以及横向扩展能力。Celery 是用 Python 编写的,但协议可以用任何语言实现,如 Node.js 语言的 node-celery 项目。

  • 支持的中间人

    • RabbitMQ
    • Redis
    • Amazon SQS

Celery任务队列高级用法 _ Escape - 图2

任务队列Celery高级用法

  • 支持的结果存储

    • AMQP、Redis
    • Memcached
    • SQLAlchemy、Django ORM
    • Apache Cassandra、Elasticsearch
  • 支持的并发

    • prefork (multiprocessing)
    • Eventlet、gevent
    • solo (single threaded)
  • 支持的序列化

    • pickle、json、yaml、msgpack
    • zlib、bzip2 compression
    • Cryptographic message signing

Celery任务队列高级用法 _ Escape - 图3

任务队列Celery高级用法

  • 支持的框架集成

    • Flask - 不需要
    • Tornado - tornado-celery
    • Pyramid - pyramid_celery
    • web2py - web2py-celery

2. 简单使用方式

主要的用途就是,帮助你快速上手和使用!

  • [简单使用] 我们将所有的内容都保存到一个文件中,而针对于大型项目而已,可能就需要创建独立的模块和配置文件了。

    • 安装环境依赖和启动基础服务
    • 编写 app 程序
    • 启动 Worker 服务并调用任务

bash

  1. # 使用docker运行MQ服务
  2. $ docker run -d -p 5462:5462 rabbitmq
  3. # 使用docker运行Redis服务
  4. $ docker run -d -p 6379:6379 redis
  5. # 使用pip安装依赖
  6. $ pip install celery

python

  1. # 最简单的Demo示例(tasks.py)
  2. from celery import Celery
  3. app = Celery('hello', broker='amqp://guest@localhost//')
  4. @app.task
  5. def add(x, y):
  6. return x + y

bash

  1. # 启动celery进程
  2. $ celery worker -A hello --loglevel=info
  3. # 调用任务
  4. # 需要我们创建的实例任务并通过delay()进行调用
  5. $ python
  6. >>> from hello.tasks import add
  7. >>> add.delay(4, 4)
  • [后端存储] 如果需要保存任务状态,Celery 需要在某处存储任务的状态信息,其中内置了一些后端存储方案。通过配置文件中的 backend 参数,来指定后端存储方案。在已经配置后端存储的情况下,重新执行任务调用,可以保存对应的消息状态。

    • ORM(SQLAlchemy/Django)
    • Memcached
    • Redis
    • RPC(RabbitMQ/AMQP)
    • 自定义的后端结果存储中间件

bash

  1. # RPC
  2. app = Celery('hello', backend='rpc://', broker='pyamqp://')
  3. # Redis
  4. app = Celery('hello', backend='redis://localhost', broker='pyamqp://')

bash

  1. # 调用任务
  2. # 需要我们创建的实例任务并通过delay()进行调用
  3. $ python
  4. >>> from hello.tasks import add
  5. >>> result = add.delay(4, 4)
  6. >>> result.ready() # 检测是否已经处理完毕
  7. False
  8. >>> result.get(timeout=1) # 设置超时时间
  9. 8
  10. >>> result.get(propagate=False) # 是否再次引发异常
  11. >>> result.traceback # 任务出现异常进行回溯
  • [相关配置] 大多数情况下,使用默认的配置就可以满足,当然也可以根据我们的实际需求按需配置。针对大型的项目,建议使用专用配置模块进行配置,将所有的配置项集中化配置。

python

  1. # [配置项] 通过upate进行配置
  2. # 程序启动时导入的模块列表,便于Worker执行相应的任务
  3. from celery import Celery
  4. app = Celery('hello',
  5. backend='redis://localhost',
  6. broker='pyamqp://',
  7. include=['hello.tasks'])
  8. app.conf.update(
  9. task_serializer='json', # 设置任务输出格式
  10. accept_content=['json'], # 忽略其他内容
  11. result_serializer='json', # 设置结果输出格式
  12. timezone='Europe/Oslo', # 时区
  13. enable_utc=True, # 启用UTC时钟
  14. task_routes={ # 设置任务专用队列
  15. 'hello.tasks.add': 'low-priority',
  16. },
  17. task_annotations = { # 任务进行限速
  18. 'hello.tasks.add': {'rate_limit': '10/m'}
  19. }
  20. )
  21. if __name__ == '__main__':
  22. app.start()
  23. # $ celery worker -A hello -Q hello
  24. # >>> from hello.tasks import add
  25. # >>> add.apply_async((2, 2), queue='hello')

python

  1. # [配置模块] 加载配置模块
  2. app.config_from_object('celery_config')
  3. # celery_config.py
  4. broker_url = 'pyamqp://'
  5. result_backend = 'rpc://'
  6. task_serializer = 'json'
  7. result_serializer = 'json'
  8. accept_content = ['json']
  9. timezone = 'Europe/Oslo'
  10. enable_utc = True
  11. task_routes = {
  12. 'hello.tasks.add': 'low-priority',
  13. }
  14. task_annotations = {
  15. 'hello.tasks.add': {'rate_limit': '10/m'}
  16. }

bash

  1. # inspect
  2. $ celery -A proj inspect --help
  3. # control
  4. $ celery -A proj control --help
  5. # event
  6. $ celery -A proj events --help
  7. # status
  8. $ celery -A proj status --help

bash


3. 进阶 - 定时任务

bash

  1. # Celery的提供的定时任务主要靠schedules来完成
  2. # 通过beat组件周期性的将任务发送给woker进行执行

python

  1. # period_task.py
  2. # 新建period_task.py文件并添加任务到配置文件中
  3. from learn_celery import app
  4. from celery.schedules import crontab
  5. @app.task
  6. def add(x, y):
  7. print(x+y)
  8. return x+y
  9. @app.task
  10. def say_hello(name):
  11. return f'hello {name}'
  12. @app.on_after_configure.connect
  13. def setup_periodic_tasks(sender, **kwargs):
  14. # 每10秒执行add任务
  15. sender.add_periodic_task(10.0, add.s(1,3), name='1+3=')
  16. sender.add_periodic_task(
  17. # 每周一下午四点五十六执行say_hello任务
  18. crontab(hour=16, minute=56, day_of_week=1),
  19. say_hello.s('escape'), name='say_hello'
  20. )

python

  1. # config.py
  2. BROKER_URL = 'redis://:escape@127.0.0.1:6379/0' # Broker配置
  3. CELERY_RESULT_BACKEND = 'redis://:escape@127.0.0.1:6379/0' # BACKEND配置
  4. CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
  5. CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
  6. CELERY_TIMEZONE='Asia/Shanghai' # 时区配置
  7. CELERY_IMPORTS = ( # 指定导入的任务模块
  8. 'learn_celery.tasks', # 普通任务
  9. 'learn_celery.period_task', # 定时任务
  10. )

bash

  1. # 启动worker和beat之后观察worker日志
  2. # 会发现定时任务会自动往队列中添加,之后被对应worker消费掉
  3. # 启动worker
  4. $ celery worker -A learn_celery.tasks -l debug
  5. # 启动beat
  6. $ celery beat -A learn_celery.period_task -l debug

bash

  1. # config.py
  2. # 还可以通过配置文件方式指定定时和计划任务
  3. from learn_celery import app
  4. from celery.schedules import crontab
  5. BROKER_URL = 'redis://:escape@127.0.0.1:6379/0' # Broker配置
  6. CELERY_RESULT_BACKEND = 'redis://:escape@127.0.0.1:6379/0' # BACKEND配置
  7. CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
  8. CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
  9. CELERY_TIMEZONE='Asia/Shanghai' # 时区配置
  10. CELERY_IMPORTS = ( # 指定导入的任务模块
  11. 'learn_celery.tasks', # 普通任务
  12. 'learn_celery.period_task', # 定时任务
  13. )
  14. app.conf.beat_schedule = {
  15. 'period_add_task': { # 添加计划任务
  16. 'task': 'learn_celery.period_task.add', # 任务路径
  17. 'schedule': crontab(hour=18, minute=16, day_of_week=1), # 执行周期
  18. 'args': (3, 4), # 传入参数
  19. },
  20. 'say_hello': { # 添加计划任务
  21. 'task': 'learn_celery.period_task.say_hello', # 任务路径
  22. 'schedule': 10.0, # 执行周期
  23. 'args': ('escape',) # 传入参数
  24. },
  25. }

bash

  1. # period_task.py
  2. # 此时的period_task.py只需要注册到woker中就行了
  3. from project import app
  4. @app.task
  5. def add(x,y):
  6. print(x+y)
  7. return x+y
  8. @app.task
  9. def say_hello(name):
  10. return f'hello {name}'

4. 进阶 - 任务绑定

bash

  1. # Celery可通过任务绑定到实例获取到任务的上下文
  2. # 这样我们可以在任务运行时候获取到任务的状态,记录相关日志等

python

  1. # period_task.py
  2. # 通过bind参数将任务绑定,self指任务的上下文
  3. # 通过self获取任务状态,同时在任务出错时进行任务重试
  4. from learn_celery import app
  5. from celery.utils.log import get_task_logger
  6. logger = get_task_logger(__name__)
  7. @app.task(bind=True) # 绑定任务
  8. def add(self, x, y):
  9. logger.info(self.request.__dict__) # 打印日志
  10. try:
  11. a=[]
  12. a[10] == 1
  13. except Exception as e:
  14. # 出错每5秒尝试一次,总共尝试3次
  15. raise self.retry(exc=e, countdown=5, max_retries=3)
  16. return x+y

Celery任务队列高级用法 _ Escape - 图4

任务队列Celery快速入门


5. 进阶 - 内置钩子函数

bash

  1. # Celery在执行任务时候,提供了钩子方法用于在任务执行完成时候进行对应的操作
  2. # 在Task源码中提供了很多状态钩子函数如:
  3. # on_success(成功后执行)
  4. # on_failure(失败时候执行)
  5. # on_retry(任务重试时候执行)
  6. # after_return(任务返回时候执行)
  7. # 在进程中使用是我们只需要重写这些方法,完成相应的操作即可

python

  1. # period_task.py
  2. # 在以下示例中,我们继续修改period_task.py文件
  3. # 分别定义三个任务来演示任务失败、重试、任务成功后执行的操作
  4. from learn_celery import app
  5. from celery.utils.log import get_task_logger
  6. from celery import Task
  7. logger = get_task_logger(__name__)
  8. class demotask(Task):
  9. # 任务成功执行
  10. def on_success(self, retval, task_id, *args, **kwargs):
  11. logger.info(f'task id:{task_id}, arg:{args}, successful!')
  12. # 任务失败执行
  13. def on_failure(self, exc, task_id, *args, **kwargs, einfo):
  14. logger.info(f'task id:{task_id}, arg:{args}, failed! erros:{exc}')
  15. # 任务重试执行
  16. def on_retry(self, exc, task_id, *args, **kwargs, einfo):
  17. logger.info(f'task id:{task_id}, arg:{args}, retry! einfo:{exc}')
  18. @app.task(base=demotask, bind=True)
  19. def add(self, x, y):
  20. try:
  21. a=[]
  22. a[10] == 1
  23. except Exception as e:
  24. # 出错每5秒尝试一次,总共尝试1次
  25. raise self.retry(exc=e, countdown=5, max_retries=1)
  26. return x+y
  27. @app.task(base=demotask)
  28. def say_hello(name):
  29. a=[]
  30. a[10] == 1
  31. return f'hello {name}'
  32. @app.task(base=demotask)
  33. def sum(a, b):
  34. return f'a+b={a+b}'

python

  1. # config.py
  2. from learn_celery import app
  3. from celery.schedules import crontab
  4. BROKER_URL = 'redis://:escape@127.0.0.1:6379/0' # Broker配置
  5. CELERY_RESULT_BACKEND = 'redis://:escape@127.0.0.1:6379/0' # BACKEND配置
  6. CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
  7. CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
  8. CELERY_TIMEZONE='Asia/Shanghai' # 时区配置
  9. CELERY_IMPORTS = ( # 指定导入的任务模块
  10. 'learn_celery.tasks', # 普通任务
  11. 'learn_celery.period_task', # 定时任务
  12. )
  13. app.conf.beat_schedule = {
  14. 'add': { # 每10秒执行
  15. 'task': 'learn_celery.period_task.add',
  16. 'schedule': 10.0,
  17. 'args': (10, 12),
  18. },
  19. 'say_hello': { # 每10秒执行
  20. 'task': 'learn_celery.period_task.say_hello',
  21. 'schedule': 10.0,
  22. 'args': ('escape',),
  23. },
  24. 'sum': { # 每10秒执行
  25. 'task': 'learn_celery.period_task.sum',
  26. 'schedule': 10.0,
  27. 'args': (1, 3),
  28. },
  29. }

Celery任务队列高级用法 _ Escape - 图5

任务队列Celery快速入门


6. 进阶 - 任务编排

python

  1. # 在很多情况下,一个任务需要由多个子任务或者一个任务需要很多步骤才能完成
  2. # Celery同样也能实现这样的任务,完成这类型的任务通过以下模块完成
  3. # group: 并行调度任务
  4. # chain: 链式任务调度
  5. # chord: 类似group但分header和body2个部分;header可以是一个group任务,执行完成后调用body的任务
  6. # map: 映射调度,通过输入多个入参来多次调度同一个任务
  7. # starmap: 类似map,入参类似*args
  8. # chunks: 将任务按照一定数量进行分组

python

  1. # tasks.py
  2. from learn_celery import app
  3. @app.task
  4. def add(x, y):
  5. return x+y
  6. @app.task
  7. def mul(x, y):
  8. return x*y
  9. @app.task
  10. def sum(data_list):
  11. res=0
  12. for i in data_list:
  13. res+=i
  14. return res

python

  1. # consumer.py
  2. # group: 组任务
  3. # 组内每个任务并行执行
  4. from celery import group
  5. from learn_celery.tasks import add
  6. # 任务 [1+2, 1+2]
  7. res = group(add.s(1,2), add.s(1,2))()
  8. while True:
  9. if res.ready():
  10. print(f'res:{res.get()}')
  11. break

Celery任务队列高级用法 _ Escape - 图6

任务队列Celery快速入门

python

  1. # consumer.py
  2. # chain: 链式任务
  3. # 链式任务中,默认上一个任务的返回结果作为参数传递给子任务
  4. from celery import group
  5. from learn_celery.tasks import add, mul, sum
  6. # 任务((1+2)+3)*3
  7. res = chain(add.s(1,2), add.s(3), mul.s(3))()
  8. while True:
  9. if res.ready():
  10. print(f'res:{res.get()}')
  11. break

python

  1. # consumer.py
  2. # chord:任务分割
  3. # 分为header和body两部分
  4. # hearder任务执行完在执行body,其中hearder返回结果作为参数传递给body
  5. from celery import group
  6. from learn_celery.tasks import add, mul, sum
  7. # 任务(1+2)+(3*4)
  8. res = chord(header=[add.s(1,2), mul.s(3,4)], body=sum.s())()
  9. while True:
  10. if res.ready():
  11. print('res:{}'.format(res.get()))
  12. break

python

  1. # consumer.py
  2. # chunks:任务分组
  3. # 按照任务的个数分组
  4. from learn_celery.tasks import add, mul, sum
  5. # 4代表每组的任务的个数
  6. res = add.chunks(zip(range(5), range(5)), 4)()
  7. while True:
  8. if res.ready():
  9. print('res:{}'.format(res.get()))
  10. break

Celery任务队列高级用法 _ Escape - 图7

任务队列Celery快速入门


7. 异步调用原理

主要解释 delay 和 apply_async 的使用方法和区别

bash

  1. # 对于delay和apply_async都可以用来进行任务的调度
  2. # 本质上是delay对apply_async进行了再一次封装或者可以说是快捷方式
  3. # 两者都返回AsyncResult对象,以下是两个方法源码

python

  1. # --------------------------------------------
  2. # delay源码
  3. # --------------------------------------------
  4. def delay(self, *args, **kwargs):
  5. """Star argument version of :meth:`apply_async`.
  6. Does not support the extra options enabled by :meth:`apply_async`.
  7. Arguments:
  8. *args (Any): Positional arguments passed on to the task.
  9. **kwargs (Any): Keyword arguments passed on to the task.
  10. Returns:
  11. celery.result.AsyncResult: Future promise.
  12. """
  13. return self.apply_async(args, kwargs)

python

  1. # --------------------------------------------
  2. # apply_async源码
  3. # --------------------------------------------
  4. def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
  5. link=None, link_error=None, shadow=None, **options):
  6. if self.typing:
  7. try:
  8. check_arguments = self.__header__
  9. except AttributeError: # pragma: no cover
  10. pass
  11. else:
  12. check_arguments(*(args or ()), **(kwargs or {}))
  13. app = self._get_app()
  14. if app.conf.task_always_eager:
  15. with denied_join_result():
  16. return self.apply(args, kwargs, task_id=task_id or uuid(),
  17. link=link, link_error=link_error, **options)
  18. if self.__v2_compat__:
  19. shadow = shadow or self.shadow_name(self(), args, kwargs, options)
  20. else:
  21. shadow = shadow or self.shadow_name(args, kwargs, options)
  22. preopts = self._get_exec_options()
  23. options = dict(preopts, **options) if options else preopts
  24. options.setdefault('ignore_result', self.ignore_result)
  25. return app.send_task(
  26. self.name, args, kwargs, task_id=task_id, producer=producer,
  27. link=link, link_error=link_error, result_cls=self.AsyncResult,
  28. shadow=shadow, task_type=self,
  29. **options
  30. )

8. 使用注意事项

Celery 快速入门 > 分布式任务队列 Celery 入门与进阶

bash

  1. # -------------------------------------------------------------------
  2. # 1.在celery中worker启动时,如果是root用户则需要设置环境变量
  3. # -------------------------------------------------------------------
  4. $ export C_FORCE_ROOT='true'

bash

  1. # -------------------------------------------------------------------
  2. # 2.使用RabbitMQ或Redis作为Broker的话,生产环境永远不要使用关系型数据库
  3. # 3.Celery4.x开始不再支持Windows平台,如果需要在Windows开发请使用3.x的版本
  4. $ pip install celer[redis]==4.3.0
  5. $ pip install celery[librabbitmq]
  6. $ pip install celery[librabbitmq,redis,auth,msgpack]
  7. # -------------------------------------------------------------------

bash

  1. # 4.良好的配置操作
  2. # 禁用预取任务是为了防止Broker分配任务不均导致多次重复执行
  3. # 设置执行多少次任务之后进行销毁是为了防止卡死的出现和发生
  4. worker_prefetch_multiplier=0 # 禁用预取任务
  5. worker_max_tasks_per_child=50 # 每个worker执行50次任务即销毁

python

  1. # -------------------------------------------------------------------
  2. # 5.不要使用复杂对象作为任务函数的参数
  3. # -------------------------------------------------------------------
  4. # Good
  5. @app.task
  6. def my_task(user_id):
  7. user = User.objects.get(id=user_id)
  8. print(user.name)
  9. ...
  10. # Bad
  11. @app.task
  12. def my_task(user):
  13. print(user.name)
  14. ...

9. 相关参考链接

授人玫瑰,手有余香!


文章作者: Escape

文章链接: https://www.escapelife.site/posts/6.3417e+60.html

版权声明: 本博客所有文章除特别声明外,均采用CC BY 4.0 许可协议。转载请注明来源 Escape !

Python Celery