又是这个时间点帅气的我出现啦.jpg<br />


组件间关系图

点击查看【processon】
当开发人员 new 了一个 ZooKeeper 对象时,它会初始化以下三个组件:

  1. HostProvider ,用来解析域名 和 负载均衡的
  2. ZKWatchManager ,用来处理事件的
  3. ClientCnxn ,用来处理网络IO的
    1. SendThread ,用来处理请求/响应
    2. EventThread ,用来处理事件回调的

最后会启动 SendThreadEventThread 循环执行任务。首先 SendThread 会判断当前客户端是否被关闭,如果被关闭了就清理一下数据退出;如果没有关闭,则判断是否和服务器建立了TCP连接,如果没有就去创建TCP连接,执行连接后可能还不会立即成功(可能是异步的),此种情况会在下几次循环 SendThread#run 时判断连接是否成功;如果(直接)连接成功了,就能创建会话请求包并放入 outgoing queue 里,等待 doTransport() 取走处理;当下几次循环获取到了响应,就构建相应的事件,设置状态~然后 EventThread 取到了待执行的事件,就会去处理。
连接的关键就是,当TCP连接建立成功后,会创建会话请求包并放入 outgoing queue 中; outgoing queue 里的数据会在某次循环中被取出来发送(优先第一个)。当收到创建会话请求的响应后,构建相应的事件并设置 ZooKeeper 的状态。

1. 初始化ZooKeeper

  1. public ZooKeeper(
  2. String connectString, // 连接的服务器列表,用 , 隔开
  3. int sessionTimeout, // 会话超时时间
  4. Watcher watcher, // 连接成功后的Watch回调
  5. boolean canBeReadOnly, // 决定当前的Client是否支持 只读模式下的Server
  6. HostProvider aHostProvider, // 自定义HostProvider
  7. // ZooKeeper 客户端的配置
  8. ZKClientConfig clientConfig) throws IOException {
  9. // 如果客户端配置为null,则使用默认配置
  10. if (clientConfig == null) {
  11. clientConfig = new ZKClientConfig();
  12. }
  13. this.clientConfig = clientConfig;
  14. // 初始化默认的WatchManager
  15. watchManager = defaultWatchManager();
  16. watchManager.defaultWatcher = watcher;
  17. ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
  18. hostProvider = aHostProvider;
  19. // 构建 ClientCnxn
  20. cnxn = createConnection(
  21. connectStringParser.getChrootPath(),
  22. hostProvider,
  23. sessionTimeout,
  24. this,
  25. watchManager,
  26. getClientCnxnSocket(),
  27. canBeReadOnly);
  28. // 启动 SendThread 、EventThread
  29. cnxn.start();
  30. }

2. 初始化ZKWatchManager

这部分略过

3. 初始化HostProvider

如果开发人员不给定 HostProvider ,那 ZooKeeper 默认会通过下面的方法创建:

private static HostProvider createDefaultHostProvider(String connectString) {
    // 处理开发人员给的connectString,服务器地址列表字符串。处理的规范一些
    // 构建StaticHostProvider
    return new StaticHostProvider(new ConnectStringParser(connectString).getServerAddresses());
}

然后创建 StaticHostProvider 的逻辑如下所示:

public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) {
    // 调用init方法;这里Resolver的匿名函数对象是重点,下面要考的
    init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(), new Resolver() {
        @Override
        public InetAddress[] getAllByName(String name) throws UnknownHostException {
            return InetAddress.getAllByName(name);
        }
    });
}

private void init(Collection<InetSocketAddress> serverAddresses, 
                  long randomnessSeed, Resolver resolver) {
    // 构造一个随机种子
    this.sourceOfRandomness = new Random(randomnessSeed);
    this.resolver = resolver;
    if (serverAddresses.isEmpty()) {
        throw new IllegalArgumentException("A HostProvider may not be empty!");
    }
    // 重点!!!!重新打乱服务器列表,简单的实现负载均衡!!!!
    this.serverAddresses = shuffle(serverAddresses);
    currentIndex = -1;
    lastIndex = -1;
}

4. 初始化网络连接器ClientCnxn

这部分主要设置 ClientCnxn 的一些属性,比如: chroot 、会话ID、连接超时时间等等。它的源码如下所示:

