K8S HA原理介绍

原理简介

k8s ha有两种方式实现:ConfigMapLock和LeaseLock。flink使用ConfigMapLock实现的,以下分析也是基于ConfigMapLock。
k8s ha基于两条设计:

  1. k8s API在同一个对象上的处理是顺序的(create ConfigMap,第一次创建成功,第二次create会失败)。
  2. update对象,基于resourceVersion实现乐观锁。
    先get resource,获取当前的resourceVersion
    update请求携带当前resourceVersion
    服务端update操作会对比请求中的resourceVersion和之前是不是一致,不一致则update失败,一致则成功并生成新的resourceVersion

K8s的ConfigMapLock结构

  1. kind: ConfigMap
  2. apiVersion: v1
  3. metadata:
  4. name: >-
  5. dsp-kafka-to-mysql-session-cluster-c43e57b3cd2a15abefa9cd75b1f739c8-jobmanager-leader
  6. namespace: flink-cluster
  7. labels:
  8. app: dsp-kafka-to-mysql-session-cluster
  9. configmap-type: high-availability
  10. type: flink-native-kubernetes
  11. annotations:
  12. control-plane.alpha.kubernetes.io/leader: >-
  13. {"holderIdentity":"cfdd3cf5-7860-493e-9f31-46739d8b4020","leaseDuration":15.000000000,"acquireTime":"2022-06-01T06:19:55.197000Z","renewTime":"2022-06-13T09:40:10.261000Z","leaderTransitions":209661}
  14. data:
  15. address: 'akka.tcp://flink@10.70.2.114:6123/user/rpc/jobmanager_12'
  16. checkpointID-0000000000000016997: >-
  17. rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAAAAAAAFCZzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAAAAAAAAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAeGhkZnM6L3RtcC9mbGluay9oYS9kc3Ata2Fma2EtdG8tbXlzcWwtc2Vzc2lvbi1jbHVzdGVyL2RzcC1rYWZrYS10by1teXNxbC1zZXNzaW9uLWNsdXN0ZXIvY29tcGxldGVkQ2hlY2twb2ludGQ0NDRkMmFiMmIyZHg=
  18. counter: '16998'
  19. sessionId: 28f94c66-cba3-4267-b089-ced5879862c1

ConfigMapLock两部分构成:

  1. annotations,主要用于leader选举和renewTime刷新。
    holderIdentity:leader标识
    leaseDuration:租约周期
    acquireTime:第一次leader选举成功时间
    renewTime:leader刷新时间
    leaderTransitions:leader刷新次数
  2. data,leader保存的信息。在leader成功启动后,进行设置

Leader选举过程

