本文是我之前的PPT
https://docs.google.com/presentation/d/1QIEgZwApk7m6ukEmm_qM2AbKfdw0j_lRn82pW7rNwfU/edit?usp=sharing

1. Celery是什么

Celery 是一个由 Python 编写的简单、灵活、可靠的用来处理大量信息的分布式系统,它同时提供操作和维护分布式系统所需的工具(它本身不是一个任务队列, 它是 任务队列管理的工具)。官网地址
Celery 的使用 - 图2
Celery的使用场景

  1. 爬虫
  2. 对实时性要求不高的任务(如推送和邮件)

Celery中五个角色

  1. Task 任务
  2. Broker 消息中间件(消息队列)
  3. Worker 消费者(任务执行者)
  4. Beat 定时任务调度器
  5. Backend 任务结果保存者

    2. Celery配置

    Celery 的使用 - 图3

    1. 从其他配置文件导入(最常用)

    定义worker消费者
    Celery 的使用 - 图4
    定义conf文件
    Celery 的使用 - 图5

    2. upload 方法导入

    Celery 的使用 - 图6

    3. 直接导入

    Celery 的使用 - 图7

    3. 单机下异步任务调用

    任务-官方地址

    异步任务编写

    任务在使用之前需要被导入,参见配置直接导入。
    通过task()装饰器可以轻松从任何可调用对象创建任务
    task支持附加的参数:https://docs.celeryproject.org/en/latest/userguide/tasks.html#task-request
    Celery 的使用 - 图8
    当多个装饰器和任务装饰器结合使用的时候,必须保证最后使用任务装饰器
    Celery 的使用 - 图9
    Celery 的使用 - 图10

    异步任务调用

    在 tasks/example/task1.py 中,写入__all__ == [‘task1_a’, ‘task1_b’](规定import可导内容)
    manage.py 中 from tasks.examples.task1 import *(import * 会减低代码可读性一般不适用,在此仅方便讲解)
    任务的调用方法有三种:
  • apply_async(args [, kwargs [, …] ] ) 【常用方法】发送任务消息
  • delay ( args, *kwargs )【 最常用方法】发送任务的快捷方式
  • calling ( call )

官方文档地址: https://docs.celeryproject.org/en/latest/userguide/calling.html
Celery 的使用 - 图11
Celery 的使用 - 图12
Celery 的使用 - 图13

常用的Celery启动命令和各个参数的含义

常用的Celery 启动命令

celery -A tasks.celery_worker.learn worker -l info -c 3 -n a_team -Q q1

参数解释:

celery -A 固定格式
tasks.celery_worker.learn python以module导包的方式,指定worker
-l info 日志级别
-c 3 3个并发
-n a_team worker消费者命名为 a_team
-Q q1 指定worker消费队列q1内的任务
-f notice.log 日志文件的名称

获取异步任务的值

  1. 在Celery任务中调用Celery任务结果

Celery 的使用 - 图14

  1. 在非Celery任务中调用Celery任务结果

Celery 的使用 - 图15

4. 单机下定时任务调用

官网文档地址:https://docs.celeryproject.org/en/latest/reference/celery.schedules.html?highlight=crontab

Celery 的使用 - 图16

定时任务启动命令(添加参数-B)

celery -A tasks.celery_worker.learn worker -B -l info -c 3 -n a_team -Q q1

5. 单机下设计任务流

任务流一共有三种 (group, chain, chord)。
接下来以以下的异步任务为例子进行讲解
Celery 的使用 - 图17

工作流-group

创建一组异步任务来并行执行
Celery 的使用 - 图18

工作流-chain

执行第一个任务并将其返回值传递给链中的下一个任务,以此类推。
Celery 的使用 - 图19

工作流-chord

等待组里中的所有任务完成后执行需要调用的回调。
Celery 的使用 - 图20

6. Celery对接Sentry与日志序列化

Celery 中日志打印