public ClientCnxn(
    String chrootPath,
    HostProvider hostProvider,
    int sessionTimeout,
    ZooKeeper zooKeeper,
    ClientWatchManager watcher,
    ClientCnxnSocket clientCnxnSocket,
    long sessionId,
    byte[] sessionPasswd,
    boolean canBeReadOnly) {
    // Zookeeper实例
    this.zooKeeper = zooKeeper;
    // WatchManager
    this.watcher = watcher;
    // 初始化时是0
    this.sessionId = sessionId;
    // 空的16字节数组
    this.sessionPasswd = sessionPasswd;
    // 会话过期时间
    this.sessionTimeout = sessionTimeout;
    // 服务器列表解析器
    this.hostProvider = hostProvider;
    // 获取chroot,即用户指定的逻辑上的根目录,指定一个目录为根目录,后续的相对目录可以使用/替代
    this.chrootPath = chrootPath;
    // 连接超时时长 由 会话超时时长 / 服务器列表个数 决定。服务器列表个数越多,连接超时时间越短,发生断开时切换的越快
    connectTimeout = sessionTimeout / hostProvider.size();
    // 读取数据超时时间
    readTimeout = sessionTimeout * 2 / 3;
    // 看代码后面的解释
    readOnly = canBeReadOnly;
    // 5. 关键:创建SendThread
    sendThread = new SendThread(clientCnxnSocket);
    // 5. 关键:创建EventThread
    eventThread = new EventThread();
    this.clientConfig = zooKeeper.getClientConfig();
    // 根据配置文件初始化请求相关的参数
    initRequestTimeout();
}

👉👉👉不了解 chroot 的戳这里《ZooKeeper - 创建会话》👈👈👈

  • readOnly :该属性是用于启用了 Read Only Mode 功能的 ZooKeeper 集群。当 Zookeeper 集群发生分区现象,那么该属性会支撑 ZooKeeper 服务器的可读功能,但是所有的写入会被拒绝。该属性仅在发生分区时生效

Tips:第五步在代码里~

6. 启动SendThread和EventThread

// org.apache.zookeeper.ClientCnxn
public void start() {
    sendThread.start();
    eventThread.start();
}

SendThread#run

在本阶段,只会分析和 会话创建相关的内容,可能会省略掉一些不必要的琐碎的部分。

基本状态设置

public void run() {
    // 设置新的操作对象
    clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
    // 更新当前时间(此时为 Time.currentElapsedTime())
    clientCnxnSocket.updateNow();
    // 更新最后一次发送/最后一次接收的时间(此时 = now)
    clientCnxnSocket.updateLastSendAndHeard();
    // 当前时间 距离上一次 接收到响应 的间隔时间,超过一定时间会抛出异常的
    int to;
    // 最后一次ping  可read/可write的server时的时间
    long lastPingRwServer = Time.currentElapsedTime();
    // ping的间隔时间
    final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
    // 要连接的服务器地址
    InetSocketAddress serverAddress = null;
    // 循环体
    ...
}

建立TCP连接

这部分需要进入循环体, ZooKeeper 是否关闭决定了该循环体是否结束。该循环体内主要就是发送请求,接收响应,包含了早期的一些必要的TCP建立,会话建立,认证等特殊请求:

// org.apache.zookeeper.ClientCnxn.SendThread#run
while (state.isAlive()) {
                try {
                    // 判断Socket是否已经建立连接完毕,注意,这里是判断Socket是否连接
                    if (!clientCnxnSocket.isConnected()) {
                        // 如果我们手动关闭了连接,就不要再重新连接了
                        if (closing) {
                            break;
                        }
                        // 涉及readonly模式,这里不分析
                        if (rwServerAddress != null) {
                            serverAddress = rwServerAddress;
                            rwServerAddress = null;
                        } else {
                            serverAddress = hostProvider.next(1000);
                        }
                        // 重点!!!开始连接ZooKeeper(入口)
                        startConnect(serverAddress);
                        // 更新心跳时间,更新最后一次 发送/接收时间
                        clientCnxnSocket.updateLastSendAndHeard();
                    }
                   // 目前仅需要分析TCP如何建立连接的,暂时省略
                   ...
                } catch (Throwable e) {
                   ...
                }
            }

startConnect() 方法里开始执行TCP连接:

private void startConnect(InetSocketAddress addr) throws IOException {
    // sasl认证
    ...
    // 如果不是第一次连接,需要暂停1秒左右
    if (!isFirstConnect) {
        try {
            Thread.sleep(r.nextInt(1000));
        } catch (InterruptedException e) {
            LOG.warn("Unexpected exception", e);
        }
    }
    // 设置ClientCnxn的state状态,为正在连接
    state = States.CONNECTING;
    // IP地址
    String hostPort = addr.getHostString() + ":" + addr.getPort();
    // 多线程日志记录,在多线程下,给定一个唯一标志,方便后续分析
    MDC.put("myid", hostPort);
    // 设置线程名字
    setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")"));
    // sasl认证处理
    ...
    // 记录连接日志
    logStartConnect(addr);
    // 根据不同的连接器实现执行连接,默认情况下是使用NIO方式连接
    clientCnxnSocket.connect(addr);
}

clientCnxnSocket 有不同的实现,目前 ZooKeeper 提供了 NIONetty 的方式,默认情况下使用 NIO 方式建立TCP连接:

// org.apache.zookeeper.ClientCnxnSocketNIO#connect
@Override
void connect(InetSocketAddress addr) throws IOException {
    // 创建Socket
    SocketChannel sock = createSock();
    try {
        // 注册并连接指定地址
        registerAndConnect(sock, addr);
    } catch (IOException e) {
        LOG.error("Unable to open socket to {}", addr);
        sock.close();
        throw e;
    }
    // 设置initialized为false
    initialized = false;
    /*
     * 清空缓存
     */
    lenBuffer.clear();
    incomingBuffer = lenBuffer;
}
// 连接
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {
    // 注册 select,这部分可以看java NIO学习
    sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
    // 执行连接,可能结果不会立刻为true;如果是异步情况或者正在建立连接的,会返回false,
    boolean immediateConnect = sock.connect(addr);
    // 如若连接成功,就能立刻执行会话创建
    // 如若连接失败,就等后续while循环执行doTransport()时再执行
    if (immediateConnect) {
        sendThread.primeConnection();
    }
}

registerAndConnect 方法里,我们可以看到 SocketChannel 执行 connect() 进行连接,这个方法根据 SocketChannel 是否为异步模式来决定是否直接返回:

  • 比如 SocketChannel 是异步模式,那么 immediateConnect 可能不会直接为 true ,即仍处于连接中状态。那就要等到 doTransport() 方法判断是否连接成功,进而构建会话请求包。
  • 如果SocketChannel 是同步模式,那么会一直阻塞到方法返回,即连接成功。如果返回结果 immediateConnecttrue ,就能立刻执行 sendThread.primeConnection() 构建会话请求包并放入 outgoing queue 中。

截止目前为止,已经开始建立TCP连接了~注意如果是异步模式,到这一步可能还没成功建立连接
假设现在是同步模式,我们阻塞并等待 sock.connect(addr) 方法返回,如果连接成功,就能接下去执行 primeConnection()

void primeConnection() throws IOException {
    isFirstConnect = false;
    // 判断之前是否存在session,如果有,就用之前的sessionId
    long sessId = (seenRwServerBefore) ? sessionId : 0;
    // 创建连接请求,这里有最后一次的事务ID、会话超时时间、sessionID
    ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
    ...
    // 如果存在要认证的信息,就创建认证请求并放入队列中
    for (AuthData id : authInfo) {
        outgoingQueue.addFirst(
            new Packet(
                new RequestHeader(ClientCnxn.AUTHPACKET_XID, OpCode.auth),
                null,
                new AuthPacket(0, id.scheme, id.data),
                null,
                null));
    }
    // 重点!保证会话请求在所有请求之前,addFirst
    outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));
    // 设置SelectionKey 可读/可写,即给SelectionKey设置OP_READ、OP_WRITE
    clientCnxnSocket.connectionPrimed();
    // 发送请求的日志
    LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress());
}

构建会话请求的逻辑算是简单的,就是将请求封装成 Packet ,然后放入 outgoing queue 中。到时候,这个循环体的另外一个方法 doTransport() 会在TCP建立成功后,从 outgoing queue 中取出请求发送给服务端。
注意,该阶段没有发送任何请求!

处理网络IO

笔者中间省略了一些 建立在会话基础上的一些代码,先分析网络监听IO,等会话连接成功了,再回头分析中间省略的代码。

// org.apache.zookeeper.ClientCnxn.SendThread#run
while (state.isAlive()) {
                try {
                    // 因为前面的代码,除了建立连接以外,都是需要建立在会话基础上的
                    // 目前先考虑网络IO,讲完了会话建立成功后再分析这部分代码
                    ...
                    // 在初始化这个阶段,该方法主要用来监听连接的,如果连接成功
                    // 就可以接着执行sendThread.primeConnection();
                    clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
                } catch (Throwable e) {
                   ...
                }
            }

