Celery是什么

celery是一个基于分布消息传递的异步任务队列。定义很简单,但是这里隐含了一个条件就是celery不是单独存在的,它一定需要建立在一个分布的消息传递机制上,这个消息传递机制就是celery文档里常说的broker。

生产者/消费者模式

  • broker
    • RabbitMQ
    • Redis
  • producer(publisher)
  • consumer
  • backend
    • RabbitMQ: 4.0以后版本不再推荐使用
    • Redis
    • MySQL
    • Mongo
    • … …

思考: 消息中间件如何选择?Kafka or RabbitMQ or 其他

Celery深度解析(一)入门篇 - 图1
生产者消费者原理图
Celery隐藏了RabbitMQ接口的实现细节,既充当了publisher(client)又充当了consumer (worker)的角色
Celery深度解析(一)入门篇 - 图2
五种分布式部署模型图

Celery深度解析(一)入门篇 - 图3
Celery+RabbitMQ工作流程图

连接相关

AMQP

  • 概述

高级消息队列协议即Advanced Message Queuing Protocol(AMQP)是面向消息中间件提供的开放的应用层协议,其设计目标是对于消息的排序、路由(包括点对点和订阅-发布)、保持可靠性、保证安全性。AMQP规范了消息传递方和接收方的行为,以使消息在不同的提供商之间实现互操作性,就像SMTP,HTTP,FTP等协议可以创建交互系统一样。与先前的中间件标准(如Java消息服务)不同的是,JMS在特定的API接口层面和实现行为上进行了统一,而高级消息队列协议则关注于各种消息如何以字节流的形式进行传递。因此,使用了符合协议实现的任意应用程序之间可以保持对消息的创建、传递。

高级消息队列协议是一种二进制应用层协议,用于应对广泛的面向消息应用程序的支持。协议提供了消息流控制,保证的一个消息对象的传递过程,如至多一次、保证多次、仅有一次等,和基于SASL和TLS的身份验证和消息加密.

高级消息队列协议对于实现有如下规定

  • 类型系统
  • 对称的异步消息传递
  • 标准的、可扩展的消息格式
  • 标准的、可扩展的消息存储池
  • 典型实现
  1. RabbitMQ
  2. ActiveMQ

Connection(连接)

Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。

无论是生产者还是消费者,都需要和 RabbitMQ Broker 建立连接,这个连接就是一条 TCP 连接,也就是 Connection。

Channel(信道/通道)

一旦 TCP 连接建立起来,客户端紧接着可以创建一个 AMQP 通道(Channel),每个通道都会被指派一个唯一的 ID。

Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel Id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。

AMQP 0-9-1 提供了一种通过单个 TCP 连接连接到多路复用的方法。这意味着应用程序可以在单个连接上打开多个称为通道的”轻量级连接”。AMQP 0-9-1 客户端在通道上连接并执行协议操作(管理拓扑、发布、使用)后打开一个或多个通道。

推荐阅读:RabbitMQ 连接指南

推荐阅读:RabbitMQ 通道指南

代码实现

参考cloudify-plugins-common/cloudify.amqp_client:AMQPClient

Celery app生命周期

Celery最重要的概念:app(celery实例)
celery app是线程安全的

  • create

app = Celery()

  • configure
  1. app.conf.update()
  • register task
  1. @app.task
  2. def dispatch():
  3. pass
  • start
  • close

