Flink HA简介

flink HA的源码主要在runtime项目下以下三个包:
org.apache.flink.runtime.highavailability:HA服务公共接口类,主要包括初始化、JobStore、ClientHA等。
org.apache.flink.runtime.leaderelection:lead选举,K8s HA对原理进行了简单的介绍
org.apache.flink.runtime.leaderretrieval:lead获取,通过k8s watch机制监听ConfigMap实现。主要在client角色中使用,如RestClusterClient和WebMonitorEndpoint。

Flink HA目前有两种实现方式:zookeeper和k8s。具体实现代码此处也只是分析k8s实现

  1. kubernetes.cluster-id: <cluster-id>
  2. high-availability: kubernetes
  3. high-availability.storageDir: hdfs:///flink/recovery
  4. kubernetes.jobmanager.replicas: 3 #默认为1HA配置时最好设为大于1,以便快速恢复

HA服务加载过程分析

  1. 入口在HighAvailabilityServicesUtils#createHighAvailabilityServices。
    根据不同的high-availability配置参数选择不同的HA实现,可以配置NONE、ZOOKEEPER、KubernetesHaServicesFactory接口实现类三种。
  2. 当配置KubernetesHaServicesFactory时,会去加载类并初始化。k8s只提供了Factory配置方式。

    1. public static HighAvailabilityServices createHighAvailabilityServices(
    2. Configuration configuration,
    3. Executor executor,
    4. AddressResolution addressResolution,
    5. RpcSystemUtils rpcSystemUtils,
    6. FatalErrorHandler fatalErrorHandler)
    7. throws Exception {
    8. //high-availability根据high-availability获取mode,配置Factory类名时,统一为FACTORY_CLASS mode。
    9. HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(configuration);
    10. switch (highAvailabilityMode) {
    11. case NONE:
    12. ...
    13. return new StandaloneHaServices(
    14. resourceManagerRpcUrl, dispatcherRpcUrl, webMonitorAddress);
    15. case ZOOKEEPER:
    16. return createZooKeeperHaServices(configuration, executor, fatalErrorHandler);
    17. case FACTORY_CLASS:
    18. return createCustomHAServices(configuration, executor);
    19. default:
    20. throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
    21. }
    22. }
    23. //加载Factory
    24. private static HighAvailabilityServicesFactory loadCustomHighAvailabilityServicesFactory(
    25. String highAvailabilityServicesFactoryClassName) throws FlinkException {
    26. final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    27. return InstantiationUtil.instantiate(
    28. highAvailabilityServicesFactoryClassName,
    29. HighAvailabilityServicesFactory.class,
    30. classLoader);
    31. }
  3. K8s的配置为org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory,会根据high-availability.use-old-ha-services配置选择是否使用老版HA接口实现。
    主要区别在于LeaderElectionService是每个flink组件一个还是公用一个MultipleComponentLeaderElectionService,下面会具体说明。

    1. public HighAvailabilityServices createHAServices(Configuration configuration, Executor executor)
    2. throws Exception {
    3. final boolean useOldHaServices =
    4. configuration.get(HighAvailabilityOptions.USE_OLD_HA_SERVICES);
    5. if (useOldHaServices) {
    6. return new KubernetesHaServices(
    7. FlinkKubeClientFactory.getInstance()
    8. .fromConfiguration(configuration, "kubernetes-ha-services"),
    9. executor,
    10. configuration,
    11. BlobUtils.createBlobStoreFromConfig(configuration));
    12. } else {
    13. return new KubernetesMultipleComponentLeaderElectionHaServices(
    14. FlinkKubeClientFactory.getInstance()
    15. .fromConfiguration(configuration, "kubernetes-ha-services"),
    16. executor,
    17. configuration,
    18. BlobUtils.createBlobStoreFromConfig(configuration),
    19. error ->
    20. FatalExitExceptionHandler.INSTANCE.uncaughtException(
    21. Thread.currentThread(), error));
    22. }
    23. }

    Flink leaderelection源码分析

    核心类和接口分析

  • HighAvailabilityServices:提供了leaderelection、leaderretrieval和其他ha服务的初始化。
  • DefaultLeaderElectionService:实现了LeaderElectionService和LeaderElectionEventHandler接口。
  • LeaderElectionEventHandler:接口用来做k8s leaderElection选举回调。
  • LeaderElectionService:接口应用于Dispatch等具体某一个LeaderContender组件来做选举。每个组件都创建一个LeaderElectionLock对象(ConfigMap)
  • MultipleComponentLeaderElectionService:同一节点上的多个组件公用一个LeaderElectionLock(也就是只存在一个ConfigMap)
  • LeaderElectionDriver:LeaderElectionService的成员变量,执行选举操作和存储leader信息。Driver当中调用KubernetesLeaderElector类具体实现选举。
  • LeaderContender:需要lead election的组件要实现接口,LeaderElectionEventHandler回调最终会回调到LeaderContender接口中的方法。

    整个LeaderElection主要三个关键部分:LeaderContender是需要选举功能的组件,LeaderElectionService负责选举整个流程服务,LeaderElectionDriver负责和依赖的三方HA模块具体交互实现。

