ZooKeeper 的客户端主要由以下几个核心组件组成:

  • ZooKeeper 实例:客户端的入口
  • ClientWatchManager:客户端 Watcher 管理器
  • HostProvider:客户端地址列表管理器
  • ClientCnxn:客户端核心线程,其内部又包含了两个线程,即 SendThread 和 EventThread。前者是一个 I/O 线程,主要负责客户端和服务端的网络 I/O 通信,后者是一个事件线程,主要负责对服务端事件进行处理。

ZooKeeper 客户端的初始化与启动环节,实际上就是 ZooKeeper 对象的实例化过程。客户端的整个初始化和启动过程大体可分为以下三个步骤:

  1. public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
  2. boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {
  3. if (clientConfig == null) {
  4. clientConfig = new ZKClientConfig();
  5. }
  6. this.clientConfig = clientConfig;
  7. // 设置默认 Watcher
  8. watchManager = defaultWatchManager();
  9. watchManager.defaultWatcher = watcher;
  10. // 设置 ZooKeeper 服务器地址列表
  11. ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
  12. hostProvider = aHostProvider;
  13. // 创建 ClientCnxn
  14. cnxn = createConnection(
  15. connectStringParser.getChrootPath(),
  16. hostProvider,
  17. sessionTimeout,
  18. this,
  19. watchManager,
  20. getClientCnxnSocket(),
  21. canBeReadOnly);
  22. cnxn.start();
  23. }

下面,我们详细看一下创建并初始化客户端网络连接器 ClientCnxn 的过程。
image.png

ClientCnxn 工作原理

ClientCnxn 连接器的底层 I/O 处理器是 ClientCnxnSocket,因此在这一步中,客户端首先会创建一个 ClientCnxnSocket 处理器。

  1. private ClientCnxnSocket getClientCnxnSocket() throws IOException {
  2. String clientCnxnSocketName = getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
  3. if (clientCnxnSocketName == null) {
  4. clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
  5. }
  6. try {
  7. Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName)
  8. .getDeclaredConstructor(ZKClientConfig.class);
  9. ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());
  10. return clientCxnSocket;
  11. } catch (Exception e) {
  12. throw new IOException("Couldn't instantiate " + clientCnxnSocketName, e);
  13. }
  14. }

之后,在初始化 ClientCnxn 时,也会对 SendThread 和 EventThread 进行初始化。前者用于管理客户端和服务端之间的所有网络 I/O,后者则用于进行客户端的事件处理。同时,客户端还会将 ClientCnxnSocket 分配给 SendThread 作为底层网络 I/O 处理器,并初始化 EventThread 的待处理事件队列 waitingEvents,用于存放所有等待被客户端处理的事件。此外,ClientCnxn 还会初始化客户端两个核心队列 outgoingQueuependingQueue,分别作为客户端的请求发送队列和服务端响应的等待队列。

  1. public class ClientCnxn {
  2. private final Queue<Packet> pendingQueue = new ArrayDeque<>();
  3. private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();
  4. public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout,
  5. ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
  6. long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
  7. ......
  8. sendThread = new SendThread(clientCnxnSocket);
  9. eventThread = new EventThread();
  10. }
  11. }
  12. class EventThread extends ZooKeeperThread {
  13. private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<>();
  14. }

之后,通过调用 ClientCnxn 的 start() 方法开启 SendThread 和 EventThread 的轮训逻辑,下面我们来看下这两个线程的内部执行逻辑。

1. SendThread

SendThread 是客户端 ClientCnxn 内部一个核心的 I/O 调度线程,用于管理客户端和服务端之间的所有网络 I/O 操作。在 ZooKeeper 客户端的实际运行过程中,一方面,SendThread 维护了客户端与服务端之间的会话生命周期,其通过在一定的周期频率内向服务端发送一个 PING 包来实现心跳检测。同时,在会话周期内,如果客户端与服务端之间出现 TCP 连接断开的情况,那么就会自动且透明化地完成重连操作。另一方面,SendThread 管理了客户端所有的请求发送和响应接收操作,其将上层客户端 API 操作转换成相应的请求协议并发送到服务端,并完成对同步调用的返回和异步调用的回调。

在开始创建 TCP 连接之前,SendThread 首先需要获取一个 Zookeeper 服务器的目标地址,这通常是从HostProvider 中随机获取出一个地址,然后委托给 ClientCnxnSocket 去创建与 ZooKeeper 服务器之间的 TCP 连接。

serverAddress = hostProvider.next(1000);
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();

在 TCP 连接创建完毕后,只是从网络 TCP 层面完成了客户端与服务端之间的 Socket 连接,但 ZooKeeper 客户端的会话层还没有创建。SendThread 会根据当前客户端的实际设置构造出一个 ConnectRequest 请求,该请求代表了客户端试图与服务器创建一个会话。同时,ZooKeeper 客户端还会进一步将该请求包装成网络 I/O 层的 Packet 对象,并放入请求发送队列 outgoingQueue 中去。

/**
 * Setup session, previous watches, authentication.
 */
void primeConnection() throws IOException {
    // 创建请求
    ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
    ......
    // 添加到请求发送队列
    outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));
}

当客户端请求准备完毕后,就可以开始向服务端发送请求了。当 ClientCnxnSocket 根据 NIO Selector 获取到一个可写事件,会负责从 outgoingQueue 中取出一个待发送的 Packet 对象,将其序列化成 ByteBuffer 后,向服务端进行发送,并在接收到服务端响应之前先暂存在 pendingQueue 中。

