轻量级分布式任务调度框架(Light Task Schedule)

LTS简介

LTS(light-task-scheduler)主要用于解决分布式任务调度问题,支持实时任务定时任务Cron任务有较好的伸缩性,扩展性,健壮稳定性而被多家公司使用,同时也希望开源爱好者一起贡献。

LTS框架概况

LTS 四种节点:

  • JobClient:主要负责提交任务, 并接收任务执行反馈结果。
  • JobTracker:负责接收并分配任务,任务调度。
  • TaskTracker:负责执行任务,执行完反馈给JobTracker。
  • LTS-Admin:主要负责节点管理,任务队列管理,监控管理等。

详解:
其中JobClient,JobTracker,TaskTracker节点都是无状态的。 可以部署多个动态的进行删减,来实现负载均衡,实现更大的负载量, 并且框架采用FailStore策略使LTS具有很好的容错能力
LTS注册中心提供多种实现(Zookeeper,redis等),注册中心进行节点信息暴露,master选举。(Mongo or Mysql)存储任务队列和任务执行日志, netty or mina做底层通信, 并提供多种序列化方式fastjson, hessian2, java等。

LTS支持任务类型

  • 实时任务:提交了之后立即就要执行的任务。
  • 定时任务:在指定时间点执行的任务,譬如 今天3点执行(单次)。
  • Cron任务:周期性任务,CronExpression,和quartz类似(但是不是使用quartz实现的)譬如 0 0/1 * ?

    支持动态修改任务参数,任务执行时间等设置,支持后台动态添加任务,支持Cron任务暂停,支持手动停止正在执行的任务(有条件),支持任务的监控统计,支持各个节点的任务执行监控,JVM监控等等.

LTS架构图

LTS - 图1
LTS - 图2

组件说明:

  1. 节点组

    NodeGroup:一个节点组等同于一个小的集群,同一个节点组中的各个节点是对等的,等效的,对外提供相同的服务。

每个节点组中都有一个master节点,这个master节点是由LTS动态选出来的,当一个master节点挂掉之后,LTS会立马选出另外一个master节点,框架提供API监听接口给用户。

  1. FailStore

    顾名思义,这个主要是用于失败了存储的,主要用于节点容错,当远程数据交互失败之后,存储在本地,等待远程通信恢复的时候,再将数据提交。

FailStore主要用户JobClient的任务提交,TaskTracker的任务反馈,TaskTracker的业务日志传输的场景下。

FailStore目前提供几种实现:leveldb,rocksdb,berkeleydb,mapdb,ltsdb,用于可以自由选择使用哪种,用户也可以采用SPI扩展使用自己的实现。

LTS-Admin新版界面预览

目前后台带有由ztajy提供的一个简易的认证功能. 用户名密码在auth.cfg中,用户自行修改.
LTS - 图3

LTS特性

LTS对Spring的支持

LTS可以完全不用Spring框架,但是考虑到很用用户项目中都是用了Spring框架,所以LTS也提供了对Spring的支持,包括Xml和注解,引入lts-spring.jar即可。

业务日志记录器

在TaskTracker端提供了业务日志记录器,供应用程序使用,通过这个业务日志器,可以将业务日志提交到JobTracker,这些业务日志可以通过任务ID串联起来,可以在LTS-Admin中实时查看任务的执行进度。


SPI扩展支持

SPI扩展可以达到零侵入,只需要实现相应的接口,并实现即可被LTS使用,目前开放出来的扩展接口有对任务队列的扩展,用户可以不选择使用mysql或者mongo作为队列存储,也可以自己实现。对业务日志记录器的扩展,目前主要支持console,mysql,mongo,用户也可以通过扩展选择往其他地方输送日志。


故障转移

当正在执行任务的TaskTracker宕机之后,JobTracker会立马将分配在宕机的TaskTracker的所有任务再分配给其他正常的TaskTracker节点执行。

节点监控

可以对JobTracker,TaskTracker节点进行资源监控,任务监控等,可以实时的在LTS-Admin管理后台查看,进而进行合理的资源调配。

多样化任务执行结果支持

LTS框架提供四种执行结果支持EXECUTE_SUCCESS,EXECUTE_FAILED,EXECUTE_LATER,EXECUTE_EXCEPTION,并对每种结果采取相应的处理机制,譬如重试。
EXECUTE_SUCCESS: 执行成功,这种情况,直接反馈客户端(如果任务被设置了要反馈给客户端)。
EXECUTE_FAILED执行失败,这种情况,直接反馈给客户端,不进行重试。
EXECUTE_LATER稍后执行(需要重试),这种情况,不反馈客户端,重试策略采用1min,2min,3min的策略,默认最大重试次数为10次,用户可以通过参数设置修改这个重试次数。
EXECUTE_EXCEPTION执行异常, 这中情况也会重试(重试策略,同上)


FailStore容错

采用FailStore机制来进行节点容错,Fail And Store,不会因为远程通信的不稳定性而影响当前应用的运行。具体FailStore说明,请参考概念说明中的FailStore说明。

LTS工作流程

下图是一个标准的实时任务执行流程。
LTS - 图4

解析:

  1. JobClient 提交一个 任务 给JobTracker, 这里我提供了两种客户端API, 一种是如果JobTracker 不存在或者提交失败,直接返回提交失败。另一种客户端是重试客户端, 如果提交失败,先存储到本地FailStore(可以使用NFS来达到同个节点组共享leveldb文件的目的,多线程访问,已经做了文件锁处理),返回 给客户端提交成功的信息,待JobTracker可用的时候,再将任务提交。
  2. JobTracker收到JobClient提交来的任务,将任务存入任务队列。JobTracker等待TaskTracker的Pull请求,然后将任务Push给TaskTracker去执行。
  3. TaskTracker收到JobTracker分发来的任务之后,然后从线程池中拿到一个线程去执行。执行完毕之后,再反馈任务执行结果给 JobTracker(成功or 失败[失败有失败错误信息]),如果发现JobTacker不可用,那么存储本地FailStore,等待TaskTracker可用的时候再反馈。反馈 结果的同时,询问JobTacker有没有新的任务要执行。
  4. JobTacker收到TaskTracker节点的任务结果信息。根据任务信息决定要不要反馈给客户端。不需要反馈的直接删除,需要反馈的,直接反馈,反馈失败进入FeedbackQueue, 等待重新反馈。
  5. JobClient收到任务执行结果,进行自己想要的逻辑处理。

    项目编译打包

    项目主要采用maven进行构建,目前提供shell脚本的打包。环境依赖:Java(jdk1.6+) Maven
    使用一般分为两种:

    (1) Maven构建

    可以通过maven命令将lts的jar包上传到本地仓库中。在父pom.xml中添加相应的repository,并用deploy命令上传即可。具体引用方式可以参考lts中的例子即可。

    (2) 直接Jar引用

    需要将lts的各个模块打包成单独的jar包,并且将所有lts依赖包引入。具体引用哪些jar包可以参考lts中的例子即可。

    JobTracker和LTS-Admin部署

    提供(cmd)windows和(shell)linux两种版本脚本来进行编译和部署:

    (1) 运行根目录下的shbuild.sh或build.cmd脚本,会在dist目录下生成lts-{version}-bin文件夹

(2) 下面是其目录结构,其中bin目录主要是JobTracker和LTS-Admin的启动脚本。jobtracker 中是 JobTracker的配置文件和需要使用到的jar包,lts-admin是LTS-Admin相关的war包和配置文件。

lts-{version}-bin的文件结构

  1. -- lts-${version}-bin
  2. |-- bin
  3. | |-- jobtracker.cmd
  4. | |-- jobtracker.sh
  5. | |-- lts-admin.cmd
  6. | |-- lts-admin.sh
  7. | |-- lts-monitor.cmd
  8. | |-- lts-monitor.sh
  9. | |-- tasktracker.sh
  10. |-- conf
  11. | |-- log4j.properties
  12. | |-- lts-admin.cfg
  13. | |-- lts-monitor.cfg
  14. | |-- readme.txt
  15. | |-- tasktracker.cfg
  16. | |-- zoo
  17. | |-- jobtracker.cfg
  18. | |-- log4j.properties
  19. | |-- lts-monitor.cfg
  20. |-- lib
  21. | |-- *.jar
  22. |-- war
  23. |-- jetty
  24. | |-- lib
  25. | |-- *.jar
  26. |-- lts-admin.war

(3) JobTracker启动。如果你想启动一个节点,直接修改下conf/zoo下的配置文件,然后运行 sh jobtracker.sh zoo start即可,如果你想启动两个JobTracker节点,那么你需要拷贝一份zoo,譬如命名为zoo2,修改下zoo2下的配置文件,然后运行sh jobtracker.sh zoo2 start即可。logs文件夹下生成jobtracker-zoo.out日志。
(4)LTS-Admin启动,修改conf/lts-monitor.cfg和conf/lts-admin.cfg下的配置,然后运行bin下的sh lts-admin.sh或lts-admin.cmd脚本即可。logs文件夹下会生成lts-admin.out日志,启动成功在日志中会打印出访问地址,用户可以通过这个访问地址访问了。

JobClient(部署)使用

需要引入lts的jar包有lts-jobclient-{version}.jar,lts-core-{version}.jar及其它第三方依赖jar。

(1) API方式启动

  1. JobClient jobClient = new RetryJobClient();
  2. jobClient.setNodeGroup("test_jobClient");
  3. jobClient.setClusterName("test_cluster");
  4. jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181");
  5. jobClient.start();
  6. // 提交任务
  7. Job job = new Job();
  8. job.setTaskId("3213213123");
  9. job.setParam("shopId", "11111");
  10. job.setTaskTrackerNodeGroup("test_trade_TaskTracker");
  11. // job.setCronExpression("0 0/1 * * * ?"); // 支持 cronExpression表达式
  12. // job.setTriggerTime(new Date()); // 支持指定时间执行
  13. Response response = jobClient.submitJob(job);

(2) Spring XML方式启动

  1. <bean id="jobClient" class="com.github.ltsopensource.spring.JobClientFactoryBean">
  2. <property name="clusterName" value="test_cluster"/>
  3. <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
  4. <property name="nodeGroup" value="test_jobClient"/>
  5. <property name="masterChangeListeners">
  6. <list>
  7. <bean class="com.github.ltsopensource.example.support.MasterChangeListenerImpl"/>
  8. </list>
  9. </property>
  10. <property name="jobFinishedHandler">
  11. <bean class="com.github.ltsopensource.example.support.JobFinishedHandlerImpl"/>
  12. </property>
  13. <property name="configs">
  14. <props>
  15. <!-- 参数 -->
  16. <prop key="job.fail.store">leveldb</prop>
  17. </props>
  18. </property>
  19. </bean>

(3) Spring 全注解方式

  1. @Configuration
  2. public class LTSSpringConfig {
  3. @Bean(name = "jobClient")
  4. public JobClient getJobClient() throws Exception {
  5. JobClientFactoryBean factoryBean = new JobClientFactoryBean();
  6. factoryBean.setClusterName("test_cluster");
  7. factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181");
  8. factoryBean.setNodeGroup("test_jobClient");
  9. factoryBean.setMasterChangeListeners(new MasterChangeListener[]{
  10. new MasterChangeListenerImpl()
  11. });
  12. Properties configs = new Properties();
  13. configs.setProperty("job.fail.store", "leveldb");
  14. factoryBean.setConfigs(configs);
  15. factoryBean.afterPropertiesSet();
  16. return factoryBean.getObject();
  17. }
  18. }

TaskTracker(部署使用)

需要引入lts的jar包有lts-tasktracker-{version}.jar,lts-core-{version}.jar及其它第三方依赖jar。

定义自己的任务执行类

  1. public class MyJobRunner implements JobRunner {
  2. @Override
  3. public Result run(JobContext jobContext) throws Throwable {
  4. try {
  5. // TODO 业务逻辑
  6. // 会发送到 LTS (JobTracker上)
  7. jobContext.getBizLogger().info("测试,业务日志啊啊啊啊啊");
  8. } catch (Exception e) {
  9. return new Result(Action.EXECUTE_FAILED, e.getMessage());
  10. }
  11. return new Result(Action.EXECUTE_SUCCESS, "执行成功了,哈哈");
  12. }
  13. }

(1) API方式启动

  1. TaskTracker taskTracker = new TaskTracker();
  2. taskTracker.setJobRunnerClass(MyJobRunner.class);
  3. taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
  4. taskTracker.setNodeGroup("test_trade_TaskTracker");
  5. taskTracker.setClusterName("test_cluster");
  6. taskTracker.setWorkThreads(20);
  7. taskTracker.start();

(2) Spring XML方式启动

  1. <bean id="taskTracker" class="com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean" init-method="start">
  2. <property name="jobRunnerClass" value="com.github.ltsopensource.example.support.MyJobRunner"/>
  3. <property name="bizLoggerLevel" value="INFO"/>
  4. <property name="clusterName" value="test_cluster"/>
  5. <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
  6. <property name="nodeGroup" value="test_trade_TaskTracker"/>
  7. <property name="workThreads" value="20"/>
  8. <property name="masterChangeListeners">
  9. <list>
  10. <bean class="com.github.ltsopensource.example.support.MasterChangeListenerImpl"/>
  11. </list>
  12. </property>
  13. <property name="configs">
  14. <props>
  15. <prop key="job.fail.store">leveldb</prop>
  16. </props>
  17. </property>
  18. </bean>

(3) Spring注解方式启动

  1. @Configuration
  2. public class LTSSpringConfig implements ApplicationContextAware {
  3. private ApplicationContext applicationContext;
  4. @Override
  5. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  6. this.applicationContext = applicationContext;
  7. }
  8. @Bean(name = "taskTracker")
  9. public TaskTracker getTaskTracker() throws Exception {
  10. TaskTrackerAnnotationFactoryBean factoryBean = new TaskTrackerAnnotationFactoryBean();
  11. factoryBean.setApplicationContext(applicationContext);
  12. factoryBean.setClusterName("test_cluster");
  13. factoryBean.setJobRunnerClass(MyJobRunner.class);
  14. factoryBean.setNodeGroup("test_trade_TaskTracker");
  15. factoryBean.setBizLoggerLevel("INFO");
  16. factoryBean.setRegistryAddress("zookeeper://127.0.0.1:2181");
  17. factoryBean.setMasterChangeListeners(new MasterChangeListener[]{
  18. new MasterChangeListenerImpl()
  19. });
  20. factoryBean.setWorkThreads(20);
  21. Properties configs = new Properties();
  22. configs.setProperty("job.fail.store", "leveldb");
  23. factoryBean.setConfigs(configs);
  24. factoryBean.afterPropertiesSet();
  25. // factoryBean.start();
  26. return factoryBean.getObject();
  27. }
  28. }

参数说明

参数说明

使用建议

一般在一个JVM中只需要一个JobClient实例即可,不要为每种任务都新建一个JobClient实例,这样会大大的浪费资源,因为一个JobClient可以提交多种任务。相同的一个JVM一般也尽量保持只有一个TaskTracker实例即可,多了就可能造成资源浪费。当遇到一个TaskTracker要运行多种任务的时候,请参考下面的 “一个TaskTracker执行多种任务”。

(1) 一个TaskTracker执行多种任务

有的时候,业务场景需要执行多种任务,有些人会问,是不是要每种任务类型都要一个TaskTracker去执行。我的答案是否定的,如果在一个JVM中,最好使用一个TaskTracker去运行多种任务,因为一个JVM中使用多个TaskTracker实例比较浪费资源(当然当你某种任务量比较多的时候,可以将这个任务单独使用一个TaskTracker节点来执行)。那么怎么才能实现一个TaskTracker执行多种任务呢。下面是我给出来的参考例子。

  1. /**
  2. * 总入口,在 taskTracker.setJobRunnerClass(JobRunnerDispatcher.class)
  3. * JobClient 提交 任务时指定 Job 类型 job.setParam("type", "aType")
  4. */
  5. public class JobRunnerDispatcher implements JobRunner {
  6. private static final ConcurrentHashMap<String/*type*/, JobRunner>
  7. JOB_RUNNER_MAP = new ConcurrentHashMap<String, JobRunner>();
  8. static {
  9. JOB_RUNNER_MAP.put("aType", new JobRunnerA()); // 也可以从Spring中拿
  10. JOB_RUNNER_MAP.put("bType", new JobRunnerB());
  11. }
  12. @Override
  13. public Result run(JobContext jobContext) throws Throwable {
  14. Job job = jobContext.getJob();
  15. String type = job.getParam("type");
  16. return JOB_RUNNER_MAP.get(type).run(job);
  17. }
  18. }
  19. class JobRunnerA implements JobRunner {
  20. @Override
  21. public Result run(JobContext jobContext) throws Throwable {
  22. // TODO A类型Job的逻辑
  23. return null;
  24. }
  25. }
  26. class JobRunnerB implements JobRunner {
  27. @Override
  28. public Result run(JobContext jobContext) throws Throwable {
  29. // TODO B类型Job的逻辑
  30. return null;
  31. }
  32. }

(2) TaskTracker的JobRunner测试

一般在编写TaskTracker的时候,只需要测试JobRunner的实现逻辑是否正确,又不想启动LTS进行远程测试。为了方便测试,LTS提供了JobRunner的快捷测试方法。自己的测试类集成com.github.ltsopensource.tasktracker.runner.JobRunnerTester即可,并实现initContextnewJobRunner方法即可。如lts-examples中的例子:

  1. public class TestJobRunnerTester extends JobRunnerTester {
  2. public static void main(String[] args) throws Throwable {
  3. // Mock Job 数据
  4. Job job = new Job();
  5. job.setTaskId("2313213");
  6. JobContext jobContext = new JobContext();
  7. jobContext.setJob(job);
  8. JobExtInfo jobExtInfo = new JobExtInfo();
  9. jobExtInfo.setRetry(false);
  10. jobContext.setJobExtInfo(jobExtInfo);
  11. // 运行测试
  12. TestJobRunnerTester tester = new TestJobRunnerTester();
  13. Result result = tester.run(jobContext);
  14. System.out.println(JSON.toJSONString(result));
  15. }
  16. @Override
  17. protected void initContext() {
  18. // TODO 初始化Spring容器
  19. }
  20. @Override
  21. protected JobRunner newJobRunner() {
  22. return new TestJobRunner();
  23. }
  24. }

(3) Spring Quartz Cron任务无缝接入

对于Quartz的Cron任务只需要在Spring配置中增加一下代码就可以接入LTS平台

  1. <bean class="com.github.ltsopensource.spring.quartz.QuartzLTSProxyBean">
  2. <property name="clusterName" value="test_cluster"/>
  3. <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
  4. <property name="nodeGroup" value="quartz_test_group"/>
  5. </bean>

(4) Spring Boot 支持

  1. @SpringBootApplication
  2. @EnableJobTracker // 启动JobTracker
  3. @EnableJobClient // 启动JobClient
  4. @EnableTaskTracker // 启动TaskTracker
  5. @EnableMonitor // 启动Monitor
  6. public class Application {
  7. public static void main(String[] args) {
  8. SpringApplication.run(Application.class, args);
  9. }
  10. }

剩下的就只是在application.properties中添加相应的配置就行了, 具体见lts-example中的 com.github.ltsopensource.examples.springboot包下的例子