celery快速入门
简介
Celery 是 Python 开发的分布式任务调度模块,Celery本身不含消息服务,它使用第三方消息服务来传递任务,目前,Celery支持的消息服务有RabbitMQ、Redis甚至是数据库,当然Redis应该是最佳选择(主要因为它太简单了)
Celery 的架构由以下几部分组成:
- message broker:消息中间件
- worker:任务执行单元
- task result store:任务执行结果存储
- backend:任务结果存储
消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental), CouchDB (experimental), SQLAlchemy (experimental), Django ORM (experimental), IronMQ
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache
另外, Celery还支持不同的并发和序列化的手段
并发
Prefork, Eventlet, gevent, threads/single threaded
序列化
pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等
安装
$ sudo pip install celery$ sudo pip install celery-with-redis$ celery -A tasks worker --loglevel=info
异步任务
使用celery实现异步任务主要包含三个步骤:
- 创建一个celery实例
- 启动celery worker
- 应用程序调用异步任务
源码分享
#!/home/xingming/pyvirt/bin/python#-*- coding:utf-8 -*-############################################## File Name: main.py# Author: xiaoh# Mail: p.mars@163.com·# Created Time: 2015-12-15 20:50:46#############################################from flask import Flaskfrom tasks import *import uuid, redisapp = Flask(__name__)r = redis.Redis()@app.route('/<msg>')def message(msg):hello.delay(msg)return msg@app.route('/task/')def doTask():id = str(uuid.uuid1()).replace('-', '')[:8]changeValue.delay(id)return id@app.route('/result/<id>')def result(id):re = r.get(id)return reif __name__ == "__main__":app.debug = Trueapp.run()
一个 Flask 的 Web 服务,调用了 Celery 去执行相关的异步操作。
#!/home/xingming/pyvirt/bin/python#-*- coding:utf-8 -*-############################################## File Name: tasks.py# Author: xiaoh# Mail: p.mars@163.com·# Created Time: 2015-12-16 00:09:44#############################################from celery import Celeryimport time, rediscelery = Celery('tasks', broker='redis://localhost:6379')r = redis.Redis(host='127.0.0.1')@celery.taskdef hello(name):for i in range(3):print 'hello,%s' % nametime.sleep(1)@celery.taskdef changeValue(id):for i in range(100):r.set(id, i)print 'id:%s' % itime.sleep(1)
代码可以分为两层:
第一种为 Flask 的 Message 函数,调用的 Celery 的 Hello 函数,这个函数没有返回,这种模式适合做批处理之类的工作,比如发个邮件,做个统计什么的
第二种则是 changeValue 这个函数,他执行了一些列的工作,之后将结果写入到了数据库中,这个数据库仍然可以称作中间件,因为他记录了中间的结果,在 Flask 那边需要再启动一个接口来获取任务执行的结果,这样就可以实现了异步接口的调用。
