基础

celeryconfig.py配置文件

  1. # celeryconfig.py配置
  2. broker_url = 'pyamqp://' #任务队列
  3. result_backend = 'rpc://' #任务结果存储
  4. task_serializer = 'json'
  5. result_serializer = 'json'
  6. accept_content = ['json']
  7. timezone = 'Europe/Oslo'
  8. enable_utc = True
  9. # 检查celery配置
  10. python -m celeryconfig
  11. task_routes = {
  12. 'tasks.add': 'low-priority',
  13. }

任务配置

  1. # 为每分钟内允许执行的10个任务
  2. task_annotations = { 'tasks.add': {'rate_limit': '10/m'}
  3. }
  4. # 命令行修改任务速度
  5. $ celery -A tasks control rate_limit tasks.add 10/m
  6. worker@example.com: OK
  7. new rate limit set successfully

proj/celery.py

  1. # proj/celery.py
  2. from __future__ import absolute_import, unicode_literals
  3. from celery import Celery
  4. # include 参数是程序启动时倒入的模块列表,可以该处添加任务模块,便于职程能够对应相应的任务。
  5. app = Celery('proj',
  6. broker='amqp://',
  7. backend='amqp://',
  8. include=['proj.tasks'])
  9. # Optional configuration, see the application user guide.
  10. app.conf.update(
  11. result_expires=3600,
  12. )
  13. if __name__ == '__main__':
  14. app.start()
  15. @task(ignore_result=True) 参数,针对单个任务禁用

tasks.py #include名称路径与include的一致

  1. from __future__ import absolute_import, unicode_literals
  2. from .celery import app
  3. @app.task
  4. def add(x, y):
  5. return x + y
  6. @app.task
  7. def mul(x, y):
  8. return x * y
  9. @app.task
  10. def xsum(numbers):
  11. return sum(numbers)

worker

