1. 服务端

服务启动:
jetbrains://idea/navigate/reference?project=tuling-zookeeper&fqn=org.apache.zookeeper.server.quorum.QuorumPeerMain

  1. public static void main(String[] args) {
  2. QuorumPeerMain main = new QuorumPeerMain();
  3. try {
  4. // 主要函数(进入)
  5. main.initializeAndRun(args);
  6. } catch (IllegalArgumentException e) {
  7. LOG.error("Invalid arguments, exiting abnormally", e);
  8. LOG.info(USAGE);
  9. System.err.println(USAGE);
  10. System.exit(2);
  11. } catch (ConfigException e) {
  12. LOG.error("Invalid config, exiting abnormally", e);
  13. System.err.println("Invalid config, exiting abnormally");
  14. System.exit(2);
  15. } catch (Exception e) {
  16. LOG.error("Unexpected exception, exiting abnormally", e);
  17. System.exit(1);
  18. }
  19. LOG.info("Exiting normally");
  20. System.exit(0);
  21. }

org.apache.zookeeper.server.quorum.QuorumPeerMain#initializeAndRun

  1. protected void initializeAndRun(String[] args)throws ConfigException, IOException
  2. {
  3. // 读取zoo.cfg配置参数
  4. QuorumPeerConfig config = new QuorumPeerConfig();
  5. if (args.length == 1) {
  6. config.parse(args[0]);
  7. }
  8. // Start and schedule the the purge task
  9. // 启动日志清楚任务
  10. DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
  11. .getDataDir(), config.getDataLogDir(), config
  12. .getSnapRetainCount(), config.getPurgeInterval());
  13. purgeMgr.start();
  14. if (args.length == 1 && config.servers.size() > 0) {
  15. // 读取配置开始搞事情(进入)
  16. runFromConfig(config);
  17. } else {
  18. LOG.warn("Either no config or no quorum defined in config, running "
  19. + " in standalone mode");
  20. // there is only server in the quorum -- run as standalone
  21. ZooKeeperServerMain.main(args);
  22. }
  23. }

org.apache.zookeeper.server.quorum.QuorumPeerMain#runFromConfig

  1. public void runFromConfig(QuorumPeerConfig config) throws IOException {
  2. try {
  3. ManagedUtil.registerLog4jMBeans();
  4. } catch (JMException e) {
  5. LOG.warn("Unable to register log4j JMX control", e);
  6. }
  7. LOG.info("Starting quorum peer");
  8. try {
  9. ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
  10. // 创建服务端的 socket 实例,有两个实现:
  11. // 1. NIOServerCnxnFactory; 2. NetyServerCnxnFactory
  12. cnxnFactory.configure(config.getClientPortAddress(),
  13. config.getMaxClientCnxns());
  14. // confg读取到的zoo.cfg赋值
  15. quorumPeer = new QuorumPeer();
  16. quorumPeer.setClientPortAddress(config.getClientPortAddress());
  17. quorumPeer.setTxnFactory(new FileTxnSnapLog(
  18. new File(config.getDataLogDir()),
  19. new File(config.getDataDir())));
  20. quorumPeer.setQuorumPeers(config.getServers());
  21. quorumPeer.setElectionType(config.getElectionAlg());
  22. quorumPeer.setMyid(config.getServerId());
  23. quorumPeer.setTickTime(config.getTickTime());
  24. quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
  25. quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
  26. quorumPeer.setInitLimit(config.getInitLimit());
  27. quorumPeer.setSyncLimit(config.getSyncLimit());
  28. quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
  29. quorumPeer.setCnxnFactory(cnxnFactory);
  30. quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
  31. quorumPeer.setLearnerType(config.getPeerType());
  32. quorumPeer.setSyncEnabled(config.getSyncEnabled());
  33. quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
  34. // 调用start方法 注意这不是调用线程的start方法(进入)
  35. quorumPeer.start();
  36. quorumPeer.join();
  37. } catch (InterruptedException e) {
  38. // warn, but generally this is ok
  39. LOG.warn("Quorum Peer interrupted", e);
  40. }
  41. }

org.apache.zookeeper.server.quorum.QuorumPeer#start

  1. @Override
  2. public synchronized void start() {
  3. loadDataBase(); // 先从内存中恢复数据写到文件中
  4. cnxnFactory.start(); // 启动服务器端 Socket 实现(进入)
  5. startLeaderElection(); // 开始选举
  6. super.start(); // 这才真正调用线程的start方法也就会执行run方法
  7. }

