K8S HA原理介绍
原理简介
k8s ha有两种方式实现:ConfigMapLock和LeaseLock。flink使用ConfigMapLock实现的,以下分析也是基于ConfigMapLock。
k8s ha基于两条设计:
- k8s API在同一个对象上的处理是顺序的(create ConfigMap,第一次创建成功,第二次create会失败)。
- update对象,基于resourceVersion实现乐观锁。
先get resource,获取当前的resourceVersion
update请求携带当前resourceVersion
服务端update操作会对比请求中的resourceVersion和之前是不是一致,不一致则update失败,一致则成功并生成新的resourceVersion
K8s的ConfigMapLock结构
kind: ConfigMap
apiVersion: v1
metadata:
name: >-
dsp-kafka-to-mysql-session-cluster-c43e57b3cd2a15abefa9cd75b1f739c8-jobmanager-leader
namespace: flink-cluster
labels:
app: dsp-kafka-to-mysql-session-cluster
configmap-type: high-availability
type: flink-native-kubernetes
annotations:
control-plane.alpha.kubernetes.io/leader: >-
{"holderIdentity":"cfdd3cf5-7860-493e-9f31-46739d8b4020","leaseDuration":15.000000000,"acquireTime":"2022-06-01T06:19:55.197000Z","renewTime":"2022-06-13T09:40:10.261000Z","leaderTransitions":209661}
data:
address: 'akka.tcp://flink@10.70.2.114:6123/user/rpc/jobmanager_12'
checkpointID-0000000000000016997: >-
rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAAAAAAAFCZzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAAAAAAAAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAeGhkZnM6L3RtcC9mbGluay9oYS9kc3Ata2Fma2EtdG8tbXlzcWwtc2Vzc2lvbi1jbHVzdGVyL2RzcC1rYWZrYS10by1teXNxbC1zZXNzaW9uLWNsdXN0ZXIvY29tcGxldGVkQ2hlY2twb2ludGQ0NDRkMmFiMmIyZHg=
counter: '16998'
sessionId: 28f94c66-cba3-4267-b089-ced5879862c1
ConfigMapLock两部分构成:
- annotations,主要用于leader选举和renewTime刷新。
holderIdentity:leader标识
leaseDuration:租约周期
acquireTime:第一次leader选举成功时间
renewTime:leader刷新时间
leaderTransitions:leader刷新次数 - data,leader保存的信息。在leader成功启动后,进行设置
Leader选举过程
具体逻辑在io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector源码里,看注释。
//选举逻辑
public void run() {
LOGGER.debug("Leader election started");
//1.申请成为lead,阻塞知道成功
if (!acquire()) {
return;
}
//2.申请成功,lead开始回调
leaderElectionConfig.getLeaderCallbacks().onStartLeading();
//3.定时刷新renewTime,保持占有锁
renewWithTimeout();
//4.刷新失败,lead停止回调
leaderElectionConfig.getLeaderCallbacks().onStopLeading();
}
//tryAcquireOrRenew()成功,countDownLatch.countDown
private boolean acquire() {
final String lockDescription = leaderElectionConfig.getLock().describe();
LOGGER.debug("Attempting to acquire leader lease '{}'...", lockDescription);
final AtomicBoolean succeeded = new AtomicBoolean(false);
return loop(countDownLatch -> {
try {
if (!succeeded.get()) {
succeeded.set(tryAcquireOrRenew());
reportTransitionIfLeaderChanged();
}
if (succeeded.get()) {
LOGGER.debug("Successfully Acquired leader lease '{}'", lockDescription);
countDownLatch.countDown();
} else {
LOGGER.debug("Failed to acquire lease '{}' retrying...", lockDescription);
}
} catch (Exception exception) {
LOGGER.error("Exception occurred while acquiring lock '{}'", lockDescription, exception);
}
}, jitter(leaderElectionConfig.getRetryPeriod(), JITTER_FACTOR).toMillis());
}
//tryAcquireOrRenew()失败,countDownLatch.countDown
private void renew(CountDownLatch abortLatch, CountDownLatch renewSignal) {
try {
final boolean success = tryAcquireOrRenew();
reportTransitionIfLeaderChanged();
if (!success) {
abortLatch.countDown();
}
} catch(LockException exception) {
LOGGER.debug("Exception occurred while renewing lock: {}", exception.getMessage(), exception);
}
renewSignal.countDown();
}
//具体create和update逻辑在Lock里面,ConfigMapLock/LeaseLock
private boolean tryAcquireOrRenew() throws LockException {
final Lock lock = leaderElectionConfig.getLock();
final ZonedDateTime now = now();
final LeaderElectionRecord oldLeaderElectionRecord = lock.get(kubernetesClient);
//没有lockRecord则抢占
if (oldLeaderElectionRecord == null) {
final LeaderElectionRecord newLeaderElectionRecord = new LeaderElectionRecord(
lock.identity(), leaderElectionConfig.getLeaseDuration(), now, now, 0);
//新建如果此时被另一个抢先,则抛异常;反之成功,lead申请成功。
lock.create(kubernetesClient, newLeaderElectionRecord);
updateObserved(newLeaderElectionRecord);
return true;
}
updateObserved(oldLeaderElectionRecord);
final boolean isLeader = isLeader(oldLeaderElectionRecord);
//如果当前节点不是leader并且不能成为leader(此处也就是其他standby执行的逻辑)
if (!isLeader && !canBecomeLeader(oldLeaderElectionRecord)) {
LOGGER.debug("Lock is held by {} and has not yet expired", oldLeaderElectionRecord.getHolderIdentity());
return false;
}
//刷新renewTime,如果之前的leader刷新超时,standby抢占
final LeaderElectionRecord newLeaderElectionRecord = new LeaderElectionRecord(
lock.identity(),
leaderElectionConfig.getLeaseDuration(),
isLeader ? oldLeaderElectionRecord.getAcquireTime() : now,
now,
isLeader ? (oldLeaderElectionRecord.getLeaderTransitions() + 1) : 0
);
newLeaderElectionRecord.setVersion(oldLeaderElectionRecord.getVersion());
//update乐观锁,抢占只有一个会成功
leaderElectionConfig.getLock().update(kubernetesClient, newLeaderElectionRecord);
updateObserved(newLeaderElectionRecord);
return true;
}
//schedule+CountDownLatch实现定时循环,countDownLatch.await()后ExecutorService.shutdownNow()
protected static boolean loop(Consumer<CountDownLatch> consumer, long periodInMillis) {
final ScheduledExecutorService singleThreadExecutorService = Executors.newSingleThreadScheduledExecutor();
final CountDownLatch countDownLatch = new CountDownLatch(1);
final Future<?> future = singleThreadExecutorService.scheduleAtFixedRate(
() -> consumer.accept(countDownLatch), 0L, periodInMillis, TimeUnit.MILLISECONDS);
try {
countDownLatch.await();
return true;
} catch (InterruptedException e) {
LOGGER.debug("Loop thread interrupted: {}", e.getMessage());
Thread.currentThread().interrupt();
return false;
} finally {
future.cancel(true);
singleThreadExecutorService.shutdownNow();
}
}