引言

  • Celery是进行分布式任务处理的工具,它封装了很多常用的针对任务队列的操作。
  • 我们可以使用它快速进行分布式任务队列的使用和管理。
  • 个人觉得,有些像Python concurrent提供的线程池工具。
  • 单靠Celery还不能够完成分布式的任务调度,还需要brokerbackend

工作流程

Celery分布式任务调度 - 图1

这里有一份 Slide,清晰直观地介绍了Celery

相关组件

Broker

broker可以理解为“中间人”,我们有了任务不是直接丢给Celery,而是要通过这个broker来自动调度管理,通常我们选用 RabbitMQ(本质上就是一个队列)。

Backend

Celery workers在完成相关的任务后,它没有义务将结果返回给用户,因为它得专注于当下的任务,才能让复杂任务处理地更加高效。backend是用来存放任务执行结果的,Celery workers将结果储存后就继续忙了。通常要由用户来进行结果的查询和获取,我们可以选用 Redis 作为存储介质。

使用案例

分布式环境

想要实现分布式,就需要多机环境。将一个业务模块拆分成多个子业务,然后部署在不同的服务器上,以解决高并发的问题。

这里准备使用虚拟机创建出两台Ubuntu主机作为服务器(ubuntu_0和ubuntu_1),其中一台部署RabbitMQ,另外一台部署Redis。本地的Windows则作为Celery workers和主业务逻辑的运行平台。

Celery分布式任务调度 - 图2

Celery + RabbitMQ

先看一个简单的例子,这里不使用Redis做后端存储,提交的任务在Celery执行完成后就直接结束,不进行结果存储等其他操作。但是我们可以采用其他方式看到执行的结果。

安装Celery

Celery就是一个Python的模块,直接使用pip进行安装,相关知识可以参考前面的Python基础——包和模块部分。

安装RabbitMQ

ubuntu_0上进行部署。
1.安装
我已经安装过了,第一次安装要等待一会

  1. sudo apt-get install rabbitmq-server

Celery分布式任务调度 - 图3

2.启动

  1. sudo rabbitmq-server // 前台启动,占用 shell 资源, shell 关了也没事
  2. sudo rabbitmq-server -detached // 后台启动,不弹出界面

Celery分布式任务调度 - 图4

3.停止

  1. sudo rabbitmqctl stop

Celery分布式任务调度 - 图5

使用RabbitMQ时,是用户制的,一般直接使用是以guest身份进行访问,这个跟Linux访客模式差不多。也可以创建角色,赋予权限,但为了简单,这里就使用默认的guest用户。

但是,guest是不支持外网访问的,只能够接收本机服务的访问,结合前面的分布式部署,这种限制是必须要取消的。

配置

修改/etc/rabbitmq目录下的rabbitmq.config配置文件,没有就新建一个(我安装的时候是没有的),创建和修改是需要 管理员权限 的,记得命令前加上 sudo 。配置如下信息:

  1. [{rabbit, [{loopback_users, []}]}].
  2. # 最后面的那个 '.' 别掉了

这样就可以接受Windows(外部网络)的访问了。

使用

然后找一个合适的位置编写一个Python脚本tasks.py

  1. import celery
  2. """
  3. ampq是啥?可能你只熟悉 http,它是通信的一种协议
  4. 这里我们要和 rabbitmq 打交道,所以用 ampq 这种协议
  5. """
  6. broker = 'amqp://192.168.160.128//' # 指明 broker
  7. app = celery.Celery('test', broker=broker) # 'test' 是名字
  8. """
  9. 用装饰器来装饰我们需要执行的任务函数
  10. """
  11. @app.task
  12. def reverse(msg):
  13. return msg[::-1]

然后再命令提示符执行celery -A tasks worker --loglevel=infotasks就是脚本文件的名称,最好保证执命令时的路径和脚本文件所在路径是一致的。这样就根据我们所写的脚本文件,开启了一个Celery服务,会创建出和本机cpu数量相同的worker进程,每个进程会使用装饰器修饰过的函数来完成任务逻辑。它监听broker中的任务,进行自动调度。

Celery分布式任务调度 - 图6

随后,我们在同一个目录下开启一个提示符窗口,将脚本文件作为资源进行导入,通过xxx.delay()向队列中提交任务参数,交给Celery服务调度。delay()方法是装饰器(@app.task)赋予的。

