Flink HA简介
flink HA的源码主要在runtime项目下以下三个包:
org.apache.flink.runtime.highavailability:HA服务公共接口类,主要包括初始化、JobStore、ClientHA等。
org.apache.flink.runtime.leaderelection:lead选举,K8s HA对原理进行了简单的介绍
org.apache.flink.runtime.leaderretrieval:lead获取,通过k8s watch机制监听ConfigMap实现。主要在client角色中使用,如RestClusterClient和WebMonitorEndpoint。
Flink HA目前有两种实现方式:zookeeper和k8s。具体实现代码此处也只是分析k8s实现。
kubernetes.cluster-id: <cluster-id>high-availability: kuberneteshigh-availability.storageDir: hdfs:///flink/recoverykubernetes.jobmanager.replicas: 3 #默认为1,HA配置时最好设为大于1,以便快速恢复
HA服务加载过程分析
- 入口在HighAvailabilityServicesUtils#createHighAvailabilityServices。
根据不同的high-availability配置参数选择不同的HA实现,可以配置NONE、ZOOKEEPER、KubernetesHaServicesFactory接口实现类三种。 当配置KubernetesHaServicesFactory时,会去加载类并初始化。k8s只提供了Factory配置方式。
public static HighAvailabilityServices createHighAvailabilityServices(Configuration configuration,Executor executor,AddressResolution addressResolution,RpcSystemUtils rpcSystemUtils,FatalErrorHandler fatalErrorHandler)throws Exception {//high-availability根据high-availability获取mode,配置Factory类名时,统一为FACTORY_CLASS mode。HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(configuration);switch (highAvailabilityMode) {case NONE:...return new StandaloneHaServices(resourceManagerRpcUrl, dispatcherRpcUrl, webMonitorAddress);case ZOOKEEPER:return createZooKeeperHaServices(configuration, executor, fatalErrorHandler);case FACTORY_CLASS:return createCustomHAServices(configuration, executor);default:throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");}}//加载Factoryprivate static HighAvailabilityServicesFactory loadCustomHighAvailabilityServicesFactory(String highAvailabilityServicesFactoryClassName) throws FlinkException {final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();return InstantiationUtil.instantiate(highAvailabilityServicesFactoryClassName,HighAvailabilityServicesFactory.class,classLoader);}
K8s的配置为org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory,会根据high-availability.use-old-ha-services配置选择是否使用老版HA接口实现。
主要区别在于LeaderElectionService是每个flink组件一个还是公用一个MultipleComponentLeaderElectionService,下面会具体说明。public HighAvailabilityServices createHAServices(Configuration configuration, Executor executor)throws Exception {final boolean useOldHaServices =configuration.get(HighAvailabilityOptions.USE_OLD_HA_SERVICES);if (useOldHaServices) {return new KubernetesHaServices(FlinkKubeClientFactory.getInstance().fromConfiguration(configuration, "kubernetes-ha-services"),executor,configuration,BlobUtils.createBlobStoreFromConfig(configuration));} else {return new KubernetesMultipleComponentLeaderElectionHaServices(FlinkKubeClientFactory.getInstance().fromConfiguration(configuration, "kubernetes-ha-services"),executor,configuration,BlobUtils.createBlobStoreFromConfig(configuration),error ->FatalExitExceptionHandler.INSTANCE.uncaughtException(Thread.currentThread(), error));}}
Flink leaderelection源码分析
核心类和接口分析
- HighAvailabilityServices:提供了leaderelection、leaderretrieval和其他ha服务的初始化。
- DefaultLeaderElectionService:实现了LeaderElectionService和LeaderElectionEventHandler接口。
- LeaderElectionEventHandler:接口用来做k8s leaderElection选举回调。
- LeaderElectionService:接口应用于Dispatch等具体某一个LeaderContender组件来做选举。每个组件都创建一个LeaderElectionLock对象(ConfigMap)
- MultipleComponentLeaderElectionService:同一节点上的多个组件公用一个LeaderElectionLock(也就是只存在一个ConfigMap)
- LeaderElectionDriver:LeaderElectionService的成员变量,执行选举操作和存储leader信息。Driver当中调用KubernetesLeaderElector类具体实现选举。
- LeaderContender:需要lead election的组件要实现接口,LeaderElectionEventHandler回调最终会回调到LeaderContender接口中的方法。
整个LeaderElection主要三个关键部分:LeaderContender是需要选举功能的组件,LeaderElectionService负责选举整个流程服务,LeaderElectionDriver负责和依赖的三方HA模块具体交互实现。
LeaderContender组件创建&选举
通过Dispatcher启动流程看leaderElection,入口在org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create。
dispatcherRunner =dispatcherRunnerFactory.createDispatcherRunner(highAvailabilityServices.getDispatcherLeaderElectionService(),fatalErrorHandler,new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),ioExecutor,rpcService,partialDispatcherServices);
DispatchRunner作为一个LeaderContender,在创建过程中,首先需要创建DispatcherLeaderElectionService。
由于MultipleComponentLeaderElectionService是公用的,所以highAvailabilityServices里只会初始化一次。
@Overridepublic LeaderElectionService getDispatcherLeaderElectionService() {return createLeaderElectionService(getLeaderPathForDispatcher());}protected LeaderElectionService createLeaderElectionService(String leaderName) {final MultipleComponentLeaderElectionService multipleComponentLeaderElectionService =getOrInitializeSingleLeaderElectionService();//此处和非Multiple一样,还是使用DefaultLeaderElectionService。//底层是用MultipleComponentLeaderElectionDriverAdapter代理到MultipleComponentLeaderElectionService。return new DefaultLeaderElectionService(multipleComponentLeaderElectionService.createDriverFactory(leaderName));}private DefaultMultipleComponentLeaderElectionServicegetOrInitializeSingleLeaderElectionService() {synchronized (lock) {//multipleComponentLeaderElectionService初始化一次。if (multipleComponentLeaderElectionService == null) {try {final KubernetesLeaderElectionConfiguration leaderElectionConfiguration =new KubernetesLeaderElectionConfiguration(getClusterConfigMap(), lockIdentity, configuration);multipleComponentLeaderElectionService =new DefaultMultipleComponentLeaderElectionService(fatalErrorHandler,new KubernetesMultipleComponentLeaderElectionDriverFactory(kubeClient,leaderElectionConfiguration,configMapSharedWatcher,watchExecutorService,fatalErrorHandler));} catch (Exception e) {throw new FlinkRuntimeException("Could not initialize the default single leader election service.", e);}}return multipleComponentLeaderElectionService;}}
DispatcherLeaderElectionService创建好后,createDispatcherRunner会创建dispatchRunner,并走到DispatcherRunnerLeaderElectionLifecycleManager构造函数中start DispatcherLeaderElectionService,创建好的dispatchRunner作为DispatcherLeaderElectionService的回调组件,开始选举。
private DispatcherRunnerLeaderElectionLifecycleManager(T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {this.dispatcherRunner = dispatcherRunner;this.leaderElectionService = leaderElectionService;leaderElectionService.start(dispatcherRunner);}//start过程中会注册当前DispatcherLeaderElectionService到DefaultMultipleComponentLeaderElectionService中。//DefaultMultipleComponentLeaderElectionService使用leaderElectionEventHandlers保存所有注册的LeaderContender。private final Map<String, LeaderElectionEventHandler> leaderElectionEventHandlers;//使用KubernetesMultipleComponentLeaderElectionDriver执行具体选举private final MultipleComponentLeaderElectionDriver multipleComponentLeaderElectionDriver;public KubernetesMultipleComponentLeaderElectionDriver(KubernetesLeaderElectionConfiguration leaderElectionConfiguration,FlinkKubeClient kubeClient,Listener leaderElectionListener,KubernetesConfigMapSharedWatcher configMapSharedWatcher,Executor watchExecutor,FatalErrorHandler fatalErrorHandler) {Preconditions.checkNotNull(leaderElectionConfiguration);this.kubeClient = Preconditions.checkNotNull(kubeClient);this.leaderElectionListener = Preconditions.checkNotNull(leaderElectionListener);this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);Preconditions.checkNotNull(configMapSharedWatcher);Preconditions.checkNotNull(watchExecutor);this.configMapName = leaderElectionConfiguration.getConfigMapName();this.lockIdentity = leaderElectionConfiguration.getLockIdentity();this.leaderElector =kubeClient.createLeaderElector(leaderElectionConfiguration, new LeaderCallbackHandlerImpl());this.configMapLabels =KubernetesUtils.getConfigMapLabels(leaderElectionConfiguration.getClusterId(),LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);kubernetesWatch =configMapSharedWatcher.watch(configMapName, new ConfigMapCallbackHandlerImpl(), watchExecutor);//leaderElector run启动选举leaderElector.run();LOG.debug("Starting the {} for config map {}.", getClass().getSimpleName(), configMapName);}
LeaderContender启动&回写leadAddress
选举成功,执行DefaultMultipleComponentLeaderElectionService回调启动LeaderContender组件。
//DefaultMultipleComponentLeaderElectionService isLeader回调,遍历所有LeaderContender,调用对应的DefaultLeaderElectionService onGrantLeadership回调public void isLeader() {final UUID newLeaderSessionId = UUID.randomUUID();synchronized (lock) {if (!running) {return;}currentLeaderSessionId = UUID.randomUUID();forEachLeaderElectionEventHandler(leaderElectionEventHandler ->leaderElectionEventHandler.onGrantLeadership(newLeaderSessionId));}}//DefaultLeaderElectionService onGrantLeadership里调用leaderContender.grantLeadership(issuedLeaderSessionID)//此处选择Dispatch来分析public void grantLeadership(UUID leaderSessionID) {runActionIfRunning(() -> {LOG.info("{} was granted leadership with leader id {}. Creating new {}.",getClass().getSimpleName(),leaderSessionID,DispatcherLeaderProcess.class.getSimpleName());//创建并启动DispatcherstartNewDispatcherLeaderProcess(leaderSessionID);});}private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {stopDispatcherLeaderProcess();//创建dispatcherLeaderProcessdispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;FutureUtils.assertNoException(previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));}//startNewDispatcherLeaderProcess-》createNewDispatcherLeaderProcess-》forwardConfirmLeaderSessionFuture//此处往ConfigMap data里写leaderAddress,对应leaderElectionService.confirmLeadership方法private void forwardConfirmLeaderSessionFuture(UUID leaderSessionID, DispatcherLeaderProcess newDispatcherLeaderProcess) {FutureUtils.assertNoException(newDispatcherLeaderProcess.getLeaderAddressFuture().thenAccept(leaderAddress -> {if (leaderElectionService.hasLeadership(leaderSessionID)) {leaderElectionService.confirmLeadership(leaderSessionID, leaderAddress);}}));}
Flink leaderretrieval源码分析
核心类和接口分析
- HighAvailabilityServices:提供了leaderelection、leaderretrieval和其他ha服务的初始化。
- LeaderRetrievalService:提供lead变动回调服务。
- LeaderRetrievalListener:获取lead变动的组件需要实现此接口。并作为LeaderRetrievalService start方法的参数,注册到LeaderRetrievalService
- LeaderRetrievalDriver:具体实现lead变动监听功能。
源码分析
具体逻辑和LeadElection类似,变动监听使用K8s watch功能。
具体逻辑略.
k8s-client java版code demo
InformerExample
WatchExample
