简单例子

  1. import time
  2. from apscheduler.schedulers.blocking import BlockingScheduler
  3. def func():
  4. now = datetime.datetime.now()
  5. ts = now.strftime('%Y-%m-%d %H:%M:%S')
  6. print('do func time :',ts)
  7. def func2():
  8. #耗时2S
  9. now = datetime.datetime.now()
  10. ts = now.strftime('%Y-%m-%d %H:%M:%S')
  11. print('do func2 time:',ts)
  12. time.sleep(2)
  13. def dojob():
  14. #创建调度器:BlockingScheduler
  15. scheduler = BlockingScheduler()
  16. #添加任务,时间间隔2S
  17. scheduler.add_job(func, 'interval', seconds=2, id='test_job1')
  18. #添加任务,时间间隔5S
  19. scheduler.add_job(func2, 'interval', seconds=3, id='test_job2')
  20. scheduler.start()
  21. dojob()

应用定时发送消息的例子

这里以一个整点报时的功能为例,来介绍定时任务的使用。
新建文件 awesome/plugins/scheduler.py,编写代码如下:

  1. from datetime import datetime
  2. import nonebot
  3. import pytz
  4. from aiocqhttp.exceptions import Error as CQHttpError
  5. @nonebot.scheduler.scheduled_job('cron', hour='*')
  6. async def _():
  7. bot = nonebot.get_bot()
  8. now = datetime.now(pytz.timezone('Asia/Shanghai'))
  9. try:
  10. await bot.send_group_msg(group_id=672076603,
  11. message=f'现在{now.hour}点整啦!')
  12. except CQHttpError:
  13. pass

这里最主要的就是第 8 行,nonebot.scheduler.scheduled_job() 是一个装饰器,第一个参数是触发器类型(这里是 cron,表示使用 Cron 类型的触发参数)。这里 hour='*' 表示每小时都执行,minutesecond 不填时默认为 0,也就是说装饰器所装饰的这个函数会在每小时的第一秒被执行。
除了 cron,还有两种触发器类型 intervaldate。例如,你可以使用 nonebot.scheduler.scheduled_job('interval', minutes=10) 来每十分钟执行一次任务。
限于篇幅,这里无法给出太详细的接口介绍,nonebot.scheduler 是一个 APScheduler 的 AsyncIOScheduler 对象,因此关于它的更多使用方法,可以参考 APScheduler 的官方文档

1 简介

APScheduler的全称是Advanced Python Scheduler。它是一个轻量级的 Python 定时任务调度框架。APScheduler 支持三种调度任务:固定时间间隔,固定时间点(日期),Linux 下的 Crontab 命令。同时,它还支持异步执行、后台执行调度任务。

2 安装

使用 pip 包管理工具安装 APScheduler 是最方便快捷的。

  1. pip install APScheduler
  2. # 如果出现因下载失败导致安装不上的情况,建议使用代理
  3. pip --proxy http://代理ip:端口 install APScheduler

3 使用步骤

APScheduler 使用起来还算是比较简单。运行一个调度任务只需要以下三部曲。

新建一个 schedulers (调度器) 。 添加一个调度任务(job stores)。 运行调度任务。

下面是执行每 2 秒报时的简单示例代码:

  1. import datetime
  2. import time
  3. from apscheduler.schedulers.background import BackgroundScheduler
  4. def timedTask():
  5. print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
  6. if __name__ == '__main__':
  7. # 创建后台执行的 schedulers
  8. scheduler = BackgroundScheduler()
  9. # 添加调度任务
  10. # 调度方法为 timedTask,触发器选择 interval(间隔性),间隔时长为 2 秒
  11. scheduler.add_job(timedTask, 'interval', seconds=2)
  12. # 启动调度任务
  13. scheduler.start()
  14. while True:
  15. print(time.time())
  16. time.sleep(5)

4 基础组件

