前提
本文写于2021年1月17日
需要安装的依赖有:
redis,python3.8
pip list:
Django 3.1.x
django-celery-beat 2.1.0
django-celery-results 2.0.0
—————————————-
不需要安装django-celery 不需要不需要不需要!!!
init.py
from __future__ import absolute_import, unicode_literals
from .celery_ import app as celery_app
__all__ = ['celery_app']
celery_.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DjangoCelery.settings') # 设置django环境
app = Celery('DjangoCelery')
app.config_from_object('django.conf:settings', namespace='CELERY') # 使用CELERY_ 作为前缀,在settings中写配置
app.autodiscover_tasks() # 发现任务文件每个app下的task.py
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
tasks.py
users/tasks.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import datetime
@shared_task
def add(x, y):
res = x + y
time_format = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('当前时间为:' + time_format + ' ,两个数相加的结果为:')
print(res)
return res
@shared_task
def mul(x, y):
res = x * y
time_format = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('当前时间为:' + time_format + ' ,两个数相乘的结果为:')
print(res)
return res
settings.py
from celery.schedules import crontab
from datetime import timedelta
CELERY_BROKER_URL = 'redis://:123456@127.0.0.1:6379/0' # 设置redis为消息队列
CELERY_BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 43200}
CELERY_RESULT_BACKEND = 'django-db://root:Admin123!@81.69.46.120:3306/HiMoProject' # BACKEND配置,这里使用MySQl
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
CELERY_ENABLE_UTC = False # 如果配置了时间为TIME_ZONE = 'Asia/Shanghai'这里必须要设置false
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULE = {
# 每十秒执行一次add方法
'add-every-10-seconds': {
'task': 'users.tasks.add',
# 多长时间执行一次
'schedule': 1.0, # 支持直接用数字表示秒数
# 'schedule': timedelta(seconds=10), # 可以用timedelta对象
# 必要的参数,这里指add()的参数
'args': (16, 26)
},
# 每个周一的20:57分执行一次mul方法
'add-every-monday-morning': {
'task': 'users.tasks.mul',
'schedule': crontab(hour=20, minute=57, day_of_week=1),
'args': (16, 16),
},
}
配置完了之后千万不要忘记执行python3 manage.py migrate
测试
在命令行内输入以下两段代码(需要开启两个终端)celery -A DjangoCelery worker -l debug
debug为输出的日志级别。一般设置为info,这里是开启workercelery -A DjangoCelery beat -l debug
这里是开启定时任务,django settings 里设置的定时任务(CELERY_BEAT_SCHEDULE)
终端2
包含beat语句
终端1
Admin设置定时任务
进入django自带的admin
具体怎么配置就不写了,翻译成中文就都看得懂了
ORM
from django_celery_results.backends.database import TaskResult
# 以下是官方源码
class TaskResult(models.Model):
"""Task result/status."""
task_id = models.CharField(
max_length=getattr(
settings,
'DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH',
255
),
unique=True, db_index=True,
verbose_name=_('Task ID'),
help_text=_('Celery ID for the Task that was run'))
task_name = models.CharField(
null=True, max_length=255, db_index=True,
verbose_name=_('Task Name'),
help_text=_('Name of the Task which was run'))
task_args = models.TextField(
null=True,
verbose_name=_('Task Positional Arguments'),
help_text=_('JSON representation of the positional arguments '
'used with the task'))
task_kwargs = models.TextField(
null=True,
verbose_name=_('Task Named Arguments'),
help_text=_('JSON representation of the named arguments '
'used with the task'))
status = models.CharField(
max_length=50, default=states.PENDING, db_index=True,
choices=TASK_STATE_CHOICES,
verbose_name=_('Task State'),
help_text=_('Current state of the task being run'))
worker = models.CharField(
max_length=100, db_index=True, default=None, null=True,
verbose_name=_('Worker'), help_text=_('Worker that executes the task')
)
content_type = models.CharField(
max_length=128,
verbose_name=_('Result Content Type'),
help_text=_('Content type of the result data'))
content_encoding = models.CharField(
max_length=64,
verbose_name=_('Result Encoding'),
help_text=_('The encoding used to save the task result data'))
result = models.TextField(
null=True, default=None, editable=False,
verbose_name=_('Result Data'),
help_text=_('The data returned by the task. '
'Use content_encoding and content_type fields to read.'))
date_created = models.DateTimeField(
auto_now_add=True, db_index=True,
verbose_name=_('Created DateTime'),
help_text=_('Datetime field when the task result was created in UTC'))
date_done = models.DateTimeField(
auto_now=True, db_index=True,
verbose_name=_('Completed DateTime'),
help_text=_('Datetime field when the task was completed in UTC'))
traceback = models.TextField(
blank=True, null=True,
verbose_name=_('Traceback'),
help_text=_('Text of the traceback if the task generated one'))
meta = models.TextField(
null=True, default=None, editable=False,
verbose_name=_('Task Meta Information'),
help_text=_('JSON meta information about the task, '
'such as information on child tasks'))
objects = managers.TaskResultManager()
class Meta:
"""Table information."""
ordering = ['-date_done']
verbose_name = _('task result')
verbose_name_plural = _('task results')
def as_dict(self):
return {
'task_id': self.task_id,
'task_name': self.task_name,
'task_args': self.task_args,
'task_kwargs': self.task_kwargs,
'status': self.status,
'result': self.result,
'date_done': self.date_done,
'traceback': self.traceback,
'meta': self.meta,
'worker': self.worker
}
def __str__(self):
return '<Task: {0.task_id} ({0.status})>'.format(self)
orm中的序列化什么的我就不写了,记住如果改了模型要执行manage.py makemigtations 和 migrate