K8S HA原理介绍
原理简介
k8s ha有两种方式实现:ConfigMapLock和LeaseLock。flink使用ConfigMapLock实现的,以下分析也是基于ConfigMapLock。
k8s ha基于两条设计:
- k8s API在同一个对象上的处理是顺序的(create ConfigMap,第一次创建成功,第二次create会失败)。
- update对象,基于resourceVersion实现乐观锁。
先get resource,获取当前的resourceVersion
update请求携带当前resourceVersion
服务端update操作会对比请求中的resourceVersion和之前是不是一致,不一致则update失败,一致则成功并生成新的resourceVersion
K8s的ConfigMapLock结构
kind: ConfigMapapiVersion: v1metadata:name: >-dsp-kafka-to-mysql-session-cluster-c43e57b3cd2a15abefa9cd75b1f739c8-jobmanager-leadernamespace: flink-clusterlabels:app: dsp-kafka-to-mysql-session-clusterconfigmap-type: high-availabilitytype: flink-native-kubernetesannotations:control-plane.alpha.kubernetes.io/leader: >-{"holderIdentity":"cfdd3cf5-7860-493e-9f31-46739d8b4020","leaseDuration":15.000000000,"acquireTime":"2022-06-01T06:19:55.197000Z","renewTime":"2022-06-13T09:40:10.261000Z","leaderTransitions":209661}data:address: 'akka.tcp://flink@10.70.2.114:6123/user/rpc/jobmanager_12'checkpointID-0000000000000016997: >-rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAAAAAAAFCZzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAAAAAAAAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAeGhkZnM6L3RtcC9mbGluay9oYS9kc3Ata2Fma2EtdG8tbXlzcWwtc2Vzc2lvbi1jbHVzdGVyL2RzcC1rYWZrYS10by1teXNxbC1zZXNzaW9uLWNsdXN0ZXIvY29tcGxldGVkQ2hlY2twb2ludGQ0NDRkMmFiMmIyZHg=counter: '16998'sessionId: 28f94c66-cba3-4267-b089-ced5879862c1
ConfigMapLock两部分构成:
- annotations,主要用于leader选举和renewTime刷新。
holderIdentity:leader标识
leaseDuration:租约周期
acquireTime:第一次leader选举成功时间
renewTime:leader刷新时间
leaderTransitions:leader刷新次数 - data,leader保存的信息。在leader成功启动后,进行设置
Leader选举过程
具体逻辑在io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector源码里,看注释。
//选举逻辑public void run() {LOGGER.debug("Leader election started");//1.申请成为lead,阻塞知道成功if (!acquire()) {return;}//2.申请成功,lead开始回调leaderElectionConfig.getLeaderCallbacks().onStartLeading();//3.定时刷新renewTime,保持占有锁renewWithTimeout();//4.刷新失败,lead停止回调leaderElectionConfig.getLeaderCallbacks().onStopLeading();}//tryAcquireOrRenew()成功,countDownLatch.countDownprivate boolean acquire() {final String lockDescription = leaderElectionConfig.getLock().describe();LOGGER.debug("Attempting to acquire leader lease '{}'...", lockDescription);final AtomicBoolean succeeded = new AtomicBoolean(false);return loop(countDownLatch -> {try {if (!succeeded.get()) {succeeded.set(tryAcquireOrRenew());reportTransitionIfLeaderChanged();}if (succeeded.get()) {LOGGER.debug("Successfully Acquired leader lease '{}'", lockDescription);countDownLatch.countDown();} else {LOGGER.debug("Failed to acquire lease '{}' retrying...", lockDescription);}} catch (Exception exception) {LOGGER.error("Exception occurred while acquiring lock '{}'", lockDescription, exception);}}, jitter(leaderElectionConfig.getRetryPeriod(), JITTER_FACTOR).toMillis());}//tryAcquireOrRenew()失败,countDownLatch.countDownprivate void renew(CountDownLatch abortLatch, CountDownLatch renewSignal) {try {final boolean success = tryAcquireOrRenew();reportTransitionIfLeaderChanged();if (!success) {abortLatch.countDown();}} catch(LockException exception) {LOGGER.debug("Exception occurred while renewing lock: {}", exception.getMessage(), exception);}renewSignal.countDown();}//具体create和update逻辑在Lock里面,ConfigMapLock/LeaseLockprivate boolean tryAcquireOrRenew() throws LockException {final Lock lock = leaderElectionConfig.getLock();final ZonedDateTime now = now();final LeaderElectionRecord oldLeaderElectionRecord = lock.get(kubernetesClient);//没有lockRecord则抢占if (oldLeaderElectionRecord == null) {final LeaderElectionRecord newLeaderElectionRecord = new LeaderElectionRecord(lock.identity(), leaderElectionConfig.getLeaseDuration(), now, now, 0);//新建如果此时被另一个抢先,则抛异常;反之成功,lead申请成功。lock.create(kubernetesClient, newLeaderElectionRecord);updateObserved(newLeaderElectionRecord);return true;}updateObserved(oldLeaderElectionRecord);final boolean isLeader = isLeader(oldLeaderElectionRecord);//如果当前节点不是leader并且不能成为leader(此处也就是其他standby执行的逻辑)if (!isLeader && !canBecomeLeader(oldLeaderElectionRecord)) {LOGGER.debug("Lock is held by {} and has not yet expired", oldLeaderElectionRecord.getHolderIdentity());return false;}//刷新renewTime,如果之前的leader刷新超时,standby抢占final LeaderElectionRecord newLeaderElectionRecord = new LeaderElectionRecord(lock.identity(),leaderElectionConfig.getLeaseDuration(),isLeader ? oldLeaderElectionRecord.getAcquireTime() : now,now,isLeader ? (oldLeaderElectionRecord.getLeaderTransitions() + 1) : 0);newLeaderElectionRecord.setVersion(oldLeaderElectionRecord.getVersion());//update乐观锁,抢占只有一个会成功leaderElectionConfig.getLock().update(kubernetesClient, newLeaderElectionRecord);updateObserved(newLeaderElectionRecord);return true;}//schedule+CountDownLatch实现定时循环,countDownLatch.await()后ExecutorService.shutdownNow()protected static boolean loop(Consumer<CountDownLatch> consumer, long periodInMillis) {final ScheduledExecutorService singleThreadExecutorService = Executors.newSingleThreadScheduledExecutor();final CountDownLatch countDownLatch = new CountDownLatch(1);final Future<?> future = singleThreadExecutorService.scheduleAtFixedRate(() -> consumer.accept(countDownLatch), 0L, periodInMillis, TimeUnit.MILLISECONDS);try {countDownLatch.await();return true;} catch (InterruptedException e) {LOGGER.debug("Loop thread interrupted: {}", e.getMessage());Thread.currentThread().interrupt();return false;} finally {future.cancel(true);singleThreadExecutorService.shutdownNow();}}
