celery快速入门

简介

Celery 是 Python 开发的分布式任务调度模块,Celery本身不含消息服务,它使用第三方消息服务来传递任务,目前,Celery支持的消息服务有RabbitMQ、Redis甚至是数据库,当然Redis应该是最佳选择(主要因为它太简单了)

Celery 的架构由以下几部分组成:

  • message broker:消息中间件
  • worker:任务执行单元
  • task result store:任务执行结果存储
  • backend:任务结果存储

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental), CouchDB (experimental), SQLAlchemy (experimental), Django ORM (experimental), IronMQ

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache

另外, Celery还支持不同的并发和序列化的手段

并发

Prefork, Eventlet, gevent, threads/single threaded

序列化

pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等

安装

  1. $ sudo pip install celery
  2. $ sudo pip install celery-with-redis
  3. $ celery -A tasks worker --loglevel=info

异步任务

使用celery实现异步任务主要包含三个步骤:

  1. 创建一个celery实例
  2. 启动celery worker
  3. 应用程序调用异步任务

源码分享

  1. #!/home/xingming/pyvirt/bin/python
  2. #-*- coding:utf-8 -*-
  3. #############################################
  4. # File Name: main.py
  5. # Author: xiaoh
  6. # Mail: p.mars@163.com·
  7. # Created Time: 2015-12-15 20:50:46
  8. #############################################
  9. from flask import Flask
  10. from tasks import *
  11. import uuid, redis
  12. app = Flask(__name__)
  13. r = redis.Redis()
  14. @app.route('/<msg>')
  15. def message(msg):
  16. hello.delay(msg)
  17. return msg
  18. @app.route('/task/')
  19. def doTask():
  20. id = str(uuid.uuid1()).replace('-', '')[:8]
  21. changeValue.delay(id)
  22. return id
  23. @app.route('/result/<id>')
  24. def result(id):
  25. re = r.get(id)
  26. return re
  27. if __name__ == "__main__":
  28. app.debug = True
  29. app.run()

一个 Flask 的 Web 服务,调用了 Celery 去执行相关的异步操作。

  1. #!/home/xingming/pyvirt/bin/python
  2. #-*- coding:utf-8 -*-
  3. #############################################
  4. # File Name: tasks.py
  5. # Author: xiaoh
  6. # Mail: p.mars@163.com·
  7. # Created Time: 2015-12-16 00:09:44
  8. #############################################
  9. from celery import Celery
  10. import time, redis
  11. celery = Celery('tasks', broker='redis://localhost:6379')
  12. r = redis.Redis(host='127.0.0.1')
  13. @celery.task
  14. def hello(name):
  15. for i in range(3):
  16. print 'hello,%s' % name
  17. time.sleep(1)
  18. @celery.task
  19. def changeValue(id):
  20. for i in range(100):
  21. r.set(id, i)
  22. print 'id:%s' % i
  23. time.sleep(1)

代码可以分为两层:

第一种为 Flask 的 Message 函数,调用的 Celery 的 Hello 函数,这个函数没有返回,这种模式适合做批处理之类的工作,比如发个邮件,做个统计什么的

第二种则是 changeValue 这个函数,他执行了一些列的工作,之后将结果写入到了数据库中,这个数据库仍然可以称作中间件,因为他记录了中间的结果,在 Flask 那边需要再启动一个接口来获取任务执行的结果,这样就可以实现了异步接口的调用。