// 监听到可写事件,向服务端发送请求
if (sockKey.isWritable()) {
    // 获取待发送队列的头Packet
    Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
    if (p != null) {
        ......
        // 通过socket发送到服务端
        sock.write(p.bb);
        if (!p.bb.hasRemaining()) {
            sentCount.getAndIncrement();
            outgoingQueue.removeFirstOccurrence(p);
            if (p.requestHeader != null
                && p.requestHeader.getType() != OpCode.ping
                && p.requestHeader.getType() != OpCode.auth) {
                // 添加到等待响应队列
                synchronized (pendingQueue) {
                    pendingQueue.add(p);
                }
            }
        }
    }

当 ClientCnxnSocket 接收到服务端的响应后,会先判断当前的客户端状态是否是 “已初始化”,如果尚未完成初始化,那么就认为该响应一定是会话创建请求的响应,直接交由 readConnectResult 方法来处理该响应。该方法会对接收到的服务端响应进行反序列化,得到 ConnectResponse 对象,并从中获取到 ZooKeeper 服务端分配的会话 sessionld。

void readConnectResult() throws IOException {
    // 序列化
    ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    ConnectResponse conRsp = new ConnectResponse();
    conRsp.deserialize(bbia, "connect");
    ......
    this.sessionId = conRsp.getSessionId();
    // 建立连接后的回调
    sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
}

连接成功后,一方面需要通知 SendThread 线程进一步对客户端进行会话参数的设置,包括 readTimeout 和 connectTimeout 等,并更新客户端状态;另一方面,需要通知地址管理器 HostProvider 当前成功连接的服务器地址。为了能够让上层应用感知到会话的成功创建,SendThread 还会生成一个 SyncConnected-None 事件,代表客户端与服务器会话创建成功,并将该事件传递给 EventThread 线程。

void onConnected(int _negotiatedSessionTimeout, long _sessionId, 
                 byte[] _sessionPasswd, boolean isRO) throws IOException {
    ......
    readTimeout = negotiatedSessionTimeout * 2 / 3;
    connectTimeout = negotiatedSessionTimeout / hostProvider.size();
    hostProvider.onConnected();
    sessionId = _sessionId;
    sessionPasswd = _sessionPasswd;
    // 更新客户端状态
    state = (isRO) ? States.CONNECTEDREADONLY : States.CONNECTED;
    seenRwServerBefore |= !isRO;
    // 生成一个连接事件
    KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
    eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, eventState, null));
}

2. EventThread

EventThread 是客户端 ClientCnxn 内部的另一个核心线程,负责客户端的事件处理,并触发客户端注册的 Watcher 监听。EventThread 中有一个 waitingEvents 队列,用于临时存放那些需要被触发的对象,包括那些客户端注册的 Watcher 和异步接口中注册的回调器 AsyncCallback。同时,EventThread 会不断地从 waitingEvents 这个队列中取出 Object,识别出其具体类型(Watcher 或 AsyncCallback),并分别调用 process 和 processResult 接口方法来实现对事件的触发和回调。

EventThread 收到事件后会从 ClientWatchManager 管理器中查询出对应的 Watcher,针对 Syncconnected-None 事件,会直接找出在初始化 ZooKeeper 时存储的默认 Watcher,然后将其放到 EventThread 的 waitingEvents 队列中。

private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
    ......
    final Set<Watcher> watchers;
    if (materializedWatchers == null) {
        // 获取关联的Watcher
        watchers = watcher.materialize(event.getState(), event.getType(), event.getPath());
    } else {
        watchers = new HashSet<Watcher>();
        watchers.addAll(materializedWatchers);
    }
    // 保存到waitingEvents队列中等待后续被轮训执行
    WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
    waitingEvents.add(pair);
}

EventThread 会不断从 waitingEvents 队列中取出待处理的 Watcher 对象,然后直接调用该对象的 process 接口方法,以达到触发 Watcher 的目的。

public void run() {
    try {
        isRunning = true;
        while (true) {
            // 从队列中获取待处理事件
            Object event = waitingEvents.take();
            processEvent(event);
            ......
         }
    } catch (InterruptedException e) {
        LOG.error("Event thread exiting due to interruption", e);
    }
}

if (event instanceof WatcherSetEventPair) {
    WatcherSetEventPair pair = (WatcherSetEventPair) event;
    for (Watcher watcher : pair.watchers) {
        try {
            // 触发watcher回调
            watcher.process(pair.event);
        } catch (Throwable t) {
            LOG.error("Error while calling watcher ", t);
        }
    }
}

3. Packet

Packet 是 ClientCnxn 内部定义的一个对协议层的封装,作为 ZooKeeper 中请求与响应的载体。
image.png
从图中可以看到,Packet 中包含了最基本的请求头(requestHeader)、响应头(replyHeader)、请求体(request)、响应体(response)、节点路径(clientPath、serverPath)和注册的 Watcher(watchRegistration)等信息。

虽然 Packet 有这么多属性,但并不是每一个属性都会进行序列化并通过网络发送出去。Packet 的 createBB() 方法负责对 Packet 对象进行序列化,最终生成可用于底层网络传输的 ByteBuffer 对象。在这个过程中,只会将 requestHeaderrequestreadonly 三个属性进行序列化,其余属性都保存在客户端的上下文中,不会进行与服务端之间的网络传输。