安装 APScheduler
$ pip install apscheduler
代码示例
源代码的examples
目录下可以找到 APScheduler
的工作示例。这些示例也可以在线浏览。
基本概念
APScheduler 有四个组件:
- 触发器(triggers)
- 作业存储(job stores)
- 执行器(executors)
- 调度器(schedulers)
触发器(triggers) 包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了他们自己初始配置意外,触发器完全是无状态的。
作业存储(job stores) 存储被调度的作业,默认的作业存储是简单地把作业保存在内存(MemoryJobStore
)中,当然作业存储也可以将作业保存在各种数据库中。一个作业的数据讲在保存在持久化作业存储时被序列化,并在加载时被反序列化。调度器不能分享同一个作业存储。
执行器(executor) 处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进程池来进行。作业完成后,执行器将会通知调度器,然后发出适当的事件。
调度器(scheduler) 通常在应用中只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。
选择正确的调度器、作业存储、执行器、触发器
您对 scheduler
的选择主要取决于您的编程环境以及您将使用APScheduler进行的操作。以下是选择调度程序的快速指南:
BlockingScheduler
: 当调度程序是您的进程中唯一运行的时候使用BackgroundScheduler
: 在不使用下面的任何框架时使用,并希望调度程序在应用程序中的后台运行AsyncIOScheduler
: 如果您的应用程序使用asyncio模块,请使用它GeventScheduler
: 如果您的应用程序使用gevent,请使用它TornadoScheduler
: 如果你正在构建一个Tornado应用程序,请使用它TwistedScheduler
: 用于构建Twisted应用程序QtScheduler
: 如果您正在构建Qt应用程序,请使用它
很简单,是吗?
要选择适当的作业存储,您需要确定是否需要作业持久性。如果您始终在应用程序开始时重新创建作业,则可以使用默认值(MemoryJobStore
)。但是如果您需要自己的作业能经受调度程序重新启动或应用程序崩溃,那么你的选择通常取决于在您的编程环境使用的工具。此时,如果你可以自由选择的话,那么以 PostgreSQL 作为后端的SQLAlchemyJobStore
是我们推荐的选择,因为它有强大的数据完整性保护。
同样,如果您使用上述框架之一,通常会为您选择执行者。否则,对于大多数目的而言,默认值 ThreadPoolExecutor
应该足够好。如果您的工作负载涉及CPU密集型操作,则应考虑使用ProcessPoolExecutor
替代,以便充分利用多个CPU内核。您甚至可以同时使用这两种方法,将进程池执行程序添加为辅助执行程序。
当您安排工作时,您需要为其选择一个触发器。触发器确定作业运行时计算日期/时间的逻辑。APScheduler带有三种内置触发器类型:
也可以将多个触发器组合成一个触发器,或者在所有参与触发器同意的时间触发,或者触发器中的任何触发器触发。有关更多信息,请参阅文档。combining triggers
您可以在各自的API文档页面上找到每个作业存储库,执行程序和触发器类型的插件名称。
配置调度器
APScheduler提供了许多不同的方式来配置调度程序。您可以使用配置字典,也可以传递选项作为关键字参数。您也可以先实例化调度程序,然后添加作业并配置调度程序。这样你可以在任何环境下获得最大的灵活性。
调度程序级别配置选项的完整列表可以在BaseScheduler
该类的API参考中找到 。调度程序子类也可能有其他选项,这些选项记录在其各自的API参考中。各个作业存储和执行程序的配置选项同样可以在其API参考页面上找到。
比方说,您希望在应用程序中使用默认作业存储和默认执行程序运行BackgroundScheduler:
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
# 在这里初始化应用程序的其余部分,或在调度程序初始化之前初始化
# Initialize the rest of the application here, or before the scheduler initialization
这将为您带来一个BackgroundScheduler,其中一个名为“default”的MemoryJobStore和一个名为“default”的ThreadPoolExecutor的默认最大线程数为10。
现在,假设你想要更多。您希望有两个使用两个执行者的作业商店,并且您还想调整新作业的默认值并设置不同的时区。以下三个例子完全相同,并会为您提供:
- 一个名为“mongo”的MongoDBJobStore
- 一个名为“default”的SQLAlchemyJobStore(使用SQLite)
- 一个名为“default”的ThreadPoolExecutor,线程数为20
- 一个名为“processpool”的ProcessPoolExecutor,线程数为5
- UTC作为调度程序的时区
- 默认情况下,合并关闭以用于新作业
- 新作业的默认最大实例限制为3
方法 1:
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
方法 2:
from apscheduler.schedulers.background import BackgroundScheduler
# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
'apscheduler.jobstores.mongo': {
'type': 'mongodb'
},
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'sqlite:///jobs.sqlite'
},
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
},
'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '5'
},
'apscheduler.job_defaults.coalesce': 'false',
'apscheduler.job_defaults.max_instances': '3',
'apscheduler.timezone': 'UTC',
})
方法 3:
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
jobstores = {
'mongo': {'type': 'mongodb'},
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': {'type': 'threadpool', 'max_workers': 20},
'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler()
# .. do something else here, maybe add jobs etc.
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
启动调度器
通过调用 start()
方法来启动调度器。除了BlockingScheduler
以外,对于方法将立即返回,您可以继续应用程序的初始化过程,可能会将作业添加到调度程序。
对于BlockingScheduler
,您只需start()
在完成任何初始化步骤后再打电话 。
注意:调度程序启动后,您不能再更改其设置。
添加作业
有两种方法可以将作业添加到调度程序中:
- 调用函数
add_job()
- 使用装饰器
scheduled_job()
第一种方法是最常用的方法,返回一个apscheduler.job.Job
实例,您可以稍后使用该实例修改或删除该作业。。第二种方式主要是方便地声明在应用程序运行时不会更改的作业(job)。
在调度器中你能在任何时间调度任务。如果在添加任务时调度程序尚未运行,则会暂时安排任务,并且仅在调度程序启动时计算其第一个运行时间。
需要注意的是,如果您使用连续执行作业的执行程序或作业存储,它会在你的作业中添加一些要求:
- 可调用的目标必须是全局可访问的
- 可调用的任何参数都必须是可序列化的
内建作业存储中,只有MemoryJobStore
不会序列化作业。内置执行程序中,只有ProcessPoolExecutor
会序列化作业。
注意:如果您在应用程序初始化过程中将作业安排在持久作业存储区中,则必须为该作业定义一个明确的ID并使用,replace_existing=True
否则每次重新启动应用程序时你都会得到新任务副本!
提示: 如果要立即运行任务,请在添加任务时省略trigger
参数。
删除任务
当您从调度程序中删除作业时,该作业将从相关的作业存储中删除,并且不会再执行。有两种方法可以做到这一点:
- 通过
remove_job()
与作业的ID和作业存储别名调用 - 通过调用
remove()
你从中获得的Job实例add_job()
如果作业调度结束(即其触发器不会产生任何进一步的运行时间),它会自动删除。
例如:
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
同样,适用明确的作业ID:
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
暂停和恢复任务
您可以通过Job
实例或调度程序本身轻松暂停和恢复作业。作业暂停时,下一次运行时间将被清除,并且不会再计算运行时间,直到作业恢复。要暂停作业,请使用以下任一方法:
要恢复作业请使用:
获取已调度的作业列表
要获得计划作业的机器可处理列表,您可以使用该 get_jobs()
方法。它会返回一个Job
实例列表 。如果您只对特定工作商店中包含的工作感兴趣,那么请提供作业别名作为第二个参数。
为了方便起见,您可以使用print_jobs()
打印格式化的作业列表、触发器和下次运行时间的方法。
修改作业
您可以通过调用apscheduler.job.Job.modify()
或 modify_job()
来修改任何作业的属性。除了作业的 id
,您可以修改作业的任何属性。
例如:
job.modify(max_instances=6, name='Alternate name')
如果您想重新计划作业 - 即更改其触发器,则可以使用 apscheduler.job.Job.reschedule()
或reschedule_job()
。这些方法为作业构建新的触发器,并基于新的触发器重新计算其下一次运行时间。
例如:
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
关闭调度器
关闭调度器:
scheduler.shutdown()
默认情况下,调度器关闭其作业存储和执行程序,并等待所有当前正在执行的作业完成。如果你不想等,你可以这样做:
scheduler.shutdown(wait=False)
这仍将关闭作业存储和执行程序,但不会等待任何正在运行的任务完成。
暂停和回复作业进程
可以暂停预定作业的处理:
scheduler.pause()
这将导致调度程序在处理恢复之前不会唤醒:
scheduler.resume()
也可以在暂停状态下启动调度程序,也就是说,不需要第一次唤醒调用:
scheduler.start(paused=True)
当您需要在有机会运行之前修剪不需要的作业时,这非常有用。
限制同时执行的作业示例的数量
默认情况下,每个作业只允许同时运行一个实例。这意味着如果工作即将开始,但之前的运行尚未完成,那么最新的运行被认为是 misfire。可以通过max_instances
在添加作业时使用关键字参数来设置调度程序允许并发运行的特定作业的最大实例数。
错过执行和合并的作业
有时调度程序可能无法在计划运行时执行预定作业。最常见的情况是在持久作业存储中安排作业,并且计划程序在作业执行后关闭并重新启动。发生这种情况时,该工作被认为是“失误”。然后,调度程序将根据作业的misfire_grace_time
选项(可以在每个作业上设置或在调度程序中全局设置)检查每个错过的执行时间,以查看是否仍应触发执行。这可能导致连续几次执行的工作。
如果这种行为不适合您的特定用例,则可以使用合并将所有这些错过的执行合并为一个。换句话说,如果为作业启用了合并,并且计划程序看到该作业的一个或多个排队执行,则只会触发一次。“绕过”运行不会发生失火事件。
注意:
如果在线程或者进程池中没有线程或进程可用,则会导致作业执行延迟,执行程序可能会由于运行太迟(与其原始指定的运行时间相比)而跳过它。如果这可能发生在您的应用程序中,您可能希望增加执行程序中的线程/进程数,或者将misfire_grace_time 设置调整为更高的值。
调度器事件
可以将事件监听器附加到调度器。调度器事件在某些情况下被触发,并且可能在其中携带关于该特定事件的详细信息的附加信息。通过给出适当的mask
参数add_listener()
,可以只听特定类型的事件 ,或者将不同的常量放在一起。可调用的监听器使用一个参数即事件对象进行调用。
有关events
可用事件及其属性的详细信息,请参阅模块的文档。
例如:
def my_listener(event):
if event.exception:
print('The job crashed :(')
else:
print('The job worked :)')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
故障排除
如果调度程序未按预期工作,则将记录apscheduler
器的日志记录级别提高到该DEBUG
级别将会有所帮助 。
如果您还没有首先启用日志记录,则可以这样做:
import logging
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
这应该提供很多关于调度程序内部正在进行的有用信息。
报告错误
Github提供了一个错误跟踪器。