Celery worker启动命令解析

  1. /opt/mgmtworker/env/bin/python \
  2. /opt/mgmtworker/env/bin/celery worker \
  3. -Ofair \
  4. --include=cloudify.dispatch \
  5. # Comma separated list of additional modules to import
  6. --hostname cloudify.management \
  7. # Set custom hostname
  8. --config=cloudify.broker_config \
  9. # Name of the configuration module
  10. --app=cloudify_agent.app.app \
  11. # app instance to use (e.g. module.attr_name)
  12. --events \
  13. # Send events that can be captured by monitors like celery events, celerymon, and others.
  14. --loglevel=debug \
  15. # Logging level, choose between DEBUG, INFO, WARNING, ERROR, CRITICAL, or FATAL.
  16. --queues=cloudify.management \
  17. # List of queues to enable for this worker, separated by comma. By default all configured queues are enabled. Example: -Q video,image
  18. --logfile=/var/log/mgmtworker/cloudify.management_worker.log \
  19. # Path to log file. If no logfile is specified, stderr is used.
  20. --autoscale=100,2 \
  21. # Enable autoscaling by providing max_concurrency, min_concurrency. Example:: --autoscale=10,3 (always keep 3 processes, but grow to 10 if necessary)
  22. --maxtasksperchild=10 \
  23. # Maximum number of tasks a pool worker can execute before it's terminated and replaced by a new worker.
  24. # The --maxtasksperchild option is used for user tasks leaking resources, like memory or file descriptors
  25. # With this option you can configure the maximum number of tasks a worker can execute before it’s replaced by a new process.
  26. --without-gossip \
  27. # Do not subscribe to other workers events.
  28. --without-mingle \
  29. # Do not synchronize with other workers at startup.
  30. --with-gate-keeper \
  31. # Enable gate keeper, implement in cloudify
  32. --gate-keeper-bucket-size=5 \
  33. # The gate keeper bucket size, implement in cloudify
  34. --with-logging-server \
  35. # Enable logging server, implement in cloudify
  36. --logging-server-logdir=/var/log/mgmtworker/logs \
  37. # logdir location, implement in cloudify
  38. --logging-server-handler-cache-size=100
  39. # Maximum number of file handlers that can be open at any given time, implement in cloudify
  1. Start worker instance.
  2. Examples::
  3. celery worker --app=proj -l info
  4. celery worker -A proj -l info -Q hipri,lopri
  5. celery worker -A proj --concurrency=4
  6. celery worker -A proj --concurrency=1000 -P eventlet
  7. celery worker --autoscale=10,0
  8. Options:
  9. -A APP, --app=APP app instance to use (e.g. module.attr_name)
  10. -b BROKER, --broker=BROKER
  11. url to broker. default is 'amqp://guest@localhost//'
  12. --loader=LOADER name of custom loader class to use.
  13. --config=CONFIG Name of the configuration module
  14. --workdir=WORKING_DIRECTORY
  15. Optional directory to change to after detaching.
  16. -C, --no-color
  17. -q, --quiet
  18. -c CONCURRENCY, --concurrency=CONCURRENCY
  19. Number of child processes processing the queue. The
  20. default is the number of CPUs available on your
  21. system.
  22. -P POOL_CLS, --pool=POOL_CLS
  23. Pool implementation: prefork (default), eventlet,
  24. gevent, solo or threads.
  25. --purge, --discard Purges all waiting tasks before the daemon is started.
  26. **WARNING**: This is unrecoverable, and the tasks will
  27. be deleted from the messaging server.
  28. -l LOGLEVEL, --loglevel=LOGLEVEL
  29. Logging level, choose between DEBUG, INFO, WARNING,
  30. ERROR, CRITICAL, or FATAL.
  31. -n HOSTNAME, --hostname=HOSTNAME
  32. Set custom hostname, e.g. 'w1.%h'. Expands: %h
  33. (hostname), %n (name) and %d, (domain).
  34. -B, --beat Also run the celery beat periodic task scheduler.
  35. Please note that there must only be one instance of
  36. this service.
  37. -s SCHEDULE_FILENAME, --schedule=SCHEDULE_FILENAME
  38. Path to the schedule database if running with the -B
  39. option. Defaults to celerybeat-schedule. The extension
  40. ".db" may be appended to the filename. Apply
  41. optimization profile. Supported: default, fair
  42. --scheduler=SCHEDULER_CLS
  43. Scheduler class to use. Default is
  44. celery.beat.PersistentScheduler
  45. -S STATE_DB, --statedb=STATE_DB
  46. Path to the state database. The extension '.db' may be
  47. appended to the filename. Default: None
  48. -E, --events Send events that can be captured by monitors like
  49. celery events, celerymon, and others.
  50. --time-limit=TASK_TIME_LIMIT
  51. Enables a hard time limit (in seconds int/float) for
  52. tasks.
  53. --soft-time-limit=TASK_SOFT_TIME_LIMIT
  54. Enables a soft time limit (in seconds int/float) for
  55. tasks.
  56. --maxtasksperchild=MAX_TASKS_PER_CHILD
  57. Maximum number of tasks a pool worker can execute
  58. before it's terminated and replaced by a new worker.
  59. -Q QUEUES, --queues=QUEUES
  60. List of queues to enable for this worker, separated by
  61. comma. By default all configured queues are enabled.
  62. Example: -Q video,image
  63. -X EXCLUDE_QUEUES, --exclude-queues=EXCLUDE_QUEUES
  64. -I INCLUDE, --include=INCLUDE
  65. Comma separated list of additional modules to import.
  66. Example: -I foo.tasks,bar.tasks
  67. --autoscale=AUTOSCALE
  68. Enable autoscaling by providing max_concurrency,
  69. min_concurrency. Example:: --autoscale=10,3 (always
  70. keep 3 processes, but grow to 10 if necessary)
  71. --autoreload Enable autoreloading.
  72. --no-execv Don't do execv after multiprocessing child fork.
  73. --without-gossip Do not subscribe to other workers events.
  74. --without-mingle Do not synchronize with other workers at startup.
  75. --without-heartbeat Do not send event heartbeats.
  76. --heartbeat-interval=HEARTBEAT_INTERVAL
  77. Interval in seconds at which to send worker heartbeat
  78. -O OPTIMIZATION
  79. -D, --detach
  80. -f LOGFILE, --logfile=LOGFILE
  81. Path to log file. If no logfile is specified, stderr
  82. is used.
  83. --pidfile=PIDFILE Optional file used to store the process pid. The
  84. program will not start if this file already exists and
  85. the pid is still alive.
  86. --uid=UID User id, or user name of the user to run as after
  87. detaching.
  88. --gid=GID Group id, or group name of the main group to change to
  89. after detaching.
  90. --umask=UMASK Effective umask (in octal) of the process after
  91. detaching. Inherits the umask of the parent process
  92. by default.
  93. --executable=EXECUTABLE
  94. Executable to use for the detached process.
  95. --version show program's version number and exit
  96. -h, --help show this help message and exit

