附上url:http://docs.jinkan.org/docs/celery/getting-started/first-steps-with-celery.html
    先pip ,然后安装依赖
    pip install celery
    pip install “celery[amqp,redis,auth,msgpack]”
    接着新建根目录文件夹celery,以及旗下py文件tasks

    1. from celery import Celery
    2. # 第一个参数是broker的服务器地址, 第二个是脚本的名称
    3. app = Celery('tasks', broker='redis://127.0.0.1:16379', backend='redis://127.0.0.1:16379')
    4. @app.task
    5. def add(x, y):
    6. return x + y
    7. # 启动celery使用: celery --app tasks worker --loglevel=info -P eventlet (记得要在celery目录下执行哦)

    新建run_tasks.py (这就是执行的脚本了)

    1. from re import A
    2. from async_timeout import timeout
    3. from tasks import add
    4. A = add.delay(4, 4)
    5. print(A.ready())
    6. run_result = A.get(propagate=False)
    7. print(run_result)

    这边遇到了一个坑,运行报错 ValueError: not enough values to unpack (expected 3, got 0)
    解决方法:

    1. pip install eventlet
    2. 重启celery:celery —app tasks worker —loglevel=info -P eventlet

    接下来想要更直观的观测到任务的运行状态的话,前往文档http://docs.jinkan.org/docs/celery/userguide/monitoring.html
    使用flower:
    pip install flower
    celery —broker=redis://127.0.0.1:16379/0 flower (不要去指定APP了,大坑!)
    celery -A myyuque worker —pool=solo —loglevel=INFO (新的启动celery命令)

    这边开始在django中集成celery
    附上地址:https://www.celerycn.io/fu-lu/django
    在myyuque下新建celery.py

    1. from __future__ import absolute_import, unicode_literals
    2. import os
    3. from celery import Celery
    4. # set the default Django settings module for the 'celery' program.
    5. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings.base')
    6. app = Celery('myyuque')
    7. # Using a string here means the worker doesn't have to serialize
    8. # the configuration object to child processes.
    9. # - namespace='CELERY' means all celery-related configuration keys
    10. # should have a `CELERY_` prefix.
    11. app.config_from_object('settings.base', namespace='CELERY')
    12. # Load task modules from all registered Django app configs.
    13. app.autodiscover_tasks()
    14. @app.task(bind=True)
    15. def debug_task(self):
    16. print('Request: {0!r}'.format(self.request))

    在base.py下添加配置(有个坑,namespace=’CELERY’ 这个没用上)

    1. # celery
    2. # 配置celery,使用redis
    3. broker_url = r'redis://127.0.0.1:16379/0'
    4. # 结果存储
    5. result_backend = r'redis://127.0.0.1:16379/0'
    6. result_serializer = 'json'
    7. # 时区
    8. timezone = 'Asia/Shanghai'
    9. # 过期时间
    10. # event_queue_ttl = 5
    11. # celery不回复结果
    12. CELERY_task_ignore_result = False
    13. # 为防止内存泄漏,一个进程执行过N次之后杀死,建议是100次
    14. worker_max_tasks_per_child = 10
    15. # 错误 DatabaseWrapper objects created in a thread can only be used in that same thread
    16. TASK_ALWAYS_EAGER = True

    init下添加

    1. from __future__ import absolute_import, unicode_literals
    2. # This will make sure the app is always imported when
    3. # Django starts so that shared_task will use this app.
    4. from .celery import app as celery_app
    5. __all__ = ('celery_app',)

    接着添加需要异步的功能,以发送钉钉通知为例,在interview下新增tasks.py

    1. from __future__ import absolute_import, unicode_literals
    2. from celery import shared_task
    3. from interview.dingtalk import send
    4. @shared_task
    5. def send_message(self, message):
    6. send(self,message)

    同时将admin里面的notify_interview方法也改了

    1. @admin.action(description=u'通知')
    2. def notify_interview(message, request, queryset):
    3. candidates = ''
    4. interviewers = ''
    5. phone = []
    6. for obj in queryset:
    7. candidates = obj.username + ';' + candidates
    8. interviewers = obj.first_interviewer_user.username + ';' + interviewers
    9. phone.append(int(obj.first_interviewer_user.account_phone))
    10. send_message.delay('候选人 %s 准备开始喂大便了,现在有请采屎官 %s 进来喂屎' % (candidates, interviewers), phone)
    11. messages.add_message(request, messages.INFO, '已经通知面试官%s !' % interviewers)