踩了两天的坑终于总结出来了

前提

本文写于2021年1月17日
需要安装的依赖有:
redis,python3.8
pip list:
Django 3.1.x
django-celery-beat 2.1.0
django-celery-results 2.0.0
—————————————-
不需要安装django-celery 不需要不需要不需要!!!

目录:
image.png

init.py

  1. from __future__ import absolute_import, unicode_literals
  2. from .celery_ import app as celery_app
  3. __all__ = ['celery_app']

celery_.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DjangoCelery.settings')  # 设置django环境

app = Celery('DjangoCelery')

app.config_from_object('django.conf:settings', namespace='CELERY')  # 使用CELERY_ 作为前缀,在settings中写配置

app.autodiscover_tasks()  # 发现任务文件每个app下的task.py


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

tasks.py

users/tasks.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task
import datetime


@shared_task
def add(x, y):
    res = x + y
    time_format = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('当前时间为:' + time_format + ' ,两个数相加的结果为:')
    print(res)
    return res


@shared_task
def mul(x, y):
    res = x * y
    time_format = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('当前时间为:' + time_format + ' ,两个数相乘的结果为:')
    print(res)
    return res

settings.py

from celery.schedules import crontab
from datetime import timedelta

CELERY_BROKER_URL = 'redis://:123456@127.0.0.1:6379/0'  # 设置redis为消息队列
CELERY_BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 43200} 
CELERY_RESULT_BACKEND = 'django-db://root:Admin123!@81.69.46.120:3306/HiMoProject'  # BACKEND配置,这里使用MySQl
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'  # 结果序列化方案
CELERY_ENABLE_UTC = False  # 如果配置了时间为TIME_ZONE = 'Asia/Shanghai'这里必须要设置false
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULE = {
    # 每十秒执行一次add方法
    'add-every-10-seconds': {
        'task': 'users.tasks.add',

        # 多长时间执行一次
        'schedule': 1.0,  # 支持直接用数字表示秒数
        # 'schedule': timedelta(seconds=10), # 可以用timedelta对象

        # 必要的参数,这里指add()的参数
        'args': (16, 26)
    },
    # 每个周一的20:57分执行一次mul方法
    'add-every-monday-morning': {
        'task': 'users.tasks.mul',
        'schedule': crontab(hour=20, minute=57, day_of_week=1),
        'args': (16, 16),
    },
}

配置完了之后千万不要忘记执行python3 manage.py migrate

测试

在命令行内输入以下两段代码(需要开启两个终端)
celery -A DjangoCelery worker -l debug debug为输出的日志级别。一般设置为info,这里是开启worker
celery -A DjangoCelery beat -l debug 这里是开启定时任务,django settings 里设置的定时任务(CELERY_BEAT_SCHEDULE)

终端2

包含beat语句
image.png

终端1

image.png

Admin设置定时任务

进入django自带的admin
1610865020881.jpg
具体怎么配置就不写了,翻译成中文就都看得懂了

ORM

from django_celery_results.backends.database import TaskResult
# 以下是官方源码
class TaskResult(models.Model):
    """Task result/status."""

    task_id = models.CharField(
        max_length=getattr(
            settings,
            'DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH',
            255
        ),
        unique=True, db_index=True,
        verbose_name=_('Task ID'),
        help_text=_('Celery ID for the Task that was run'))
    task_name = models.CharField(
        null=True, max_length=255, db_index=True,
        verbose_name=_('Task Name'),
        help_text=_('Name of the Task which was run'))
    task_args = models.TextField(
        null=True,
        verbose_name=_('Task Positional Arguments'),
        help_text=_('JSON representation of the positional arguments '
                    'used with the task'))
    task_kwargs = models.TextField(
        null=True,
        verbose_name=_('Task Named Arguments'),
        help_text=_('JSON representation of the named arguments '
                    'used with the task'))
    status = models.CharField(
        max_length=50, default=states.PENDING, db_index=True,
        choices=TASK_STATE_CHOICES,
        verbose_name=_('Task State'),
        help_text=_('Current state of the task being run'))
    worker = models.CharField(
        max_length=100, db_index=True, default=None, null=True,
        verbose_name=_('Worker'), help_text=_('Worker that executes the task')
    )
    content_type = models.CharField(
        max_length=128,
        verbose_name=_('Result Content Type'),
        help_text=_('Content type of the result data'))
    content_encoding = models.CharField(
        max_length=64,
        verbose_name=_('Result Encoding'),
        help_text=_('The encoding used to save the task result data'))
    result = models.TextField(
        null=True, default=None, editable=False,
        verbose_name=_('Result Data'),
        help_text=_('The data returned by the task.  '
                    'Use content_encoding and content_type fields to read.'))
    date_created = models.DateTimeField(
        auto_now_add=True, db_index=True,
        verbose_name=_('Created DateTime'),
        help_text=_('Datetime field when the task result was created in UTC'))
    date_done = models.DateTimeField(
        auto_now=True, db_index=True,
        verbose_name=_('Completed DateTime'),
        help_text=_('Datetime field when the task was completed in UTC'))
    traceback = models.TextField(
        blank=True, null=True,
        verbose_name=_('Traceback'),
        help_text=_('Text of the traceback if the task generated one'))
    meta = models.TextField(
        null=True, default=None, editable=False,
        verbose_name=_('Task Meta Information'),
        help_text=_('JSON meta information about the task, '
                    'such as information on child tasks'))

    objects = managers.TaskResultManager()

    class Meta:
        """Table information."""

        ordering = ['-date_done']

        verbose_name = _('task result')
        verbose_name_plural = _('task results')

    def as_dict(self):
        return {
            'task_id': self.task_id,
            'task_name': self.task_name,
            'task_args': self.task_args,
            'task_kwargs': self.task_kwargs,
            'status': self.status,
            'result': self.result,
            'date_done': self.date_done,
            'traceback': self.traceback,
            'meta': self.meta,
            'worker': self.worker
        }

    def __str__(self):
        return '<Task: {0.task_id} ({0.status})>'.format(self)

orm中的序列化什么的我就不写了,记住如果改了模型要执行manage.py makemigtations 和 migrate