现在让我们看看核心的网络IO处理逻辑:

void doTransport(
    int waitTimeOut,
    Queue<Packet> pendingQueue,
    ClientCnxn cnxn) throws IOException, InterruptedException {
    // 这部分都是NIO的知识
    selector.select(waitTimeOut);
    Set<SelectionKey> selected;
    synchronized (this) {
        selected = selector.selectedKeys();
    }
    // Everything below and until we get back to the select is
    // non blocking, so time is effectively a constant. That is
    // Why we just have to do this once, here
    updateNow();
    // 遍历所有的SelectionKey
    for (SelectionKey k : selected) {
        SocketChannel sc = ((SocketChannel) k.channel());
        // 如果SelectionKey仅处于连接状态,且已经准备就绪,就执行该条件语句
        if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
            // 判断异步模式下的SocketChannel的Scoket是否连接完毕
            // 之前是同步模式下的SocketChannel连接,就能直接执行会话请求的构建
            if (sc.finishConnect()) {
                // 更新最后一次的发送/接收时间
                updateLastSendAndHeard();
                // 更新当前绑定的socket地址
                updateSocketAddresses();
                // 因为TCP已经建立完毕。准备创建会话请求,并放入待取队列中
                sendThread.primeConnection();
            }
        // 如果SelectionKey处于 可读/可写,且已经准备就绪的状态,就执行该条件语句 
        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
            // 这里会处理读写请求
            doIO(pendingQueue, cnxn);
        }
    }
    // 判断是否已经建立连接,若连接建立成功,判断会话请求是否在请求队列内的首位。
    // 如果确定,给予SelectionKey 可写能力
    if (sendThread.getZkState().isConnected()) {
        if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
            enableWrite();
        }
    }
    // 清理引用
    selected.clear();
}

关于 sendThread.primeConnection() 方法详解可以看 建立TCP连接的最后一部分 ,那里分析了 Blocking Mode SocketChannel 下的TCP连接成功后,对会话请求的构建;这边是 None Blocking Mode SocketChannel 下的TCP连接成功后,对会话请求的构建。

该方法除了 OP_CONNECT 的判断以外,还存在 OP_READ|OP_WRITE 的判断,这部分就是真正的处理读写请求的部分。当一个请求准备就绪了,并且支持读写后,就可以处理IO了。

void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
    SocketChannel sock = (SocketChannel) sockKey.channel();
    // 如果SelectionKey可读
    if (sockKey.isReadable()) {
        // 读取数据
        int rc = sock.read(incomingBuffer);
        if (rc < 0) {
            throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"
                                           + Long.toHexString(sessionId)
                                           + ", likely server has closed socket");
        }
        // 全部读完了已经
        if (!incomingBuffer.hasRemaining()) {
            // 恢复
            incomingBuffer.flip();
            if (incomingBuffer == lenBuffer) {
                recvCount.getAndIncrement();
                readLength();
            // 如果有数据,且当前还未初始化,那么必然是会话请求    
            } else if (!initialized) {
                // 读取连接结果,这里开始构建事件
                readConnectResult();
                // 启用可读能力
                enableRead();
                if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
                    // Since SASL authentication has completed (if client is configured to do so),
                    // outgoing packets waiting in the outgoingQueue can now be sent.
                    enableWrite();
                }
                // 善后,恢复到初始状态
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
                updateLastHeard();
                initialized = true;
            // 如果是普通的读,就读取信息,转换成Event放入EventThread
            } else {
                sendThread.readResponse(incomingBuffer);
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
                updateLastHeard();
            }
        }
    }
    // 如果可写
    if (sockKey.isWritable()) {
        // 找到待发送的Packet,基本是队首
        Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
        if (p != null) {
            // 更新最后发送时间
            updateLastSend();
            // 如果packet还没有序列化
            if (p.bb == null) {
                // 判断是否有请求头 && 请求头类型非ping && 请求头类型非auth
                // 感觉是在判断请求是否为普通请求
                if ((p.requestHeader != null)
                    && (p.requestHeader.getType() != OpCode.ping)
                    && (p.requestHeader.getType() != OpCode.auth)) {
                    p.requestHeader.setXid(cnxn.getXid());
                }
                // 序列化请求
                p.createBB();
            }
            // 写入请求
            sock.write(p.bb);
            // 所有字节码都发送完毕,hasRemaining()表示是否有剩余;
            if (!p.bb.hasRemaining()) {
                // 发送成功数量++
                sentCount.getAndIncrement();
                // 从outgoing queue中移除队首(发送的包)
                outgoingQueue.removeFirstOccurrence(p);
                // 判断是否有请求头 && 请求头类型非ping && 请求头类型非auth
                // 如果是普通请求,那么加入pendingQueue;反之不加入;
                // 因为普通请求可以被开发人员增加监听;特殊事件基本都是按ZooKeeper既定的路线走
                // 不需要再加入pendingQueue,直接内部消化了
                if (p.requestHeader != null
                    && p.requestHeader.getType() != OpCode.ping
                    && p.requestHeader.getType() != OpCode.auth) {
                    synchronized (pendingQueue) {
                        pendingQueue.add(p);
                    }
                }
            }
        }
        // 如果待发送队列空了,就禁用写入
        if (outgoingQueue.isEmpty()) {
            disableWrite();
            // 如果会话初始化还没成功,也禁用写入。禁用了那怎么发送会话包呢?
            // 至少会执行一次,因为这在write之后
        } else if (!initialized && p != null && !p.bb.hasRemaining()) {
            disableWrite();
        } else {
            // Just in case
            enableWrite();
        }
    }
}