org.apache.zookeeper.server.NIOServerCnxnFactory#run 服务端建立链接

  1. public void run() {
  2. while (!ss.socket().isClosed()) {
  3. try {
  4. selector.select(1000);
  5. Set<SelectionKey> selected;
  6. synchronized (this) {
  7. selected = selector.selectedKeys();
  8. }
  9. ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
  10. selected);
  11. Collections.shuffle(selectedList); // 乱序
  12. for (SelectionKey k : selectedList) {
  13. if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
  14. SocketChannel sc = ((ServerSocketChannel) k
  15. .channel()).accept();
  16. InetAddress ia = sc.socket().getInetAddress();
  17. int cnxncount = getClientCnxnCount(ia);
  18. // 调用zoo.cfg配置的客户端连接数是否超过了
  19. if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
  20. LOG.warn("Too many connections from " + ia
  21. + " - max is " + maxClientCnxns );
  22. sc.close();
  23. } else {
  24. LOG.info("Accepted socket connection from "
  25. + sc.socket().getRemoteSocketAddress());
  26. sc.configureBlocking(false);
  27. // 监听read事件
  28. SelectionKey sk = sc.register(selector,
  29. SelectionKey.OP_READ);
  30. NIOServerCnxn cnxn = createConnection(sc, sk);
  31. sk.attach(cnxn);
  32. addCnxn(cnxn);
  33. }
  34. } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
  35. NIOServerCnxn c = (NIOServerCnxn) k.attachment();
  36. c.doIO(k);
  37. } else {
  38. if (LOG.isDebugEnabled()) {
  39. LOG.debug("Unexpected ops in select "
  40. + k.readyOps());
  41. }
  42. }
  43. }
  44. // 清除,下次之需
  45. selected.clear();
  46. } catch (RuntimeException e) {
  47. LOG.warn("Ignoring unexpected runtime exception", e);
  48. } catch (Exception e) {
  49. LOG.warn("Ignoring exception", e);
  50. }
  51. }
  52. closeAll();
  53. LOG.info("NIOServerCnxn factory exited run method");
  54. }

org.apache.zookeeper.server.quorum.QuorumPeer#startLeaderElection 选举开始

  1. synchronized public void startLeaderElection() {
  2. try {
  3. // 投票给自己
  4. currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
  5. } catch(IOException e) {
  6. RuntimeException re = new RuntimeException(e.getMessage());
  7. re.setStackTrace(e.getStackTrace());
  8. throw re;
  9. }
  10. // 从配置中拿出自己的选举地址
  11. for (QuorumServer p : getView().values()) {
  12. if (p.id == myid) {
  13. myQuorumAddr = p.addr;
  14. break;
  15. }
  16. }
  17. if (myQuorumAddr == null) {
  18. throw new RuntimeException("My id " + myid + " not in the peer list");
  19. }
  20. if (electionType == 0) {
  21. try {
  22. udpSocket = new DatagramSocket(myQuorumAddr.getPort());
  23. responder = new ResponderThread();
  24. responder.start();
  25. } catch (SocketException e) {
  26. throw new RuntimeException(e);
  27. }
  28. }
  29. // 选举开始
  30. this.electionAlg = createElectionAlgorithm(electionType);
  31. }

org.apache.zookeeper.server.quorum.FastLeaderElection#starter 选举初始化

  1. private void starter(QuorumPeer self, QuorumCnxManager manager) {
  2. this.self = self;
  3. proposedLeader = -1;
  4. proposedZxid = -1;
  5. sendqueue = new LinkedBlockingQueue<ToSend>();
  6. recvqueue = new LinkedBlockingQueue<Notification>();
  7. this.messenger = new Messenger(manager);
  8. }

org.apache.zookeeper.server.quorum.QuorumPeer#run选举开始
这就不贴代码了

2. 客户端

image.png

  1. public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
  2. boolean canBeReadOnly)
  3. throws IOException
  4. {
  5. LOG.info("Initiating client connection, connectString=" + connectString
  6. + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
  7. watchManager.defaultWatcher = watcher;
  8. ConnectStringParser connectStringParser = new ConnectStringParser(
  9. connectString);
  10. HostProvider hostProvider = new StaticHostProvider(
  11. connectStringParser.getServerAddresses());//拿到ip端口号
  12. cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
  13. hostProvider, sessionTimeout, this, watchManager,
  14. getClientCnxnSocket(), canBeReadOnly);//创建ClientCnxn对象
  15. cnxn.start();//非thread线程启动
  16. }

