什么是Celery

Celery是一个分布式任务的框架,具备如下特点:

  1. 简单:几行代码就可以创建一个简单的Celery任务。
  2. 高可用:任务可以自动重试。
  3. 快速:可以在一分钟内执行上百万个任务。
  4. 灵活:每一部分都可以轻松进行扩展。

Celery使用场景

Celery非常适合用于去做需要异步执行的任务,例如:

  1. 发送电子邮件,发送IM消息通知
  2. 爬取网页,数据分析
  3. 图片和视频处理
  4. 生成报告等

Celery架构

Celery架构如下图所示:

DjangoCelery快速入门 - 图1

  1. 首先,任务的来源可以是WebServer下发,也是可以是定时任务器下发。
  2. 接下来,下发的任务首先会存放到一个Broker的队列中等待处理。
  3. 然后Worker会从Broker消息队列中读取消息并处理。
  4. 最后将处理后得到的结果再次写入一个数据库进行存储。

Celery环境搭建

下面,我们来看如何搭建Celery的环境。

第一步:安装Celery第三方库

  1. pip3 install celery

第二步:安装Celery依赖库

  1. pip3 install "celery[librabbitmq,redis,auth,msgpack]"

第三步:做为Broker示例,我们需要安装一个Redis,此处,我们用docker了部署一个Redis实例

  1. docker run -d -p 6379:6379 redis

到此为止,Celery的基本环境我们就已经准备完成了,下面我们可以用一个Celery的demo来验证我们的环境是否OK。

Celery Demo

创建tasks.py文件如下:

  1. from celery import Celery
  2. app = Celery('tasks', broker='redis://127.0.0.1', backend='redis://127.0.0.1')
  3. @app.task
  4. def add(x, y):
  5. return x + y

启动celery worker:

  1. celery -A tasks worker --loglevel=info

DjangoCelery快速入门 - 图2

接下来,我们创建一个运行任务的脚本run_task.py:

  1. from tasks import add
  2. result = add.delay(4, 4)
  3. print('Is Task ready: %s' % result.ready())
  4. run_result = result.get(timeout=1)
  5. print("task result: %s" % run_result)

运行脚本,观察输出如下:

  1. python3 ./run_task.py
  2. # Is Task ready: False
  3. # task result: 8

可以看到,该任务的确是异步执行的,首先执行后,任务的状态并未完成,然后等待任务执行完成后,获取到了计算的结果。

Celery任务的监控

因为任务异步化对于我们项目的运维和问题定位无疑是增加了一定的成本,为了能够让我们的系统更加容易监控和观察,Celery提供了一套监控方案:Flower。

下面,我们来体验一下Flower。

Step1: 安装flower

  1. pip3 install flower==0.9.7

Step2: 启动flower

  1. celery -A tasks flower --broker=redis://localhost:6379/0

DjangoCelery快速入门 - 图3

访问localhost:5555可以看到如下页面:

DjangoCelery快速入门 - 图4

在该页面中,我们可以查询Celery相关的节点,任务等一系列详细信息。

Django集成Celery

在上面的例子中,我们主要在讲解Celery自身的功能和用法,接下来,我们将会结合Django来讲解如何在Django中使用Celery。

第一步: 在Django主应用的目录下,创建一个celery.py文件:

  1. from __future__ import absolute_import, unicode_literals
  2. import os
  3. from celery import Celery
  4. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project_name.settings')
  5. app = Celery('project_name')
  6. # Using a string here means the worker doesn't have to serialize
  7. # the configuration object to child processes.
  8. # - namespace='CELERY' means all celery-related configuration keys
  9. # should have a `CELERY_` prefix.
  10. app.config_from_object('django.conf:settings', namespace='CELERY')
  11. # Load task modules from all registered Django app configs.
  12. app.autodiscover_tasks() # 自动收集各个应用下的tasks.py文件
  13. @app.task(bind=True)
  14. def debug_task(self):
  15. print('Request: {0!r}'.format(self.request))

第二步: 在Django主应用目录下修改__init__.py文件如下:

  1. from __future__ import absolute_import, unicode_literals
  2. # This will make sure the app is always imported when
  3. # Django starts so that shared_task will use this app.
  4. from .celery import app as celery_app
  5. __all__ = ('celery_app',)

它的作用是在项目启动的时候初始化Celery APP对象。

第三步: 修改settings.py文件,增加Celery的相关配置。

  1. CELERY_BROKER_URL = 'redis://redis:6379/0'
  2. CELERY_RESULT_BACKEND = 'redis://redis:6379/1'
  3. CELERY_ACCEPT_CONTENT = ['application/json']
  4. CELERY_RESULT_SERIALIZER = 'json'
  5. CELERY_TASK_SERIALIZER = 'json'
  6. CELERY_TIMEZONE = 'Asia/Shanghai'
  7. CELERYD_MAX_TASKS_PER_CHILD = 10
  8. CELERYD_LOG_FILE = os.path.join(BASE_DIR, "logs", "celery_work.log")
  9. CELERYBEAT_LOG_FILE = os.path.join(BASE_DIR, "logs", "celery_beat.log")

第四步:在对应的APP目录下,创建tasks.py文件:

  1. from __future__ import absolute_import, unicode_literals
  2. from celery import shared_task
  3. from .dingtalk import send
  4. @shared_task
  5. def send_dingtalk_message(message):
  6. send(message)

此时,只需要在将之前调用send的方法改成send_dingtalk_message.delay的方式调用,即可实现异步任务执行的逻辑。

第五步: 启动相关服务

  1. 启动Celery
  1. celery --app ${project_name} worker -l info
  1. 启动Django
  1. python3 ./manage.py runserver 0.0.0.0:8000
  1. 启动flower监控
  1. celery -A ${project_name} flower

Django支持定时任务

上面的内容中,我们使用Django + Celery结合实现了异步任务的执行,接下来,我们将会利用Celery + Django实现定时任务。

准备工作

Step1: 安装第三方依赖

  1. pip3 install django-celery-beat

Step2: 在项目配置的INSTALL_APPS中添加django_celery_beat

Step3: 同步数据库表结构。

  1. python3 ./manage.py migrate

Step4: 启动Beat进程

  1. celery -A ${project_name} beat --scheduler django_celery_beat.schedulers:DatabaseScheduler

管理定时任务的方法

在django-celery-beat中,有如下几种管理定时任务的方式:

  1. 在Admin后台添加管理定时任务。
  2. 系统启动时自动注册定时任务。
  3. 直接设置应用的beat_scheduler。
  4. 运行时添加定时任务。

django-celery-beat管理后台管理定时任务

打开django-celery-beat管理后台,你会看到Periodic Task的项目下存在Intervals 、 Crontabs 、 Periodic task。

其中:

  1. Intervals 定义了定时任务的间隔时间。
  2. Crontabs 支持通过 Linux Crontabs 的格式来定义任务的运行时机的。
  3. Periodic task 中定义了具体有哪些定时任务。

最终,我们可以通过在Admin管理后台创建 Periodic task 来实现定时任务的管理。

系统启动时自动注册定时任务

除了Admin管理后台外,我们还可以在系统启动时启动注册定时任务,注册的方式如下,修改project_name/celery.py文件,增加如下内容:

  1. from celery.schedules import crontab
  2. @app.task
  3. def test(arg):
  4. print(arg)
  5. @app.on_after_configure.connect
  6. def setup_periodic_tasks(sender, **kwargs):
  7. # Calls test('hello') every 10 seconds.
  8. sender.add_periodic_task(10.0, test.s('hello'), name='hello every 10')
  9. # Calls test('world') every 30 seconds
  10. sender.add_periodic_task(30.0, test.s('world'), expires=10)
  11. # Executes every Monday morning at 7:30 a.m.
  12. sender.add_periodic_task(
  13. crontab(hour=7, minute=30, day_of_week=1),
  14. test.s('Happy Mondays!'),
  15. )

直接设置应用的beat_scheduler

  1. app.conf.beat_schedule = {
  2. 'add-every-10-seconds': {
  3. 'task': 'app_name.tasks.function_name',
  4. 'schedule': 10.0,
  5. 'args': (16, 4, )
  6. },
  7. }

直接对Celery对象设置 conf.beat_schedule 即可。

运行时添加定时任务

  1. from django_celery_beat.models import PeriodicTask, IntervalSchedule
  2. # 创建定时策略
  3. scheduler, created = IntervalSchedule.object.get_or_create(every=10, period=IntervalSchedule.SECONDS)
  4. # 创建任务
  5. task = PeriodicTask.objects.create(
  6. interval=scheduler, name="say welcome", task='project_name.celery.function', args=json.dumps(data)
  7. )

参考资源

  1. Celery官方文档