基础
celeryconfig.py配置文件
# celeryconfig.py配置broker_url = 'pyamqp://' #任务队列result_backend = 'rpc://' #任务结果存储task_serializer = 'json'result_serializer = 'json'accept_content = ['json']timezone = 'Europe/Oslo'enable_utc = True# 检查celery配置python -m celeryconfigtask_routes = {'tasks.add': 'low-priority',}
任务配置
# 为每分钟内允许执行的10个任务task_annotations = { 'tasks.add': {'rate_limit': '10/m'}}# 命令行修改任务速度$ celery -A tasks control rate_limit tasks.add 10/mworker@example.com: OKnew rate limit set successfully
proj/celery.py
# proj/celery.pyfrom __future__ import absolute_import, unicode_literalsfrom celery import Celery# include 参数是程序启动时倒入的模块列表,可以该处添加任务模块,便于职程能够对应相应的任务。app = Celery('proj',broker='amqp://',backend='amqp://',include=['proj.tasks'])# Optional configuration, see the application user guide.app.conf.update(result_expires=3600,)if __name__ == '__main__':app.start()@task(ignore_result=True) 参数,针对单个任务禁用
tasks.py #include名称路径与include的一致
from __future__ import absolute_import, unicode_literalsfrom .celery import app@app.taskdef add(x, y):return x + y@app.taskdef mul(x, y):return x * y@app.taskdef xsum(numbers):return sum(numbers)
worker
celery -A proj worker -l info #启动
celery worker —help #查看帮助
-------------- celery@halcyon.local v4.0 (latentcall)---- **** -------- * *** * -- [Configuration]-- * - **** --- . broker: amqp://guest@localhost:5672//- ** ---------- . app: __main__:0x1012d8590- ** ---------- . concurrency: 8 (processes)- ** ---------- . events: OFF (enable -E to monitor this worker)- ** ----------- *** --- * --- [Queues]-- ******* ---- . celery: exchange:celery(direct) binding:celery--- ***** -----[2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has started.broker 为 Celery 程序中指定的中间人(Broker)的连接URL,也可以通过 -b 选项在命令行进行设置其他的中间人(Broker)。concurrency 为同时处理任务的工作进程数量,所有的进程都被占满时,新的任务需要进行等待其中的一个进程完成任务才能执行进行任务。默认的并发数为当前计算机的 CPU 数,可以通过设置 celery worker-c 项进行自定义设置并发数。没有推荐的并发数,因为最佳的并发数取决于很多因素,如果任务主要是 I/O 限制,可以进行增加并发数,经过测试,设置超过两倍的 CPU 数量效果不是很好,很有可能会降低性能。包括默认的 prefork 池,Celery 也支持在单个线程中使用 Eventlet、Gevent。(详情参阅:并发:Concurrency)Events 选项设置为启用状态时, Celery 会开启监控事件来进行监视 职程(Worker)。一般情况用于监控程序,如 Flower 和 实时 Celery 监控,详情参阅 监控和管理手册:Monitoring and Management Guide。Queues 为职程(Worker)任务队列,可以告诉职程(Worker)同时从多个任务队列中进行消费。通常用于将任务消息路由到特定的职程(Worker)、提升服务质量、关注点分离、优先级排序的常用手段。详情参阅 路由任务:Routing Tasks。
celery multi
$ celery multi start w1 -A proj -l infocelery multi v4.0.0 (latentcall)> Starting nodes...> w1.halcyon.local: OK$ celery multi restart w1 -A proj -l infocelery multi v4.0.0 (latentcall)> Stopping nodes...> w1.halcyon.local: TERM -> 64024> Waiting for 1 node.....> w1.halcyon.local: OK> Restarting node w1.halcyon.local: OKcelery multi v4.0.0 (latentcall)> Stopping nodes...> w1.halcyon.local: TERM -> 64052$ celery multi stop w1 -A proj -l info$ celery multi stopwait w1 -A proj -l info$ mkdir -p /var/run/celery$ mkdir -p /var/log/celery$ celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid \--logfile=/var/log/celery/%n%I.log$ celery multi start 10 -A proj -l info -Q:1-3 images,video -Q:4,5 data \-Q default -L:4,5 debug
celery结果 delay
https://www.celerycn.io/ru-men/celery-jin-jie-shi-yong#cheng-xu-tiao-yong
celery group 并行任务
路由
app.conf.update(task_routes = {'proj.tasks.add': {'queue': 'hipri'},},)
远程控制
https://www.celerycn.io/ru-men/celery-jin-jie-shi-yong#yuan-cheng-kong-zhi
$ celery -A proj inspect active #当前职程(Worker)正在处理的任务:$ celery -A proj inspect active --destination=celery@example.com$ celery -A proj status celery status 命令可以远程控制并且显示集群中职程(Worker)的列表:
定期任务
timezone = 'Asia/Shanghai'必须通过直接使用(app.conf.timezone ='Asia/Shanghai')对其进行配置,或者如果已使用app.config_from_object对其进行了设置,则可以将该设置添加到您的应用程序模块(既常用的celeryconfig.py)中。app.conf.beat_schedule = {'add-every-30-seconds': {'task': 'tasks.add','schedule': 30.0,'args': (16, 16)},}app.conf.timezone = 'UTC'from celery.schedules import crontabapp.conf.beat_schedule = {# Executes every Monday morning at 7:30 a.m.'add-every-monday-morning': {'task': 'tasks.add','schedule': crontab(hour=7, minute=30, day_of_week=1),'args': (16, 16),},}启动celery -A proj beatBeat需要将任务的最后运行时间存储在本地数据库文件(默认情况下命名为celerybeat-schedule)中,因此需要访问该文件才能在当前目录中进行写操作,或者您可以为此文件指定一个自定义位置:
https://docs.celeryproject.org/en/stable/tutorials/task-cookbook.html#cookbook-task-serial
https://www.cnblogs.com/pyspark/articles/8819803.html
锁
锁https://docs.celeryproject.org/en/stable/tutorials/task-cookbook.html#cookbook-task-serialThis document describes the current stable version of Celery (5.1). For development docs, go here.Task CookbookEnsuring a task is only executed one at a timeEnsuring a task is only executed one at a timeYou can accomplish this by using a lock.In this example we’ll be using the cache framework to set a lock that’s accessible for all workers.It’s part of an imaginary RSS feed importer called djangofeeds. The task takes a feed URL as a single argument, and imports that feed into a Django model called Feed. We ensure that it’s not possible for two or more workers to import the same feed at the same time by setting a cache key consisting of the MD5 check-sum of the feed URL.The cache key expires after some time in case something unexpected happens, and something always will…For this reason your tasks run-time shouldn’t exceed the timeout.NoteIn order for this to work correctly you need to be using a cache backend where the .add operation is atomic. memcached is known to work well for this purpose.import timefrom celery import taskfrom celery.utils.log import get_task_loggerfrom contextlib import contextmanagerfrom django.core.cache import cachefrom hashlib import md5from djangofeeds.models import Feedlogger = get_task_logger(__name__)LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes@contextmanagerdef memcache_lock(lock_id, oid):timeout_at = time.monotonic() + LOCK_EXPIRE - 3# cache.add fails if the key already existsstatus = cache.add(lock_id, oid, LOCK_EXPIRE)try:yield statusfinally:# memcache delete is very slow, but we have to use it to take# advantage of using add() for atomic lockingif time.monotonic() < timeout_at and status:# don't release the lock if we exceeded the timeout# to lessen the chance of releasing an expired lock# owned by someone else# also don't release the lock if we didn't acquire itcache.delete(lock_id)@task(bind=True)def import_feed(self, feed_url):# The cache key consists of the task name and the MD5 digest# of the feed URL.feed_url_hexdigest = md5(feed_url).hexdigest()lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)logger.debug('Importing feed: %s', feed_url)with memcache_lock(lock_id, self.app.oid) as acquired:if acquired:return Feed.objects.import_feed(feed_url).urllogger.debug('Feed %s is already being imported by another worker', feed_url)