gate_keeper.py
并发任务的限制:通过一个三个参数来控制

  • max_concurrencies 默认值100
  • gate_keeper_bucket_size 默认值5
  • gate_keeper_max_top_flows 默认值20
  1. if autoscale:
  2. self.max_concurrencies = int(autoscale.split(',')[0])
  3. else:
  4. self.max_concurrencies = 100
  5. self.max_top_flows = gate_keeper_max_top_flows
  6. worker.gate_keeper = self
  7. self.enabled = with_gate_keeper
  8. self.bucket_size = gate_keeper_bucket_size
  9. self._current = collections.defaultdict(
  10. lambda: Queue.Queue(self.bucket_size))
  11. self._on_hold = collections.defaultdict(Queue.Queue)
  12. self._pending_workflows = Queue.Queue()

bucketkey: //‘’
task_type: workflow/operation

Celery components

  • worker
    • Timer
    • Hub
    • Beat
    • StateDB
    • Consumer

Celery Consumer blueprint

  1. default_steps = [
  2. 'celery.worker.consumer.connection:Connection',
  3. 'celery.worker.consumer.mingle:Mingle',
  4. 'celery.worker.consumer.events:Events',
  5. 'celery.worker.consumer.gossip:Gossip',
  6. 'celery.worker.consumer.heart:Heart',
  7. 'celery.worker.consumer.control:Control',
  8. 'celery.worker.consumer.tasks:Tasks',
  9. 'celery.worker.consumer.consumer:Evloop',
  10. 'celery.worker.consumer.agent:Agent',
  11. ]
  • Mingle

Bootstep syncing state with neighbor workers. At startup, or upon consumer restart, this will:

  • Sync logical clocks.
  • Sync revoked tasks.
  • Events

Service used for sending monitoring events

  • Heart

Bootstep sending event heartbeats. This service sends a worker-heartbeat message every n seconds. Note:
Not to be confused with AMQP protocol level heartbeats.

  • Control