org.apache.zookeeper.ClientCnxn#ClientCnxn初始化 启动了两个线程 send和event

  1. public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
  2. ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
  3. long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
  4. this.zooKeeper = zooKeeper;
  5. this.watcher = watcher;
  6. this.sessionId = sessionId;
  7. this.sessionPasswd = sessionPasswd;
  8. this.sessionTimeout = sessionTimeout;
  9. this.hostProvider = hostProvider;
  10. this.chrootPath = chrootPath;
  11. connectTimeout = sessionTimeout / hostProvider.size();
  12. readTimeout = sessionTimeout * 2 / 3;
  13. readOnly = canBeReadOnly;
  14. sendThread = new SendThread(clientCnxnSocket);
  15. eventThread = new EventThread();
  16. }

org.apache.zookeeper.ClientCnxn#start连个线程start>run方法

  1. public void start() {
  2. sendThread.start();
  3. eventThread.start();
  4. }

org.apache.zookeeper.ClientCnxn.SendThread#run

  1. @Override
  2. public void run() {
  3. clientCnxnSocket.introduce(this,sessionId);
  4. clientCnxnSocket.updateNow();
  5. clientCnxnSocket.updateLastSendAndHeard(); //客户端和服务端链接的socket更新
  6. int to;
  7. long lastPingRwServer = System.currentTimeMillis();
  8. final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
  9. while (state.isAlive()) {
  10. try {
  11. if (!clientCnxnSocket.isConnected()) {
  12. if(!isFirstConnect){
  13. try {
  14. Thread.sleep(r.nextInt(1000));
  15. } catch (InterruptedException e) {
  16. LOG.warn("Unexpected exception", e);
  17. }
  18. }
  19. // don't re-establish connection if we are closing
  20. if (closing || !state.isAlive()) {
  21. break;
  22. }
  23. startConnect();
  24. clientCnxnSocket.updateLastSendAndHeard();
  25. }
  26. if (state.isConnected()) {
  27. // determine whether we need to send an AuthFailed event.
  28. if (zooKeeperSaslClient != null) {
  29. boolean sendAuthEvent = false;
  30. if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
  31. try {
  32. zooKeeperSaslClient.initialize(ClientCnxn.this);
  33. } catch (SaslException e) {
  34. LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
  35. state = States.AUTH_FAILED;
  36. sendAuthEvent = true;
  37. }
  38. }
  39. KeeperState authState = zooKeeperSaslClient.getKeeperState();
  40. if (authState != null) {
  41. if (authState == KeeperState.AuthFailed) {
  42. // An authentication error occurred during authentication with the Zookeeper Server.
  43. state = States.AUTH_FAILED;
  44. sendAuthEvent = true;
  45. } else {
  46. if (authState == KeeperState.SaslAuthenticated) {
  47. sendAuthEvent = true;
  48. }
  49. }
  50. }
  51. if (sendAuthEvent == true) {
  52. eventThread.queueEvent(new WatchedEvent(
  53. Watcher.Event.EventType.None,
  54. authState,null));
  55. }
  56. }
  57. to = readTimeout - clientCnxnSocket.getIdleRecv();
  58. } else {
  59. to = connectTimeout - clientCnxnSocket.getIdleRecv();
  60. }
  61. if (to <= 0) {
  62. String warnInfo;
  63. warnInfo = "Client session timed out, have not heard from server in "
  64. + clientCnxnSocket.getIdleRecv()
  65. + "ms"
  66. + " for sessionid 0x"
  67. + Long.toHexString(sessionId);
  68. LOG.warn(warnInfo);
  69. throw new SessionTimeoutException(warnInfo);
  70. }
  71. if (state.isConnected()) {
  72. //1000(1 second) is to prevent race condition missing to send the second ping
  73. //also make sure not to send too many pings when readTimeout is small
  74. int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
  75. ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
  76. //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
  77. if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
  78. sendPing();//发送心跳
  79. clientCnxnSocket.updateLastSend();
  80. } else {
  81. if (timeToNextPing < to) {
  82. to = timeToNextPing;
  83. }
  84. }
  85. }
  86. // If we are in read-only mode, seek for read/write server
  87. if (state == States.CONNECTEDREADONLY) {
  88. long now = System.currentTimeMillis();
  89. int idlePingRwServer = (int) (now - lastPingRwServer);
  90. if (idlePingRwServer >= pingRwTimeout) {
  91. lastPingRwServer = now;
  92. idlePingRwServer = 0;
  93. pingRwTimeout =
  94. Math.min(2*pingRwTimeout, maxPingRwTimeout);
  95. pingRwServer();
  96. }
  97. to = Math.min(to, pingRwTimeout - idlePingRwServer);
  98. }
  99. clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
  100. //这个方法比较长 重点看这
  101. } catch (Throwable e) {
  102. if (closing) {
  103. if (LOG.isDebugEnabled()) {
  104. // closing so this is expected
  105. LOG.debug("An exception was thrown while closing send thread for session 0x"
  106. + Long.toHexString(getSessionId())
  107. + " : " + e.getMessage());
  108. }
  109. break;
  110. } else {
  111. // this is ugly, you have a better way speak up
  112. if (e instanceof SessionExpiredException) {
  113. LOG.info(e.getMessage() + ", closing socket connection");
  114. } else if (e instanceof SessionTimeoutException) {
  115. LOG.info(e.getMessage() + RETRY_CONN_MSG);
  116. } else if (e instanceof EndOfStreamException) {
  117. LOG.info(e.getMessage() + RETRY_CONN_MSG);
  118. } else if (e instanceof RWServerFoundException) {
  119. LOG.info(e.getMessage());
  120. } else {
  121. LOG.warn(
  122. "Session 0x"
  123. + Long.toHexString(getSessionId())
  124. + " for server "
  125. + clientCnxnSocket.getRemoteSocketAddress()
  126. + ", unexpected error"
  127. + RETRY_CONN_MSG, e);
  128. }
  129. cleanup();
  130. if (state.isAlive()) {
  131. eventThread.queueEvent(new WatchedEvent(
  132. Event.EventType.None,
  133. Event.KeeperState.Disconnected,
  134. null));
  135. }
  136. clientCnxnSocket.updateNow();
  137. clientCnxnSocket.updateLastSendAndHeard();
  138. }
  139. }
  140. }
  141. cleanup();
  142. clientCnxnSocket.close();
  143. if (state.isAlive()) {
  144. eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
  145. Event.KeeperState.Disconnected, null));
  146. }
  147. ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
  148. "SendThread exited loop for session: 0x"
  149. + Long.toHexString(getSessionId()));
  150. }

