| 创建时间: | 2019/7/20 14:24 |
|---|---|
| 作者: | sunpengwei1992@aliyun.com |
开篇语
上一篇文章介绍了elastic-job-lite的入门,架构。使用和一些流程,里面提到elastic-job-lite是一个去中心化,轻量级的任务调度框架,那为什么elastic-jib-lite在启动时要选取主节点呢?难道我看错了,哈哈,不可能的,后文 elastic-job-lite简称ejl。
leader选举
ejl定位为轻量级,去中心化,其任务调度由各自的机器驱动,各台机器之间通过zk去协调,ejl为每个任务都创建一个JobScheduler,而在JobScheduler的初始化中回为每个job选举一个主节点,记住不是全局一个主节点,而是每个任务一个主节。如下图,每个节点上都运行两个任务job1,job2,那么在启动时每个节点就会创建两个JobScheduler对象,为每一个任务在集群中选举一个leader。
这个leader是怎么选举出来的呢?什么时候开始选举?一、在整个集群启动时为每个任务选举leader; 二、当有些任务的leader下线时,会重新选举。
- 集群启动时选举leader
在JobScheduler中public void init(){schedulerFacder.registerStartUpInfo();}public void registerStartUpInfo(){leaderService.electLeader();}/** * 选举主节点. */public void electLeader() {log.debug("Elect a new leader now.");jobNodeStorage.executeInLeader(LeaderNode.LATCH,new LeaderElectionExecutionCallBack());log.debug("Leader election completed.");}public void executeInLeader(final String latchNode,final LeaderExecutionCallback callback) {//通过LeaderLatch进行选举//这是curator(zk的客户端)中类LeaderLatch latch =new LeaderLatch(getClient()jobNodePath.getFullPath(latchNode));//开始选举latch.start();//阻塞,直到选举成功latch.await();//在回调方法中写入主节点标记callback.execute();}
在callback.execute()中执行如下,再次判断没有主节点,将当前机器示例id写入
if (!hasLeader()) {jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE,instanceId));}
- leader重新选举,主要是通过
LeaderElectionJobListener这个监听器来实现leader重新选举,当一个job还在运行,但leader节点下线了,就要重新选举leaderclass LeaderElectionJobListener extends AbstractJobListener{protected void dataChanged(){//具体选举看上面的代码leaderService.electLeader()}}
主节点的选举的本质就是大伙竞争一个zk的分布式锁。谁先得到锁,谁就是主节点。
何时使用leader?有什么作用?
分布式系统中,在一个任务执行过程中,有多个机器,多个分片,那么如何去分配呢?哪些机器执行哪些分片呢,如果大家都参与岂不是乱了,这个时候就需要一个领导者来拍板。在ejl中有两处需要leader节点来参与:
- 机器启动后,任务开始第一次执行时,需要leader来分片
- 当集群中有新的节点增加时,分片的数量有变化时或者有一些节点下线时都会触发重新分片
主要代码如下,大家阅读源码时可从
AbstractElasticJobExecute类中execute方法开始看起。
AbstractElasticJobExecute类中public final void execute(){//获取分片,这个方法中主节点leader会分片ShardingContexts shardingContexts =jobFacade.getShardingContexts();}在 getShardingContexts()中,有如下方法shardingService.shardingIfNecessary();/**如果需要分片且当前节点为主节点, 则作业分片.如果当前无可用节点则不分片.*/public void shardingIfNecessary() {//不是主节点直接返回,不允许分片if (!leaderService.isLeaderUntilBlock()) {blockUntilShardingCompleted();return;}....}
leader节点删除的时机
leader节点删除的时机有三处,一,在leader节点所在机器进程CRASHED时,jvm通过钩子方法删除自己;二,作业被禁用时删除leader节点,三,主节点进程远程关闭
leader机器进程关闭
JobShutdownHookPlugin类中public void shutdown() {if (leaderService.isLeader()) {leaderService.removeLeader();}}
作业被禁用时
LeaderAbdicationJobListener类中protected void dataChanged(//判断是leader,并且作业被禁用if (leaderService.isLeader()&& isLocalServerDisabled(path, data)) {leaderService.removeLeader();}}
作业终止调度时
InstanceShutdownStatusJobListener类中protected void dataChanged(//当job未暂停,//并且调度控制器未暂停,//并且事件是移除这个实例,//并且运行实例未被移除if (!JobRegistry.getInstance().isShutdown(jobName)&&!JobRegistry.getInstance().getJobScheduleController(jobName).isPaused()&&isRemoveInstance(path, eventType)&&!isReconnectedRegistryCenter()) {//在这个方法中removeLeaderschedulerFacade.shutdownInstance();}}
EJL的leader在zk中的数据结构
代码在
LeaderNode类中leader在zk中的根路径
String ROOT = "leader";
这是leader进行选举的父路径 /leader/election
String ELECTION_ROOT = ROOT + "/election";
保存主节点的地址 /leader/election/instance 这是一个临时节点,leader所在的机器下线后,这个路径就会消失,对于重新选举有作用
String INSTANCE = ELECTION_ROOT + "/instance";
leader选举的分布式锁 /leader/election/latch
String LATCH = ELECTION_ROOT + "/latch";
欢迎大家关注微信公众号:“技术人技术事”,更多精彩期待你的到来

这个leader是怎么选举出来的呢?什么时候开始选举?一、在整个集群启动时为每个任务选举leader; 二、当有些任务的leader下线时,会重新选举。