Celery 有个特殊的记录器 celery.task 。这个记录器由 celery worker 设立,目的是将任务相关信息添加到日志中。日志中包含两个新的参数 “task_id” “task_name”。 通过访问任务记录器celery.utils.log 可以对打印日志进行处理。
Celery 的使用 - 图21
日志输出
Celery 的使用 - 图22

自定义celery日志

自定义Celery日志有三种策略

  1. 增加Celery记录器
  2. 覆盖Celery根记录器
  3. 停用Celery记录器

因为第二种最简单,所以介绍一下:本质是通过连接setuplogging信号来组织Celery配置任何记录器
![](https://lh6.googleusercontent.com/YLRDvtyjXRUxMiZ35lw80YUHCzTOxOkkcaqCZOiS3o73hnaid5PUGdDhY8HRSyf2OEfLMGfZUOM2dcYWUDjn2AbUvQOv9oSNwFcXilSmplSkUB1KK_5YgyllfHz9FFC
-oqxx6sVBx4#crop=0&crop=0&crop=1&crop=1&from=url&id=W0rUH&margin=%5Bobject%20Object%5D&originHeight=202&originWidth=450&originalType=binary&ratio=1&rotation=0&showTitle=false&status=done&style=none&title=)
真实使用案例
image.png

序列化与sentry对接

Sentry可以在定义的日志拦截器里轻松添加,而Json序列化的包有很多,在日志格式化器(formatters)里添加就好了。
Celery 的使用 - 图24

7. Celery与Flask相结合

Celery 的使用 - 图25
因为Celery和Flask 各有一套上下文,一般来说是以Flask为主, Celery为辅助,所以需要将Celery的上下文注册到Flask之内。
真实使用中可以按如下方法
image.png

8. 用docker 部署服务

  1. version: '3.7'
  2. services:
  3. ledger_pay:
  4. image: docker.zk.com/ledger-pay-server/project:v0.0.3
  5. restart: always
  6. ports:
  7. - 8533:9333
  8. env_file:
  9. - product.env
  10. sync_crontab:
  11. container_name: ledger_pay_crontab
  12. image: docker.zk.com/ledger-pay-server/project:v0.0.6
  13. restart: always
  14. working_dir: /usr/src/app/ledger-pay/
  15. command: celery -A project.sync_tasks.celery_worker.celery_ledger worker -B -E -l INFO -f /var/log/celery/crontab.log -c 10 -Q crontab
  16. volumes:
  17. - './logs:/var/log/celery/'
  18. env_file:
  19. - product.env
  20. sync_check_order:
  21. container_name: ledger_pay_check_order
  22. image: docker.zk.com/ledger-pay-server/project:v0.0.6
  23. restart: always
  24. working_dir: /usr/src/app/ledger-pay/
  25. command: celery -A project.sync_tasks.celery_worker.celery_ledger worker -E -l INFO -f wechat_trade.log -c 10 -Q wechat_trade
  26. volumes:
  27. - './logs:/var/log/celery/'
  28. env_file:
  29. - product.env
  30. sync_profit_sharing:
  31. container_name: ledger_pay_profit_sharing
  32. image: docker.zk.com/ledger-pay-server/project:v0.0.6
  33. restart: always
  34. working_dir: /usr/src/app/ledger-pay/
  35. command: celery -A project.sync_tasks.celery_worker.celery_ledger worker -E -l INFO -f wechat_separate.log -c 10 -Q wechat_separate
  36. volumes:
  37. - './logs:/var/log/celery/'
  38. env_file:
  39. - product.env
  40. check_separate_status:
  41. container_name: ledger_pay_separate_check
  42. image: docker.zk.com/ledger-pay-server/project:v0.0.6
  43. restart: always
  44. working_dir: /usr/src/app/ledger-pay/
  45. command: celery -A project.sync_tasks.celery_worker.celery_ledger worker -E -l INFO -f query_wechat_separate.log -c 10 -Q query_wechat_separate
  46. volumes:
  47. - './logs:/var/log/celery/'
  48. env_file:
  49. - product.env