附上url:http://docs.jinkan.org/docs/celery/getting-started/first-steps-with-celery.html
先pip ,然后安装依赖
pip install celery
pip install “celery[amqp,redis,auth,msgpack]”
接着新建根目录文件夹celery,以及旗下py文件tasks
from celery import Celery
# 第一个参数是broker的服务器地址, 第二个是脚本的名称
app = Celery('tasks', broker='redis://127.0.0.1:16379', backend='redis://127.0.0.1:16379')
@app.task
def add(x, y):
return x + y
# 启动celery使用: celery --app tasks worker --loglevel=info -P eventlet (记得要在celery目录下执行哦)
新建run_tasks.py (这就是执行的脚本了)
from re import A
from async_timeout import timeout
from tasks import add
A = add.delay(4, 4)
print(A.ready())
run_result = A.get(propagate=False)
print(run_result)
这边遇到了一个坑,运行报错 ValueError: not enough values to unpack (expected 3, got 0)
解决方法:
- pip install eventlet
- 重启celery:celery —app tasks worker —loglevel=info -P eventlet
接下来想要更直观的观测到任务的运行状态的话,前往文档http://docs.jinkan.org/docs/celery/userguide/monitoring.html
使用flower:
pip install flower
celery —broker=redis://127.0.0.1:16379/0 flower (不要去指定APP了,大坑!)
celery -A myyuque worker —pool=solo —loglevel=INFO (新的启动celery命令)
这边开始在django中集成celery
附上地址:https://www.celerycn.io/fu-lu/django
在myyuque下新建celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings.base')
app = Celery('myyuque')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('settings.base', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
在base.py下添加配置(有个坑,namespace=’CELERY’ 这个没用上)
# celery
# 配置celery,使用redis
broker_url = r'redis://127.0.0.1:16379/0'
# 结果存储
result_backend = r'redis://127.0.0.1:16379/0'
result_serializer = 'json'
# 时区
timezone = 'Asia/Shanghai'
# 过期时间
# event_queue_ttl = 5
# celery不回复结果
CELERY_task_ignore_result = False
# 为防止内存泄漏,一个进程执行过N次之后杀死,建议是100次
worker_max_tasks_per_child = 10
# 错误 DatabaseWrapper objects created in a thread can only be used in that same thread
TASK_ALWAYS_EAGER = True
在init下添加
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
接着添加需要异步的功能,以发送钉钉通知为例,在interview下新增tasks.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from interview.dingtalk import send
@shared_task
def send_message(self, message):
send(self,message)
同时将admin里面的notify_interview方法也改了
@admin.action(description=u'通知')
def notify_interview(message, request, queryset):
candidates = ''
interviewers = ''
phone = []
for obj in queryset:
candidates = obj.username + ';' + candidates
interviewers = obj.first_interviewer_user.username + ';' + interviewers
phone.append(int(obj.first_interviewer_user.account_phone))
send_message.delay('候选人 %s 准备开始喂大便了,现在有请采屎官 %s 进来喂屎' % (candidates, interviewers), phone)
messages.add_message(request, messages.INFO, '已经通知面试官%s !' % interviewers)