服务器启动

服务端整体架构图
image.png
Zookeeper服务器的启动,⼤致可以分为以下五个步骤
 1. 配置⽂件解析
 2. 初始化数据管理器
 3. 初始化⽹络I/O管理器
 4. 数据恢复
 5. 对外服务

单机版服务器启动
单机版服务器的启动其流程图如下
image.png
上图的过程可以分为预启动初始化过程。
1. 预启动

  1.  1. 统⼀由QuorumPeerMain作为启动类。⽆论单机或集群,在zkServer.cmdzkServer.sh中都配置了QuorumPeerMain作为启动⼊⼝类。
  2.  2. 解析配置⽂件zoo.cfgzoo.cfg配置运⾏时的基本参数,如tickTimedataDirclientPort等参数。
  3.  3. 创建并启动历史⽂件清理器DatadirCleanupManager。对事务⽇志和快照数据⽂件进⾏定时清理。
  4.  4. 判断当前是集群模式还是单机模式启动。若是单机模式,则委托给ZooKeeperServerMain进⾏启动。
  5.  5. 再次进⾏配置⽂件zoo.cfg的解析。
  6.  6. 创建服务器实例ZooKeeperServerZookeeper服务器⾸先会进⾏服务器实例的创建,然后对该服务器实例进⾏初始化,包括连接器、内存数据库、请求处理器等组件的初始化。

2. 初始化

  1.  1. 创建服务器统计器ServerStatsServerStatsZookeeper服务器运⾏时的统计器。
  2.  2. 创建Zookeeper数据管理器FileTxnSnapLogFileTxnSnapLogZookeeper上层服务器和底层数据存储之间的对接层,提供了⼀系列操作数据⽂件的接⼝,如事务⽇志⽂件和快照数据⽂件。Zookeeper根据zoo.cfg⽂件中解析出的快照数据⽬录dataDir和事务⽇志⽬录dataLogDir来创建
  3. FileTxnSnapLog
  4.  3. 设置服务器tickTime和会话超时时间限制。
  5.  4. 创建ServerCnxnFactory。通过配置系统属性zookeper.serverCnxnFactory来指定使⽤Zookeeper⾃⼰实现的NIO还是使⽤Netty框架作为Zookeeper服务端⽹络连接⼯⼚。
  6.  5. 初始化ServerCnxnFactoryZookeeper会初始化Thread作为ServerCnxnFactory的主线程,然后再初始化NIO服务器。
  7.  6. 启动ServerCnxnFactory主线程。进⼊Threadrun⽅法,此时服务端还不能处理客户端请求。
  8.  7. 恢复本地数据。启动时,需要从本地快照数据⽂件和事务⽇志⽂件进⾏数据恢复。
  9.  8. 创建并启动会话管理器。Zookeeper会创建会话管理器SessionTracker进⾏会话管理。
  10.  9. 初始化Zookeeper的请求处理链。Zookeeper请求处理⽅式为责任链模式的实现。会有多个请求处理器依次处理⼀个客户端请求,在服务器启动时,会将这些请求处理器串联成⼀个请求处理链。
  11.  10. 注册JMX服务。Zookeeper会将服务器运⾏时的⼀些信息以JMX的⽅式暴露给外部。
  12.  11. 注册Zookeeper服务器实例。将Zookeeper服务器实例注册给ServerCnxnFactory,之后Zookeeper就可以对外提供服务。

⾄此,单机版的Zookeeper服务器启动完毕。

集群服务器启动
单机和集群服务器的启动在很多地⽅是⼀致的,其流程图如下:
image.png
上图的过程可以分为预启动、初始化、Leader选举、Leader与Follower启动期交互、Leader与Follower启动等过程
1. 预启动

  1. 1. 统⼀由QuorumPeerMain作为启动类。
  2. 2. 解析配置⽂件zoo.cfg
  3. 3. 创建并启动历史⽂件清理器DatadirCleanupFactory
  4. 4. 判断当前是集群模式还是单机模式的启动。在集群模式中,在zoo.cfg⽂件中配置了多个服务器地址,可以选择集群启动。

2. 初始化

  1. 1. 创建ServerCnxnFactory
  2. 2. 初始化ServerCnxnFactory
  3. 3. 创建Zookeeper数据管理器FileTxnSnapLog
  4. 4. 创建QuorumPeer实例。Quorum是集群模式下特有的对象,是Zookeeper服务器实例(ZooKeeperServer)的托管者,QuorumPeer代表了集群中的⼀台机器,在运⾏期间,QuorumPeer会不断检测当前服务器实例的运⾏状态,同时根据情况发起Leader选举。
  5. 5. 创建内存数据库ZKDatabaseZKDatabase负责管理ZooKeeper的所有会话记录以及DataTree和事务⽇志的存储。
  6. 6. 初始化QuorumPeer。将核⼼组件如FileTxnSnapLogServerCnxnFactoryZKDatabase注册到QuorumPeer中,同时配置QuorumPeer的参数,如服务器列表地址、Leader选举算法和会话超时时间限制等。
  7. 7. 恢复本地数据。
  8. 8. 启动ServerCnxnFactory主线程

