定期任务:Periodic Tasks
首先安装beat
pip install django-celery-beat
然后在settings里面加上
INSTALLED_APPS = (
...,
'django_celery_beat',
)
然后同步数据库
python manage.py makemigrations
python manage.py migrate
然后启动beat服务
celery -A myyuque beat -l info —scheduler django_celery_beat.schedulers:DatabaseScheduler
接着去我们的项目看看
intervals:管理时间间隔的地方
clocked:每天/周/月某时候去执行
crontabs:
periodic tasks:管理任务
solar events:
附上好像没效果的代码
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from celery.schedules import crontab
from celery1.tasks import add
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings.base')
app = Celery('myyuque')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('settings.base', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
# 手动设置周期性任务
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
# 这里代表项目启动时开启的定时任务
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
# Calls test('world') every 30 seconds
sender.add_periodic_task(30.0, test.s('world'), expires=10)
# Executes every Monday morning at 7:30 a.m.
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
test.s('Happy Mondays!'),
)
@app.task
def test(arg):
print(arg)