创建时间: 2019/8/4 15:38
作者: sunpengwei1992@aliyun.com

开篇词

elastic-job-lite在项目中使用也有两个多月的时间了,从一开始搜索网上教程,参考别人使用方法,到后面阅读源码,理解其架构,实现。也写了几篇关于ejl的架构,流程处理,数据结构的文章,中间也经历了很多坑,也有了一些最佳实践,这篇文章写一下总结,但不是ejl的结尾,后续还会有ejl的文章,下面附上前几篇的链接:

  1. elastic-job-lite入门以及架构原理分析
  2. elastic-job-lite 既然去中心化,为何要选举主节点
  3. elastic-job-lite 数据结构分析

    使用心得

    EJL中有三种job作业类型,simple, dataflow, script,这三种任务类型都支持cron表达式定时调用,也支持页面单次触发。

simple是简单类型,提供接口如下,入参为当前分片的上下文,接口如下,这种简单类型接口适合处理一些业务逻辑不是很复杂的情况,且在短时间内能完成,ejl中还好,是本地调用,如果夸服务调用,那么时间一长,是很容易超时的,调度中心会认为处理失败,总的来说适合一些高频,低时的任务,且没有相对复杂的数据分离处理。

  1. void execute(ShardingContext var1);

dataflow是一种流失数据处理,fetchData入参为当前分片的上下文,processData入参为需要处理的数据,接口如下,这种流式处理接口一般适合大数据量计算,fetchData负责需要处理的数据,processData负责对抓取的数据计算生成结果存储,也适合一些大量任务的执行,将数据的准备和数据的处理进行了分离,从而在代码组织上比较清晰,这种类型的作业一般也会耗时比较长少则几分钟,多则几小时,但一般低频,一天一次,其任务触发后在本地机器while(list.size>0)执行,直到fetchData返回的数据集合大小为0,此处一般建议processData中可以开起多线程处理数据,加快执行处理时间。如果你需要处理的数据是断断续续的生成,那么这种是不适合的,因为fetchData的时候,可能你的数据还在生成中,导致任务已经完毕,这是可以考虑第一种。

  1. List<T> fetchData(ShardingContext var1);
  2. void processData(ShardingContext var1, List<T> var2);

scritp是一种脚本类型,其接口为一个接口,它适合一些脚本作业触发,也可以说是实现语言不是Java的一些脚本任务,比如shell写了一段业务逻辑,python写个一个汇总逻辑,需要每隔一小时执行一次,这是script就派上用场了,通过命令进行触发,在script的配置中有一个scriptCommandLine参数,这个参数就是配置执行这个脚本的命令,执行器会把当前上下文信息转化为json参数拼在命令行后面,ejl会定时通过命令触发你写的脚本,script脚本执行器如下:

  1. public final class ScriptJobExecutor extends AbstractElasticJobExecutor{
  2. protected void process(final ShardingContext shardingContext) {
  3. //scriptCommandLine从上下文中获取到,声明为final
  4. final String scriptCommandLine = shardingContext.get...
  5. this.executeScript(shardingContext, scriptCommandLine);
  6. }
  7. private void executeScript(final ShardingContext shardingContext, final String scriptCommandLine){
  8. CommandLine commandLine = CommandLine.parse(scriptCommandLine);
  9. commandLine.addArgument("上下文json串" false);
  10. try {
  11. //org.apache.commons.exec apache的包
  12. new DefaultExecutor().execute(commandLine);
  13. } catch (final IOException ex) {
  14. throw new JobConfigurationException("Execute script failure.", ex);
  15. }
  16. }
  17. }

上面介绍了三种作业类型的使用方法和场景,不管是以上那种作业类型,ejl都支持为其添加监听器,能够实现在作业开始执行前和执行完成后做一些准备和清理工作,并且是在分布式环境下进行,也就是保证所有节点还未开始执行时,就执行监听器中的befor方法,在所有节点都执行完成后,在执行after方法,这是非常有帮助的,通常我们都会用到,比如我们在任务开始前需要准备一些基础数据,在结束需要告知下游系统完成,其接口如下:

  1. public interface ElasticJobListener {
  2. void beforeJobExecuted(ShardingContexts var1);
  3. void afterJobExecuted(ShardingContexts var1);
  4. }