构建会话事件

当在还没有初始化的情况下,收到了一个响应,会进入 readConnectResult() 方法中,主要做以下几个步骤:

  1. 解析响应
  2. 构建WatchEvent
  3. 放入 waitingEvents

这部分负责解析响应,当响应处理完毕,进入 onConnected() 方法。

void readConnectResult() throws IOException {
    // 日志部分
    ...
    // 读取数据
    ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    // 创建ConnectResponse
    ConnectResponse conRsp = new ConnectResponse();
    // 解析数据,放入ConnectResponse
    conRsp.deserialize(bbia, "connect");
    // read "is read-only" flag
    boolean isRO = false;
    try {
        isRO = bbia.readBool("readOnly");
    } catch (IOException e) {
        // this is ok -- just a packet from an old server which
        // doesn't contain readOnly field
        LOG.warn("Connected to an old server; r-o mode will be unavailable");
    }
    //  从响应里获取SessionID
    this.sessionId = conRsp.getSessionId();
    // 调用onConnected逻辑
    sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
}

onConnected() 方法里构建 SyncConnected 事件,并入队等待 EventThread 处理:

void onConnected(
    int _negotiatedSessionTimeout,
    long _sessionId,
    byte[] _sessionPasswd,
    boolean isRO) throws IOException {
    negotiatedSessionTimeout = _negotiatedSessionTimeout;
    // 会话超时处理
    if (negotiatedSessionTimeout <= 0) {
        state = States.CLOSED;
        eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null));
        eventThread.queueEventOfDeath();
        String warnInfo = String.format(
            "Unable to reconnect to ZooKeeper service, session 0x%s has expired",
            Long.toHexString(sessionId));
        LOG.warn(warnInfo);
        throw new SessionExpiredException(warnInfo);
    }
    // 可读可写客户端 连接进入 只读服务器,异常
    if (!readOnly && isRO) {
        LOG.error("Read/write client got connected to read-only server");
    }
    // 设置属性
    readTimeout = negotiatedSessionTimeout * 2 / 3;
    connectTimeout = negotiatedSessionTimeout / hostProvider.size();
    // 更新当前正在使用的服务器地址下标
    hostProvider.onConnected();
    // 保存sessionID
    sessionId = _sessionId;
    // session的密码
    sessionPasswd = _sessionPasswd;
    // 判断连接类型
    state = (isRO) ? States.CONNECTEDREADONLY : States.CONNECTED;
    seenRwServerBefore |= !isRO;
    LOG.info(
        "Session establishment complete on server {}, session id = 0x{}, negotiated timeout = {}{}",
        clientCnxnSocket.getRemoteSocketAddress(),
        Long.toHexString(sessionId),
        negotiatedSessionTimeout,
        (isRO ? " (READ-ONLY mode)" : ""));
    // 创建连接事件
    KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
    // 放入EventThread,开始处理
    eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, eventState, null));
}

到此,剩下的就是 EventThread 的事件回调处理了,已经没有 SendThread 啥事了。所以接下来将分析在会话建立完毕之后的一些事情(补之前的坑)。

补充

