基础
celeryconfig.py配置文件
# celeryconfig.py配置
broker_url = 'pyamqp://' #任务队列
result_backend = 'rpc://' #任务结果存储
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
# 检查celery配置
python -m celeryconfig
task_routes = {
'tasks.add': 'low-priority',
}
任务配置
# 为每分钟内允许执行的10个任务
task_annotations = { 'tasks.add': {'rate_limit': '10/m'}
}
# 命令行修改任务速度
$ celery -A tasks control rate_limit tasks.add 10/m
worker@example.com: OK
new rate limit set successfully
proj/celery.py
# proj/celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
# include 参数是程序启动时倒入的模块列表,可以该处添加任务模块,便于职程能够对应相应的任务。
app = Celery('proj',
broker='amqp://',
backend='amqp://',
include=['proj.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
@task(ignore_result=True) 参数,针对单个任务禁用
tasks.py #include名称路径与include的一致
from __future__ import absolute_import, unicode_literals
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
worker
celery -A proj worker -l info #启动
celery worker —help #查看帮助
-------------- celery@halcyon.local v4.0 (latentcall)
---- **** -----
--- * *** * -- [Configuration]
-- * - **** --- . broker: amqp://guest@localhost:5672//
- ** ---------- . app: __main__:0x1012d8590
- ** ---------- . concurrency: 8 (processes)
- ** ---------- . events: OFF (enable -E to monitor this worker)
- ** ----------
- *** --- * --- [Queues]
-- ******* ---- . celery: exchange:celery(direct) binding:celery
--- ***** -----
[2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has started.
broker 为 Celery 程序中指定的中间人(Broker)的连接URL,
也可以通过 -b 选项在命令行进行设置其他的中间人(Broker)。
concurrency 为同时处理任务的工作进程数量,所有的进程都被占满时,
新的任务需要进行等待其中的一个进程完成任务才能执行进行任务。
默认的并发数为当前计算机的 CPU 数,可以通过设置 celery worker-c 项进行自定义设置并发数。
没有推荐的并发数,因为最佳的并发数取决于很多因素,如果任务主要是 I/O 限制,
可以进行增加并发数,经过测试,设置超过两倍的 CPU 数量效果不是很好,很有可能会降低性能。
包括默认的 prefork 池,Celery 也支持在单个线程中使用 Eventlet、Gevent。
(详情参阅:并发:Concurrency)
Events 选项设置为启用状态时, Celery 会开启监控事件来进行监视 职程(Worker)。
一般情况用于监控程序,如 Flower 和 实时 Celery 监控,
详情参阅 监控和管理手册:Monitoring and Management Guide。
Queues 为职程(Worker)任务队列,可以告诉职程(Worker)同时从多个任务队列中进行消费。
通常用于将任务消息路由到特定的职程(Worker)、提升服务质量、关注点分离、优先级排序的常用手段。
详情参阅 路由任务:Routing Tasks。
celery multi
$ celery multi start w1 -A proj -l info
celery multi v4.0.0 (latentcall)
> Starting nodes...
> w1.halcyon.local: OK
$ celery multi restart w1 -A proj -l info
celery multi v4.0.0 (latentcall)
> Stopping nodes...
> w1.halcyon.local: TERM -> 64024
> Waiting for 1 node.....
> w1.halcyon.local: OK
> Restarting node w1.halcyon.local: OK
celery multi v4.0.0 (latentcall)
> Stopping nodes...
> w1.halcyon.local: TERM -> 64052
$ celery multi stop w1 -A proj -l info
$ celery multi stopwait w1 -A proj -l info
$ mkdir -p /var/run/celery
$ mkdir -p /var/log/celery
$ celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid \
--logfile=/var/log/celery/%n%I.log
$ celery multi start 10 -A proj -l info -Q:1-3 images,video -Q:4,5 data \
-Q default -L:4,5 debug
celery结果 delay
https://www.celerycn.io/ru-men/celery-jin-jie-shi-yong#cheng-xu-tiao-yong
celery group 并行任务
路由
app.conf.update(
task_routes = {
'proj.tasks.add': {'queue': 'hipri'},
},
)
远程控制
https://www.celerycn.io/ru-men/celery-jin-jie-shi-yong#yuan-cheng-kong-zhi
$ celery -A proj inspect active #当前职程(Worker)正在处理的任务:
$ celery -A proj inspect active --destination=celery@example.com
$ celery -A proj status celery status 命令可以远程控制并且显示集群中职程(Worker)的列表:
定期任务
timezone = 'Asia/Shanghai'
必须通过直接使用(app.conf.timezone ='Asia/Shanghai')对其进行配置,
或者如果已使用app.config_from_object对其进行了设置,
则可以将该设置添加到您的应用程序模块(既常用的celeryconfig.py)中。
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
app.conf.timezone = 'UTC'
from celery.schedules import crontab
app.conf.beat_schedule = {
# Executes every Monday morning at 7:30 a.m.
'add-every-monday-morning': {
'task': 'tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}
启动
celery -A proj beat
Beat需要将任务的最后运行时间存储在本地数据库文件(默认情况下命名为celerybeat-schedule)中,
因此需要访问该文件才能在当前目录中进行写操作,或者您可以为此文件指定一个自定义位置:
https://docs.celeryproject.org/en/stable/tutorials/task-cookbook.html#cookbook-task-serial
https://www.cnblogs.com/pyspark/articles/8819803.html
锁
锁
https://docs.celeryproject.org/en/stable/tutorials/task-cookbook.html#cookbook-task-serial
This document describes the current stable version of Celery (5.1). For development docs, go here.
Task Cookbook
Ensuring a task is only executed one at a time
Ensuring a task is only executed one at a time
You can accomplish this by using a lock.
In this example we’ll be using the cache framework to set a lock that’s accessible for all workers.
It’s part of an imaginary RSS feed importer called djangofeeds. The task takes a feed URL as a single argument, and imports that feed into a Django model called Feed. We ensure that it’s not possible for two or more workers to import the same feed at the same time by setting a cache key consisting of the MD5 check-sum of the feed URL.
The cache key expires after some time in case something unexpected happens, and something always will…
For this reason your tasks run-time shouldn’t exceed the timeout.
Note
In order for this to work correctly you need to be using a cache backend where the .add operation is atomic. memcached is known to work well for this purpose.
import time
from celery import task
from celery.utils.log import get_task_logger
from contextlib import contextmanager
from django.core.cache import cache
from hashlib import md5
from djangofeeds.models import Feed
logger = get_task_logger(__name__)
LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
@contextmanager
def memcache_lock(lock_id, oid):
timeout_at = time.monotonic() + LOCK_EXPIRE - 3
# cache.add fails if the key already exists
status = cache.add(lock_id, oid, LOCK_EXPIRE)
try:
yield status
finally:
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if time.monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_id)
@task(bind=True)
def import_feed(self, feed_url):
# The cache key consists of the task name and the MD5 digest
# of the feed URL.
feed_url_hexdigest = md5(feed_url).hexdigest()
lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)
logger.debug('Importing feed: %s', feed_url)
with memcache_lock(lock_id, self.app.oid) as acquired:
if acquired:
return Feed.objects.import_feed(feed_url).url
logger.debug(
'Feed %s is already being imported by another worker', feed_url)