踩了两天的坑终于总结出来了

前提

本文写于2021年1月17日
需要安装的依赖有:
redis,python3.8
pip list:
Django 3.1.x
django-celery-beat 2.1.0
django-celery-results 2.0.0
—————————————-
不需要安装django-celery 不需要不需要不需要!!!

目录:
image.png

init.py

  1. from __future__ import absolute_import, unicode_literals
  2. from .celery_ import app as celery_app
  3. __all__ = ['celery_app']

celery_.py

  1. from __future__ import absolute_import, unicode_literals
  2. import os
  3. from celery import Celery
  4. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DjangoCelery.settings') # 设置django环境
  5. app = Celery('DjangoCelery')
  6. app.config_from_object('django.conf:settings', namespace='CELERY') # 使用CELERY_ 作为前缀,在settings中写配置
  7. app.autodiscover_tasks() # 发现任务文件每个app下的task.py
  8. @app.task(bind=True)
  9. def debug_task(self):
  10. print('Request: {0!r}'.format(self.request))

tasks.py

users/tasks.py

  1. from __future__ import absolute_import, unicode_literals
  2. from celery import shared_task
  3. import datetime
  4. @shared_task
  5. def add(x, y):
  6. res = x + y
  7. time_format = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  8. print('当前时间为:' + time_format + ' ,两个数相加的结果为:')
  9. print(res)
  10. return res
  11. @shared_task
  12. def mul(x, y):
  13. res = x * y
  14. time_format = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  15. print('当前时间为:' + time_format + ' ,两个数相乘的结果为:')
  16. print(res)
  17. return res

settings.py

  1. from celery.schedules import crontab
  2. from datetime import timedelta
  3. CELERY_BROKER_URL = 'redis://:123456@127.0.0.1:6379/0' # 设置redis为消息队列
  4. CELERY_BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 43200}
  5. CELERY_RESULT_BACKEND = 'django-db://root:Admin123!@81.69.46.120:3306/HiMoProject' # BACKEND配置,这里使用MySQl
  6. CELERY_TASK_SERIALIZER = 'json'
  7. CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
  8. CELERY_ENABLE_UTC = False # 如果配置了时间为TIME_ZONE = 'Asia/Shanghai'这里必须要设置false
  9. DJANGO_CELERY_BEAT_TZ_AWARE = False
  10. CELERY_BEAT_SCHEDULE = {
  11. # 每十秒执行一次add方法
  12. 'add-every-10-seconds': {
  13. 'task': 'users.tasks.add',
  14. # 多长时间执行一次
  15. 'schedule': 1.0, # 支持直接用数字表示秒数
  16. # 'schedule': timedelta(seconds=10), # 可以用timedelta对象
  17. # 必要的参数,这里指add()的参数
  18. 'args': (16, 26)
  19. },
  20. # 每个周一的20:57分执行一次mul方法
  21. 'add-every-monday-morning': {
  22. 'task': 'users.tasks.mul',
  23. 'schedule': crontab(hour=20, minute=57, day_of_week=1),
  24. 'args': (16, 16),
  25. },
  26. }

配置完了之后千万不要忘记执行python3 manage.py migrate

测试

在命令行内输入以下两段代码(需要开启两个终端)
celery -A DjangoCelery worker -l debug debug为输出的日志级别。一般设置为info,这里是开启worker
celery -A DjangoCelery beat -l debug 这里是开启定时任务,django settings 里设置的定时任务(CELERY_BEAT_SCHEDULE)

终端2

包含beat语句
image.png

终端1

image.png

Admin设置定时任务

进入django自带的admin
1610865020881.jpg
具体怎么配置就不写了,翻译成中文就都看得懂了

ORM

  1. from django_celery_results.backends.database import TaskResult
  2. # 以下是官方源码
  3. class TaskResult(models.Model):
  4. """Task result/status."""
  5. task_id = models.CharField(
  6. max_length=getattr(
  7. settings,
  8. 'DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH',
  9. 255
  10. ),
  11. unique=True, db_index=True,
  12. verbose_name=_('Task ID'),
  13. help_text=_('Celery ID for the Task that was run'))
  14. task_name = models.CharField(
  15. null=True, max_length=255, db_index=True,
  16. verbose_name=_('Task Name'),
  17. help_text=_('Name of the Task which was run'))
  18. task_args = models.TextField(
  19. null=True,
  20. verbose_name=_('Task Positional Arguments'),
  21. help_text=_('JSON representation of the positional arguments '
  22. 'used with the task'))
  23. task_kwargs = models.TextField(
  24. null=True,
  25. verbose_name=_('Task Named Arguments'),
  26. help_text=_('JSON representation of the named arguments '
  27. 'used with the task'))
  28. status = models.CharField(
  29. max_length=50, default=states.PENDING, db_index=True,
  30. choices=TASK_STATE_CHOICES,
  31. verbose_name=_('Task State'),
  32. help_text=_('Current state of the task being run'))
  33. worker = models.CharField(
  34. max_length=100, db_index=True, default=None, null=True,
  35. verbose_name=_('Worker'), help_text=_('Worker that executes the task')
  36. )
  37. content_type = models.CharField(
  38. max_length=128,
  39. verbose_name=_('Result Content Type'),
  40. help_text=_('Content type of the result data'))
  41. content_encoding = models.CharField(
  42. max_length=64,
  43. verbose_name=_('Result Encoding'),
  44. help_text=_('The encoding used to save the task result data'))
  45. result = models.TextField(
  46. null=True, default=None, editable=False,
  47. verbose_name=_('Result Data'),
  48. help_text=_('The data returned by the task. '
  49. 'Use content_encoding and content_type fields to read.'))
  50. date_created = models.DateTimeField(
  51. auto_now_add=True, db_index=True,
  52. verbose_name=_('Created DateTime'),
  53. help_text=_('Datetime field when the task result was created in UTC'))
  54. date_done = models.DateTimeField(
  55. auto_now=True, db_index=True,
  56. verbose_name=_('Completed DateTime'),
  57. help_text=_('Datetime field when the task was completed in UTC'))
  58. traceback = models.TextField(
  59. blank=True, null=True,
  60. verbose_name=_('Traceback'),
  61. help_text=_('Text of the traceback if the task generated one'))
  62. meta = models.TextField(
  63. null=True, default=None, editable=False,
  64. verbose_name=_('Task Meta Information'),
  65. help_text=_('JSON meta information about the task, '
  66. 'such as information on child tasks'))
  67. objects = managers.TaskResultManager()
  68. class Meta:
  69. """Table information."""
  70. ordering = ['-date_done']
  71. verbose_name = _('task result')
  72. verbose_name_plural = _('task results')
  73. def as_dict(self):
  74. return {
  75. 'task_id': self.task_id,
  76. 'task_name': self.task_name,
  77. 'task_args': self.task_args,
  78. 'task_kwargs': self.task_kwargs,
  79. 'status': self.status,
  80. 'result': self.result,
  81. 'date_done': self.date_done,
  82. 'traceback': self.traceback,
  83. 'meta': self.meta,
  84. 'worker': self.worker
  85. }
  86. def __str__(self):
  87. return '<Task: {0.task_id} ({0.status})>'.format(self)

orm中的序列化什么的我就不写了,记住如果改了模型要执行manage.py makemigtations 和 migrate