ZooKeeper 的客户端主要由以下几个核心组件组成:
- ZooKeeper 实例:客户端的入口
- ClientWatchManager:客户端 Watcher 管理器
- HostProvider:客户端地址列表管理器
- ClientCnxn:客户端核心线程,其内部又包含了两个线程,即 SendThread 和 EventThread。前者是一个 I/O 线程,主要负责客户端和服务端的网络 I/O 通信,后者是一个事件线程,主要负责对服务端事件进行处理。
ZooKeeper 客户端的初始化与启动环节,实际上就是 ZooKeeper 对象的实例化过程。客户端的整个初始化和启动过程大体可分为以下三个步骤:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
// 设置默认 Watcher
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher;
// 设置 ZooKeeper 服务器地址列表
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
hostProvider = aHostProvider;
// 创建 ClientCnxn
cnxn = createConnection(
connectStringParser.getChrootPath(),
hostProvider,
sessionTimeout,
this,
watchManager,
getClientCnxnSocket(),
canBeReadOnly);
cnxn.start();
}
下面,我们详细看一下创建并初始化客户端网络连接器 ClientCnxn 的过程。
ClientCnxn 工作原理
ClientCnxn 连接器的底层 I/O 处理器是 ClientCnxnSocket,因此在这一步中,客户端首先会创建一个 ClientCnxnSocket 处理器。
private ClientCnxnSocket getClientCnxnSocket() throws IOException {
String clientCnxnSocketName = getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
if (clientCnxnSocketName == null) {
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
try {
Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName)
.getDeclaredConstructor(ZKClientConfig.class);
ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());
return clientCxnSocket;
} catch (Exception e) {
throw new IOException("Couldn't instantiate " + clientCnxnSocketName, e);
}
}
之后,在初始化 ClientCnxn 时,也会对 SendThread 和 EventThread 进行初始化。前者用于管理客户端和服务端之间的所有网络 I/O,后者则用于进行客户端的事件处理。同时,客户端还会将 ClientCnxnSocket 分配给 SendThread 作为底层网络 I/O 处理器,并初始化 EventThread 的待处理事件队列 waitingEvents,用于存放所有等待被客户端处理的事件。此外,ClientCnxn 还会初始化客户端两个核心队列 outgoingQueue 和 pendingQueue,分别作为客户端的请求发送队列和服务端响应的等待队列。
public class ClientCnxn {
private final Queue<Packet> pendingQueue = new ArrayDeque<>();
private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout,
ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
......
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
}
}
class EventThread extends ZooKeeperThread {
private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<>();
}
之后,通过调用 ClientCnxn 的 start() 方法开启 SendThread 和 EventThread 的轮训逻辑,下面我们来看下这两个线程的内部执行逻辑。
1. SendThread
SendThread 是客户端 ClientCnxn 内部一个核心的 I/O 调度线程,用于管理客户端和服务端之间的所有网络 I/O 操作。在 ZooKeeper 客户端的实际运行过程中,一方面,SendThread 维护了客户端与服务端之间的会话生命周期,其通过在一定的周期频率内向服务端发送一个 PING 包来实现心跳检测。同时,在会话周期内,如果客户端与服务端之间出现 TCP 连接断开的情况,那么就会自动且透明化地完成重连操作。另一方面,SendThread 管理了客户端所有的请求发送和响应接收操作,其将上层客户端 API 操作转换成相应的请求协议并发送到服务端,并完成对同步调用的返回和异步调用的回调。
在开始创建 TCP 连接之前,SendThread 首先需要获取一个 Zookeeper 服务器的目标地址,这通常是从HostProvider 中随机获取出一个地址,然后委托给 ClientCnxnSocket 去创建与 ZooKeeper 服务器之间的 TCP 连接。
serverAddress = hostProvider.next(1000);
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
在 TCP 连接创建完毕后,只是从网络 TCP 层面完成了客户端与服务端之间的 Socket 连接,但 ZooKeeper 客户端的会话层还没有创建。SendThread 会根据当前客户端的实际设置构造出一个 ConnectRequest 请求,该请求代表了客户端试图与服务器创建一个会话。同时,ZooKeeper 客户端还会进一步将该请求包装成网络 I/O 层的 Packet 对象,并放入请求发送队列 outgoingQueue 中去。
/**
* Setup session, previous watches, authentication.
*/
void primeConnection() throws IOException {
// 创建请求
ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
......
// 添加到请求发送队列
outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));
}
当客户端请求准备完毕后,就可以开始向服务端发送请求了。当 ClientCnxnSocket 根据 NIO Selector 获取到一个可写事件,会负责从 outgoingQueue 中取出一个待发送的 Packet 对象,将其序列化成 ByteBuffer 后,向服务端进行发送,并在接收到服务端响应之前先暂存在 pendingQueue 中。
// 监听到可写事件,向服务端发送请求
if (sockKey.isWritable()) {
// 获取待发送队列的头Packet
Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
if (p != null) {
......
// 通过socket发送到服务端
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount.getAndIncrement();
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
// 添加到等待响应队列
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
当 ClientCnxnSocket 接收到服务端的响应后,会先判断当前的客户端状态是否是 “已初始化”,如果尚未完成初始化,那么就认为该响应一定是会话创建请求的响应,直接交由 readConnectResult 方法来处理该响应。该方法会对接收到的服务端响应进行反序列化,得到 ConnectResponse 对象,并从中获取到 ZooKeeper 服务端分配的会话 sessionld。
void readConnectResult() throws IOException {
// 序列化
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
conRsp.deserialize(bbia, "connect");
......
this.sessionId = conRsp.getSessionId();
// 建立连接后的回调
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
}
连接成功后,一方面需要通知 SendThread 线程进一步对客户端进行会话参数的设置,包括 readTimeout 和 connectTimeout 等,并更新客户端状态;另一方面,需要通知地址管理器 HostProvider 当前成功连接的服务器地址。为了能够让上层应用感知到会话的成功创建,SendThread 还会生成一个 SyncConnected-None 事件,代表客户端与服务器会话创建成功,并将该事件传递给 EventThread 线程。
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {
......
readTimeout = negotiatedSessionTimeout * 2 / 3;
connectTimeout = negotiatedSessionTimeout / hostProvider.size();
hostProvider.onConnected();
sessionId = _sessionId;
sessionPasswd = _sessionPasswd;
// 更新客户端状态
state = (isRO) ? States.CONNECTEDREADONLY : States.CONNECTED;
seenRwServerBefore |= !isRO;
// 生成一个连接事件
KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, eventState, null));
}
2. EventThread
EventThread 是客户端 ClientCnxn 内部的另一个核心线程,负责客户端的事件处理,并触发客户端注册的 Watcher 监听。EventThread 中有一个 waitingEvents 队列,用于临时存放那些需要被触发的对象,包括那些客户端注册的 Watcher 和异步接口中注册的回调器 AsyncCallback。同时,EventThread 会不断地从 waitingEvents 这个队列中取出 Object,识别出其具体类型(Watcher 或 AsyncCallback),并分别调用 process 和 processResult 接口方法来实现对事件的触发和回调。
EventThread 收到事件后会从 ClientWatchManager 管理器中查询出对应的 Watcher,针对 Syncconnected-None 事件,会直接找出在初始化 ZooKeeper 时存储的默认 Watcher,然后将其放到 EventThread 的 waitingEvents 队列中。
private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
......
final Set<Watcher> watchers;
if (materializedWatchers == null) {
// 获取关联的Watcher
watchers = watcher.materialize(event.getState(), event.getType(), event.getPath());
} else {
watchers = new HashSet<Watcher>();
watchers.addAll(materializedWatchers);
}
// 保存到waitingEvents队列中等待后续被轮训执行
WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
waitingEvents.add(pair);
}
EventThread 会不断从 waitingEvents 队列中取出待处理的 Watcher 对象,然后直接调用该对象的 process 接口方法,以达到触发 Watcher 的目的。
public void run() {
try {
isRunning = true;
while (true) {
// 从队列中获取待处理事件
Object event = waitingEvents.take();
processEvent(event);
......
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
}
if (event instanceof WatcherSetEventPair) {
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
// 触发watcher回调
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
}
3. Packet
Packet 是 ClientCnxn 内部定义的一个对协议层的封装,作为 ZooKeeper 中请求与响应的载体。
从图中可以看到,Packet 中包含了最基本的请求头(requestHeader)、响应头(replyHeader)、请求体(request)、响应体(response)、节点路径(clientPath、serverPath)和注册的 Watcher(watchRegistration)等信息。
虽然 Packet 有这么多属性,但并不是每一个属性都会进行序列化并通过网络发送出去。Packet 的 createBB() 方法负责对 Packet 对象进行序列化,最终生成可用于底层网络传输的 ByteBuffer 对象。在这个过程中,只会将 requestHeader、request 和 readonly 三个属性进行序列化,其余属性都保存在客户端的上下文中,不会进行与服务端之间的网络传输。