3. Leader选举

  1. 1. 初始化Leader选举。
  2. 集群模式特有,Zookeeper⾸先会根据⾃身的服务器IDSID)、最新的ZXIDlastLoggedZxid)和当前的服务器epochcurrentEpoch)来⽣成⼀个初始化投票,在
  3. 初始化过程中,每个服务器都会给⾃⼰投票。然后,根据zoo.cfg的配置,创建相应Leader选举算法实现,Zookeeper提供了三种默认算法(LeaderElectionAuthFastLeaderElectionFastLeaderElection),
  4. 可通过zoo.cfg中的electionAlg属性来指定,但现只⽀持FastLeaderElection选举算法。在初始化阶段,Zookeeper会创建Leader选举所需的⽹络I/OQuorumCnxManager,同时启动对Leader选举端⼝的监听,等待集群中其他服务器创建连接。
  5. 2. 注册JMX服务。
  6. 3. 检测当前服务器状态
  7. 运⾏期间,QuorumPeer会不断检测当前服务器状态。在正常情况下,Zookeeper服务器的状态在LOOKINGLEADINGFOLLOWING/OBSERVING之间进⾏切换。在启动阶段,QuorumPeer的初始状态是LOOKING,因此开始进⾏Leader选举。
  8. 4. Leader选举
  9. ZooKeeperLeader选举过程,简单地讲,就是⼀个集群中所有的机器相互之间进⾏⼀系列投票,选举产⽣最合适的机器成为Leader,同时其余机器成为Follower或是Observer的集群机器⻆
  10. ⾊初始化过程。关于Leader选举算法,简⽽⾔之,就是集群中哪个机器处理的数据越新(通常我们根据每个服务器处理过的最⼤ZXID来⽐较确定其数据是否更新),其越有可能成为Leader。当然,如
  11. 果集群中的所有机器处理的ZXID⼀致的话,那么SID最⼤的服务器成为Leader,其余机器称为FollowerObserver

4. Leader和Follower启动期交互过程
到这⾥为⽌,ZooKeeper已经完成了Leader选举,并且集群中每个服务器都已经确定了⾃⼰的⻆⾊——通常情况下就分为 Leader 和 Follower 两种⻆⾊。下⾯我们来对 Leader和Follower在启动期间的交互进⾏介绍,其⼤致交互流程如图所示。
image.png
1. 创建Leader服务器和Follower服务器。完成Leader选举后,每个服务器会根据⾃⼰服务器的⻆⾊创建相应的服务器实例,并进⼊各⾃⻆⾊的主流程。
2. Leader服务器启动Follower接收器LearnerCnxAcceptor。运⾏期间,Leader服务器需要和所有其余的服务器(统称为Learner)保持连接以确集群的机器存活情况,LearnerCnxAcceptor负责接
收所有⾮Leader服务器的连接请求。
3. Learner服务器开始和Leader建⽴连接。所有Learner会找到Leader服务器,并与其建⽴连接。
4. Leader服务器创建LearnerHandler。Leader接收到来⾃其他机器连接创建请求后,会创建⼀个LearnerHandler实例,每个LearnerHandler实例都对应⼀个Leader与Learner服务器之间的连接,其负责Leader和Learner服务器之间⼏乎所有的消息通信和数据同步。
5. 向Leader注册。Learner完成和Leader的连接后,会向Leader进⾏注册,即将Learner服务器的基本信息(LearnerInfo),包括SID和ZXID,发送给Leader服务器。
6. Leader解析Learner信息,计算新的epoch。Leader接收到Learner服务器基本信息后,会解析出该Learner的SID和ZXID,然后根据ZXID解析出对应的epoch_of_learner,并和当前Leader服务器的epoch_of_leader进⾏⽐较,如果该Learner的epoch_of_learner更⼤,则更新Leader的epoch_of_leader = epoch_of_learner + 1。然后LearnHandler进⾏等待,直到过半Learner已经向Leader进⾏了注册,同时更新了epoch_of_leader后,Leader就可以确定当前集群的epoch了。
7. 发送Leader状态。计算出新的epoch后,Leader会将该信息以⼀个LEADERINFO消息的形式发送给Learner,并等待Learner的响应。
8. Learner发送ACK消息。Learner接收到LEADERINFO后,会解析出epoch和ZXID,然后向Leader反馈⼀个ACKEPOCH响应。
9. 数据同步。Leader收到Learner的ACKEPOCH后,即可进⾏数据同步。
10. 启动Leader和Learner服务器。当有过半Learner已经完成了数据同步,那么Leader和Learner服务器实例就可以启动了

5. Leader和Follower启动
1. 创建启动会话管理器。
2. 初始化Zookeeper请求处理链,集群模式的每个处理器也会在启动阶段串联请求处理链。
3. 注册JMX服务。

 ⾄此,集群版的Zookeeper服务器启动完毕

leader选举

Leader选举概述
Leader选举是zookeeper最重要的技术之⼀,也是保证分布式数据⼀致性的关键所在。
当Zookeeper集群中的⼀台服务器出现以下两种情况之⼀时,需要进⼊Leader选举。
  (1) 服务器初始化启动。
  (2) 服务器运⾏期间⽆法和Leader保持连接。
下⾯就两种情况进⾏分析讲解。
服务器启动时期的Leader选举
若进⾏Leader选举,则⾄少需要两台机器,这⾥选取3台机器组成的服务器集群为例。在集群初始化阶段,当有⼀台服务器Server1启动时,其单ᇿ⽆法进⾏和完成Leader选举,当第⼆台服务器Server2启动时,此时两台机器可以相互通信,每台机器都试图找到Leader,于是进⼊Leader选举过程。选举过程如下
(1) 每个Server发出⼀个投票
由于是初始情况,Server1(假设myid为1)和Server2假设myid为2)都会将⾃⼰作为Leader服务器来进⾏投票,每次投票会包含所推举的服务器的myid和ZXID,使⽤(myid, ZXID)来表示,此时Server1的投票为(1, 0),Server2的投票为(2, 0),然后各⾃将这个投票发给集群中其他机器
(2) 接受来⾃各个服务器的投票
集群的每个服务器收到投票后,⾸先判断该投票的有效性,如检查是否是本轮投票、是否来⾃LOOKING状态的服务器。
(3) 处理投票
针对每⼀个投票,服务器都需要将别⼈的投票和⾃⼰的投票进⾏PK,PK规则如下
    · 优先检查ZXID。ZXID⽐较⼤的服务器优先作为Leader。
    · 如果ZXID相同,那么就⽐较myid。myid较⼤的服务器作为Leader服务器。
  现在我们来看Server1和Server2实际是如何进⾏投票处理的。对于Server1来说,它⾃⼰的投票是(1,0),⽽接收到的投票为(2,0)。⾸先会对⽐两者的ZXID,因为都是0,所以⽆法决定谁是Leader。接下来会对⽐两者的myid,很显然,Server1发现接收到的投票中的myid是2,⼤于⾃⼰,于是就会更新⾃⼰的投票为(2,0),然后重新将投票发出去。⽽对于Server2来说,不需要更新⾃⼰的投票
(4) 统计投票

  1. 每次投票后,服务器都会统计所有投票,判断是否已经有过半的机器接收到相同的投票信息。对于Server1Server2服务器来说,都统计出集群中已经有两台机器接受了(20)这个投票信息。这⾥我
  2. 们需要对“过半”的概念做⼀个简单的介绍。所谓“过半”就是指⼤于集群机器数量的⼀半,即⼤于或等于(n/2+1)。对于这⾥由3台机器构成的集群,⼤于等于2台即为达到“过半”要求。

那么,当Server1和Server2都收到相同的投票信息(2,0)的时候,即认为已经选出了Leader。

(5) 改变服务器状态
⼀旦确定了 Leader,每个服务器就会更新⾃⼰的状态:如果是 Follower,那么就变更为FOLLOWING,如果是Leader,那么就变更为LEADING。

服务器运⾏时期的Leader选举
在ZooKeeper集群正常运⾏过程中,⼀旦选出⼀个Leader,那么所有服务器的集群⻆⾊⼀般不会再发⽣变化——也就是说,Leader服务器将⼀直作为集群的Leader,即使集群中有⾮Leader机器挂了或是有新机器加⼊集群也不会影响Leader。但是⼀旦Leader所在的机器挂了,那么整个集群将暂时⽆法对外服务,⽽是进⼊新⼀轮的Leader选举。服务器运⾏期间的Leader选举和启动时期的Leader选举基本过程是⼀致的。
我们还是假设当前正在运⾏的 ZooKeeper 机器由 3 台机器组成,分别是 Server1、Server2和Server3,当前的Leader是Server2。假设在某⼀个瞬间,Leader挂了,这个时候便开始了Leader选举。
(1) 变更状态
Leader挂后,余下的⾮Observer服务器都会将⾃⼰的服务器状态变更为LOOKING,然后开始进⼊Leader选举过程。

(2) 每个Server会发出⼀个投票
在运⾏期间,每个服务器上的ZXID可能不同,此时假定Server1的ZXID为123,Server3的ZXID为122;在第⼀轮投票中,Server1和Server3都会投⾃⼰,产⽣投票(1, 123),(3, 122),然后各⾃将投票发送给集群中所有机器。
(3) 接收来⾃各个服务器的投票,与启动时过程相同
(4) 处理投票。与启动时过程相同,此时,Server1将会成为Leader
(5) 统计投票。与启动时过程相同
(6) 改变服务器的状态。与启动时过程相同

单机启动源码

单机模式的委托启动类为:QuorumPeerMain
服务端启动过程
看下QuorumPeerMain⾥⾯的main函数代码:

  1. public static void main(String[] args) {
  2. QuorumPeerMain main = new QuorumPeerMain();
  3. try {
  4. //初始化并加载配置文件
  5. main.initializeAndRun(args);
  6. }
  7. //....其他省略....
  8. LOG.info("Exiting normally");
  9. System.exit(0);
  10. }
  1. protected void initializeAndRun(String[] args)
  2. throws ConfigException, IOException, AdminServerException
  3. {
  4. QuorumPeerConfig config = new QuorumPeerConfig();
  5. if (args.length == 1) {
  6. config.parse(args[0]);
  7. }
  8. // Start and schedule the the purge task
  9. //历史文件清理器
  10. DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
  11. .getDataDir(), config.getDataLogDir(), config
  12. .getSnapRetainCount(), config.getPurgeInterval());
  13. purgeMgr.start();
  14. // 当配置了多节点信息,config.isDistributed()=true
  15. if (args.length == 1 && config.isDistributed()) {
  16. // 集群模式
  17. runFromConfig(config);
  18. } else {
  19. LOG.warn("Either no config or no quorum defined in config, running "
  20. + " in standalone mode");
  21. // there is only server in the quorum -- run as standalone
  22. // 单机模式
  23. ZooKeeperServerMain.main(args);
  24. }
  25. }

image.png
判断集群还是单机
image.png
判断完成后委托为ZooKeeperServerMain类去启动
进入ZooKeeperServerMain类main方法
image.png
=>_protected void _initializeAndRun(String[] args)
创建ServerConfig对象 进行解析config文件

  1. // 解析单机模式的配置对象,并启动单机模式
  2. protected void initializeAndRun(String[] args)
  3. throws ConfigException, IOException, AdminServerException
  4. {
  5. try {
  6. //注册jmx
  7. // JMX的全称为Java Management Extensions.是管理Java的一种扩展。
  8. // 这种机制可以方便的管理、监控正在运行中的Java程序。常用于管理线程,内存,日志Level,服务重启,系统环境等
  9. ManagedUtil.registerLog4jMBeans();
  10. } catch (JMException e) {
  11. LOG.warn("Unable to register log4j JMX control", e);
  12. }
  13. // 创建服务配置对象
  14. ServerConfig config = new ServerConfig();
  15. //如果入参只有一个,则认为是配置文件的路径
  16. if (args.length == 1) {
  17. // 解析配置文件
  18. config.parse(args[0]);
  19. } else {
  20. // 参数有多个,解析参数
  21. config.parse(args);
  22. }
  23. // 根据配置运行服务
  24. runFromConfig(config);
  25. }

然后根据配置运行服务:
image.png
=>org.apache.zookeeper.server.ZooKeeperServerMain#runFromConfig
源码:

  1. public void runFromConfig(ServerConfig config)
  2. throws IOException, AdminServerException {
  3. LOG.info("Starting server");
  4. FileTxnSnapLog txnLog = null;
  5. try {
  6. // Note that this thread isn't going to be doing anything else,
  7. // so rather than spawning another thread, we will just call
  8. // run() in this thread.
  9. // create a file logger url from the command line args
  10. //初始化日志文件 创建数据管理器 提供一系列操作数据文件接口
  11. txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);
  12. // 初始化zkServer对象
  13. final ZooKeeperServer zkServer = new ZooKeeperServer(txnLog,
  14. config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null);
  15. // 服务结束钩子,用于知道服务器错误或关闭状态更改。 解除等待阻塞
  16. final CountDownLatch shutdownLatch = new CountDownLatch(1);
  17. zkServer.registerServerShutdownHandler(
  18. new ZooKeeperServerShutdownHandler(shutdownLatch));
  19. // Start Admin server
  20. // 创建admin服务,用于接收请求(创建jetty服务)
  21. adminServer = AdminServerFactory.createAdminServer();
  22. // 设置zookeeper服务
  23. adminServer.setZooKeeperServer(zkServer);
  24. // AdminServer是3.5.0之后支持的特性,启动了一个jettyserver,默认端口是8080,访问此端口可以获取Zookeeper运行时的相关信息
  25. adminServer.start();
  26. boolean needStartZKServer = true;
  27. //---启动ZooKeeperServer
  28. //判断配置文件中 clientportAddress是否为null
  29. if (config.getClientPortAddress() != null) {
  30. //ServerCnxnFactory是Zookeeper中的重要组件,负责处理客户端与服务器的连接
  31. //初始化server端IO对象,默认是NIOServerCnxnFactory:Java原生NIO处理网络IO事件
  32. cnxnFactory = ServerCnxnFactory.createFactory();
  33. //初始化配置信息
  34. cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);
  35. //启动服务:此方法除了启动ServerCnxnFactory,还会启动ZooKeeper
  36. cnxnFactory.startup(zkServer);
  37. // zkServer has been started. So we don't need to start it again in secureCnxnFactory.
  38. needStartZKServer = false;
  39. }
  40. if (config.getSecureClientPortAddress() != null) {
  41. secureCnxnFactory = ServerCnxnFactory.createFactory();
  42. secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true);
  43. secureCnxnFactory.startup(zkServer, needStartZKServer);
  44. }
  45. // 定时清除容器节点
  46. //container ZNodes是3.6版本之后新增的节点类型,Container类型的节点会在它没有子节点时
  47. // 被删除(新创建的Container节点除外),该类就是用来周期性的进行检查清理工作
  48. containerManager = new ContainerManager(zkServer.getZKDatabase(), zkServer.firstProcessor,
  49. Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)),
  50. Integer.getInteger("znode.container.maxPerMinute", 10000)
  51. );
  52. containerManager.start();
  53. // Watch status of ZooKeeper server. It will do a graceful shutdown
  54. // if the server is not running or hits an internal error.
  55. // ZooKeeperServerShutdownHandler处理逻辑,只有在服务运行不正常的情况下,才会往下执行
  56. shutdownLatch.await();
  57. // 关闭服务
  58. shutdown();
  59. if (cnxnFactory != null) {
  60. cnxnFactory.join();
  61. }
  62. if (secureCnxnFactory != null) {
  63. secureCnxnFactory.join();
  64. }
  65. if (zkServer.canShutdown()) {
  66. zkServer.shutdown(true);
  67. }
  68. } catch (InterruptedException e) {
  69. // warn, but generally this is ok
  70. LOG.warn("Server interrupted", e);
  71. } finally {
  72. if (txnLog != null) {
  73. txnLog.close();
  74. }
  75. }
  76. }

下面分析runFromConfig方法
image.png
跟踪进入ZooKeeperServer创建过程,在创建ZooKeeperServer的内部创建了ServerStats
image.png
接下来开始创建ServerCnxnFactory. 这里可以支持使用Netty去或者原生NIO处理网络IO请求,默认生成的是NIOServerCnxnFactory,然后接下来根据配置去初始化一些其他信息
image.png
继续跟踪走到startup方法 此处开始真正启动服务
image.png
=>org.apache.zookeeper.server.ServerCnxnFactory#startup(org.apache.zookeeper.server.ZooKeeperServer)
此处startup方法是抽象方法,具体逻辑由子类实现
image.png
继续跟踪方法进入到=>org.apache.zookeeper.server.NIOServerCnxnFactory#startup
image.png
=>org.apache.zookeeper.server.NIOServerCnxnFactory#start 可以看出在start中主要是用来启动NIO所需要的多个线程
image.png
NIO相关线程启动完成之后 跟踪进入 数据加载和启动流程
image.png
首先看startdata() 在这里创建了默认的zookeeper节点,然后加载可能已经存有的数据即恢复本地数据
=>image.png
然后调用zks.startup() 启动服务
startup源码

  1. public synchronized void startup() {
  2. //初始化session追踪器 即会话管理器
  3. if (sessionTracker == null) {
  4. createSessionTracker();
  5. }
  6. //启动session追踪器
  7. startSessionTracker();
  8. //建立请求处理链路
  9. setupRequestProcessors();
  10. //注册jmx
  11. registerJMX();
  12. setState(State.RUNNING);
  13. notifyAll();
  14. }

跟踪进入org.apache.zookeeper.server.ZooKeeperServer#setupRequestProcessors 看看处理链路
//这⾥可以看出,单机模式下请求的处理链路为:
//PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor

  1. /* PrepRequestProcessor主要内容:对请求进行区分是否是事务请求,如果是事务请求则创建出事务请求头,
  2. 同时执行一些检查操作,对于增删改等影响数据状态的操作都被认为是事务,需要创建出事务请求头
  3. */
  4. /*
  5. SyncRequestProcessor处理器 主要对事务请求进行日志记录,同时事务请求达到一定次数后,就会执行一次快照
  6. */
  7. /*
  8. FinalRequestProcessor处理器### 作为处理器链上的最后一个处理器,负责执行请求的具体任务,前面几个处理器都是辅助操作,
  9. PrepRequestProcessor为请求添加事务请求头和执行一些检查工作,
  10. SyncRequestProcessor也仅仅是把该请求记录下来保存到事务日志中。
  11. 该请求的具体内容,如获取所有的子节点,创建node的这些具体的操作就是由FinalRequestProcessor来完成的
  12. */
  13. protected void setupRequestProcessors() {
  14. RequestProcessor finalProcessor = new FinalRequestProcessor(this);
  15. RequestProcessor syncProcessor = new SyncRequestProcessor(this,
  16. finalProcessor);
  17. ((SyncRequestProcessor)syncProcessor).start();
  18. firstProcessor = new PrepRequestProcessor(this, syncProcessor);
  19. ((PrepRequestProcessor)firstProcessor).start();
  20. }

最后设置状态为running 到此启动流程完成

源码分析之Leader选举

分析Zookeeper中⼀个核⼼的模块,Leader选举。
总体框架图
对于Leader选举,其总体框架图如下图所示
image.png
AuthFastLeaderElection,LeaderElection其在3.4.0之后的版本中已经不建议使⽤。
Election源码分析

  1. package org.apache.zookeeper.server.quorum;
  2. /*
  3. * 实现类:FastLeaderElection 其是标准的fast paxos算法的实现,基于TCP协议进行选举
  4. * */
  5. public interface Election {
  6. //标识寻找Leader
  7. public Vote lookForLeader() throws InterruptedException;
  8. //关闭服务端之间的连接
  9. public void shutdown();
  10. }

说明:
  选举的⽗接⼝为Election,其定义了lookForLeader和shutdown两个⽅法,lookForLeader表示寻找Leader,shutdown则表示关闭,如关闭服务端之间的连接。

Zookeeper中默认的选举策略,FastLeaderElection。
FastLeaderElection源码分析
类的继承关系

  1. public class FastLeaderElection implements Election {}

说明:FastLeaderElection实现了Election接⼝,重写了接⼝中定义的lookForLeader⽅法和shutdown⽅法

在源码分析之前,我们⾸先介绍⼏个概念:

  • 外部投票:特指其他服务器发来的投票。
  • 内部投票:服务器⾃身当前的投票。
  • 选举轮次:ZooKeeper服务器Leader选举的轮次,即logical clock(逻辑时钟)。
  • PK:指对内部投票和外部投票进⾏⼀个对⽐来确定是否需要变更内部投票。选票管理
  • sendqueue:选票发送队列,⽤于保存待发送的选票。
  • recvqueue:选票接收队列,⽤于保存接收到的外部投票。

image.png
三个内部类:

Notification:表示收到的选举投票信息(其他服务器发来的选举投票信息),其包含了被选举者的id、zxid、选举周期等信息, 其buildMsg方法将选举信息封装至ByteBuffer中再进行发送
ToSend表示发送给其他服务器的选举投票信息,也包含了被选举者的id、zxid、选举周期等信息
Messenger:多线程消息处理器,workerSender 发送选票信息, workerReceiver 接收选票信息

lookForLeader函数
当 ZooKeeper 服务器检测到当前服务器状态变成 LOOKING 时,就会触发 Leader选举,即调⽤lookForLeader⽅法来进⾏Leader选举。
image.png
更新逻辑时钟和初始化选票信息

  1. public Vote lookForLeader() throws InterruptedException {
  2. synchronized(this){
  3. // ⾸先会将逻辑时钟⾃增,每进⾏⼀轮新的leader选举,都需要更新逻辑时钟
  4. logicalclock++;
  5. // 更新选票(初始化选票)
  6. updateProposal(getInitId(), getInitLastLoggedZxid(),
  7. getPeerEpoch());
  8. }
  9. LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid));
  10. // 向其他服务器发送⾃⼰的选票(已更新的选票)
  11. sendNotifications();

选票初始化完成后 将选票发送出去
image.png
跟踪sendNotifications方法 在此封装toSend 将选票放入阻塞队列发送出去
image.png