Worker Remote Control Bootstep. Control -> :mod:celery.worker.pidbox -> :mod:kombu.pidbox. The actual commands are implemented in :mod:celery.worker.control.

  • Tasks

Worker Task Consumer

  • Evloop

Event loop service. Note:
This is always started last.

  • Agent

Agent starts :pypi:cell actors.

Celery task生命周期

  • start
  • stop(cancel)
  • shutdown(close channel)

Celery中一些常用配置

  1. # 中间件 url
  2. broker_url = 'amqp://{username}:{password}@{hostname}:{port}//{options}'
  3. # 任务结果存储 url
  4. result_backend = broker_url
  5. # 任务结果过期时间(单位:秒)
  6. result_expires = 600
  7. # How many messages to prefetch at a time multiplied by the number of concurrent processes.
  8. # The default is 4 (four messages for each process).
  9. # The default setting is usually a good choice,
  10. # however – if you have very long running tasks waiting in the queue and you have to start the workers,
  11. # note that the first worker to start will receive four times the number of messages initially.
  12. # Thus the tasks may not be fairly distributed to the workers.
  13. # To disable prefetching, set worker_prefetch_multiplier to 1. Changing that setting to 0 will allow the worker to keep consuming as many messages as it wants.
  14. worker_prefetch_multiplier = 0
  15. # Late ack means the task messages will be acknowledged after the task has been executed,
  16. # not just before (the default behavior).
  17. task_acks_late = True
  18. # 事件超时(单位:秒)
  19. event_queue_expires = 60
  20. # 事件TTL
  21. event_queue_ttl = 30
  22. # Broker 心跳 (单位:秒)
  23. broker_heartbeat = 60
  24. # Broker 心跳检查频率
  25. broker_heartbeat_checkrate = 2
  26. # 任务序列化格式
  27. task_serializer = 'json'
  28. # 结果序列化格式
  29. result_serializer = 'json'
  30. # 消息格式白名单
  31. accept_content = ['json', 'pickle']

Celery配置项说明

Celery signal(信号)有什么作用

1:信号(signal)机制是Linux系统中最为古老的进程之间的通信机制,解决进程在正常运行过程中被中断的问题,导致进程的处理流程会发生变化

2:信号是软件中断

3:信号是异步事件

Celery定义了如下信号
有多种类型的事件可以触发信号,你可以连接到这些信号,使得在他们触发的时候执行自定义操作。

  1. __all__ = (
  2. 'before_task_publish', 'after_task_publish',
  3. 'task_prerun', 'task_postrun', 'task_success',
  4. 'task_retry', 'task_failure', 'task_revoked', 'celeryd_init',
  5. 'celeryd_after_setup', 'worker_init', 'worker_process_init',
  6. 'worker_process_shutdown', 'worker_ready', 'worker_shutdown',
  7. 'worker_shutting_down', 'setup_logging', 'after_setup_logger',
  8. 'after_setup_task_logger', 'beat_init', 'beat_embedded_init',
  9. 'heartbeat_sent', 'eventlet_pool_started', 'eventlet_pool_preshutdown',
  10. 'eventlet_pool_postshutdown', 'eventlet_pool_apply',
  11. )

Cloudify Celery App注册了如下两个signal

  1. @signals.worker_process_init.connect
  2. def declare_fork(**kwargs):
  3. try:
  4. import Crypto.Random
  5. Crypto.Random.atfork()
  6. except ImportError:
  7. pass
  8. # This is a ugly hack to restart the hub event loop
  9. # after the Celery mainProcess started...
  10. @signals.worker_ready.connect
  11. def reset_worker_tasks_state(sender, *args, **kwargs):
  12. if sender.loop is not asynloop:
  13. return
  14. inspector = app.control.inspect(destination=[sender.hostname])
  15. def callback(*args, **kwargs):
  16. try:
  17. inspector.stats()
  18. except BaseException:
  19. pass
  20. sender.hub.call_soon(callback=callback)

扩展阅读:Linux信号量

扩展阅读:Celery信号

Celery failover工作原理

TBC

