1. 摘要
常见的业务场景有以下几种:
- 某拼购电商平台在每天上午 9 点,下午 3 点和晚上 9 点发放优惠券。
- 某银行系统需要在信用卡到期还款日的前三天进行短信提醒。
- 12306 会在春运期间设置定时进行分批放票。
-
2. 简介
任务调度是指系统为了按预定的时间和频次自动完成特定任务的过程。任务调度JDK的几种实现方式如下:
多线程。
通过开启一个线程,while循环执行业务逻辑,让线程sleep休眠,达到任务间隔执行。代码清单如下图所示:
- Timer类。
Timer的优点在于简单易用,每个Timer对应一个线程,因此可以同时启动多个Timer并行执行多个任务,同一个 Timer 中的任务是串行执行。
- ScheduledExecutorService接口。
Java 5推出了基于线程池设计的ScheduledExecutorService,其设计思想是,每一个被调度的任务都会由线程池中一个线程去执行,因此任务是并发执行的,相互之间不会受到干扰。
从以上可以看出,无论是Thread,还是Timer,或是ScheduledExecutorService都只能提供基于开始时间与重复间隔的任务调度,而不能胜任更加复杂的任务调度需求。比如,设置每日凌晨零点执行任务、复杂调度任务的管理、任务间传递数据等等。
3. 分布式
当前软件的架构正在逐步转变为分布式系统架构,将传统的单体架构转为微服务架构,服务之间通过网络交互来完成业务处理,如下图所示,电商系统为分布式架构,由用户服务、商品服务等组成:
4. 分布式任务调度
通常任务调度的程序是集成在Spring Boot应用中的,比如:优惠卷服务中包括了定时发放优惠卷的的任务调度程序,通知推送服务中包括了定时向用户发通知的任务调度程序等,由于采用分布式系统架构,一个服务往往会部署多个冗余实例来进行业务处理, 像这种分布式系统环境下运行的任务调度,我们称之为分布式任务调度,如下图:
5. 分布式调度要实现目标
不管是任务调度程序集成在应用程序中,还是单独构建的任务调度系统,如果采用分布式调度任务的方式就相当于 将任务调度程序分布式构建,这样就可以具有分布式系统的特点,并且提高任务的调度处理能力:
- 并行任务调度
并行任务调度实现靠多线程,如果有大量任务需要调度,此时光靠多线程就会有瓶颈了,因为一台计算机CPU的处理能力是有限的。
如果将任务调度程序分布式部署,每个结点还可以部署为集群,这样就可以让多台计算机共同去完成任务调度,我 们可以将任务分割为若干个分片,由不同的实例并行执行,来提高任务调度的处理效率。
- 高可用
若某一个实例宕机,不影响其他实例来执行任务。
- 弹性扩容
当集群中增加实例就可以提高并执行任务的处理效率。
- 任务管理与监测
对系统中存在的所有定时任务进行统一的管理及监测。让开发人员及运维人员能够时刻了解任务执行情况,从而做出快速的应急处理响应。
- 避免任务重复执行
当任务调度以集群方式部署,同一个任务调度可能会执行多次,比如在上面提到的电商系统中定时发放优惠券的服务,就会对同一用户发放多次优惠券,对公司造成很多损失,所以我们需要控制相同的任务在多个运行实例上只执行一次,考虑采用下边的方法:
- 分布式锁
多个实例在任务执行前首先需要获取锁,如果获取失败那么久证明有其他服务已经再运行,如果获取成功那么证明没有服务在运行定时任务,那么就可以执行。
- ZooKeeper选举
利用ZooKeeper对Leader实例执行定时任务,有其他业务已经使用了ZooKeeper,那么执行定时任务的时候判断自己是否是Leader,如果不是则不执行,如果是则执行业务逻辑,这样也能达到我们的目的。
Elastic-Job 框架实现了以上的分布式任务调度目标,功能列表如下图所示:
备注:具体前参考 Elastic-Job 官网(http://elasticjob.io)
6. 动态任务发布实现
- 搭建 SpringBoot 应用,可参考【小白都能看得懂的服务调用链路追踪设计与实现】这篇的第三节的开发环境准备和工程初始化步骤等。
在 pom.xml 文件加入 elastic-job 任务调度的依赖:
<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>2.1.5</version></dependency>
整体的pom文件内容如下图所示:

工程的结构如下图:

- 编码实现如下:
elastic-job的Zookeeper注册中心配置类ElasticJobRegistryCenterConfig,代码如下:
测试自定义调度任务发布的控制层DynamicController,代码如下:
自定义的调度任务CustomJob,实现了SimpleJob接口,即elastic-job框架定义的job,代码如下:
动态任务配置与启动DynamicJobService,这个类很重要,它主要负责动态任务的详细参数配置及动态任务的启动,代码如下:
其他的类比较简单,不是本文的重点,MyService类 的代码如下:
package cn.smart4j.giserway.dynamic.task.service;import cn.smart4j.giserway.dynamic.task.dao.MyDao;import cn.smart4j.giserway.dynamic.task.vo.BookVO;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.util.List;/*** @Description: MyService* @Param:* @return:* @Author:* @Date: 2020/4/28*/@Servicepublic class MyService {@AutowiredMyDao myDao;public List<BookVO> queryBooks() {return myDao.queryBooks();// return SpringContextUtils.getBean(MyDao.class).queryBooks();}}
MyDao类代码如下:
package cn.smart4j.giserway.dynamic.task.dao;
import cn.smart4j.giserway.dynamic.task.vo.BookVO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import java.util.List;
/**
* @Description: 数据访问层
* @Param:
* @return:
* @Author:
* @Date: 2020/4/28
*/
@Repository
public class MyDao {
@Autowired
JdbcTemplate jdbcTemplate;
public List<BookVO> queryBooks() {
String sql = "select * from t_book";
return jdbcTemplate.query(sql, new BeanPropertyRowMapper<>(BookVO.class));
// return SpringContextUtils.getBean(JdbcTemplate.class).query(sql, new BeanPropertyRowMapper<>(BookVO.class));
}
}
BookVO类代码如下:
package cn.smart4j.giserway.dynamic.task.vo;
/**
* @Description: BookVO
* @Param:
* @return:
* @Author:
* @Date: 2020/4/28
*/
public class BookVO {
/**
* 书籍id
*/
private int id;
/**
* 书籍名称
*/
private String bookName;
/**
* 价格
*/
private double money;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getBookName() {
return bookName;
}
public void setBookName(String bookName) {
this.bookName = bookName;
}
public double getMoney() {
return money;
}
public void setMoney(double money) {
this.money = money;
}
@Override
public String toString() {
return "BookVO{" +
"id=" + id +
", bookName='" + bookName + '\'' +
", money=" + money +
'}';
}
}
application.properties配置文件如下。
server.port=18108 spring.application.name = dynamic-task #logging.level.root = info # 数据源 spring.datasource.driver‐class‐name = com.mysql.cj.jdbc.Driver spring.datasource.url = jdbc:mysql://localhost:3306/elastic_job_demo?useUnicode=true spring.datasource.username = root spring.datasource.password = root # zk地址 registry.serverList = localhost:2181 # 命名空间 registry.namespace = dynamic-job启动本地的 Zookeeper 服务,如下:
zkServer start下面可以开始测试动态任务的发布了,运行 SpringBoot 启动类,如下所示:

- 通过 Postman 请求,结果如下:


程序运行报空指针异常,出现在自定义的调度任务实现类CustomJob的28行,通过Debug打断点可知,这里的mySevice没有使用容器中注入的,即@Autowired 注入不起作用,那直接换成new MyService().queryBooks(); 试试,再启动服务后,同样Postman请求,结果如下: 
在MyService中调dao层时报myDao空指针异常,再修改为new MyDao().queryBooks(); 试试,再启动服务后,同样Postman请求,结果如下:
同样还是无法发布调度任务成功,其实仔细观察发现,这里的CustomJob是我们通过调接口,Class.forName 装载cn.smart4j.giserway.dynamic.task.job.CustomJob类后,通过newInstance()获取SimpleJob实例的,这个实例没有注入容器,只存在于JVM中,Spring容器没有注入这个Bean,所以这里的CustomJob中注入的MyService无法使用,其他依赖注入也都失效了。为了解决通过@Autowired注入后获取不到容器中的Bean,我们可以使用 Spring上下文获取容器中已经注入的Bean,即实现ApplicationContextAware接口,提取获取容器中的Bean对象工具类SpringContextUtils,代码如下: 
- 把有关使用自动注入的地方修改为通过SpringContextUtils.getBean()获取bean对象即可,细心的小伙伴可能注意到报空指针的类中代码注释掉的部分,即是最终的代码实现,修改如下:



- 再次启动服务,测试结果如下:

说明自定义调度任务 CustomJob 动态发布成功。
- 可以通过 ZooInspector 工具查看,任务详情,如下截图所示:


连接Zookeeper服务,查看调度任务相关信息,包括命名空间、任务名称、任务选举的leader、config任务配置信息、任务实例、任务分片等。如下图所示:
