特别说明:本示例的主要目的仅仅是告诉大家如何使用LTS,所以偷了个懒,将所有节点都揉合到了一个工程,实际项目是分开部署的,因需而定。

整个工程其实很简单:(一定要先搞明白这个项目结构)
LTS简单集成springboot项目 - 图1

(1) 准备工作

  • 新建SpringBoot工程
  • 导入相应的依赖,完成项目pom文件

    (1.1) 项目依赖

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.springframework.boot</groupId>
    4. <artifactId>spring-boot-starter-web</artifactId>
    5. </dependency>
    6. <!--lts-->
    7. <dependency>
    8. <groupId>com.github.ltsopensource</groupId>
    9. <artifactId>lts</artifactId>
    10. <version>1.7.0</version>
    11. </dependency>
    12. <dependency>
    13. <groupId>io.netty</groupId>
    14. <artifactId>netty-all</artifactId>
    15. <version>4.1.25.Final</version>
    16. </dependency>
    17. <dependency>
    18. <groupId>com.alibaba</groupId>
    19. <artifactId>fastjson</artifactId>
    20. <version>1.2.58</version>
    21. </dependency>
    22. <dependency>
    23. <groupId>org.mapdb</groupId>
    24. <artifactId>mapdb</artifactId>
    25. <version>2.0-beta10</version>
    26. </dependency>
    27. <dependency>
    28. <groupId>com.101tec</groupId>
    29. <artifactId>zkclient</artifactId>
    30. <version>0.3</version>
    31. </dependency>
    32. <dependency>
    33. <groupId>com.alibaba</groupId>
    34. <artifactId>druid</artifactId>
    35. <version>1.0.14</version>
    36. </dependency>
    37. <dependency>
    38. <groupId>mysql</groupId>
    39. <artifactId>mysql-connector-java</artifactId>
    40. <version>5.1.26</version>
    41. </dependency>
    42. <dependency>
    43. <groupId>org.javassist</groupId>
    44. <artifactId>javassist</artifactId>
    45. <version>3.20.0-GA</version>
    46. </dependency>
    47. <dependency>
    48. <groupId>org.projectlombok</groupId>
    49. <artifactId>lombok</artifactId>
    50. </dependency>
    51. </dependencies>

注意:

如果你用的是Redis作为注册中心,mongodb作为任务队列,那么请引入相应的依赖,我这里用的是Zookeeper和mysql。

(1.2) 项目配置文件

配置文件:

  1. # 应用名称
  2. spring.application.name=lts
  3. server.port=8081
  4. ##########################################
  5. # jobclient->负责提交任务以及接收任务执行结果 #
  6. ##########################################
  7. #集群名称
  8. lts.jobclient.cluster-name=test_cluster
  9. #注册中心
  10. lts.jobclient.registry-address=zookeeper://127.0.0.1:2181
  11. #JobClient节点组名称
  12. lts.jobclient.node-group=test_jobClient
  13. #是否使用RetryClient
  14. lts.jobclient.use-retry-client=true
  15. #失败存储,用于服务正常后再次执行(容错处理)
  16. lts.jobclient.configs.job.fail.store=mapdb
  17. #######################################
  18. # jobtracker->负责调度任务 接收并分配任务 #
  19. #######################################
  20. lts.jobtracker.cluster-name=test_cluster
  21. lts.jobtracker.listen-port=35001
  22. lts.jobtracker.registry-address=zookeeper://127.0.0.1:2181
  23. lts.jobtracker.configs.job.logger=mysql
  24. lts.jobtracker.configs.job.queue=mysql
  25. lts.jobtracker.configs.jdbc.url=jdbc:mysql://rm-2ze29e1gr6iu0p31oko.mysql.rds.aliyuncs.com:3306/lts?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC
  26. lts.jobtracker.configs.jdbc.username=root
  27. lts.jobtracker.configs.jdbc.password=自己mysql的密码
  28. ###########################################################
  29. # tasktracker->负责执行任务 执行完任务将执行结果反馈给JobTracker #
  30. ###########################################################
  31. #lts.tasktracker.cluster-name=test_cluster
  32. lts.tasktracker.registry-address=zookeeper://127.0.0.1:2181
  33. #TaskTracker节点组默认是64个线程用于执行任务
  34. #lts.tasktracker.work-threads=64
  35. lts.tasktracker.node-group=test_trade_TaskTracker
  36. #lts.tasktracker.dispatch-runner.enable=true
  37. #lts.tasktracker.dispatch-runner.shard-value=taskId
  38. lts.tasktracker.configs.job.fail.store=mapdb
  39. ################################################################
  40. # jmonitor->负责收集各个节点的监控信息,包括任务监控信息,节点JVM监控信息 #
  41. ################################################################
  42. lts.monitor.cluster-name=test_cluster
  43. lts.monitor.registry-address=zookeeper://127.0.0.1:2181
  44. lts.monitor.configs.job.logger=mysql
  45. lts.monitor.configs.job.queue=mysql
  46. lts.monitor.configs.jdbc.url=jdbc:mysql://rm-2ze29e1gr6iu0p31oko.mysql.rds.aliyuncs.com:3306/lts?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC
  47. lts.monitor.configs.jdbc.username=root
  48. lts.monitor.configs.jdbc.password=自己mysql的密码
  49. ################################################################
  50. ################ log4j.properties日志配置文件 ####################
  51. ################################################################
  52. log4j.rootLogger=INFO,stdout
  53. log4j.appender.stdout.Threshold=INFO
  54. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  55. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  56. log4j.appender.stdout.layout.ConversionPattern=%d [%t] (%F:%L) %-5p %c %x - %m%n

(1.3) 数据库操作

在mysql数据库中创建一个新的数据库lts即可,项目初始化会自动创建相应表格
完成以上准备工作之后,接着便是实现任务提交以及任务执行了。
另外说明:以上配置信息,均可以在官方示例lts-example找到。

(2) 项目启动类设置

开启相应注解即可:

  1. @SpringBootApplication
  2. @EnableJobClient //JobClient
  3. @EnableTaskTracker //TaskTracker
  4. @EnableJobTracker //JobTracker
  5. @EnableMonitor //Monitor
  6. public class LtstestApplication {
  7. public static void main(String[] args) {
  8. SpringApplication.run(LtstestApplication.class, args);
  9. }
  10. }

(3)JobClient提交任务

关于**Jobclient使用的官方建议**

一般在一个JVM中只需要一个JobClient实例即可,不要为每种任务都新建一个JobClient实例,这样会大大的浪费资源,因为一个JobClient可以提交多种任务。

本示例中,我直接写在了TestController中,模拟了提交两个不同的任务:

  1. @Autowired
  2. private JobClient jobClient;
  3. @GetMapping("test01")
  4. public Map<String, Object> test01() {
  5. //模拟提交一个任务
  6. Job job = new Job();
  7. job.setTaskId("task-AAAAAAAAAAAAAAA");
  8. job.setCronExpression("0/3 * * * * ?");
  9. //设置任务类型 区分不同的任务 执行不同的业务逻辑
  10. job.setParam("type", "aType");
  11. job.setNeedFeedback(true);
  12. //任务触发时间 如果设置了 cron 则该设置无效
  13. // job.setTriggerTime(DateUtils.addDay(new Date(), 1).getTime());
  14. //任务执行节点组
  15. job.setTaskTrackerNodeGroup("test_trade_TaskTracker");
  16. //当任务队列中存在这个任务的时候,是否替换更新
  17. job.setReplaceOnExist(false);
  18. Map<String, Object> submitResult = new HashMap<String, Object>(4);
  19. try {
  20. //任务提交返回值 response
  21. Response response = jobClient.submitJob(job);
  22. submitResult.put("success", response.isSuccess());
  23. submitResult.put("msg", response.getMsg());
  24. submitResult.put("code", response.getCode());
  25. } catch (Exception e) {
  26. log.error("提交任务失败", e);
  27. throw new RuntimeException("提交任务失败");
  28. }
  29. return submitResult;
  30. }
  31. @GetMapping("test02")
  32. public Map<String, Object> test02() {
  33. //模拟提交一个任务
  34. Job job = new Job();
  35. job.setTaskId("task-BBBBBBBBBBBBBBB");
  36. job.setCronExpression("0/6 * * * * ?");
  37. //设置任务类型 区分不同的任务 执行不同的业务逻辑
  38. job.setParam("type", "bType");
  39. job.setNeedFeedback(true);
  40. //任务触发时间 如果设置了 cron 则该设置无效
  41. // job.setTriggerTime(DateUtils.addDay(new Date(), 1).getTime());
  42. //任务执行节点组
  43. job.setTaskTrackerNodeGroup("test_trade_TaskTracker");
  44. //当任务队列中存在这个任务的时候,是否替换更新
  45. job.setReplaceOnExist(false);
  46. Map<String, Object> submitResult = new HashMap<String, Object>(4);
  47. try {
  48. Response response = jobClient.submitJob(job);
  49. submitResult.put("success", response.isSuccess());
  50. submitResult.put("msg", response.getMsg());
  51. submitResult.put("code", response.getCode());
  52. } catch (Exception e) {
  53. log.error("提交任务失败", e);
  54. throw new RuntimeException("提交任务失败");
  55. }
  56. return submitResult;
  57. }

注意:JobClient我们可以直接引入,然后构建一个Job,通过JobClient进行提交。任务提交之后,JobTracker会对任务进行分发,分发方式有如下两种:

TaskTracker会定时发送pull请求给JobTracker, 默认1s一次, 在发送pull请求之前,会检查当前TaskTracker是否有可用的空闲线程,如果没有则不会发送pull请求,同时也会检查本节点机器资源是否足够,主要是检查cpu和内存使用率,默认超过90%就不会发送pull请求,当JobTracker收到TaskTracker节点的pull请求之后,再从任务队列中取出相应的已经到了执行时间点的任务 push给TaskTracker,这里push的个数等于TaskTracker的空余线程数。

还有一种途径是,每个TaskTracker线程处理完当前任务之后,在反馈给JobTracker的时候,同时也会询问JobTracker是否有新的任务需要执行,如果有JobTracker会同时返回给TaskTracker一个新的任务执行。所以在任务量足够大的情况下,每个TaskTracker基本上是满负荷的执行的。

(4) TaskTracker执行任务

关于TaskTracker使用的官方建议

一个JVM一般也尽量保持只有一个TaskTracker实例即可,多了就可能造成资源浪费。
当遇到一个TaskTracker要运行多种任务的时候,在一个JVM中,最好使用一个TaskTracker去运行多种任务,因为一个JVM中使用多个TaskTracker实例比较浪费资源(当然当你某种任务量比较多的时候,可以将这个任务单独使用一个TaskTracker节点来执行)。

上面提交了两个任务,分别是任务A任务B,所以这里演示的是一个TaskTracker执行多种不同的任务
任务的执行必须实现JobRunner接口,如下任务A:

  1. public class JobRunnerA implements JobRunner {
  2. @Override
  3. public Result run(JobContext jobContext) throws Throwable {
  4. // TODO A类型Job的逻辑
  5. System.out.println("我是Runner A");
  6. return null;
  7. }
  8. }

任务B同理,就不重复贴出代码了
需要指出的是,在SpringBoot中,任务的执行需要添加**@JobRunner4TaskTracker注解**,但是有且**只能有一个@JobRunner4TaskTracker注解**。所以,对于同一个TaskTracker执行不同的任务,需要进行调度执行,如下:

  1. /**
  2. * 总入口,在 taskTracker.setJobRunnerClass(JobRunnerDispatcher.class)
  3. * JobClient 提交 任务时指定 Job 类型 job.setParam("type", "aType")
  4. */
  5. @JobRunner4TaskTracker
  6. public class JobRunnerDispatcher implements JobRunner {
  7. private static final Logger log = LoggerFactory.getLogger(JobRunnerDispatcher.class);
  8. private static final ConcurrentHashMap<String/*type*/, JobRunner>
  9. JOB_RUNNER_MAP = new ConcurrentHashMap<String, JobRunner>();
  10. static {
  11. JOB_RUNNER_MAP.put("aType", new JobRunnerA()); // 也可以从Spring中拿
  12. JOB_RUNNER_MAP.put("bType", new JobRunnerB());
  13. }
  14. @Override
  15. public Result run(JobContext jobContext) throws Throwable {
  16. Job job = jobContext.getJob();
  17. String type = job.getParam("type");
  18. return JOB_RUNNER_MAP.get(type).run(jobContext);
  19. }
  20. }

说明:
JobRunnerDispatcher 类同样实现了JobRunner接口,并且添加了@JobRunner4TaskTracker注解,表示该类才是真正会执行任务的地方。通过该类,实现不同的任务执行。
实际上,到这里基本整个LTS任务从提交到执行就已经完成了,也就是简单的集成完成了。
可以直接启动项目了

(5) master节点监听以及任务完成处理类

这个不必多说,直接看代码好了(来自lts-example):

  1. /**
  2. * 主节点监听
  3. */
  4. @MasterNodeListener
  5. public class MasterNodeChangeListener implements MasterChangeListener {
  6. private static final Logger log = LoggerFactory.getLogger(MasterNodeChangeListener.class);
  7. /**
  8. * @param master master节点
  9. * @param isMaster 表示当前节点是不是master节点
  10. */
  11. @Override
  12. public void change(Node master, boolean isMaster) {
  13. // 一个节点组master节点变化后的处理 , 譬如我多个JobClient, 但是有些事情只想只有一个节点能做。
  14. if (isMaster) {
  15. log.info("我变成了节点组中的master节点了, 恭喜, 我要放大招了");
  16. } else {
  17. log.info(StringUtils.format("master节点变成了{},不是我,我不能放大招,要猥琐", master));
  18. }
  19. }
  20. }
  1. /**
  2. * 任务完成处理类
  3. */
  4. @Component
  5. public class JobCompletedHandlerImpl implements JobCompletedHandler {
  6. private static final Logger log = LoggerFactory.getLogger(JobCompletedHandlerImpl.class);
  7. @Override
  8. public void onComplete(List<JobResult> jobResults) {
  9. //对任务执行结果进行处理 打印相应的日志信息
  10. if (CollectionUtils.isNotEmpty(jobResults)) {
  11. for (JobResult jobResult : jobResults) {
  12. String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
  13. log.info("任务执行完成taskId={}, 执行完成时间={}, job={}",
  14. jobResult.getJob().getTaskId(), time, jobResult.getJob().toString());
  15. }
  16. }
  17. }
  18. }

(6) Admin后台管理

(6.1) 下载源码

下载源码

(6.2) 修改配置文件

lts-admin项目下\lightTaskScheduler\lts-admin\src\main\resources
lts-admin.cfg配置文件:(主要修改mysql和zookeeper地址)

  1. // 后台的用户名密码
  2. console.username=admin
  3. console.password=admin
  4. # 注册中心地址,可以是zk,也可以是redis
  5. registryAddress=zookeeper://127.0.0.1:2181
  6. # registryAddress=redis://127.0.0.1:6379
  7. # 集群名称
  8. clusterName=test_cluster
  9. # zk客户端,可选值 zkclient, curator
  10. configs.zk.client=zkclient
  11. # ------ 这个是Admin存储数据的地方,也可以和JobQueue的地址一样 ------
  12. configs.jdbc.url=jdbc:mysql://127.0.0.1:3306/lts
  13. configs.jdbc.username=fiora
  14. configs.jdbc.password=fiora
  15. # admin 数据使用mysql 默认 mysql, 可以自行扩展
  16. jdbc.datasource.provider=mysql
  17. # 使用 可选值 fastjson, jackson
  18. # configs.lts.json=fastjson
  19. # 是否在admin启动monitor服务, monitor服务也可以单独启动
  20. lts.monitorAgent.enable=true
  21. #======================以下相关配置是JobTrackerJobQueueJobLogger的相关配置 要保持和JobTracker一样==========================
  22. ## (可选配置)jobT. 开头的, 因为JobTrackerAdmin可能使用的数据库不是同一个
  23. # LTS业务日志, 可选值 mysql, mongo
  24. jobT.job.logger=mysql
  25. # ---------以下是任务队列配置-----------
  26. # 任务队列,可选值 mysql, mongo
  27. jobT.job.queue=mysql
  28. # ------ 1. 如果是mysql作为任务队列 (如果不配置,表示和Admin的在一个数据库)------
  29. # jobT.jdbc.url=jdbc:mysql://127.0.0.1:3306/lts
  30. # jobT.jdbc.username=root
  31. # jobT.jdbc.password=root
  32. # ------ 2. 如果是mongo作为任务队列 ------
  33. # jobT.mongo.addresses=127.0.0.1:27017
  34. # jobT.mongo.database=lts
  35. # jobT.mongo.username=xxx #如果有的话
  36. # jobT.mongo.password=xxx #如果有的话
  37. # admin 数据使用mysql 默认 mysql, 可以自行扩展
  38. # jobT.jdbc.datasource.provider=mysql

lts-monitor.cfg,自行修改zookeeper和mysql配置

  1. # 注册中心地址,可以是zk,也可以是redis
  2. registryAddress=zookeeper://127.0.0.1:2181
  3. # registryAddress=redis://127.0.0.1:6379
  4. # 集群名称
  5. clusterName=test_cluster
  6. # LTS业务日志, 可选值 mysql, mongo
  7. configs.job.logger=mysql
  8. # zk客户端,可选值 zkclient, curator
  9. configs.zk.client=zkclient
  10. # ---------以下是任务队列配置-----------
  11. # 任务队列,可选值 mysql, mongo
  12. configs.job.queue=mysql
  13. # ------ 1. 如果是mysql作为任务队列 ------
  14. configs.jdbc.url=jdbc:mysql://127.0.0.1:3306/lts
  15. configs.jdbc.username=fiora
  16. configs.jdbc.password=fiora
  17. # ------ 2. 如果是mongo作为任务队列 ------
  18. configs.mongo.addresses=127.0.0.1:27017
  19. configs.mongo.database=lts
  20. # configs.mongo.username=xxx #如果有的话
  21. # configs.mongo.password=xxx #如果有的话
  22. # admin 数据使用mysql, h2 默认 h2 embedded
  23. jdbc.datasource.provider=mysql
  24. # 使用 可选值 fastjson, jackson
  25. # configs.lts.json=fastjson

(6.2) 编译打包

下载完的源码中,总目录下,有build.cmd脚本,执行该脚本
LTS简单集成springboot项目 - 图2
编译,获得lts-admin管理项目war

(6.3) 启动访问

war包扔给tomcat执行即可
访问路径:

http://localhost:8080/项目名/

LTS简单集成springboot项目 - 图3
注意:默认账户密码为admin,admin