Flink HA简介
flink HA的源码主要在runtime项目下以下三个包:
org.apache.flink.runtime.highavailability:HA服务公共接口类,主要包括初始化、JobStore、ClientHA等。
org.apache.flink.runtime.leaderelection:lead选举,K8s HA对原理进行了简单的介绍
org.apache.flink.runtime.leaderretrieval:lead获取,通过k8s watch机制监听ConfigMap实现。主要在client角色中使用,如RestClusterClient和WebMonitorEndpoint。
Flink HA目前有两种实现方式:zookeeper和k8s。具体实现代码此处也只是分析k8s实现。
kubernetes.cluster-id: <cluster-id>
high-availability: kubernetes
high-availability.storageDir: hdfs:///flink/recovery
kubernetes.jobmanager.replicas: 3 #默认为1,HA配置时最好设为大于1,以便快速恢复
HA服务加载过程分析
- 入口在HighAvailabilityServicesUtils#createHighAvailabilityServices。
根据不同的high-availability配置参数选择不同的HA实现,可以配置NONE、ZOOKEEPER、KubernetesHaServicesFactory接口实现类三种。 当配置KubernetesHaServicesFactory时,会去加载类并初始化。k8s只提供了Factory配置方式。
public static HighAvailabilityServices createHighAvailabilityServices(
Configuration configuration,
Executor executor,
AddressResolution addressResolution,
RpcSystemUtils rpcSystemUtils,
FatalErrorHandler fatalErrorHandler)
throws Exception {
//high-availability根据high-availability获取mode,配置Factory类名时,统一为FACTORY_CLASS mode。
HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(configuration);
switch (highAvailabilityMode) {
case NONE:
...
return new StandaloneHaServices(
resourceManagerRpcUrl, dispatcherRpcUrl, webMonitorAddress);
case ZOOKEEPER:
return createZooKeeperHaServices(configuration, executor, fatalErrorHandler);
case FACTORY_CLASS:
return createCustomHAServices(configuration, executor);
default:
throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
}
}
//加载Factory
private static HighAvailabilityServicesFactory loadCustomHighAvailabilityServicesFactory(
String highAvailabilityServicesFactoryClassName) throws FlinkException {
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
return InstantiationUtil.instantiate(
highAvailabilityServicesFactoryClassName,
HighAvailabilityServicesFactory.class,
classLoader);
}
K8s的配置为org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory,会根据high-availability.use-old-ha-services配置选择是否使用老版HA接口实现。
主要区别在于LeaderElectionService是每个flink组件一个还是公用一个MultipleComponentLeaderElectionService,下面会具体说明。public HighAvailabilityServices createHAServices(Configuration configuration, Executor executor)
throws Exception {
final boolean useOldHaServices =
configuration.get(HighAvailabilityOptions.USE_OLD_HA_SERVICES);
if (useOldHaServices) {
return new KubernetesHaServices(
FlinkKubeClientFactory.getInstance()
.fromConfiguration(configuration, "kubernetes-ha-services"),
executor,
configuration,
BlobUtils.createBlobStoreFromConfig(configuration));
} else {
return new KubernetesMultipleComponentLeaderElectionHaServices(
FlinkKubeClientFactory.getInstance()
.fromConfiguration(configuration, "kubernetes-ha-services"),
executor,
configuration,
BlobUtils.createBlobStoreFromConfig(configuration),
error ->
FatalExitExceptionHandler.INSTANCE.uncaughtException(
Thread.currentThread(), error));
}
}
Flink leaderelection源码分析
核心类和接口分析
- HighAvailabilityServices:提供了leaderelection、leaderretrieval和其他ha服务的初始化。
- DefaultLeaderElectionService:实现了LeaderElectionService和LeaderElectionEventHandler接口。
- LeaderElectionEventHandler:接口用来做k8s leaderElection选举回调。
- LeaderElectionService:接口应用于Dispatch等具体某一个LeaderContender组件来做选举。每个组件都创建一个LeaderElectionLock对象(ConfigMap)
- MultipleComponentLeaderElectionService:同一节点上的多个组件公用一个LeaderElectionLock(也就是只存在一个ConfigMap)
- LeaderElectionDriver:LeaderElectionService的成员变量,执行选举操作和存储leader信息。Driver当中调用KubernetesLeaderElector类具体实现选举。
- LeaderContender:需要lead election的组件要实现接口,LeaderElectionEventHandler回调最终会回调到LeaderContender接口中的方法。
整个LeaderElection主要三个关键部分:LeaderContender是需要选举功能的组件,LeaderElectionService负责选举整个流程服务,LeaderElectionDriver负责和依赖的三方HA模块具体交互实现。
LeaderContender组件创建&选举
通过Dispatcher启动流程看leaderElection,入口在org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create。
dispatcherRunner =
dispatcherRunnerFactory.createDispatcherRunner(
highAvailabilityServices.getDispatcherLeaderElectionService(),
fatalErrorHandler,
new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),
ioExecutor,
rpcService,
partialDispatcherServices);
DispatchRunner作为一个LeaderContender,在创建过程中,首先需要创建DispatcherLeaderElectionService。
由于MultipleComponentLeaderElectionService是公用的,所以highAvailabilityServices里只会初始化一次。
@Override
public LeaderElectionService getDispatcherLeaderElectionService() {
return createLeaderElectionService(getLeaderPathForDispatcher());
}
protected LeaderElectionService createLeaderElectionService(String leaderName) {
final MultipleComponentLeaderElectionService multipleComponentLeaderElectionService =
getOrInitializeSingleLeaderElectionService();
//此处和非Multiple一样,还是使用DefaultLeaderElectionService。
//底层是用MultipleComponentLeaderElectionDriverAdapter代理到MultipleComponentLeaderElectionService。
return new DefaultLeaderElectionService(
multipleComponentLeaderElectionService.createDriverFactory(leaderName));
}
private DefaultMultipleComponentLeaderElectionService
getOrInitializeSingleLeaderElectionService() {
synchronized (lock) {
//multipleComponentLeaderElectionService初始化一次。
if (multipleComponentLeaderElectionService == null) {
try {
final KubernetesLeaderElectionConfiguration leaderElectionConfiguration =
new KubernetesLeaderElectionConfiguration(
getClusterConfigMap(), lockIdentity, configuration);
multipleComponentLeaderElectionService =
new DefaultMultipleComponentLeaderElectionService(
fatalErrorHandler,
new KubernetesMultipleComponentLeaderElectionDriverFactory(
kubeClient,
leaderElectionConfiguration,
configMapSharedWatcher,
watchExecutorService,
fatalErrorHandler));
} catch (Exception e) {
throw new FlinkRuntimeException(
"Could not initialize the default single leader election service.", e);
}
}
return multipleComponentLeaderElectionService;
}
}
DispatcherLeaderElectionService创建好后,createDispatcherRunner会创建dispatchRunner,并走到DispatcherRunnerLeaderElectionLifecycleManager构造函数中start DispatcherLeaderElectionService,创建好的dispatchRunner作为DispatcherLeaderElectionService的回调组件,开始选举。
private DispatcherRunnerLeaderElectionLifecycleManager(
T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
this.dispatcherRunner = dispatcherRunner;
this.leaderElectionService = leaderElectionService;
leaderElectionService.start(dispatcherRunner);
}
//start过程中会注册当前DispatcherLeaderElectionService到DefaultMultipleComponentLeaderElectionService中。
//DefaultMultipleComponentLeaderElectionService使用leaderElectionEventHandlers保存所有注册的LeaderContender。
private final Map<String, LeaderElectionEventHandler> leaderElectionEventHandlers;
//使用KubernetesMultipleComponentLeaderElectionDriver执行具体选举
private final MultipleComponentLeaderElectionDriver multipleComponentLeaderElectionDriver;
public KubernetesMultipleComponentLeaderElectionDriver(
KubernetesLeaderElectionConfiguration leaderElectionConfiguration,
FlinkKubeClient kubeClient,
Listener leaderElectionListener,
KubernetesConfigMapSharedWatcher configMapSharedWatcher,
Executor watchExecutor,
FatalErrorHandler fatalErrorHandler) {
Preconditions.checkNotNull(leaderElectionConfiguration);
this.kubeClient = Preconditions.checkNotNull(kubeClient);
this.leaderElectionListener = Preconditions.checkNotNull(leaderElectionListener);
this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
Preconditions.checkNotNull(configMapSharedWatcher);
Preconditions.checkNotNull(watchExecutor);
this.configMapName = leaderElectionConfiguration.getConfigMapName();
this.lockIdentity = leaderElectionConfiguration.getLockIdentity();
this.leaderElector =
kubeClient.createLeaderElector(
leaderElectionConfiguration, new LeaderCallbackHandlerImpl());
this.configMapLabels =
KubernetesUtils.getConfigMapLabels(
leaderElectionConfiguration.getClusterId(),
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
kubernetesWatch =
configMapSharedWatcher.watch(
configMapName, new ConfigMapCallbackHandlerImpl(), watchExecutor);
//leaderElector run启动选举
leaderElector.run();
LOG.debug("Starting the {} for config map {}.", getClass().getSimpleName(), configMapName);
}
LeaderContender启动&回写leadAddress
选举成功,执行DefaultMultipleComponentLeaderElectionService回调启动LeaderContender组件。
//DefaultMultipleComponentLeaderElectionService isLeader回调,遍历所有LeaderContender,调用对应的DefaultLeaderElectionService onGrantLeadership回调
public void isLeader() {
final UUID newLeaderSessionId = UUID.randomUUID();
synchronized (lock) {
if (!running) {
return;
}
currentLeaderSessionId = UUID.randomUUID();
forEachLeaderElectionEventHandler(
leaderElectionEventHandler ->
leaderElectionEventHandler.onGrantLeadership(newLeaderSessionId));
}
}
//DefaultLeaderElectionService onGrantLeadership里调用leaderContender.grantLeadership(issuedLeaderSessionID)
//此处选择Dispatch来分析
public void grantLeadership(UUID leaderSessionID) {
runActionIfRunning(
() -> {
LOG.info(
"{} was granted leadership with leader id {}. Creating new {}.",
getClass().getSimpleName(),
leaderSessionID,
DispatcherLeaderProcess.class.getSimpleName());
//创建并启动Dispatcher
startNewDispatcherLeaderProcess(leaderSessionID);
});
}
private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
stopDispatcherLeaderProcess();
//创建dispatcherLeaderProcess
dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);
final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;
FutureUtils.assertNoException(
previousDispatcherLeaderProcessTerminationFuture.thenRun(
newDispatcherLeaderProcess::start));
}
//startNewDispatcherLeaderProcess-》createNewDispatcherLeaderProcess-》forwardConfirmLeaderSessionFuture
//此处往ConfigMap data里写leaderAddress,对应leaderElectionService.confirmLeadership方法
private void forwardConfirmLeaderSessionFuture(
UUID leaderSessionID, DispatcherLeaderProcess newDispatcherLeaderProcess) {
FutureUtils.assertNoException(
newDispatcherLeaderProcess
.getLeaderAddressFuture()
.thenAccept(
leaderAddress -> {
if (leaderElectionService.hasLeadership(leaderSessionID)) {
leaderElectionService.confirmLeadership(
leaderSessionID, leaderAddress);
}
}));
}
Flink leaderretrieval源码分析
核心类和接口分析
- HighAvailabilityServices:提供了leaderelection、leaderretrieval和其他ha服务的初始化。
- LeaderRetrievalService:提供lead变动回调服务。
- LeaderRetrievalListener:获取lead变动的组件需要实现此接口。并作为LeaderRetrievalService start方法的参数,注册到LeaderRetrievalService
- LeaderRetrievalDriver:具体实现lead变动监听功能。
源码分析
具体逻辑和LeadElection类似,变动监听使用K8s watch功能。
具体逻辑略.
k8s-client java版code demo
InformerExample
WatchExample