ejl是不支持作业连贯的,比如,我们的job依赖上游系统作业的完成,怎么做呢?

  1. 第一种方式在before中我们轮询监听上游作业状态,就不准备数据,这样fetchData就抓不到数据,作业就不执行,事实上我觉监听器中两个方法应该有个bool返回值更好
  2. 我们可以利用web提供的单词触发的api调用,我们不配置cron表达式,当上游系统作业完成时可以通过http请求调用我们的api启动我们的job

    坑和解决方案

    这里记录一下使用过程中踩过的坑

第一坑,在spring boot中假设我们开启两个任务,配置如下,这一看没什么问题,而且一般情况下程序员都习惯cv大法,然后改一改,但总会有一些忽略的地方,下面的配置两个job都有共同的jobConfig方法,这就是copy的结果,类名改了,Bean的Name改了,job名称也改了,就是方法名没改,并且编译阶段和启动都不会报错,但导致的问题就是只有一个job作业会注册成功,当时我们排查了好久才发现这个问题,因为以前也确实没遇到不同类中方法重名导致的问题,这是因为spting 会把方法名,返回值作为构造一个对象的key,此处方法返回类型和方法名一致,导致此对象只会被创建一次。

  1. public class JobConfig1 {
  2. @Resource
  3. private ZookeeperRegistryCenter regCenter;
  4. @Bean(name = "job1")
  5. public DataflowJob jobConifg() {
  6. return new Job1()();
  7. }
  8. }
  9. public class JobConfig2 {
  10. @Resource
  11. private ZookeeperRegistryCenter regCenter;
  12. @Bean(name = "job2")
  13. public DataflowJob jobConifg() {
  14. return new Job1();
  15. }
  16. }

第二坑,就是上面提到的监听器,问题是分布式环境下会出现多节点都调用befor和after的问题,其实也就是并发的问题,我们分析如下代码,通过分布式一致性协调服务判断是否所有节点开始,如果是,则执行before方法,如果不是,则锁住进入睡眠状态,最终当其中某一个节点执行完before方法清楚标记,唤醒睡眠的节点,问题出在哪里了?我们看isAllStarted()方法,就是判断节点总数是不是和zk上写入的节点数一样,也就是说假设1,2,3三个节点此时同一时刻都进入isAllStarted()方法,他们都往zk上写入了,都是有可能判断成功的,则会都进入before方法。

  1. 正确的做法是我们应该在doBefore…方法加一个分布式锁,即便isAllStarted都判断成功了,也有分布式锁保障,只有获得锁的方法才能执行doBefore…方法。
  2. 网上也有一种做法,就是在i’sAllStated()再加一个条件,只有指定的分片才能执行doBefore…方法,但这有一种问题,就是这个分片被禁用后,就会导致任务无法执行,因为所有分片都在睡眠中,都无法被唤醒。 ``` if (guaranteeService.isAllStarted()) {
    1. doBeforeJobExecutedAtLastStarted(shardingContexts);
    2. guaranteeService.clearAllStartedInfo();
    3. return;
    } long before = timeService.getCurrentMillis(); try { synchronized (startedWait) {
    1. startedWait.wait(startedTimeoutMilliseconds);
    } } catch (final InterruptedException ex) { Thread.interrupted(); }

public boolean isAllStarted() {
return this.jobNodeStorage.isJobNodeExisted(“guarantee/started”) && this.configService.load(false).getTypeConfig() .getCoreConfig().getShardingTotalCount() == this.jobNodeStorage.getJobNodeChildrenKeys(“guarantee/started”).size();
}

  1. > 第一种解决方案代码:

leaderLatch = new LeaderLatch(client, path, id); LeaderLatchListener leaderLatchListener = new LeaderLatchListener() { @Override public void isLeader() { doBefore…. clearAllStartedInfo //释放锁 } @Override public void notLeader() { //休眠,等待唤醒
} }; leaderLatch.addListener(leaderLatchListener); leaderLatch.start();

  1. > 第二种解决方案代码:

while(shardingContexts.getShardingItemParameters().containsKey(0)) { if (guaranteeService.isAllStarted()) { doBeforeJobExecutedAtLastStarted(shardingContexts); guaranteeService.clearAllStartedInfo(); return; } Thread.sleep(10) } //否则直接休眠 try { synchronized (startedWait) { startedWait.wait(startedTimeoutMilliseconds); } } catch (final InterruptedException ex) { Thread.interrupted(); } ```

欢迎大家关注微信公众号:“golang那点事”,更多精彩期待你的到来
GoLang公众号.jpg