接下来继续回到lookForLeader中,循环获取选票信息
image.png
之后每台服务器会不断地从recvqueue队列中获取外部选票。如果服务器发现⽆法获取到任何外部投票,就⽴即确认⾃⼰是否和集群中其他服务器保持着有效的连接,如果没有连接,则⻢上建⽴连接,如果已经建⽴了连接,则再次发送⾃⼰当前的内部投票,其流程如下

  1. //4.接收外部选票:每台服务器会不断从recvqueue中去获取外部投票
  2. Notification n = recvqueue.poll(notTimeout,
  3. TimeUnit.MILLISECONDS);
  4. /*
  5. * Sends more notifications if haven't received enough.
  6. * Otherwise processes new notification.
  7. */
  8. //n为null时 未获取到任何选票信息
  9. if(n == null){
  10. //判断自己是否与集群断开连接
  11. if(manager.haveDelivered()){
  12. //没有断开:发送自己本身的投票信息
  13. sendNotifications();
  14. } else {
  15. //d断开:马上重新连接
  16. manager.connectAll();
  17. }
  18. /*
  19. * Exponential backoff
  20. */
  21. int tmpTimeOut = notTimeout*2;
  22. notTimeout = (tmpTimeOut < maxNotificationInterval?
  23. tmpTimeOut : maxNotificationInterval);
  24. LOG.info("Notification time out: " + notTimeout);
  25. }
  26. //处理外部投票(判断选票轮次 外部和内部投票的选举是否在同一轮次) 同时校验被投票成为leader的服务器是否有效
  27. else if (validVoter(n.sid) && validVoter(n.leader)) {

在发送完初始化选票之后,接着开始处理外部投票。在处理外部投票时,会根据选举轮次来进⾏不同的处理。  

 · 外部投票的选举轮次⼤于内部投票。若服务器⾃身的选举轮次落后于该外部投票对应服务器的选举轮次,那么就会⽴即更新⾃⼰的选举轮次(logicalclock),并且清空所有已经收到的投票,然后使⽤初始化的投票来进⾏PK以确定是否变更内部投票。最终再将内部投票发送出去。
· 外部投票的选举轮次⼩于内部投票。若服务器接收的外选票的选举轮次落后于⾃身的选举轮次,那么Zookeeper就会直接忽略该外部投票,不做任何处理。
· 外部投票的选举轮次等于内部投票。此时可以开始进⾏选票PK,如果消息中的选票更优,则需要更新本服务器内部选票,再发送给其他服务器。
  之后再对选票进⾏归档操作,⽆论是否变更了投票,都会将刚刚收到的那份外部投票放⼊选票集合recvset中进⾏归档,其中recvset⽤于记录当前服务器在本轮次的Leader选举中收到的所有外部投票,然后开始统计投票,统计投票是为了统计集群中是否已经有过半的服务器认可了当前的内部投票,如果确定已经有过半服务器认可了该投票,然后再进⾏最后⼀次确认,判断是否⼜有更优的选票产⽣,若⽆,则终⽌投票,然后最终的选票,其流程如下

  1. //处理外部投票(判断选票轮次 外部和内部投票的选举是否在同一轮次) 同时校验被投票成为leader的服务器是否有效
  2. else if (validVoter(n.sid) && validVoter(n.leader)) {
  3. /*
  4. * Only proceed if the vote comes from a replica in the current or next
  5. * voting view for a replica in the current or next voting view.
  6. */
  7. switch (n.state) { //判断选票状态
  8. case LOOKING: //集群启动的时候 都是looking状态
  9. // If notification > current, replace and send messages out
  10. //外部投票的选举轮次大于内部投票
  11. if (n.electionEpoch > logicalclock.get()) {
  12. //更新选举轮次 以外部轮次为准
  13. logicalclock.set(n.electionEpoch);
  14. //同时清空所有已经收到的选票
  15. recvset.clear();
  16. //此处先与外部比较epoch值、epoch相同再比较zxid,zxid相同再比较myid
  17. // 如果外部的大则下面更新为外部的zxid和epoch // 进⾏PK,选出较优的服务器
  18. if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
  19. getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
  20. // 更新选票
  21. updateProposal(n.leader, n.zxid, n.peerEpoch);
  22. } else { // ⽆法选出较优的服务器
  23. // 更新选票
  24. updateProposal(getInitId(),
  25. getInitLastLoggedZxid(),
  26. getPeerEpoch());
  27. }
  28. //再次发送自己本身的选票
  29. sendNotifications();
  30. //如果外部投票的选举轮次小于内部投票 直接忽略外部投票
  31. // // 选举周期⼩于逻辑时钟,不做处理,直接忽略
  32. } else if (n.electionEpoch < logicalclock.get()) {
  33. if(LOG.isDebugEnabled()){
  34. LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
  35. + Long.toHexString(n.electionEpoch)
  36. + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
  37. }
  38. break;
  39. //外部投票的选举轮次和内部投票一致,也是绝大多数情况:
  40. //6.进行选票PK
  41. } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
  42. proposedLeader, proposedZxid, proposedEpoch)) {
  43. //更新自己本身的投票
  44. updateProposal(n.leader, n.zxid, n.peerEpoch);
  45. //7、变更选票 重新发送选票信息
  46. sendNotifications();
  47. }
  48. if(LOG.isDebugEnabled()){
  49. LOG.debug("Adding vote: from=" + n.sid +
  50. ", proposed leader=" + n.leader +
  51. ", proposed zxid=0x" + Long.toHexString(n.zxid) +
  52. ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
  53. }
  54. //8.选票归档:将收到的外部投票放进选票集合recvse中
  55. recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
  56. //判断当前节点收到的票数 是否已经可以结束选举
  57. if (termPredicate(recvset,
  58. new Vote(proposedLeader, proposedZxid,
  59. logicalclock.get(), proposedEpoch))) {
  60. // Verify if there is any change in the proposed leader
  61. while((n = recvqueue.poll(finalizeWait,
  62. TimeUnit.MILLISECONDS)) != null){
  63. if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
  64. proposedLeader, proposedZxid, proposedEpoch)){
  65. recvqueue.put(n);
  66. break;
  67. }
  68. }
  69. /*
  70. * This predicate is true once we don't read any new
  71. * relevant message from the reception queue
  72. */
  73. if (n == null) {
  74. //10.设置服务器状态
  75. //会等待⼀段时间(默认是 200 毫秒)
  76. self.setPeerState((proposedLeader == self.getId()) ?
  77. ServerState.LEADING: learningState());
  78. Vote endVote = new Vote(proposedLeader,
  79. proposedZxid, proposedEpoch);
  80. leaveInstance(endVote);
  81. return endVote;
  82. }
  83. }
  84. break;
  85. case OBSERVING:

1.⾃增选举轮次。 在 FastLeaderElection 实现中,有⼀个 logicalclock 属性,⽤于标识当前Leader的选举轮次,ZooKeeper规定了所有有效的投票都必须在同⼀轮次中。ZooKeeper在开始新⼀轮的投票时,会⾸先对logicalclock进⾏⾃增操作。

