创建时间: 2019/7/20 14:24
作者: sunpengwei1992@aliyun.com

开篇语

上一篇文章介绍了elastic-job-lite的入门,架构。使用和一些流程,里面提到elastic-job-lite是一个去中心化,轻量级的任务调度框架,那为什么elastic-jib-lite在启动时要选取主节点呢?难道我看错了,哈哈,不可能的,后文 elastic-job-lite简称ejl。

leader选举

ejl定位为轻量级,去中心化,其任务调度由各自的机器驱动,各台机器之间通过zk去协调,ejl为每个任务都创建一个JobScheduler,而在JobScheduler的初始化中回为每个job选举一个主节点,记住不是全局一个主节点,而是每个任务一个主节。如下图,每个节点上都运行两个任务job1,job2,那么在启动时每个节点就会创建两个JobScheduler对象,为每一个任务在集群中选举一个leader。 elastic-job-lite 既然去中心化,为何要选举主节点 - 图1 这个leader是怎么选举出来的呢?什么时候开始选举?一、在整个集群启动时为每个任务选举leader; 二、当有些任务的leader下线时,会重新选举。

  • 集群启动时选举leader
    1. JobScheduler
    2. public void init(){
    3. schedulerFacder.registerStartUpInfo();
    4. }
    5. public void registerStartUpInfo(){
    6. leaderService.electLeader();
    7. }
    8. /** * 选举主节点. */
    9. public void electLeader() {
    10. log.debug("Elect a new leader now.");
    11. jobNodeStorage.executeInLeader(LeaderNode.LATCH,
    12. new LeaderElectionExecutionCallBack());
    13. log.debug("Leader election completed.");
    14. }
    15. public void executeInLeader(final String latchNode,
    16. final LeaderExecutionCallback callback) {
    17. //通过LeaderLatch进行选举
    18. //这是curator(zk的客户端)中类
    19. LeaderLatch latch =
    20. new LeaderLatch(getClient()jobNodePath.getFullPath(latchNode));
    21. //开始选举
    22. latch.start();
    23. //阻塞,直到选举成功
    24. latch.await();
    25. //在回调方法中写入主节点标记
    26. callback.execute();
    27. }

    在callback.execute()中执行如下,再次判断没有主节点,将当前机器示例id写入

  1. if (!hasLeader()) {
  2. jobNodeStorage.fillEphemeralJobNode(
  3. LeaderNode.INSTANCE,instanceId));
  4. }
  • leader重新选举,主要是通过LeaderElectionJobListener这个监听器来实现leader重新选举,当一个job还在运行,但leader节点下线了,就要重新选举leader
    1. class LeaderElectionJobListener extends AbstractJobListener{
    2. protected void dataChanged(){
    3. //具体选举看上面的代码
    4. leaderService.electLeader()
    5. }
    6. }

    主节点的选举的本质就是大伙竞争一个zk的分布式锁。谁先得到锁,谁就是主节点。

何时使用leader?有什么作用?

分布式系统中,在一个任务执行过程中,有多个机器,多个分片,那么如何去分配呢?哪些机器执行哪些分片呢,如果大家都参与岂不是乱了,这个时候就需要一个领导者来拍板。在ejl中有两处需要leader节点来参与:

  1. 机器启动后,任务开始第一次执行时,需要leader来分片
  2. 当集群中有新的节点增加时,分片的数量有变化时或者有一些节点下线时都会触发重新分片

    主要代码如下,大家阅读源码时可从AbstractElasticJobExecute类中execute方法开始看起。

  1. AbstractElasticJobExecute类中
  2. public final void execute(){
  3. //获取分片,这个方法中主节点leader会分片
  4. ShardingContexts shardingContexts =
  5. jobFacade.getShardingContexts();
  6. }
  7. getShardingContexts()中,有如下方法
  8. shardingService.shardingIfNecessary();
  9. /**
  10. 如果需要分片且当前节点为主节点, 则作业分片.
  11. 如果当前无可用节点则不分片.
  12. */
  13. public void shardingIfNecessary() {
  14. //不是主节点直接返回,不允许分片
  15. if (!leaderService.isLeaderUntilBlock()) {
  16. blockUntilShardingCompleted();
  17. return;
  18. }
  19. ....
  20. }

leader节点删除的时机

leader节点删除的时机有三处,一,在leader节点所在机器进程CRASHED时,jvm通过钩子方法删除自己;二,作业被禁用时删除leader节点,三,主节点进程远程关闭

  • leader机器进程关闭

    1. JobShutdownHookPlugin类中
    2. public void shutdown() {
    3. if (leaderService.isLeader()) {
    4. leaderService.removeLeader();
    5. }
    6. }
  • 作业被禁用时

    1. LeaderAbdicationJobListener类中
    2. protected void dataChanged(
    3. //判断是leader,并且作业被禁用
    4. if (leaderService.isLeader()
    5. && isLocalServerDisabled(path, data)) {
    6. leaderService.removeLeader();
    7. }
    8. }
  • 作业终止调度时

    1. InstanceShutdownStatusJobListener类中
    2. protected void dataChanged(
    3. //当job未暂停,
    4. //并且调度控制器未暂停,
    5. //并且事件是移除这个实例,
    6. //并且运行实例未被移除
    7. if (!JobRegistry.getInstance().isShutdown(jobName)
    8. &&
    9. !JobRegistry.getInstance()
    10. .getJobScheduleController(jobName).isPaused()
    11. &&
    12. isRemoveInstance(path, eventType)
    13. &&
    14. !isReconnectedRegistryCenter()) {
    15. //在这个方法中removeLeader
    16. schedulerFacade.shutdownInstance();
    17. }
    18. }

    EJL的leader在zk中的数据结构

    代码在LeaderNode类中

  • leader在zk中的根路径

    1. String ROOT = "leader";
  • 这是leader进行选举的父路径 /leader/election

    1. String ELECTION_ROOT = ROOT + "/election";
  • 保存主节点的地址 /leader/election/instance 这是一个临时节点,leader所在的机器下线后,这个路径就会消失,对于重新选举有作用

    1. String INSTANCE = ELECTION_ROOT + "/instance";
  • leader选举的分布式锁 /leader/election/latch

    1. String LATCH = ELECTION_ROOT + "/latch";

    欢迎大家关注微信公众号:“技术人技术事”,更多精彩期待你的到来
    GoLang公众号.jpg