前提
本文写于2021年1月17日
需要安装的依赖有:
redis,python3.8
pip list:
Django 3.1.x
django-celery-beat            2.1.0
django-celery-results         2.0.0
—————————————-
不需要安装django-celery 不需要不需要不需要!!!
init.py
from __future__ import absolute_import, unicode_literalsfrom .celery_ import app as celery_app__all__ = ['celery_app']
celery_.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DjangoCelery.settings')  # 设置django环境
app = Celery('DjangoCelery')
app.config_from_object('django.conf:settings', namespace='CELERY')  # 使用CELERY_ 作为前缀,在settings中写配置
app.autodiscover_tasks()  # 发现任务文件每个app下的task.py
@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))
tasks.py
users/tasks.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import datetime
@shared_task
def add(x, y):
    res = x + y
    time_format = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('当前时间为:' + time_format + ' ,两个数相加的结果为:')
    print(res)
    return res
@shared_task
def mul(x, y):
    res = x * y
    time_format = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('当前时间为:' + time_format + ' ,两个数相乘的结果为:')
    print(res)
    return res
settings.py
from celery.schedules import crontab
from datetime import timedelta
CELERY_BROKER_URL = 'redis://:123456@127.0.0.1:6379/0'  # 设置redis为消息队列
CELERY_BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 43200} 
CELERY_RESULT_BACKEND = 'django-db://root:Admin123!@81.69.46.120:3306/HiMoProject'  # BACKEND配置,这里使用MySQl
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'  # 结果序列化方案
CELERY_ENABLE_UTC = False  # 如果配置了时间为TIME_ZONE = 'Asia/Shanghai'这里必须要设置false
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULE = {
    # 每十秒执行一次add方法
    'add-every-10-seconds': {
        'task': 'users.tasks.add',
        # 多长时间执行一次
        'schedule': 1.0,  # 支持直接用数字表示秒数
        # 'schedule': timedelta(seconds=10), # 可以用timedelta对象
        # 必要的参数,这里指add()的参数
        'args': (16, 26)
    },
    # 每个周一的20:57分执行一次mul方法
    'add-every-monday-morning': {
        'task': 'users.tasks.mul',
        'schedule': crontab(hour=20, minute=57, day_of_week=1),
        'args': (16, 16),
    },
}
配置完了之后千万不要忘记执行python3 manage.py migrate
测试
在命令行内输入以下两段代码(需要开启两个终端)celery -A DjangoCelery worker -l debug  debug为输出的日志级别。一般设置为info,这里是开启workercelery -A DjangoCelery beat -l debug  这里是开启定时任务,django settings 里设置的定时任务(CELERY_BEAT_SCHEDULE)
终端2
包含beat语句
终端1

Admin设置定时任务
进入django自带的admin
具体怎么配置就不写了,翻译成中文就都看得懂了
ORM
from django_celery_results.backends.database import TaskResult
# 以下是官方源码
class TaskResult(models.Model):
    """Task result/status."""
    task_id = models.CharField(
        max_length=getattr(
            settings,
            'DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH',
            255
        ),
        unique=True, db_index=True,
        verbose_name=_('Task ID'),
        help_text=_('Celery ID for the Task that was run'))
    task_name = models.CharField(
        null=True, max_length=255, db_index=True,
        verbose_name=_('Task Name'),
        help_text=_('Name of the Task which was run'))
    task_args = models.TextField(
        null=True,
        verbose_name=_('Task Positional Arguments'),
        help_text=_('JSON representation of the positional arguments '
                    'used with the task'))
    task_kwargs = models.TextField(
        null=True,
        verbose_name=_('Task Named Arguments'),
        help_text=_('JSON representation of the named arguments '
                    'used with the task'))
    status = models.CharField(
        max_length=50, default=states.PENDING, db_index=True,
        choices=TASK_STATE_CHOICES,
        verbose_name=_('Task State'),
        help_text=_('Current state of the task being run'))
    worker = models.CharField(
        max_length=100, db_index=True, default=None, null=True,
        verbose_name=_('Worker'), help_text=_('Worker that executes the task')
    )
    content_type = models.CharField(
        max_length=128,
        verbose_name=_('Result Content Type'),
        help_text=_('Content type of the result data'))
    content_encoding = models.CharField(
        max_length=64,
        verbose_name=_('Result Encoding'),
        help_text=_('The encoding used to save the task result data'))
    result = models.TextField(
        null=True, default=None, editable=False,
        verbose_name=_('Result Data'),
        help_text=_('The data returned by the task.  '
                    'Use content_encoding and content_type fields to read.'))
    date_created = models.DateTimeField(
        auto_now_add=True, db_index=True,
        verbose_name=_('Created DateTime'),
        help_text=_('Datetime field when the task result was created in UTC'))
    date_done = models.DateTimeField(
        auto_now=True, db_index=True,
        verbose_name=_('Completed DateTime'),
        help_text=_('Datetime field when the task was completed in UTC'))
    traceback = models.TextField(
        blank=True, null=True,
        verbose_name=_('Traceback'),
        help_text=_('Text of the traceback if the task generated one'))
    meta = models.TextField(
        null=True, default=None, editable=False,
        verbose_name=_('Task Meta Information'),
        help_text=_('JSON meta information about the task, '
                    'such as information on child tasks'))
    objects = managers.TaskResultManager()
    class Meta:
        """Table information."""
        ordering = ['-date_done']
        verbose_name = _('task result')
        verbose_name_plural = _('task results')
    def as_dict(self):
        return {
            'task_id': self.task_id,
            'task_name': self.task_name,
            'task_args': self.task_args,
            'task_kwargs': self.task_kwargs,
            'status': self.status,
            'result': self.result,
            'date_done': self.date_done,
            'traceback': self.traceback,
            'meta': self.meta,
            'worker': self.worker
        }
    def __str__(self):
        return '<Task: {0.task_id} ({0.status})>'.format(self)
orm中的序列化什么的我就不写了,记住如果改了模型要执行manage.py makemigtations 和 migrate