2.初始化选票。 在开始进⾏新⼀轮的投票之前,每个服务器都会⾸先初始化⾃⼰的选票。在图7-33中我们已经讲解了 Vote 数据结构,初始化选票也就是对 Vote 属性的初始化。在初始化阶段,每台服务器都会将⾃⼰推举为Leader,表7-10展示了⼀个初始化的选票。
3.发送初始化选票。 在完成选票的初始化后,服务器就会发起第⼀次投票。ZooKeeper 会将刚刚初始化好的选票放⼊sendqueue队列中,由发送器WorkerSender负责

4.接收外部投票。 每台服务器都会不断地从 recvqueue 队列中获取外部投票。如果服务器发现⽆法获取到任何的外部投票,那么就会⽴即确认⾃⼰是否和集群中其他服务器保持着有效连接。如果发现没有建⽴连接,那么就会⻢上建⽴连接。如果已经建⽴了连接,那么就再次发送⾃⼰当前的内部投票。

5.判断选举轮次。 当发送完初始化选票之后,接下来就要开始处理外部投票了。在处理外部投票的时候,会根据选举轮次来进⾏不同的处理。 · 外部投票的选举轮次⼤于内部投票。如果服务器发现⾃⼰的选举轮次已经落后于该外部投票对应服务器的选举轮次,那么就会⽴即更新⾃⼰的选举轮次(logicalclock),并且清空所有已经收到的投票,然后使⽤初始化的投票来进⾏PK以确定是否变更内部投票(关于P K的逻辑会在步骤6中统⼀讲解),最终再将内部投票发送出去。 · 外部投票的选举轮次⼩于内部投票。 如果接收到的选票的选举轮次落后于服务器⾃身的,那么ZooKeeper就会直接忽略该外部投票,不做任何处理,并返回步骤4。
· 外部投票的选举轮次和内部投票⼀致。 这也是绝⼤多数投票的场景,如外部投票的选举轮次和内部投票⼀致的话,那么就开始进⾏选票PK。 总的来说,只有在同⼀个选举轮次的投票才是有效的投票。

6.选票PK。 在步骤5中提到,在收到来⾃其他服务器有效的外部投票后,就要进⾏选票PK了——也就是FastLeaderElection.totalOrderPredicate⽅法的核⼼逻辑。选票PK的⽬的是为了确定当前服务器是否需要变更投票,主要从选举轮次、ZXID和 SID 三个因素来考虑,具体条件如下:在选票 PK 的时候依次判断,符合任意⼀个条件就需要进⾏投票变更。
· 如果外部投票中被推举的Leader服务器的选举轮次⼤于内部投票,那么就需要进⾏投票变更。 · 如果选举轮次⼀致的话,那么就对⽐两者的ZXID。如果外部投票的ZXID⼤于内部投票,那么就需要进⾏投票变更。 · 如果两者的 ZXID ⼀致,那么就对⽐两者的SID。如果外部投票的 SID ⼤于内部投票,那么就需要进⾏投票变更。 7.变更投票。 通过选票PK后,如果确定了外部投票优于内部投票(所谓的“优于”,是指外部投票所推举的服务器更适合成为Leader),那么就进⾏投票变更——使⽤外部投票的选票信息来覆盖内部投票。变更完成后,再次将这个变更后的内部投票发送出去。

8.选票归档。 ⽆论是否进⾏了投票变更,都会将刚刚收到的那份外部投票放⼊“选票集合”recvset中进⾏归档。recvset⽤于记录当前服务器在本轮次的Leader选举中收到的所有外部投票——按照服务器对应的SID来区分,例如,{(1,vote1),(2,vote2),…}。
9.统计投票。 完成了选票归档之后,就可以开始统计投票了。统计投票的过程就是为了统计集群中是否已经有过半的服务器认可了当前的内部投票。如果确定已经有过半的服务器认可了该内部投票,则终⽌投票。否则返回步骤4。 10.更新服务器状态。 统计投票后,如果已经确定可以终⽌投票,那么就开始更新服务器状态。服务器会⾸先判断当前被过半服务器认可的投票所对应的Leader服务器是否是⾃⼰,如果是⾃⼰的话,那么就会将⾃⼰的服务器状态更新为 LEADING。如果⾃⼰不是被选举产⽣的 Leader 的话,那么就会根据具体情况来确定⾃⼰是FOLLOWING或是OBSERVING。 以上 10 个步骤,就是 FastLeaderElection 选举算法的核⼼步骤,其中步骤 4~9 会经过⼏轮循环,直到Leader选举产⽣。另外还有⼀个细节需要注意,就是在完成步骤9之后,如果统计投票发现已经有过半的服务器认可了当前的选票,这个时候,ZooKeeper 并不会⽴即进⼊步骤 10 来更新服务器状态,⽽是会等待⼀段时间(默认是 200 毫秒)来确定是否有新的更优的投票

zookeeper源码分析之集群模式服务端

执⾏流程图
image.png
源码分析
集群模式下启动所有的ZK节点启动⼊⼝都是QuorumPeerMain类的main⽅法。 main⽅法加载配置⽂件以后,最终会调⽤到QuorumPeer的start⽅法,来看下:

  1. @Override
  2. public synchronized void start() {
  3. // 校验serverid如果不在peer列表中,抛异常
  4. if (!getView().containsKey(myid)) {
  5. throw new RuntimeException("My id " + myid + " not in the peer list");
  6. }
  7. // 加载zk数据库:载入之前持久化的一些信息 单机模式也是调用此方法恢复数据
  8. loadDataBase();
  9. // 启动连接服务端
  10. startServerCnxnFactory();
  11. try {
  12. adminServer.start();
  13. } catch (AdminServerException e) {
  14. LOG.warn("Problem starting AdminServer", e);
  15. System.out.println(e);
  16. }
  17. // 启动之后马上进行选举,主要是创建选举必须的环境,比如:启动相关线程
  18. startLeaderElection();
  19. // 执行选举逻辑
  20. super.start();
  21. }

