首先quartz本身是支持分布式的,通过表来管理各节点之间的关系。
1、去quartz官网下载最新的包 http://www.quartz-scheduler.org/
2、下载之后解压,进入如下目录,创建数据库表
quartz-2.2.3-distribution\quartz-2.2.3\docs\dbTables并选择对应的数据库SQL(笔者使用的是MySQL数据库)
在pom文件中引入依赖
<!--quartz依赖-->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
、创建 quartz.proiperties 配置文件
org.quartz.scheduler.instanceId=AUTO
org.quartz.scheduler.makeSchedulerThreadDaemon=true
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.makeThreadsDaemons=true
#线程数量
org.quartz.threadPool.threadCount:20
#线程优先级
org.quartz.threadPool.threadPriority:5
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix=QRTZ_
#特别注意:此处是quartz的数据源,报错就debug跟踪一下查看dbName
org.quartz.jobStore.dataSource = springTxDataSource.schedulerFactoryBean
#加入集群
org.quartz.jobStore.isClustered=true
#容许的最大作业延
org.quartz.jobStore.misfireThreshold=25000
#调度实例失效的检查时间间隔
org.quartz.jobStore.clusterCheckinInterval: 5000
quartz的初始化配置,读取配置文件
package com.wh.timerdemo.config;
import org.quartz.spi.JobFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import javax.sql.DataSource;
import java.io.IOException;
import java.util.Properties;
@Configuration
public class QuartzConfig {
// 配置文件路径
private static final String QUARTZ_CONFIG = "/quartz.properties";
// 按照自己注入的数据源自行修改
@Qualifier("writeDataSource")
@Autowired
private DataSource dataSource;
@Autowired
private AutoWiredSpringBeanToJobFactory autoWiredSpringBeanToJobFactory;
/**
* 从quartz.properties文件中读取Quartz配置属性
* @return
* @throws IOException
*/
@Bean
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource(QUARTZ_CONFIG));
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
/**
* JobFactory与schedulerFactoryBean中的JobFactory相互依赖,注意bean的名称
* 在这里为JobFactory注入了Spring上下文
*
* @param applicationContext
* @return
*/
@Bean
public JobFactory buttonJobFactory(ApplicationContext applicationContext) {
AutoWiredSpringBeanToJobFactory jobFactory = new AutoWiredSpringBeanToJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
}
@Bean
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setJobFactory(autoWiredSpringBeanToJobFactory);
factory.setOverwriteExistingJobs(true);
factory.setAutoStartup(true); // 设置自行启动
// 延时启动,应用启动1秒后
factory.setStartupDelay(1);
factory.setQuartzProperties(quartzProperties());
factory.setDataSource(dataSource);// 使用应用的dataSource替换quartz的dataSource
return factory;
}
}
将任务工厂注入到Spring
package com.wh.timerdemo.config;
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
import org.springframework.stereotype.Component;
/**
* 为JobFactory注入SpringBean,否则Job无法使用Spring创建的bean
*/
@Component
public class AutoWiredSpringBeanToJobFactory extends SpringBeanJobFactory implements ApplicationContextAware {
private transient AutowireCapableBeanFactory beanFactory;
@Override
public void setApplicationContext(final ApplicationContext context) {
beanFactory = context.getAutowireCapableBeanFactory();
}
@Override
protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
final Object job = super.createJobInstance(bundle);
beanFactory.autowireBean(job);
return job;
}
}
7、创建任务调度管理,任务的增删改查,起动停止等。
public class QuartzManager {
private static SchedulerFactory schedulerFactory = new StdSchedulerFactory();
private Scheduler scheduler = null;
/**
* @Description: 添加一个定时任务
*
* @param jobName 任务名
* @param jobGroupName 任务组名
* @param triggerName 触发器名
* @param triggerGroupName 触发器组名
* @param jobClass 任务
* @param cron 时间设置,参考quartz说明文档
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void addJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName, Class jobClass, String cron) {
try {
// 任务名,任务组,任务执行类
Scheduler scheduler = schedulerFactory.getScheduler();
JobDetail jobDetail= JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build();
// 触发器
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
// 触发器名,触发器组
triggerBuilder.withIdentity(triggerName, triggerGroupName);
triggerBuilder.startNow();
// 触发器时间设定
triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
// 创建Trigger对象
CronTrigger trigger = (CronTrigger) triggerBuilder.build();
// 调度容器设置JobDetail和Trigger
scheduler.scheduleJob(jobDetail, trigger);
// 启动
if (!scheduler.isShutdown()) {
scheduler.start();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @Description:启动所有定时任务
*/
public static void startJobs() {
try {
Scheduler scheduler = schedulerFactory.getScheduler();
scheduler.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @Description:关闭所有定时任务
*/
public static void shutdownJobs() {
try {
Scheduler scheduler = schedulerFactory.getScheduler();
if (!scheduler.isShutdown()) {
scheduler.shutdown();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 获取当前正在执行的任务
* @return
*/
public static boolean getCurrentJobs(String name){
try {
Scheduler scheduler = schedulerFactory.getScheduler();
List<JobExecutionContext> jobContexts = scheduler.getCurrentlyExecutingJobs();
for (JobExecutionContext context : jobContexts) {
if (name.equals(context.getTrigger().getJobKey().getName())) {
return true;
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return false;
}
public Scheduler getScheduler() {
return scheduler;
}
public void setScheduler(Scheduler scheduler) {
this.scheduler = scheduler;
}
}
创建一个执行的Job,这里包含定时任务执行的逻辑
package com.wh.timerdemo.task;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @DisallowConcurrentExecution : 此标记用在实现Job的类上面,意思是不允许并发执行.
* 注org.quartz.threadPool.threadCount的数量有多个的情况,@DisallowConcurrentExecution才生效
*/
@DisallowConcurrentExecution
public class ButtonTimerJob implements Job {
private static final Logger logger = LoggerFactory.getLogger(ButtonTimerJob.class);
/**
* 核心方法,Quartz Job真正的执行逻辑。
* @throws JobExecutionException execute()方法只允许抛出JobExecutionException异常
*/
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
logger.info("--------------定时任务执行逻辑---------------------");
}
}
创建启动Job类:负责任务的创建启动和配置cron等
package com.wh.timerdemo.task;
import com.wh.timerdemo.config.QuartzManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
/**
* 定时任务的启动类
*/
@Configuration
public class StartJob implements ApplicationListener<ContextRefreshedEvent> {
private Logger logger = LoggerFactory.getLogger(this.getClass());
public void run() {
logger.info(">> 启动定时任务...");
// QuartzManager.startJobs();
QuartzManager.addJob(
"SpecialPeriodJob",
"SpecialPeriodJobGroup",
"SpecialPeriodTrigger",
"SpecialPeriodTriggerGroup",
ButtonTimerJob.class,
"0/30 * * * * ?");
}
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
System.out.println("启动定时任务......");
run();
}
}
重点:11张表
qrtz_blob_triggers : 以Blob 类型存储的触发器。
qrtz_calendars:存放日历信息, quartz可配置一个日历来指定一个时间范围。
qrtz_cron_triggers:存放cron类型的触发器。
qrtz_fired_triggers:存放已触发的触发器。
qrtz_job_details:存放一个jobDetail信息。
qrtz_locks: 存储程序的悲观锁的信息(假如使用了悲观锁)。
qrtz_paused_trigger_graps:存放暂停掉的触发器。
qrtz_scheduler_state:调度器状态。
qrtz_simple_triggers:简单触发器的信息。
qrtz_trigger_listeners:触发器监听器。
qrtz_triggers:触发器的基本信息。
cron方式需要用到的4张数据表:
qrtz_triggers,qrtz_cron_triggers,qrtz_fired_triggers,qrtz_job_details
Quartz核心是调度器,还采用多线程管理。quartz框架是原生就支持分布式定时任务的。
1.持久化任务(把调度信息存储到数据):当应用程序停止运行时,所有调度信息不被丢失,当你重新启动时,调度信息还存在,这就是持久化任务。
2.集群和分布式处理:当在集群环境下,当有配置Quartz的多个客户端时(节点),
采用Quartz的集群和分布式处理时,我们要了解几点好处
1) 一个节点无法完成的任务,会被集群中拥有相同的任务的节点取代执行。
2) Quartz调度是通过触发器的类别来识别不同的任务,在不同的节点定义相同的触发器的类别,这样在集群下能稳定的运行,一个节点无法完成的任务,会被集群中拥有相同的任务的节点取代执行。
3)分布式 体现在 当相同的任务定时在一个时间点,在那个时间点,不会被两个节点同时执行。
4、Quartz 在集群如何工作:
**
上图三个节点在数据库中都拥有同一份Job定义,如果某一个节点失效,那么Job会在其他节点上执行。由于三个节点上的Job执行代码是一样的,那么怎么保证只有在一台机器上触发呢?答案是使用了数据库锁。在quartz的集群解决方案里有张表scheduler_locks,quartz采用了悲观锁的方式对triggers表进行行加锁,以保证任务同步的正确性。一旦某一个节点上面的线程获取了该锁,那么这个Job就会在这台机器上被执行,同时这个锁就会被这台机器占用。同时另外一台机器也会想要触发这个任务,但是锁已经被占用了,就只能等待,直到这个锁被释放。之后会看trigger状态,如果已经被执行了,则不会执行了。
简单地说,quartz的分布式调度策略是以数据库为边界资源的一种异步策略。各个调度器都遵守一个基于数据库锁的操作规则从而保证了操作的唯一性。同时多个节点的异步运行保证了服务的可靠。但这种策略有自己的局限性:集群特性对于高CPU使用率的任务效果很好,但是对于大量的短任务,各个节点都会抢占数据库锁,这样就出现大量的线程等待资源。这种情况随着节点的增加会越来越严重。
另外,quartz的分布式只是解决了高可用的问题,并没有解决任务分片的问题,还是会有单机处理的极限。