celery -A proj worker -l info #启动
celery worker —help #查看帮助

  1. -------------- celery@halcyon.local v4.0 (latentcall)
  2. ---- **** -----
  3. --- * *** * -- [Configuration]
  4. -- * - **** --- . broker: amqp://guest@localhost:5672//
  5. - ** ---------- . app: __main__:0x1012d8590
  6. - ** ---------- . concurrency: 8 (processes)
  7. - ** ---------- . events: OFF (enable -E to monitor this worker)
  8. - ** ----------
  9. - *** --- * --- [Queues]
  10. -- ******* ---- . celery: exchange:celery(direct) binding:celery
  11. --- ***** -----
  12. [2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has started.
  13. broker Celery 程序中指定的中间人(Broker)的连接URL
  14. 也可以通过 -b 选项在命令行进行设置其他的中间人(Broker)。
  15. concurrency 为同时处理任务的工作进程数量,所有的进程都被占满时,
  16. 新的任务需要进行等待其中的一个进程完成任务才能执行进行任务。
  17. 默认的并发数为当前计算机的 CPU 数,可以通过设置 celery worker-c 项进行自定义设置并发数。
  18. 没有推荐的并发数,因为最佳的并发数取决于很多因素,如果任务主要是 I/O 限制,
  19. 可以进行增加并发数,经过测试,设置超过两倍的 CPU 数量效果不是很好,很有可能会降低性能。
  20. 包括默认的 prefork 池,Celery 也支持在单个线程中使用 EventletGevent
  21. (详情参阅:并发:Concurrency
  22. Events 选项设置为启用状态时, Celery 会开启监控事件来进行监视 职程(Worker)。
  23. 一般情况用于监控程序,如 Flower 实时 Celery 监控,
  24. 详情参阅 监控和管理手册:Monitoring and Management Guide
  25. Queues 为职程(Worker)任务队列,可以告诉职程(Worker)同时从多个任务队列中进行消费。
  26. 通常用于将任务消息路由到特定的职程(Worker)、提升服务质量、关注点分离、优先级排序的常用手段。
  27. 详情参阅 路由任务:Routing Tasks

celery multi

  1. $ celery multi start w1 -A proj -l info
  2. celery multi v4.0.0 (latentcall)
  3. > Starting nodes...
  4. > w1.halcyon.local: OK
  5. $ celery multi restart w1 -A proj -l info
  6. celery multi v4.0.0 (latentcall)
  7. > Stopping nodes...
  8. > w1.halcyon.local: TERM -> 64024
  9. > Waiting for 1 node.....
  10. > w1.halcyon.local: OK
  11. > Restarting node w1.halcyon.local: OK
  12. celery multi v4.0.0 (latentcall)
  13. > Stopping nodes...
  14. > w1.halcyon.local: TERM -> 64052
  15. $ celery multi stop w1 -A proj -l info
  16. $ celery multi stopwait w1 -A proj -l info
  17. $ mkdir -p /var/run/celery
  18. $ mkdir -p /var/log/celery
  19. $ celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid \
  20. --logfile=/var/log/celery/%n%I.log
  21. $ celery multi start 10 -A proj -l info -Q:1-3 images,video -Q:4,5 data \
  22. -Q default -L:4,5 debug

celery结果 delay

  1. https://www.celerycn.io/ru-men/celery-jin-jie-shi-yong#cheng-xu-tiao-yong

celery group 并行任务

路由

  1. app.conf.update(
  2. task_routes = {
  3. 'proj.tasks.add': {'queue': 'hipri'},
  4. },
  5. )

远程控制

https://www.celerycn.io/ru-men/celery-jin-jie-shi-yong#yuan-cheng-kong-zhi

  1. $ celery -A proj inspect active #当前职程(Worker)正在处理的任务:
  2. $ celery -A proj inspect active --destination=celery@example.com
  3. $ celery -A proj status celery status 命令可以远程控制并且显示集群中职程(Worker)的列表:

定期任务

  1. timezone = 'Asia/Shanghai'
  2. 必须通过直接使用(app.conf.timezone ='Asia/Shanghai')对其进行配置,
  3. 或者如果已使用app.config_from_object对其进行了设置,
  4. 则可以将该设置添加到您的应用程序模块(既常用的celeryconfig.py)中。
  5. app.conf.beat_schedule = {
  6. 'add-every-30-seconds': {
  7. 'task': 'tasks.add',
  8. 'schedule': 30.0,
  9. 'args': (16, 16)
  10. },
  11. }
  12. app.conf.timezone = 'UTC'
  13. from celery.schedules import crontab
  14. app.conf.beat_schedule = {
  15. # Executes every Monday morning at 7:30 a.m.
  16. 'add-every-monday-morning': {
  17. 'task': 'tasks.add',
  18. 'schedule': crontab(hour=7, minute=30, day_of_week=1),
  19. 'args': (16, 16),
  20. },
  21. }
  22. 启动
  23. celery -A proj beat
  24. Beat需要将任务的最后运行时间存储在本地数据库文件(默认情况下命名为celerybeat-schedule)中,
  25. 因此需要访问该文件才能在当前目录中进行写操作,或者您可以为此文件指定一个自定义位置:

https://docs.celeryproject.org/en/stable/tutorials/task-cookbook.html#cookbook-task-serial
https://www.cnblogs.com/pyspark/articles/8819803.html

  1. https://docs.celeryproject.org/en/stable/tutorials/task-cookbook.html#cookbook-task-serial
  2. This document describes the current stable version of Celery (5.1). For development docs, go here.
  3. Task Cookbook
  4. Ensuring a task is only executed one at a time
  5. Ensuring a task is only executed one at a time
  6. You can accomplish this by using a lock.
  7. In this example well be using the cache framework to set a lock thats accessible for all workers.
  8. Its part of an imaginary RSS feed importer called djangofeeds. The task takes a feed URL as a single argument, and imports that feed into a Django model called Feed. We ensure that its not possible for two or more workers to import the same feed at the same time by setting a cache key consisting of the MD5 check-sum of the feed URL.
  9. The cache key expires after some time in case something unexpected happens, and something always will
  10. For this reason your tasks run-time shouldnt exceed the timeout.
  11. Note
  12. In order for this to work correctly you need to be using a cache backend where the .add operation is atomic. memcached is known to work well for this purpose.
  13. import time
  14. from celery import task
  15. from celery.utils.log import get_task_logger
  16. from contextlib import contextmanager
  17. from django.core.cache import cache
  18. from hashlib import md5
  19. from djangofeeds.models import Feed
  20. logger = get_task_logger(__name__)
  21. LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
  22. @contextmanager
  23. def memcache_lock(lock_id, oid):
  24. timeout_at = time.monotonic() + LOCK_EXPIRE - 3
  25. # cache.add fails if the key already exists
  26. status = cache.add(lock_id, oid, LOCK_EXPIRE)
  27. try:
  28. yield status
  29. finally:
  30. # memcache delete is very slow, but we have to use it to take
  31. # advantage of using add() for atomic locking
  32. if time.monotonic() < timeout_at and status:
  33. # don't release the lock if we exceeded the timeout
  34. # to lessen the chance of releasing an expired lock
  35. # owned by someone else
  36. # also don't release the lock if we didn't acquire it
  37. cache.delete(lock_id)
  38. @task(bind=True)
  39. def import_feed(self, feed_url):
  40. # The cache key consists of the task name and the MD5 digest
  41. # of the feed URL.
  42. feed_url_hexdigest = md5(feed_url).hexdigest()
  43. lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)
  44. logger.debug('Importing feed: %s', feed_url)
  45. with memcache_lock(lock_id, self.app.oid) as acquired:
  46. if acquired:
  47. return Feed.objects.import_feed(feed_url).url
  48. logger.debug(
  49. 'Feed %s is already being imported by another worker', feed_url)