LeaderContender组件创建&选举

通过Dispatcher启动流程看leaderElection,入口在org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create。

  1. dispatcherRunner =
  2. dispatcherRunnerFactory.createDispatcherRunner(
  3. highAvailabilityServices.getDispatcherLeaderElectionService(),
  4. fatalErrorHandler,
  5. new HaServicesJobPersistenceComponentFactory(highAvailabilityServices),
  6. ioExecutor,
  7. rpcService,
  8. partialDispatcherServices);

DispatchRunner作为一个LeaderContender,在创建过程中,首先需要创建DispatcherLeaderElectionService。
由于MultipleComponentLeaderElectionService是公用的,所以highAvailabilityServices里只会初始化一次。

  1. @Override
  2. public LeaderElectionService getDispatcherLeaderElectionService() {
  3. return createLeaderElectionService(getLeaderPathForDispatcher());
  4. }
  5. protected LeaderElectionService createLeaderElectionService(String leaderName) {
  6. final MultipleComponentLeaderElectionService multipleComponentLeaderElectionService =
  7. getOrInitializeSingleLeaderElectionService();
  8. //此处和非Multiple一样,还是使用DefaultLeaderElectionService。
  9. //底层是用MultipleComponentLeaderElectionDriverAdapter代理到MultipleComponentLeaderElectionService。
  10. return new DefaultLeaderElectionService(
  11. multipleComponentLeaderElectionService.createDriverFactory(leaderName));
  12. }
  13. private DefaultMultipleComponentLeaderElectionService
  14. getOrInitializeSingleLeaderElectionService() {
  15. synchronized (lock) {
  16. //multipleComponentLeaderElectionService初始化一次。
  17. if (multipleComponentLeaderElectionService == null) {
  18. try {
  19. final KubernetesLeaderElectionConfiguration leaderElectionConfiguration =
  20. new KubernetesLeaderElectionConfiguration(
  21. getClusterConfigMap(), lockIdentity, configuration);
  22. multipleComponentLeaderElectionService =
  23. new DefaultMultipleComponentLeaderElectionService(
  24. fatalErrorHandler,
  25. new KubernetesMultipleComponentLeaderElectionDriverFactory(
  26. kubeClient,
  27. leaderElectionConfiguration,
  28. configMapSharedWatcher,
  29. watchExecutorService,
  30. fatalErrorHandler));
  31. } catch (Exception e) {
  32. throw new FlinkRuntimeException(
  33. "Could not initialize the default single leader election service.", e);
  34. }
  35. }
  36. return multipleComponentLeaderElectionService;
  37. }
  38. }