APScheduler 有四种组件,分别是:调度器(scheduler),作业存储(job store),触发器(trigger),执行器(executor)。

  • schedulers(调度器)
    它是任务调度器,属于控制器角色。它配置作业存储器和执行器可以在调度器中完成,例如添加、修改和移除作业。
  • triggers(触发器)
    描述调度任务被触发的条件。不过触发器完全是无状态的。
  • job stores(作业存储器)
    任务持久化仓库,默认保存任务在内存中,也可将任务保存都各种数据库中,任务中的数据序列化后保存到持久化数据库,从数据库加载后又反序列化。
  • executors(执行器)
    负责处理作业的运行,它们通常通过在作业中提交指定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。

    4.1 schedulers(调度器)

    我个人觉得 APScheduler 非常好用的原因。它提供 7 种调度器,能够满足我们各种场景的需要。例如:后台执行某个操作,异步执行操作等。调度器分别是:

  • BlockingScheduler : 调度器在当前进程的主线程中运行,也就是会阻塞当前线程。

  • BackgroundScheduler : 调度器在后台线程中运行,不会阻塞当前线程。
  • AsyncIOScheduler : 结合 asyncio 模块(一个异步框架)一起使用。
  • GeventScheduler : 程序中使用 gevent(高性能的Python并发框架)作为IO模型,和 GeventExecutor 配合使用。
  • TornadoScheduler : 程序中使用 Tornado(一个web框架)的IO模型,用 ioloop.add_timeout 完成定时唤醒。
  • TwistedScheduler : 配合 TwistedExecutor,用 reactor.callLater 完成定时唤醒。
  • QtScheduler : 你的应用是一个 Qt 应用,需使用QTimer完成定时唤醒。

    4.2 triggers(触发器)

    APScheduler 有三种内建的 trigger:
    1)date 触发器
    date 是最基本的一种调度,作业任务只会执行一次。它表示特定的时间点触发。它的参数如下:
参数 说明
run_date (datetime 或 str) 作业的运行日期或时间
timezone (datetime.tzinfo 或 str) 指定时区

date 触发器使用示例如下:

  1. from datetime import datetime
  2. from datetime import date
  3. from apscheduler.schedulers.background import BackgroundScheduler
  4. def job_func(text):
  5. print(text)
  6. scheduler = BackgroundScheduler()
  7. # 在 2017-12-13 时刻运行一次 job_func 方法
  8. scheduler .add_job(job_func, 'date', run_date=date(2017, 12, 13), args=['text'])
  9. # 在 2017-12-13 14:00:00 时刻运行一次 job_func 方法
  10. scheduler .add_job(job_func, 'date', run_date=datetime(2017, 12, 13, 14, 0, 0), args=['text'])
  11. # 在 2017-12-13 14:00:01 时刻运行一次 job_func 方法
  12. scheduler .add_job(job_func, 'date', run_date='2017-12-13 14:00:01', args=['text'])
  13. scheduler.start()

2)interval 触发器
固定时间间隔触发。interval 间隔调度,参数如下:

参数 说明
weeks (int) 间隔几周
days (int) 间隔几天
hours (int) 间隔几小时
minutes (int) 间隔几分钟
seconds (int) 间隔多少秒
start_date (datetime 或 str) 开始日期
end_date (datetime 或 str) 结束日期
timezone (datetime.tzinfo 或str) 时区

interval 触发器使用示例如下:

  1. import datetime
  2. from apscheduler.schedulers.background import BackgroundScheduler
  3. def job_func(text):
  4. print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
  5. scheduler = BackgroundScheduler()
  6. # 每隔两分钟执行一次 job_func 方法
  7. scheduler .add_job(job_func, 'interval', minutes=2)
  8. # 在 2017-12-13 14:00:01 ~ 2017-12-13 14:00:10 之间, 每隔两分钟执行一次 job_func 方法
  9. scheduler .add_job(job_func, 'interval', minutes=2, start_date='2017-12-13 14:00:01' , end_date='2017-12-13 14:00:10')
  10. scheduler.start()

3)cron 触发器
在特定时间周期性地触发,和Linux crontab格式兼容。它是功能最强大的触发器。
我们先了解 cron 参数:

参数 说明
year (int 或 str) 年,4位数字
month (int 或 str) 月 (范围1-12)
day (int 或 str) 日 (范围1-31
week (int 或 str) 周 (范围1-53)
day_of_week (int 或 str) 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
hour (int 或 str) 时 (范围0-23)
minute (int 或 str) 分 (范围0-59)
second (int 或 str) 秒 (范围0-59)
start_date (datetime 或 str) 最早开始日期(包含)
end_date (datetime 或 str) 最晚结束时间(包含)
timezone (datetime.tzinfo 或str) 指定时区

参照 https://www.cnblogs.com/ohyb/p/9084011.html
https://none.rclab.tk/guide/scheduler.html