org.apache.zookeeper.ClientCnxnSocketNIO#doTransport 真正干事的

  1. @Override
  2. void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
  3. ClientCnxn cnxn)
  4. throws IOException, InterruptedException {
  5. selector.select(waitTimeOut);
  6. Set<SelectionKey> selected;
  7. synchronized (this) {
  8. selected = selector.selectedKeys();
  9. }
  10. // Everything below and until we get back to the select is
  11. // non blocking, so time is effectively a constant. That is
  12. // Why we just have to do this once, here
  13. updateNow();
  14. for (SelectionKey k : selected) {
  15. SocketChannel sc = ((SocketChannel) k.channel());
  16. if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
  17. if (sc.finishConnect()) {
  18. updateLastSendAndHeard();
  19. sendThread.primeConnection();
  20. }
  21. } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
  22. doIO(pendingQueue, outgoingQueue, cnxn);//这是处理客户端往服务端发送的数据 链接之后会处理读和写操作 这不往下跟代码了 }
  23. }
  24. if (sendThread.getZkState().isConnected()) {
  25. synchronized(outgoingQueue) {
  26. if (findSendablePacket(outgoingQueue,
  27. cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
  28. enableWrite();
  29. }
  30. }
  31. }
  32. selected.clear();
  33. }

如果是回调函数怎么处理了 org.apache.zookeeper.ClientCnxn.SendThread#run
里面调用了org.apache.zookeeper.ClientCnxn.EventThread#queueEvent这个是往event队列放数据的。

org.apache.zookeeper.ClientCnxn.EventThread#run 这个就是从队列里面取数据了

  1. public void run() {
  2. try {
  3. isRunning = true;
  4. while (true) {
  5. Object event = waitingEvents.take();
  6. if (event == eventOfDeath) {
  7. wasKilled = true;
  8. } else {
  9. processEvent(event);
  10. }
  11. if (wasKilled)
  12. synchronized (waitingEvents) {
  13. if (waitingEvents.isEmpty()) {
  14. isRunning = false;
  15. break;
  16. }
  17. }
  18. }
  19. } catch (InterruptedException e) {
  20. LOG.error("Event thread exiting due to interruption", e);
  21. }
  22. LOG.info("EventThread shut down for session: 0x{}",
  23. Long.toHexString(getSessionId()));
  24. }