Celery分布式任务调度 - 图7

然后前面的一个窗口会出现错误信息(Windows的特色,搞开发就没省心过,Linux下是可以正常运行的):

  1. [2019-02-17 01:07:56,505: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)',)

github 上看看开发者自己怎么说的吧,对Windows无爱了,不过可以换一种方式执行:

  1. celery -A tasks worker --pool=solo -l info
  2. // -l info == --loglevel=info 两者是等价的

然后,再次刚才的任务提交:
Celery分布式任务调度 - 图8

这说明刚才提交的任务已经被调度执行了,结果是7654321,这个结果是通过--loglevel=info来控制展示在终端上的,本来这些信息是作为日志记录的。

由于没有连接backend,这个结果是不会长时间存在的,也没有返回给调用方。

Celery采用了简单灵活的构架,主进程并不关心有多少个可用的worker被创建出来了,也不需要知道worker到底运行在哪个服务器上,只需要知道队列在哪,通过什么样的方式提交任务即可。

Celery + RabbitMQ + Redis

安装

ubuntu_1上部署redis
1.安装

  1. sudo apt-get install redis-server

Celery分布式任务调度 - 图9

2.运行

  1. sudo service redis-server start //启动
  2. sudo service redis-server stop //停止
  3. sudo service redis-server restart //重启

配置

redisrabbitmq一样,默认是不支持外网的访问的,需要对它进行配置。在/etc/redis目录下,有redis.conf配置文件。

  1. sudo vim redis.conf

Celery分布式任务调度 - 图10

将这一行注释掉,默认是开启的,表示只接受环路网络的访问。完成后重新启动,可以查看运行状态,最后一行的*表示接受所有网段的访问,没有刚才的配置的话,是127.0.0.1

Celery分布式任务调度 - 图11

使用

在使用CeleryWindow端,因为绑定了redis作为backend,所以还需要安装redis模块,才能正常使用。

Celery分布式任务调度 - 图12

然后,我们在Windows端来启动服务。

Celery分布式任务调度 - 图13

这时,刚才使用过的脚本文件要进行修改,将我们的backend信息添加进去。

  1. import celery
  2. broker = 'amqp://192.168.160.128/'
  3. backend = 'redis://192.168.160.129'
  4. app = celery.Celery('test', broker=broker, backend=backend)
  5. @app.task
  6. def reverse(msg):
  7. return msg[::-1]

现在,我们开启另外一个提示符窗口,用来执行我们的业务逻辑。任务提交之后,它会返回给我们一个AsynResult对象,利用这个对象,我们就可以知道任务执行的情况如何了。

Celery分布式任务调度 - 图14

  • ready():用来查询任务是否完成
  • get():阻塞式调用,获取任务完成后返回的结果,若未完成,进行等待

更详细的方法参阅 这里

Celery + RabbitMQ + Redis(升级版)

背景

通过刚才的使用案例,我们基本熟悉了分布式计算结构中需要的不同模块,以及每个模块的部署和使用方式。但是,结合前面系统结构图,我们可以看到,这并不是严格意义上的分布式。

Celery分布式任务调度 - 图15

因为我自己的机器资源有限,逻辑拆分的不彻底,Celery workers主业务的逻辑部署在一台机器上了,使用起来总觉得有些别扭。现在我们将这些逻辑拆开,实现一台机器上发布任务到队列,另一台机器从里面取出然后执行

Celery分布式任务调度 - 图16

现在,从ubuntu_0这台机器上提交任务到RabbitMQ(虽然这个又部署在了同一台机器,但问题不大,因为他们耦合度很低),Windows上的Celery workerRabbitMQ中获取任务进行执行,将结果保存到Redis

具体组件的部署和运行前面已将做了介绍了,现在主要关注下Celery逻辑的拆分问题。

Windows端部署

这里是任务的执行端,只需要一个Python脚本文件tasks.py,就可以实现对队列的任务监听。

  1. # tasks.py
  2. import celery
  3. import time
  4. broker = 'amqp://192.168.160.128/'
  5. backend = 'redis://192.168.160.129'
  6. app = celery.Celery('Windows', broker=broker, backend=backend)
  7. def sleep():
  8. time.sleep(1)
  9. @app.task
  10. def echo(msg):
  11. # 这里可以调用其他函数
  12. sleep()
  13. return msg

