临时队列

Celery 创建的队列默认是持久化的。这意味着即使消息中间会将消息写到硬盘使得即使重启任务也会被执行。
但是,一些情况下,消息丢失也没关系,所以并非所有的任务都需要持久化。你可以为这类任务消息创建一个临时队列来提高性能:

  1. from kombu import Exchange, Queue
  2. task_queues = (
  3. Queue('celery', routing_key='celery'),
  4. Queue('transient', Exchange('transient', delivery_mode=1),
  5. routing_key='transient', durable=False),

或者可以配置 task_routes:

  1. task_routes = {
  2. 'proj.tasks.add': {'queue': 'celery', 'delivery_mode': 'transient'}
  3. }

delivery_mode 修改发送到队列的消息的递送方式。one 值代表消息不会写到硬盘,而two值(默认)代表消息可被写到硬盘。
将你的任务导向新的临时队列,你可以通过声明queue参数(或者使用task_routes设置):

  1. task.apply_async(args, queue='transient')

任务限速

Celery 也可以针对任务进行限速,以下为每分钟内允许执行的10个任务的配置:

  1. Celery 也可以针对任务进行限速,以下为每分钟内允许执行的10个任务的配置:
  2. task_annotations = {
  3. 'tasks.add': {'rate_limit': '10/m'}
  4. }

常用配置参数

  1. # 注意,celery4版本后,CELERY_BROKER_URL改为BROKER_URL
  2. BROKER_URL = 'amqp://username:passwd@host:port/虚拟主机名'
  3. # 指定结果的存储地址
  4. CELERY_RESULT_BACKEND = 'redis://username:passwd@host:port/db'
  5. # 指定任务序列化方式
  6. CELERY_TASK_SERIALIZER = 'msgpack'
  7. # 指定结果序列化方式
  8. CELERY_RESULT_SERIALIZER = 'msgpack'
  9. # 任务过期时间,celery任务执行结果的超时时间
  10. CELERY_TASK_RESULT_EXPIRES = 60 * 20
  11. # 指定任务接受的内容序列化类型(序列化),一个列表
  12. CELERY_ACCEPT_CONTENT = ["msgpack"]
  13. # 任务发送完成是否需要确认,这一项对性能有一点影响
  14. CELERY_ACKS_LATE = True
  15. # 压缩方案选择,可以是zlib, bzip2,默认是发送没有压缩的数据
  16. CELERY_MESSAGE_COMPRESSION = 'zlib'
  17. # 规定完成任务的时间
  18. CELERYD_TASK_TIME_LIMIT = 5 # 在5s内完成任务,否则执行该任务的worker将被杀死,任务移交给父进程
  19. # 设置worker的并发数,默认是服务器的内核数目,也是命令行-c参数指定的数目
  20. CELERYD_CONCURRENCY = 4
  21. # celery worker 每次去redis预取任务的数量
  22. CELERYD_PREFETCH_MULTIPLIER = 4
  23. # 每个worker最多执行100个任务被销毁,可以防止内存泄漏
  24. CELERYD_MAX_TASKS_PER_CHILD = 100
  25. # 设置默认的队列名称,如果一个消息不符合其他的队列就会放在默认队列里面,如果什么都不设置的话,数据都会发送到默认的队列中
  26. CELERY_DEFAULT_QUEUE = "default"
  27. # 设置详细的队列
  28. CELERY_QUEUES = {
  29. "default": { # 这是上面指定的默认队列
  30. "exchange": "default",
  31. "exchange_type": "direct",
  32. "routing_key": "default"
  33. },
  34. "topicqueue": { # 这是一个topic队列 凡是topictest开头的routing key都会被放到这个队列
  35. "routing_key": "topic.#",
  36. "exchange": "topic_exchange",
  37. "exchange_type": "topic",
  38. },
  39. "task_eeg": { # 设置扇形交换机
  40. "exchange": "tasks",
  41. "exchange_type": "fanout",
  42. "binding_key": "tasks",
  43. },
  44. }
  45. # 注:引用-> https://www.yuque.com/keep_running/python/bq005a

监控工具

Flower: https://github.com/mher/flower.git

问题

woker数量过多,连接redis失败

  1. BROKER_POOL_LIMIT = 50
  2. CELERY_REDIS_MAX_CONNECTIONS = 60
  3. BROKER_TRANSPORT_OPTIONS = {
  4. "max_connections": 400,
  5. }

Django Celery 4.x.x 本地时区问题

  1. # setting.py
  2. USE_TZ = False
  3. TIME_ZONE = 'Asia/Shanghai'
  4. USE_I18N = True
  5. USE_L10N = True
  6. DJANGO_CELERY_BEAT_TZ_AWARE = False
  7. CELERY_TIMEZONE = 'Asia/Shanghai'
  8. CELERY_ENABLE_UTC = False
  9. # celery.py
  10. celery_app.conf.timezone = 'Asia/Shanghai'
  11. celery_app.conf.enable_utc = False
  12. # 创建cron任务需要指定timezone (因为CrontabSchedule中timezone字段默认为UTC时间)
  13. # crontab.py
  14. task, created = celery_models.PeriodicTask.objects.get_or_create(name=name, task=task)
  15. if crontab_time:
  16. # 获取 crontab
  17. crontab_time['timezone'] = 'Asia/Shanghai'
  18. crontab = celery_models.CrontabSchedule.objects.filter(**crontab_time).first()
  19. if crontab is None:
  20. # 如果没有就创建,有的话就继续复用之前的crontab
  21. crontab = celery_models.CrontabSchedule.objects.create(**crontab_time)
  22. task.crontab = crontab

这样配置后celery日志显示的时间还是UTC显示

类似模块