在分析这个循环体的时候,还有一部分还没有分析,就是在会话建立成功后的事情。这里大概可以分为这样几块:

  1. 会话建立后,处理认证信息;
  2. 超时判断
  3. 会话建立后,构建PING包放入 outgoing queue
  4. 若当前连接是只读连接,那么需要隔一段时间寻找一下可读/可写的服务器。
    while (state.isAlive()) {
     try {
         // 连接未建立的if块
         ...
         // ①会话建立后的if块:用来处理认证
         if (state.isConnected()) {
             // 如果存在zooKeeperSaslClient,就需要进行Sasl
             if (zooKeeperSaslClient != null) {
                 boolean sendAuthEvent = false;
                 if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                     try {
                         zooKeeperSaslClient.initialize(ClientCnxn.this);
                     } catch (SaslException e) {
                         LOG.error("SASL authentication with Zookeeper Quorum member failed.", e);
                         state = States.AUTH_FAILED;
                         sendAuthEvent = true;
                     }
                 }
                 KeeperState authState = zooKeeperSaslClient.getKeeperState();
                 if (authState != null) {
                     if (authState == KeeperState.AuthFailed) {
                         // An authentication error occurred during authentication with the Zookeeper Server.
                         state = States.AUTH_FAILED;
                         sendAuthEvent = true;
                     } else {
                         if (authState == KeeperState.SaslAuthenticated) {
                             sendAuthEvent = true;
                         }
                     }
                 }
                 if (sendAuthEvent) {
                     eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null));
                     if (state == States.AUTH_FAILED) {
                         eventThread.queueEventOfDeath();
                     }
                 }
             }
             // 计算耗费时间
             to = readTimeout - clientCnxnSocket.getIdleRecv();
         } else {
             // 计算耗费时间
             to = connectTimeout - clientCnxnSocket.getIdleRecv();
         }
         // ② 超时判断
         if (to <= 0) {
             String warnInfo = String.format(
                 "Client session timed out, have not heard from server in %dms for session id 0x%s",
                 clientCnxnSocket.getIdleRecv(),
                 Long.toHexString(sessionId));
             LOG.warn(warnInfo);
             throw new SessionTimeoutException(warnInfo);
         }
         // ③ 会话建立完毕后的if块:准备发送PING包,保持心跳
         if (state.isConnected()) {
             //1000(1 second) is to prevent race condition missing to send the second ping
             //also make sure not to send too many pings when readTimeout is small
             int timeToNextPing = readTimeout / 2
                                  - clientCnxnSocket.getIdleSend()
                                  - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
             //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
             if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                 sendPing();
                 clientCnxnSocket.updateLastSend();
             } else {
                 if (timeToNextPing < to) {
                     to = timeToNextPing;
                 }
             }
         }
         // If we are in read-only mode, seek for read/write server
         if (state == States.CONNECTEDREADONLY) {
             long now = Time.currentElapsedTime();
             int idlePingRwServer = (int) (now - lastPingRwServer);
             if (idlePingRwServer >= pingRwTimeout) {
                 lastPingRwServer = now;
                 idlePingRwServer = 0;
                 pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout);
                 pingRwServer();
             }
             to = Math.min(to, pingRwTimeout - idlePingRwServer);
         }
         ...
     } catch (Throwable e) {
         ...
     }
    }
    

EventThread#run

该部分的逻辑比较简单:

public void run() {
    try {
        isRunning = true;
        while (true) {
            // 从队列里去除事件
            Object event = waitingEvents.take();
            // 判断是否为关闭事件
            if (event == eventOfDeath) {
                wasKilled = true;
            } else {
                // 处理普通事件
                processEvent(event);
            }
            // 判断客户端是否被kill
            if (wasKilled) {
                synchronized (waitingEvents) {
                    if (waitingEvents.isEmpty()) {
                        isRunning = false;
                        break;
                    }
                }
            }
        }
    } catch (InterruptedException e) {
        LOG.error("Event thread exiting due to interruption", e);
    }
    LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId()));
}

总结

客户端的会话创建主要分三个阶段,第一个阶段是初始化阶段,主要初始化客户端的一些配置、属性并开始建立TCP连接;第二个阶段是会话请求发送阶段,这部分主做了两步,一步是准备会话请求包,另一部分是让SendThread循环执行NIO监听,如果可以写了就发送请求包;最后一个阶段是响应处理,循环NIO监听,如果发现可以读取数据了,就读取数据。由于会话请求包永远是在队首,所以一开始一定会是先处理会话请求(通过一个 initialized 状态来判断是否存在会话)

比较重点的是 NIO 的知识点,然后 HostProvider 以及处理包的逻辑~