Python实用模块(二十)Apscheduler

2020-07-17

文章目录

软硬件环境

视频看这里

此处是youtube的播放链接,需要科学上网。喜欢我的视频,请记得订阅我的频道,打开旁边的小铃铛,点赞并分享,感谢您的支持。

前言

说起定时任务,第一反应应该是 windows 自带的计划任务或者 linux 自带的 crontab,关于 ubuntu 操作系统下如何使用 crontab 可以参考下面这支视频。 [apscheduler](https://xugaoxiang.com/tag/apscheduler/) 是一款使用 [python](https://xugaoxiang.com/tag/python/) 语言开发的定时任务工具,提供了非常丰富而且简单易用的定时任务接口

安装

安装非常简单, 使用 pip

  1. pip install apscheduler

apscheduler的四大组件

  • triggers 触发器 可以按照日期、时间间隔或者 contab 表达式三种方式触发
  • job stores 作业存储器 指定作业存放的位置,默认保存在内存,也可以保存在各种数据库中
  • executors 执行器 将指定的作业提交到线程池或者进程池中运行
  • schedulers 作业调度器 常用的有 BackgroundScheduler(后台运行)和 BlockingScheduler (阻塞式)

代码实践

下面通过几个示例来看看如何来使用 apscheduler

  1. import time
  2. from apscheduler.schedulers.background import BlockingScheduler
  3. from apscheduler.triggers.interval import IntervalTrigger
  4. def my_job():
  5. print('my_job, {}'.format(time.ctime()))
  6. if __name__ == "__main__":
  7. scheduler = BlockingScheduler()
  8. # 间隔设置为1秒,还可以使用minutes、hours、days、weeks等
  9. intervalTrigger=IntervalTrigger(seconds=1)
  10. # 给作业设个id,方便作业的后续操作,暂停、取消等
  11. scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
  12. scheduler.start()
  13. print('=== end. ===')

执行代码,输出是这样的

Python实用模块(二十)Apscheduler - 迷途小书童的Note迷途小书童的Note - 图1

因为我们使用了 BlockingScheduler,它是阻塞式的,所以只看到了 my_job 方法中的输出,而语句 print('=== end. ===') 并没有被执行。BackgroundScheduler 它可以在后台运行,不会阻塞主线程的执行,来看下面的代码

  1. import time
  2. from apscheduler.schedulers.background import BackgroundScheduler
  3. from apscheduler.triggers.interval import IntervalTrigger
  4. def my_job():
  5. print('my_job, {}'.format(time.ctime()))
  6. if __name__ == "__main__":
  7. scheduler = BackgroundScheduler()
  8. intervalTrigger=IntervalTrigger(seconds=1)
  9. scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
  10. scheduler.start()
  11. print('=== end. ===')
  12. while True:
  13. time.sleep(1)

代码执行的结果是这样的

Python实用模块(二十)Apscheduler - 迷途小书童的Note迷途小书童的Note - 图2

如果把 triggers 设置成 DateTrigger,就变成作业在某一个时间点执行,示例如下

  1. import time
  2. import datetime
  3. from apscheduler.schedulers.background import BlockingScheduler
  4. from apscheduler.triggers.date import DateTrigger
  5. def my_job():
  6. print('my_job, {}'.format(time.ctime()))
  7. if __name__ == "__main__":
  8. scheduler = BlockingScheduler()
  9. intervalTrigger=DateTrigger(run_date='2020-07-17 16:18:55')
  10. scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
  11. scheduler.start()

等到设定的时间到了,作业就会被自行一次

Python实用模块(二十)Apscheduler - 迷途小书童的Note迷途小书童的Note - 图3

如果想按照指定的周期去执行的话,就需要使用 CronTrigger 了,工作原理跟 UNIXcrontab 定时任务非常相似,它可以指定非常详细且复杂的规则。同样的来看示例代码

  1. import time
  2. from apscheduler.schedulers.background import BlockingScheduler
  3. from apscheduler.triggers.cron import CronTrigger
  4. def my_job():
  5. print('my_job, {}'.format(time.ctime()))
  6. if __name__ == "__main__":
  7. scheduler = BlockingScheduler()
  8. # 第一秒执行作业
  9. intervalTrigger=CronTrigger(second=1)
  10. # 每天的19:30:01执行作业
  11. # intervalTrigger=CronTrigger(hour=19, minute=30, second=1)
  12. # 每年的10月1日19点执行作业
  13. # intervalTrigger=CronTrigger(month=10, day=1, hour=19)
  14. scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
  15. scheduler.start()

代码执行的效果是这样的

Python实用模块(二十)Apscheduler - 迷途小书童的Note迷途小书童的Note - 图4

上面的代码中并没有使用 executors,因为只有一个作业,但是从调试中可以发现,默认情况下,apscheduler 也是使用了 ThreadPoolExecutor,且线程池的大小是10

Python实用模块(二十)Apscheduler - 迷途小书童的Note迷途小书童的Note - 图5

下面我们看看 executors 的使用

  1. import time
  2. from apscheduler.schedulers.background import BlockingScheduler
  3. from apscheduler.triggers.interval import IntervalTrigger
  4. from apscheduler.executors.pool import ThreadPoolExecutor
  5. def my_job():
  6. print('my_job, {}'.format(time.ctime()))
  7. if __name__ == "__main__":
  8. executors = {
  9. 'default': ThreadPoolExecutor(20)
  10. }
  11. scheduler = BlockingScheduler(executors=executors)
  12. intervalTrigger=IntervalTrigger(seconds=1)
  13. scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
  14. scheduler.start()

可以看到,我们将线程池的大小改为了20,在初始化 scheduler 的时候将 executors 传递进去,后面的操作就跟之前的完全一样了

Python实用模块(二十)Apscheduler - 迷途小书童的Note迷途小书童的Note - 图6

关于 ThreadPoolExecutorProcessPoolExecutor 的选择问题,这里有一个原则,如果是 cpu 密集型的作业,使用 ProcessPoolExecutor,其它的使用 ThreadPoolExecutor,当然 ThreadPoolExecutorProcessPoolExecutor 也是可以混用的

最后我们来看看作业存储器,我们把它改成存储到 sqlite

  1. import time
  2. from apscheduler.schedulers.background import BlockingScheduler
  3. from apscheduler.triggers.interval import IntervalTrigger
  4. from apscheduler.executors.pool import ThreadPoolExecutor
  5. from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
  6. def my_job():
  7. print('my_job, {}'.format(time.ctime()))
  8. jobstores = {
  9. 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
  10. }
  11. if __name__ == "__main__":
  12. executors = {
  13. 'default': ThreadPoolExecutor(20)
  14. }
  15. scheduler = BlockingScheduler(jobstores=jobstores, executors=executors)
  16. intervalTrigger=IntervalTrigger(seconds=1)
  17. scheduler.add_job(my_job, intervalTrigger, id='my_job_id')
  18. scheduler.start()

代码执行后,会在源码目录下生成 sqlite 数据库文件 jobs.sqlite,我们使用图形化工具打开查看,可以看到数据库中存放的作业的 id 和作业下次执行的时间

Python实用模块(二十)Apscheduler - 迷途小书童的Note迷途小书童的Note - 图7

参考资料