DispatcherLeaderElectionService创建好后,createDispatcherRunner会创建dispatchRunner,并走到DispatcherRunnerLeaderElectionLifecycleManager构造函数中start DispatcherLeaderElectionService,创建好的dispatchRunner作为DispatcherLeaderElectionService的回调组件,开始选举。

  1. private DispatcherRunnerLeaderElectionLifecycleManager(
  2. T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
  3. this.dispatcherRunner = dispatcherRunner;
  4. this.leaderElectionService = leaderElectionService;
  5. leaderElectionService.start(dispatcherRunner);
  6. }
  7. //start过程中会注册当前DispatcherLeaderElectionService到DefaultMultipleComponentLeaderElectionService中。
  8. //DefaultMultipleComponentLeaderElectionService使用leaderElectionEventHandlers保存所有注册的LeaderContender。
  9. private final Map<String, LeaderElectionEventHandler> leaderElectionEventHandlers;
  10. //使用KubernetesMultipleComponentLeaderElectionDriver执行具体选举
  11. private final MultipleComponentLeaderElectionDriver multipleComponentLeaderElectionDriver;
  12. public KubernetesMultipleComponentLeaderElectionDriver(
  13. KubernetesLeaderElectionConfiguration leaderElectionConfiguration,
  14. FlinkKubeClient kubeClient,
  15. Listener leaderElectionListener,
  16. KubernetesConfigMapSharedWatcher configMapSharedWatcher,
  17. Executor watchExecutor,
  18. FatalErrorHandler fatalErrorHandler) {
  19. Preconditions.checkNotNull(leaderElectionConfiguration);
  20. this.kubeClient = Preconditions.checkNotNull(kubeClient);
  21. this.leaderElectionListener = Preconditions.checkNotNull(leaderElectionListener);
  22. this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
  23. Preconditions.checkNotNull(configMapSharedWatcher);
  24. Preconditions.checkNotNull(watchExecutor);
  25. this.configMapName = leaderElectionConfiguration.getConfigMapName();
  26. this.lockIdentity = leaderElectionConfiguration.getLockIdentity();
  27. this.leaderElector =
  28. kubeClient.createLeaderElector(
  29. leaderElectionConfiguration, new LeaderCallbackHandlerImpl());
  30. this.configMapLabels =
  31. KubernetesUtils.getConfigMapLabels(
  32. leaderElectionConfiguration.getClusterId(),
  33. LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
  34. kubernetesWatch =
  35. configMapSharedWatcher.watch(
  36. configMapName, new ConfigMapCallbackHandlerImpl(), watchExecutor);
  37. //leaderElector run启动选举
  38. leaderElector.run();
  39. LOG.debug("Starting the {} for config map {}.", getClass().getSimpleName(), configMapName);
  40. }

LeaderContender启动&回写leadAddress

选举成功,执行DefaultMultipleComponentLeaderElectionService回调启动LeaderContender组件。

  1. //DefaultMultipleComponentLeaderElectionService isLeader回调,遍历所有LeaderContender,调用对应的DefaultLeaderElectionService onGrantLeadership回调
  2. public void isLeader() {
  3. final UUID newLeaderSessionId = UUID.randomUUID();
  4. synchronized (lock) {
  5. if (!running) {
  6. return;
  7. }
  8. currentLeaderSessionId = UUID.randomUUID();
  9. forEachLeaderElectionEventHandler(
  10. leaderElectionEventHandler ->
  11. leaderElectionEventHandler.onGrantLeadership(newLeaderSessionId));
  12. }
  13. }
  14. //DefaultLeaderElectionService onGrantLeadership里调用leaderContender.grantLeadership(issuedLeaderSessionID)
  15. //此处选择Dispatch来分析
  16. public void grantLeadership(UUID leaderSessionID) {
  17. runActionIfRunning(
  18. () -> {
  19. LOG.info(
  20. "{} was granted leadership with leader id {}. Creating new {}.",
  21. getClass().getSimpleName(),
  22. leaderSessionID,
  23. DispatcherLeaderProcess.class.getSimpleName());
  24. //创建并启动Dispatcher
  25. startNewDispatcherLeaderProcess(leaderSessionID);
  26. });
  27. }
  28. private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
  29. stopDispatcherLeaderProcess();
  30. //创建dispatcherLeaderProcess
  31. dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);
  32. final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;
  33. FutureUtils.assertNoException(
  34. previousDispatcherLeaderProcessTerminationFuture.thenRun(
  35. newDispatcherLeaderProcess::start));
  36. }
  37. //startNewDispatcherLeaderProcess-》createNewDispatcherLeaderProcess-》forwardConfirmLeaderSessionFuture
  38. //此处往ConfigMap data里写leaderAddress,对应leaderElectionService.confirmLeadership方法
  39. private void forwardConfirmLeaderSessionFuture(
  40. UUID leaderSessionID, DispatcherLeaderProcess newDispatcherLeaderProcess) {
  41. FutureUtils.assertNoException(
  42. newDispatcherLeaderProcess
  43. .getLeaderAddressFuture()
  44. .thenAccept(
  45. leaderAddress -> {
  46. if (leaderElectionService.hasLeadership(leaderSessionID)) {
  47. leaderElectionService.confirmLeadership(
  48. leaderSessionID, leaderAddress);
  49. }
  50. }));
  51. }

Flink leaderretrieval源码分析

核心类和接口分析

  • HighAvailabilityServices:提供了leaderelection、leaderretrieval和其他ha服务的初始化。
  • LeaderRetrievalService:提供lead变动回调服务。
  • LeaderRetrievalListener:获取lead变动的组件需要实现此接口。并作为LeaderRetrievalService start方法的参数,注册到LeaderRetrievalService
  • LeaderRetrievalDriver:具体实现lead变动监听功能。

    源码分析

    具体逻辑和LeadElection类似,变动监听使用K8s watch功能。
    具体逻辑略.
    k8s-client java版code demo
    InformerExample
    WatchExample