定期任务:Periodic Tasks
    首先安装beat
    pip install django-celery-beat
    然后在settings里面加上

    1. INSTALLED_APPS = (
    2. ...,
    3. 'django_celery_beat',
    4. )

    然后同步数据库

    python manage.py makemigrations
    python manage.py migrate

    然后启动beat服务
    celery -A myyuque beat -l info —scheduler django_celery_beat.schedulers:DatabaseScheduler
    接着去我们的项目看看
    image.png
    intervals:管理时间间隔的地方
    clocked:每天/周/月某时候去执行
    crontabs:
    periodic tasks:管理任务
    solar events:
    附上好像没效果的代码

    1. from __future__ import absolute_import, unicode_literals
    2. import os
    3. from celery import Celery
    4. from celery.schedules import crontab
    5. from celery1.tasks import add
    6. # set the default Django settings module for the 'celery' program.
    7. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings.base')
    8. app = Celery('myyuque')
    9. # Using a string here means the worker doesn't have to serialize
    10. # the configuration object to child processes.
    11. # - namespace='CELERY' means all celery-related configuration keys
    12. # should have a `CELERY_` prefix.
    13. app.config_from_object('settings.base', namespace='CELERY')
    14. # Load task modules from all registered Django app configs.
    15. app.autodiscover_tasks()
    16. @app.task(bind=True)
    17. def debug_task(self):
    18. print('Request: {0!r}'.format(self.request))
    19. # 手动设置周期性任务
    20. app.conf.beat_schedule = {
    21. 'add-every-30-seconds': {
    22. 'task': 'tasks.add',
    23. 'schedule': 30.0,
    24. 'args': (16, 16)
    25. },
    26. }
    27. # 这里代表项目启动时开启的定时任务
    28. @app.on_after_configure.connect
    29. def setup_periodic_tasks(sender, **kwargs):
    30. # Calls test('hello') every 10 seconds.
    31. sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
    32. # Calls test('world') every 30 seconds
    33. sender.add_periodic_task(30.0, test.s('world'), expires=10)
    34. # Executes every Monday morning at 7:30 a.m.
    35. sender.add_periodic_task(
    36. crontab(hour=7, minute=30, day_of_week=1),
    37. test.s('Happy Mondays!'),
    38. )
    39. @app.task
    40. def test(arg):
    41. print(arg)