然后就可以启动这个服务进行等待了。

Celery分布式任务调度 - 图17

红色方框表示的是这里的worker正在等待的任务。稍后我们就会从Ubuntu_0中提交这样的任务。

Ubuntu_0部署

我们需要两个Python文件,一个和Celery任务有关—— tasks.py,一个主要负责我们的逻辑——main.py

tasks.py是为了告诉程序要将任务提交到到哪里去执行,提交什么样的任务,所以名字要和 Windows 端的对应。这里任务函数没有任何业务逻辑在里面,因为这个任务并不是在本机执行,而是在Windows端,那里已经存放了函数主体,这里这样写只是为了发送任务时形式上能对应上。

在我们使用C语言时,我们会定义自己的头文件,将一些逻辑分离出来,头文件里写的都是关于函数的声明,函数的主体在另外的地方,差不多的道理。

  1. # tasks.py
  2. import celery
  3. broker = 'ampq:/192.168.160.128'
  4. backend = 'redis://192.168.160.129'
  5. app = celery.Celery('???', broker=broker, backend=backend) # 第一个参数写什么无所谓了,这里只负责发送任务;省略第一个参数其实也可以
  6. @app.task
  7. def echo(msg):
  8. pass

main.py中就是我们的主要逻辑了。

  1. # main.py
  2. from tasks import app
  3. for i in range(3):
  4. tasks.echo.delay("asd") # celery 中写上 echo 函数的目的就在这里,否则这句调用会报错

ubuntu_0中运行程序提交任务

Celery分布式任务调度 - 图18

Windows端的worker收到任务开始工作

Celery分布式任务调度 - 图19

还可以采用另外一种更加直白的方式(通过 send_task() 函数)来发送任务,我们在tasks.py中甚至都不需要声明函数,文件名也不需要强制使用 tasks.py 的形式。但是任务的名字一定要写对,与Windows端服务启动界面用红框标出的名字是一致的。任务接收的参数也要放在列表或元组里面。

  1. # 采用这种方式,文件的文件名没有强制要求
  2. # whatever.py
  3. import celery
  4. broker = 'ampq:/192.168.160.128'
  5. backend = 'redis://192.168.160.129'
  6. app = celery.Celery('???', broker=broker, backend=backend)
  1. from my_celery import app
  2. for i in range(3):
  3. app.send_task('tasks.echo', ["asd"]) # 第一个参数一定要写对,与 windows 端的对应上

这样一来,就非常接近我们的最初目标了,再看下面的结构图,要好好体会一下各个模块间的关系。

Celery分布式任务调度 - 图20

补充

清空RabbitMQ

在使用 app.send_task() 远程提交任务的时候,可能会把任务名称输错;或者是其他的一些原因造成任务队列中的任务是有错误的,不能被 worker 消费掉,但因为任务队列的设计机制,这个任务如果没有被消费,将会很长一段时间留在队列中。这就比较尴尬了,因为任务出错—>不能消费—>留在队列等待消费—>任务出错… 我们可以手动将队列中的任务清除掉

  1. sudo rabbimqctl list_queues # 用来查看当前所有的任务队列

Celery分布式任务调度 - 图21

可以看到,目前只有一个发往 celery 的任务队列,里面已经积攒了 9 个任务了。

  1. sudo rabbitmqctl purge_queue <queue_name> # 将指定的队列清除

Celery分布式任务调度 - 图22

现在就将队列中因为各种原因滞留下来的任务清除掉了。

总结

这部分内容网上没有找到太多资料,特别是在多机环境中部署的情况。Celery官网教程 使用的案例也都是在一台主机环境下的,不能满足我们分机部署的需求。而 RabbitMQ 作为官方指定的队列工具,其官方教程却是用了 Pika 取代Celery来讲解 任务调度 的模型……

可能单纯使用Celery的情况并不多见,因为它已经可以与FlaskDjango这样流行Web框架相结合使用。

关于多机部署的方式也是自己不断尝试琢磨出来的,按照自己的理解发现能正常运行,就写在笔记里了,如果其中有什么错误,或者有相关的官方文档,希望可以在评论区提醒我,也可以给我发邮件:yuchentianx@gmail.com