Celery, Kombu, Pika有什么关系

  • Celery: 分布式任务执行系统
  • Kombu:封装消息中间件传输协议,提供connection client。包括:AMQP,Redis,QPID,Zookeeper,Consul,Etcd,SLMQ,SQS等等。
    Kombu从属于Celery项目。
  • pika:封装AMQP0-9-1传输协议,提供connection client。
    扩展阅读:StackOverflow: Kombu vs. Pika

Celery中Pidbox做什么的

Worker mailbox (remote control)

Pidbox is the broadcast messaging system used by celery to support remote control of workers.

以下是曾经排查过的pidbox消息堆积问题

  • 现象:celery@Server_yncx0p.celery.pidbox队列中消息数量600+
  1. {"arguments": {}, "matcher": null, "reply_to": {"routing_key": "db0f1641-5317-3efb-a0e0-decc122912fa", "exchange": "reply.celery.pidbox"}, "pattern": null, "ticket": "82f2655d-0714-4a97-b878-31d33c8db1ec", "destination": ["celery@cloudify.management"], "method": "ping"}
  • 问题排查:所有ping消息会发送给celery.pidbox这个exchange,该exchange类型为‘fanout’(广播给所有绑定的队列)。

  • 结论:ping类型的消息是广播类型。当某个队列无consumer,pidbox消息会堆积。

测试代码

  1. from celery import Celery
  2. # 如果RabbitMQ有认证,注意修改一下
  3. broker_url = 'amqp://guest:guest@127.0.0.1:5672//'
  4. def get_celery_client(broker_url=broker_url,
  5. broker_ssl_enabled=False,
  6. broker_ssl_cert_path=None,
  7. max_retries=None):
  8. from celery import Celery
  9. celery_client = Celery(broker=broker_url,
  10. backend=broker_url)
  11. celery_client.conf.update(
  12. CELERY_TASK_RESULT_EXPIRES=600)
  13. if broker_ssl_enabled:
  14. import ssl
  15. celery_client.conf.BROKER_USE_SSL = {
  16. 'ca_certs': broker_ssl_cert_path,
  17. 'cert_reqs': ssl.CERT_REQUIRED,
  18. }
  19. celery_client.pool.connection.ensure_connection(max_retries=max_retries)
  20. return celery_client
  21. def ping(worker=None):
  22. app = get_celery_client(broker_url, max_retries=3)
  23. try:
  24. destination = [worker] if worker else []
  25. inspect = app.control.inspect(destination=[worker],
  26. timeout=10)
  27. resp = inspect.ping()
  28. except BaseException:
  29. return None
  30. finally:
  31. if app:
  32. app.close()
  33. return resp
  34. resp = ping('celery@test_queue')
  35. print(resp)

Celery如何支持并发的

  1. 多进程:Prefork
    Mgmtworker使用该模式工作
  2. 协程:Eventlet,Gevent
    celery -A proj worker -P eventlet -c 10
    celery -A proj worker -P gevent -c 10
  3. 横向扩展
  • autoscale
  • 多服务器/多容器实例

思考: Mgmtworker是否能够使用协程来实现并发?为什么?如何做?

Celery监控

  • Celery flower(Python项目)

Celery的版本

最新版本: 5.0.2, 不再支持Python2
Cloudify社区代码使用的版本:3.1.17
我们Mgmtworker使用的版本:4.4.2
Agent使用的版本:3.1.26

Celery 难解之谜

  • 任务执行hang住
    • 任务并发量大,死锁
    • 任务不多,amqp连接死了
  • Celery Heartbeat丢失异常重启
    • RabbitMQ每隔一定周期,打印heartbeat丢失信息,将connection 断开,Celery worker被动重启
    • Celery Worker接收不到Heartbeat主动重启

Celery有什么替代方案

Python生态的悲哀:没有好的替代品

RQ?Huey

扩展阅读:StackOverflow: Pros and cons to use Celery vs. RQ

参考文档

  1. celery有什么难理解的
  2. celery最佳实践
  3. celery源码解析
  4. celery官方文档
  5. RabbitMQ Connection Channel 详解
  6. 高级消息队列协议
  7. 高并发架构系列:Kafka、RocketMQ、RabbitMQ的优劣势比较
  8. RabbitMQ 通道指南