临时队列
Celery 创建的队列默认是持久化的。这意味着即使消息中间会将消息写到硬盘使得即使重启任务也会被执行。
但是,一些情况下,消息丢失也没关系,所以并非所有的任务都需要持久化。你可以为这类任务消息创建一个临时队列来提高性能:
from kombu import Exchange, Queuetask_queues = (Queue('celery', routing_key='celery'),Queue('transient', Exchange('transient', delivery_mode=1),routing_key='transient', durable=False),
或者可以配置 task_routes:
task_routes = {'proj.tasks.add': {'queue': 'celery', 'delivery_mode': 'transient'}}
delivery_mode 修改发送到队列的消息的递送方式。one 值代表消息不会写到硬盘,而two值(默认)代表消息可被写到硬盘。
将你的任务导向新的临时队列,你可以通过声明queue参数(或者使用task_routes设置):
task.apply_async(args, queue='transient')
任务限速
Celery 也可以针对任务进行限速,以下为每分钟内允许执行的10个任务的配置:
Celery 也可以针对任务进行限速,以下为每分钟内允许执行的10个任务的配置:task_annotations = {'tasks.add': {'rate_limit': '10/m'}}
常用配置参数
# 注意,celery4版本后,CELERY_BROKER_URL改为BROKER_URLBROKER_URL = 'amqp://username:passwd@host:port/虚拟主机名'# 指定结果的存储地址CELERY_RESULT_BACKEND = 'redis://username:passwd@host:port/db'# 指定任务序列化方式CELERY_TASK_SERIALIZER = 'msgpack'# 指定结果序列化方式CELERY_RESULT_SERIALIZER = 'msgpack'# 任务过期时间,celery任务执行结果的超时时间CELERY_TASK_RESULT_EXPIRES = 60 * 20# 指定任务接受的内容序列化类型(序列化),一个列表CELERY_ACCEPT_CONTENT = ["msgpack"]# 任务发送完成是否需要确认,这一项对性能有一点影响CELERY_ACKS_LATE = True# 压缩方案选择,可以是zlib, bzip2,默认是发送没有压缩的数据CELERY_MESSAGE_COMPRESSION = 'zlib'# 规定完成任务的时间CELERYD_TASK_TIME_LIMIT = 5 # 在5s内完成任务,否则执行该任务的worker将被杀死,任务移交给父进程# 设置worker的并发数,默认是服务器的内核数目,也是命令行-c参数指定的数目CELERYD_CONCURRENCY = 4# celery worker 每次去redis预取任务的数量CELERYD_PREFETCH_MULTIPLIER = 4# 每个worker最多执行100个任务被销毁,可以防止内存泄漏CELERYD_MAX_TASKS_PER_CHILD = 100# 设置默认的队列名称,如果一个消息不符合其他的队列就会放在默认队列里面,如果什么都不设置的话,数据都会发送到默认的队列中CELERY_DEFAULT_QUEUE = "default"# 设置详细的队列CELERY_QUEUES = {"default": { # 这是上面指定的默认队列"exchange": "default","exchange_type": "direct","routing_key": "default"},"topicqueue": { # 这是一个topic队列 凡是topictest开头的routing key都会被放到这个队列"routing_key": "topic.#","exchange": "topic_exchange","exchange_type": "topic",},"task_eeg": { # 设置扇形交换机"exchange": "tasks","exchange_type": "fanout","binding_key": "tasks",},}# 注:引用-> https://www.yuque.com/keep_running/python/bq005a
监控工具
问题
woker数量过多,连接redis失败
BROKER_POOL_LIMIT = 50CELERY_REDIS_MAX_CONNECTIONS = 60BROKER_TRANSPORT_OPTIONS = {"max_connections": 400,}
Django Celery 4.x.x 本地时区问题
# setting.pyUSE_TZ = FalseTIME_ZONE = 'Asia/Shanghai'USE_I18N = TrueUSE_L10N = TrueDJANGO_CELERY_BEAT_TZ_AWARE = FalseCELERY_TIMEZONE = 'Asia/Shanghai'CELERY_ENABLE_UTC = False# celery.pycelery_app.conf.timezone = 'Asia/Shanghai'celery_app.conf.enable_utc = False# 创建cron任务需要指定timezone (因为CrontabSchedule中timezone字段默认为UTC时间)# crontab.pytask, created = celery_models.PeriodicTask.objects.get_or_create(name=name, task=task)if crontab_time:# 获取 crontabcrontab_time['timezone'] = 'Asia/Shanghai'crontab = celery_models.CrontabSchedule.objects.filter(**crontab_time).first()if crontab is None:# 如果没有就创建,有的话就继续复用之前的crontabcrontab = celery_models.CrontabSchedule.objects.create(**crontab_time)task.crontab = crontab
这样配置后celery日志显示的时间还是UTC显示
类似模块
- huey ->Mini-Huey(a little task queue for python )
学习资料
https://www.celerycn.io/
