celery
# 启动 celery
$ celery -A celery_tasks worker --loglevel=info
# 后台启动
nohup celery -A celery_tasks worker > /var/log/celery_worker.log 2>&1 &
nohup celery -A celery_tasks beat > /var/log/celery_beat.log 2>&1 &
# 启动 celery , flower 任务面板
$ celery -A celery_tasks flower --address=0.0.0.0 --port=5555
测试环境可以使用一个进程:
celery -A celery_tasks worker -B
启动详情
$ celery -A celery_tasks worker --loglevel=info
/home/hwmg/beancluster.cfg
/home/hwmg
/home/hwmg
['hwfirewalld', 'sh/hw_firewalld.sh', '172.28.78.201']
['h3cRouter', 'sh/h3cRouter.sh', '172.28.78.202']
['shanshi', 'sh/shanshi.sh', '172.28.78.203']
['CiscoFirewalld', 'sh/cisco_firewalld.sh', '172.28.78.206']
['CiscoRouter', 'sh/CiscoTranfer.sh', '172.28.78.205']
['iKuaiOS', 'sh/ikuai.sh', '172.28.78.204']
['JuniperFirewalld', 'sh/Juniper.sh', '172.28.78.207']
['MikrotikRouter', 'sh/Mikrotik.sh', '172.28.78.208']
/home/hwmg
/usr/local/lib/python3.6/site-packages/celery/platforms.py:835: SecurityWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!
Please specify a different user using the --uid option.
User information: uid=0 euid=0 gid=0 egid=0
uid=uid, euid=euid, gid=gid, egid=egid,
-------------- celery@antiy246 v5.1.2 (sun-harmonics)
--- ***** -----
-- ******* ---- Linux-3.10.0-862.el7.x86_64-x86_64-with-centos-7.5.1804-Core 2022-01-14 14:49:35
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: celery:0x7fa51d466a90
- ** ---------- .> transport: redis://10.42.0.107:6379/9
- ** ---------- .> results: redis://10.42.0.107:6379/9
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery_tasks.cronbjob.counter.Monitor
. celery_tasks.cronbjob.detect.detectAliveIP
. celery_tasks.cronbjob.detect.portDetect
. celery_tasks.cronbjob.replay.replayMP4
. celery_tasks.go_tasks.CreateQemu
. celery_tasks.go_tasks.DeleteQemu
. celery_tasks.go_tasks.RestoreQemu
. celery_tasks.go_tasks.StartQemu
. celery_tasks.go_tasks.StopQemu
. celery_tasks.go_tasks.add
. celery_tasks.go_tasks.sum
. celery_tasks.py_tasks.add
. celery_tasks.py_tasks.assets.batchCreateAssets
. celery_tasks.py_tasks.assets.createAssets
. celery_tasks.py_tasks.assets.releaseAssets
. celery_tasks.py_tasks.iptables.createIPtables
. celery_tasks.py_tasks.restore.restoreManual
. celery_tasks.py_tasks.servicesCURD.createInstance
. celery_tasks.py_tasks.servicesCURD.deleteInstances
. celery_tasks.py_tasks.servicesCURD.restartInstances
. celery_tasks.py_tasks.servicesCURD.startStopInstances
. celery_tasks.py_tasks.sftp.addSFTPConf
. celery_tasks.py_tasks.sftp.delSFTPConf
. celery_tasks.py_tasks.sum
[2022-01-14 14:49:37,779: INFO/MainProcess] Connected to redis://10.42.0.107:6379/9
[2022-01-14 14:49:37,802: INFO/MainProcess] mingle: searching for neighbors
[2022-01-14 14:49:38,842: INFO/MainProcess] mingle: all alone
[2022-01-14 14:49:38,905: INFO/MainProcess] celery@antiy246 ready.
APScheduler
advanceded python scheduler 实现了 Quartz 的所有功能的一个python定时任务框架。提供基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。
特点
- 类型 linux cron 的调度(可选的开始/结束时间 )
- 基于时间间隔的执行调度(周期性调度,可选的开始/结束时间)
- 一次性执行任务(在设定的日期/时间运行一次任务)
基本概念与执行逻辑
示例
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
# 输出时间
def job():
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# BlockingScheduler
sched = BlockingScheduler()
sched.add_job(my_job, 'interval', seconds=5, id='my_job_id')
sched.start()
flask
基于 flask 集成时任务队列时,一个严重的问题是(运行在 uwsgi 上的多进程 work 问题)
https://blog.miguelgrinberg.com/post/run-your-flask-regularly-scheduled-jobs-with-cron