我们已经知道了当⼀个节点启动时需要先发起选举寻找Leader节点,然后再根据Leader节点的事务信息进⾏同步,最后开始对外提供服务,这⾥我们先来看下初始化选举的逻辑,即上⾯的startLeaderElection⽅法:

  1. synchronized public void startLeaderElection() {
  2. try {
  3. // 所有节点启动的初始状态都是LOOKING,因此这里都会是创建一张投自己为Leader的票
  4. if (getPeerState() == ServerState.LOOKING) {
  5. //创建一个投自己的选票
  6. currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
  7. }
  8. } catch(IOException e) {
  9. RuntimeException re = new RuntimeException(e.getMessage());
  10. re.setStackTrace(e.getStackTrace());
  11. throw re;
  12. }
  13. // if (!getView().containsKey(myid)) {
  14. // throw new RuntimeException("My id " + myid + " not in the peer list");
  15. //}
  16. if (electionType == 0) {
  17. try {
  18. udpSocket = new DatagramSocket(myQuorumAddr.getPort());
  19. responder = new ResponderThread();
  20. responder.start();
  21. } catch (SocketException e) {
  22. throw new RuntimeException(e);
  23. }
  24. }
  25. //初始化选举算法, 选举类型 electionType默认为3
  26. this.electionAlg = createElectionAlgorithm(electionType);
  27. }
  28. @SuppressWarnings("deprecation")
  29. protected Election createElectionAlgorithm(int electionAlgorithm){
  30. Election le=null;
  31. //TODO: use a factory rather than a switch
  32. switch (electionAlgorithm) {
  33. case 0:
  34. le = new LeaderElection(this);
  35. break;
  36. case 1:
  37. le = new AuthFastLeaderElection(this);
  38. break;
  39. case 2:
  40. le = new AuthFastLeaderElection(this, true);
  41. break;
  42. case 3: //默认
  43. //electionAlgorithm默认是3,直接走到这里
  44. qcm = createCnxnManager();
  45. //监听选举事件的listener
  46. QuorumCnxManager.Listener listener = qcm.listener;
  47. if(listener != null){
  48. //开启监听器
  49. listener.start();
  50. //初始化选举算法
  51. FastLeaderElection fle = new FastLeaderElection(this, qcm);
  52. //发起选举
  53. fle.start();
  54. le = fle;
  55. } else {
  56. LOG.error("Null listener when initializing cnx manager");
  57. }
  58. break;
  59. default:
  60. assert false;
  61. }
  62. return le;
  63. }

接下来,回到QuorumPeer类中start⽅法的最后⼀⾏super.start(),QuorumPeer本身也是⼀个线程类,⼀起来看下它的run⽅法

  1. @Override
  2. public void run() {
  3. updateThreadName();
  4. try {
  5. /*
  6. * Main loop
  7. */
  8. while (running) {
  9. //根据当前节点的状态执行不同的流程
  10. switch (getPeerState()) {
  11. case LOOKING:
  12. LOG.info("LOOKING");
  13. if (Boolean.getBoolean("readonlymode.enabled")) {
  14. try {
  15. roZkMgr.start();
  16. reconfigFlagClear();
  17. if (shuttingDownLE) {
  18. shuttingDownLE = false;
  19. startLeaderElection();
  20. }
  21. //寻找Leader节点
  22. setCurrentVote(makeLEStrategy().lookForLeader());
  23. } catch (Exception e) {
  24. } finally {
  25. }
  26. } else {
  27. }
  28. break;
  29. case OBSERVING:
  30. try {
  31. //当前节点启动模式为Observer
  32. LOG.info("OBSERVING");
  33. setObserver(makeObserver(logFactory));
  34. //与Leader节点进⾏数据同步
  35. observer.observeLeader();
  36. } catch (Exception e) {
  37. } finally {
  38. }
  39. break;
  40. case FOLLOWING:
  41. try {
  42. LOG.info("FOLLOWING");
  43. //当前节点启动模式为Follower
  44. setFollower(makeFollower(logFactory));
  45. //与Leader节点进⾏数据同步
  46. follower.followLeader();
  47. } catch (Exception e) {
  48. } finally {
  49. }
  50. break;
  51. case LEADING:
  52. LOG.info("LEADING");
  53. try {
  54. //当前节点启动模式为Leader
  55. setLeader(makeLeader(logFactory));
  56. //发送⾃⼰成为Leader的通知
  57. leader.lead();
  58. setLeader(null);
  59. } catch (Exception e) {
  60. } finally {
  61. }
  62. break;
  63. }
  64. start_fle = Time.currentElapsedTime();
  65. }
  66. } finally {
  67. }
  68. }

节点初始化的状态为LOOKING,因此启动时直接会调⽤lookForLeader⽅法发起Leader选举,此方法就是上小节中leader选举算法
image.png
image.png
image.png
经过上⾯的发起投票,统计投票信息最终每个节点都会确认⾃⼰的身份,节点根据类型的不同会执⾏以下逻辑:
1. 如果是Leader节点,⾸先会想其他节点发送⼀条NEWLEADER信息,确认⾃⼰的身份,等到各个节点的ACK消息以后开始正式对外提供服务,同时开启新的监听器,处理新节点加⼊的逻辑。
2. 如果是Follower节点,⾸先向Leader节点发送⼀条FOLLOWERINFO信息,告诉Leader节点⾃⼰已处理的事务的最⼤Zxid,然后Leader节点会根据⾃⼰的最⼤Zxid与Follower节点进⾏同步,如果Follower节点落后的不多则会收到Leader的DIFF信息通过内存同步,如果Follower节点落后的很多则会收到SNAP通过快照同步,如果Follower节点的Zxid⼤于Leader节点则会收到TRUNC信息忽略多余的事务。
3. 如果是Observer节点,则与Follower节点相同