附上url:http://docs.jinkan.org/docs/celery/getting-started/first-steps-with-celery.html
先pip ,然后安装依赖
pip install celery
pip install “celery[amqp,redis,auth,msgpack]”
接着新建根目录文件夹celery,以及旗下py文件tasks
from celery import Celery# 第一个参数是broker的服务器地址, 第二个是脚本的名称app = Celery('tasks', broker='redis://127.0.0.1:16379', backend='redis://127.0.0.1:16379')@app.taskdef add(x, y):return x + y# 启动celery使用: celery --app tasks worker --loglevel=info -P eventlet (记得要在celery目录下执行哦)
新建run_tasks.py (这就是执行的脚本了)
from re import Afrom async_timeout import timeoutfrom tasks import addA = add.delay(4, 4)print(A.ready())run_result = A.get(propagate=False)print(run_result)
这边遇到了一个坑,运行报错 ValueError: not enough values to unpack (expected 3, got 0)
解决方法:
- pip install eventlet
- 重启celery:celery —app tasks worker —loglevel=info -P eventlet
接下来想要更直观的观测到任务的运行状态的话,前往文档http://docs.jinkan.org/docs/celery/userguide/monitoring.html
使用flower:
pip install flower
celery —broker=redis://127.0.0.1:16379/0 flower (不要去指定APP了,大坑!)
celery -A myyuque worker —pool=solo —loglevel=INFO (新的启动celery命令)
这边开始在django中集成celery
附上地址:https://www.celerycn.io/fu-lu/django
在myyuque下新建celery.py
from __future__ import absolute_import, unicode_literalsimport osfrom celery import Celery# set the default Django settings module for the 'celery' program.os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings.base')app = Celery('myyuque')# Using a string here means the worker doesn't have to serialize# the configuration object to child processes.# - namespace='CELERY' means all celery-related configuration keys# should have a `CELERY_` prefix.app.config_from_object('settings.base', namespace='CELERY')# Load task modules from all registered Django app configs.app.autodiscover_tasks()@app.task(bind=True)def debug_task(self):print('Request: {0!r}'.format(self.request))
在base.py下添加配置(有个坑,namespace=’CELERY’ 这个没用上)
# celery# 配置celery,使用redisbroker_url = r'redis://127.0.0.1:16379/0'# 结果存储result_backend = r'redis://127.0.0.1:16379/0'result_serializer = 'json'# 时区timezone = 'Asia/Shanghai'# 过期时间# event_queue_ttl = 5# celery不回复结果CELERY_task_ignore_result = False# 为防止内存泄漏,一个进程执行过N次之后杀死,建议是100次worker_max_tasks_per_child = 10# 错误 DatabaseWrapper objects created in a thread can only be used in that same threadTASK_ALWAYS_EAGER = True
在init下添加
from __future__ import absolute_import, unicode_literals# This will make sure the app is always imported when# Django starts so that shared_task will use this app.from .celery import app as celery_app__all__ = ('celery_app',)
接着添加需要异步的功能,以发送钉钉通知为例,在interview下新增tasks.py
from __future__ import absolute_import, unicode_literalsfrom celery import shared_taskfrom interview.dingtalk import send@shared_taskdef send_message(self, message):send(self,message)
同时将admin里面的notify_interview方法也改了
@admin.action(description=u'通知')def notify_interview(message, request, queryset):candidates = ''interviewers = ''phone = []for obj in queryset:candidates = obj.username + ';' + candidatesinterviewers = obj.first_interviewer_user.username + ';' + interviewersphone.append(int(obj.first_interviewer_user.account_phone))send_message.delay('候选人 %s 准备开始喂大便了,现在有请采屎官 %s 进来喂屎' % (candidates, interviewers), phone)messages.add_message(request, messages.INFO, '已经通知面试官%s !' % interviewers)