具体逻辑在io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector源码里,看注释。

  1. //选举逻辑
  2. public void run() {
  3. LOGGER.debug("Leader election started");
  4. //1.申请成为lead,阻塞知道成功
  5. if (!acquire()) {
  6. return;
  7. }
  8. //2.申请成功,lead开始回调
  9. leaderElectionConfig.getLeaderCallbacks().onStartLeading();
  10. //3.定时刷新renewTime,保持占有锁
  11. renewWithTimeout();
  12. //4.刷新失败,lead停止回调
  13. leaderElectionConfig.getLeaderCallbacks().onStopLeading();
  14. }
  15. //tryAcquireOrRenew()成功,countDownLatch.countDown
  16. private boolean acquire() {
  17. final String lockDescription = leaderElectionConfig.getLock().describe();
  18. LOGGER.debug("Attempting to acquire leader lease '{}'...", lockDescription);
  19. final AtomicBoolean succeeded = new AtomicBoolean(false);
  20. return loop(countDownLatch -> {
  21. try {
  22. if (!succeeded.get()) {
  23. succeeded.set(tryAcquireOrRenew());
  24. reportTransitionIfLeaderChanged();
  25. }
  26. if (succeeded.get()) {
  27. LOGGER.debug("Successfully Acquired leader lease '{}'", lockDescription);
  28. countDownLatch.countDown();
  29. } else {
  30. LOGGER.debug("Failed to acquire lease '{}' retrying...", lockDescription);
  31. }
  32. } catch (Exception exception) {
  33. LOGGER.error("Exception occurred while acquiring lock '{}'", lockDescription, exception);
  34. }
  35. }, jitter(leaderElectionConfig.getRetryPeriod(), JITTER_FACTOR).toMillis());
  36. }
  37. //tryAcquireOrRenew()失败,countDownLatch.countDown
  38. private void renew(CountDownLatch abortLatch, CountDownLatch renewSignal) {
  39. try {
  40. final boolean success = tryAcquireOrRenew();
  41. reportTransitionIfLeaderChanged();
  42. if (!success) {
  43. abortLatch.countDown();
  44. }
  45. } catch(LockException exception) {
  46. LOGGER.debug("Exception occurred while renewing lock: {}", exception.getMessage(), exception);
  47. }
  48. renewSignal.countDown();
  49. }
  50. //具体create和update逻辑在Lock里面,ConfigMapLock/LeaseLock
  51. private boolean tryAcquireOrRenew() throws LockException {
  52. final Lock lock = leaderElectionConfig.getLock();
  53. final ZonedDateTime now = now();
  54. final LeaderElectionRecord oldLeaderElectionRecord = lock.get(kubernetesClient);
  55. //没有lockRecord则抢占
  56. if (oldLeaderElectionRecord == null) {
  57. final LeaderElectionRecord newLeaderElectionRecord = new LeaderElectionRecord(
  58. lock.identity(), leaderElectionConfig.getLeaseDuration(), now, now, 0);
  59. //新建如果此时被另一个抢先,则抛异常;反之成功,lead申请成功。
  60. lock.create(kubernetesClient, newLeaderElectionRecord);
  61. updateObserved(newLeaderElectionRecord);
  62. return true;
  63. }
  64. updateObserved(oldLeaderElectionRecord);
  65. final boolean isLeader = isLeader(oldLeaderElectionRecord);
  66. //如果当前节点不是leader并且不能成为leader(此处也就是其他standby执行的逻辑)
  67. if (!isLeader && !canBecomeLeader(oldLeaderElectionRecord)) {
  68. LOGGER.debug("Lock is held by {} and has not yet expired", oldLeaderElectionRecord.getHolderIdentity());
  69. return false;
  70. }
  71. //刷新renewTime,如果之前的leader刷新超时,standby抢占
  72. final LeaderElectionRecord newLeaderElectionRecord = new LeaderElectionRecord(
  73. lock.identity(),
  74. leaderElectionConfig.getLeaseDuration(),
  75. isLeader ? oldLeaderElectionRecord.getAcquireTime() : now,
  76. now,
  77. isLeader ? (oldLeaderElectionRecord.getLeaderTransitions() + 1) : 0
  78. );
  79. newLeaderElectionRecord.setVersion(oldLeaderElectionRecord.getVersion());
  80. //update乐观锁,抢占只有一个会成功
  81. leaderElectionConfig.getLock().update(kubernetesClient, newLeaderElectionRecord);
  82. updateObserved(newLeaderElectionRecord);
  83. return true;
  84. }
  85. //schedule+CountDownLatch实现定时循环,countDownLatch.await()后ExecutorService.shutdownNow()
  86. protected static boolean loop(Consumer<CountDownLatch> consumer, long periodInMillis) {
  87. final ScheduledExecutorService singleThreadExecutorService = Executors.newSingleThreadScheduledExecutor();
  88. final CountDownLatch countDownLatch = new CountDownLatch(1);
  89. final Future<?> future = singleThreadExecutorService.scheduleAtFixedRate(
  90. () -> consumer.accept(countDownLatch), 0L, periodInMillis, TimeUnit.MILLISECONDS);
  91. try {
  92. countDownLatch.await();
  93. return true;
  94. } catch (InterruptedException e) {
  95. LOGGER.debug("Loop thread interrupted: {}", e.getMessage());
  96. Thread.currentThread().interrupt();
  97. return false;
  98. } finally {
  99. future.cancel(true);
  100. singleThreadExecutorService.shutdownNow();
  101. }
  102. }