一、Zookeeper工作流程

上图主要描述了客户端和服务端互动过程:
1.客户端把request以Packet形式传递到Zookeeper类中。
2.Zookeeper类处理request并放入outgoingqueue中。
3.sendThread把发出的request转移到pendinqueue中。
4.收到回复后,sendThread从pendingqueue中取出request,并生成event。
5.eventThread处理event并触发watchManager中的Watcher,调用callback。
Zk 客户端和服务端建立连接,主要分为三个阶段:
1.初始化阶段:主要功能类实例化,
Zookeeper:客户端核心类之一,也是入口。
HostProvider:客户端地址列表管理器
ClientCnxn:客户端连接核心类,包含SendThread和EventThread两个线程。SendThread为IO线程,主要负责zookeeper客户端和服务端之间的网络IO通信,EventThread为事件线程,主要负责对服务端事件进行处理。
ClientWatchManager:客户端Watcher管理器。
2.创建阶段:启动并创建连接
3.响应请求:响应并接收。
二、初始化阶段
初始化阶段过程:
1.初始化ZooKeeper对象通过调用ZooKeeper的构造方法实例化一个ZooKeeper对象,在初始化过程中会创建一个客户端Watcher管理器ClientWatcherManager2.设置会话默认Watcher如果构造方法中传入了一个Watcher对象,那么客户端将这个Watcher对象作为默认Watcher保存到ClientWatcherManager中3.构造ZK服务器地址列表管理器HostProvider对于构造器中传入的服务器端地址,客户端将其保存在服务器地址列表管理器HostProvider中4.创建并初始化客户端网络连接ZooKeeper客户端首先会创建一个网络连接器ClientCnxn,用来管理客户端与服务器的网络交互。另外,客户端在创建 ClientCnxn的同时还会初始化客户端两个核心队列outGoingQueue和pendingQueue,分别作为客户端请求组发送队列和服务端 响应等待队列5.初始化SendThread和EventThread客户端会创建两个核心网络线程SendThread和EventThread,前者用于管理客户端和服务端之间的所有网络I/O,后者则用于进行客户端的事件处理。同时,客户端还会将ClientCnxnSocket分配给SendThread作为底层网络I/O处理器,并初始化EventThread的待处理事件队列waitingEvents,用于存放所有等待被客户端处理的事情。
1.Zookeeper实例化
public ZooKeeper(String connectString,int sessionTimeout,Watcher watcher,
boolean canBeReadOnly,HostProvider aHostProvider,ZKClientConfig clientConfig)
throws IOException {
LOG.info(
"Initiating client connection, connectString={} sessionTimeout={} watcher={}",
connectString,
sessionTimeout,
watcher);
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
//实例化ClientWatchManager类型为ZKWatchManager
watchManager = defaultWatchManager();
//设置默认watcher
watchManager.defaultWatcher = watcher;
//负责解析server地址串
//1.加chroot(默认prefix)
//2.读字符串把server地址分开
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
//分局字符串解析host、ip等
hostProvider = aHostProvider;
//实例化ClientCnxn类
cnxn = createConnection(
connectStringParser.getChrootPath(),
hostProvider,
sessionTimeout,
this,
watchManager,
//实例化NIOSocket
getClientCnxnSocket(),
canBeReadOnly);
//启动SendThread、EventThread
cnxn.start();
}
1.1实例化ZKWatchManager
protected ZKWatchManager defaultWatchManager() {
return new ZKWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET));
}
public boolean getBoolean(String key) {
//默认为false
return getBoolean(key, false);
}
1.2实例化连接地址解析器ConnectStringParser
解析连接服务端地址串,并设置连接地址列表。解析chrootPath。
Chroot:客户端隔离命名空间。
如果一个Zookeeper客户端设置了Chroot,那么该客户端对服务器的任何操作,都会被限制在其自己的命名空间下。在那些多个应用公用一个Zookeeper集群的场景下,这对于实现不同应用之间的相互隔离非常有帮助。客户端可以通过在connectString添加后缀的的方式来设置Chroot。
//默认端口
private static final int DEFAULT_PORT = 2181;
//默认前缀
private final String chrootPath;
//服务端地址列表
private final ArrayList<InetSocketAddress> serverAddresses =
new ArrayList<InetSocketAddress>();
public ConnectStringParser(String connectString) {
// parse out chroot, if any
int off = connectString.indexOf('/');
if (off >= 0) {
String chrootPath = connectString.substring(off);
// ignore "/" chroot spec, same as null
if (chrootPath.length() == 1) {
this.chrootPath = null;
} else {
PathUtils.validatePath(chrootPath);
this.chrootPath = chrootPath;
}
connectString = connectString.substring(0, off);
} else {
this.chrootPath = null;
}
//拆分地址串
List<String> hostsList = split(connectString, ",");
for (String host : hostsList) {
int port = DEFAULT_PORT;
String[] hostAndPort = NetUtils.getIPV6HostAndPort(host);
if (hostAndPort.length != 0) {
host = hostAndPort[0];
if (hostAndPort.length == 2) {
port = Integer.parseInt(hostAndPort[1]);
}
} else {
int pidx = host.lastIndexOf(':');
if (pidx >= 0) {
// otherwise : is at the end of the string, ignore
if (pidx < host.length() - 1) {
port = Integer.parseInt(host.substring(pidx + 1));
}
host = host.substring(0, pidx);
}
}
serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
}
}
1.3实例化HostProvider:地址列表管理器
public interface HostProvider {
int size();
InetSocketAddress next(long spinDelay);
void onConnected();
boolean updateServerList(Collection<InetSocketAddress> serverAddresses,
InetSocketAddress currentHost);
}
Zookeeper规定:任何对于该接口的实现必须满足以下3点:
1.next()方法必须要有合法的返回值。
凡是对该方法的调用,必须要返回一个合法的InetSocketAddress对象,不能是null或其他不合法的InetSocketAddress
2.next()方法必须返回已解析的InetSocketAddress对象。
服务器的地址列表存放在ConnectStringParser.ServerAddresses中,是没有被解析的InetSocketAddress对象。在传递到HostProvider后,HostProvider负责对InetSocketAddress列表进行解析。最终返回的时已解析的InetSocketAddress对象。
3.size()方法不能返回0
Zookeeper中规定,size方法不能返回0,也就是说,HostProvider中必须至少有一个服务器地址。
private static HostProvider createDefaultHostProvider(String connectString) {
return new StaticHostProvider(
new ConnectStringParser(connectString).getServerAddresses());
}
public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) {
init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(), new Resolver() {
@Override
public InetAddress[] getAllByName(String name) throws UnknownHostException {
return InetAddress.getAllByName(name);
}
});
}
private void init(Collection<InetSocketAddress> serverAddresses,
long randomnessSeed,
Resolver resolver) {
//获取随机数
this.sourceOfRandomness = new Random(randomnessSeed);
this.resolver = resolver;
if (serverAddresses.isEmpty()) {
throw new IllegalArgumentException("A HostProvider may not be empty!");
}
//重新解析连接地址列表
this.serverAddresses = shuffle(serverAddresses);
currentIndex = -1;
lastIndex = -1;
}
StaticHostProvider
解析服务器地址
StaticHostProvider解析服务器地址列表,使用Collections工具类的shuffle方法来将这个服务器地址列表进行随机打散。
获取可用的服务器地址
next()方法先将打散的服务器地址列表拼装成一个环形循环队列。
工作原理:
hostProvider的next方法
public InetSocketAddress next(long spinDelay) {
boolean needToSleep = false;
InetSocketAddress addr;
synchronized (this) {
if (reconfigMode) {
addr = nextHostInReconfigMode();
if (addr != null) {
currentIndex = serverAddresses.indexOf(addr);
return resolve(addr);
}
//tried all servers and couldn't connect
reconfigMode = false;
needToSleep = (spinDelay > 0);
}
//当前索引+1
++currentIndex;
//重置当前索引
if (currentIndex == serverAddresses.size()) {
currentIndex = 0;
}
//获取新的server地址
addr = serverAddresses.get(currentIndex);
needToSleep = needToSleep || (currentIndex == lastIndex && spinDelay > 0);
if (lastIndex == -1) {
// We don't want to sleep on the first ever connect attempt.
lastIndex = 0;
}
}
if (needToSleep) {
try {
Thread.sleep(spinDelay);
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
return resolve(addr);
}
1.4实例化ClientCnxnSocketNIO:底层socket通信层
该实现类使用Java原生的NIO接口,其核心是doIO逻辑,主要负责对请求的发送和响应接收过程。
private ClientCnxnSocket getClientCnxnSocket() throws IOException {
String clientCnxnSocketName = getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
if (clientCnxnSocketName == null) {
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
try {
Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName)
.getDeclaredConstructor(ZKClientConfig.class);
ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());
return clientCxnSocket;
} catch (Exception e) {
throw new IOException("Couldn't instantiate " + clientCnxnSocketName, e);
}
}
socketName = ClientCnxnSocketNIO
请求发送
正常情况下,会从outgoingQueue队列中取出一个可发送的Packet对象,同时生成一个xid并将其设置到packet的请求头中,然后将其序列化后发送。请求发送完毕后,会立即将该Packet保存到pendingQueue队列中,以便等待服务端响应返回后进行相应的处理。
响应接收
1.5实例化ClientCnxn:网络IO
ClientCnxn是Zookeeper客户端的核心工作类,负责维护客户端和服务端之间的网络连接,并进行一系列网络通信。
Packet
Packet是ClientCnxn内部定义的一个对协议层的封装。包含了最基本的请求头requestHeader、请求体request,响应头ReplyHeader、响应体response、节点路径serverPath/clientPath和注册的Watcher(WatchRegistration)等信息。
packet中的createBB()方法负责对Packet对象进行序列化,最终生成可用于底层网络传输的ByteBuffer对象。在这个过程中,只会将requestHeader、request和readOnly三个属性进行序列化,其余属性保存在客户端的上下文中,不会进行与服务端之间的网络传输。
public ClientCnxn(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
ZooKeeper zooKeeper,
ClientWatchManager watcher,
ClientCnxnSocket clientCnxnSocket,
long sessionId,
byte[] sessionPasswd,
boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
//创建发送线程
sendThread = new SendThread(clientCnxnSocket);
//创建事件线程
eventThread = new EventThread();
this.clientConfig = zooKeeper.getClientConfig();
initRequestTimeout();
}
outgoingQueue和pendingQueue
outgoingQueue队列是一个请求发送队列,专门用于存储那些需要发送到服务端的Packet集合。Pending队列是为了存储那些已经从客户端发送到服务端的,但是需要等待服务端响应的Packet集合。
三、创建阶段
创建阶段过程:
6.启动SendThread和EventThread。
SendThread首先会判断当前客户端的状态,进行一系列请理性工作,为客户端发送“会话创建”请求做准备。
7.获取一个服务器地址。
在开始创建TCP之前,SendThread首先需要获取一个Zookeeper服务器的目标地址, 这通常是从
HostProvider中随机获取出一个地址,然后委托给ClientCnxnSocket去创建与Zookeeper服务器之间的TCP连
接。
8.创建TCP连接。获取一个服务器地址后,ClientCnxnSocket负责和服务器创建一个TCP长连接。
9.构造ConnectRequest请求。
在TCP连接创建完毕后,可能有的读者会认为,这样是否就说明已经和Zookeeper服务器完成连接了呢?
其实不然,上面的步骤只是纯粹地从网络TCP层完成了客户端与服务端之间的Socket连接,但远未完成
Zookeeper客户端的会话创建。
SendThread会负责根据当前客户端的实际设置,构造出一个ConnectRequest请求,该请求代表了客户端试
图与服务端创建一个会话。
同时,Zookeeper客户端还会进一步将该请求包装成网络I/O层的Packet对象,放入发送队列outgoingQueue
中去。
10.发送请求。
当客户端请求准备完毕后,就可以开始向服务端发送请求了。
ClientCnxnSocket负责从outgoingQueue中取出一个待发送的Packet对象,将其序列化成ByteBuffer后,
向服务端进行发送。
SendThread线程启动,从hostProvider获取server地址,建立连接并构造请求发送。
1.获取服务器地址,建立TCP连接。
2.构造ConnectRequest请求。TCP连接建立后,client和server之间的会话并没完全建立。SendThread会根据响应的参数构造ConnectRequest,并包装成Packet对象,放入outgoingqueue中发送到Server端,这就是client和server的一个会话。
3.ClientCnxnSocket从queue中取出Packet并序列化部分属性并发送到server。
public void run() {
//ClientCnxnSocket设置sendThread、sessionId、outgoingQueue
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
while (state.isAlive()) {
try {
//判断clientCnxnSocket是否是连接状态。
//1.创建clientCnxnSocket时,没有建立连接
//2.客户端断开连接
if (!clientCnxnSocket.isConnected()) {
// don't re-establish connection if we are closing
if (closing) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
//获取下一个连接地址
serverAddress = hostProvider.next(1000);
}
//启动连接
startConnect(serverAddress);
//更新连接时间
clientCnxnSocket.updateLastSendAndHeard();
}
if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed.", e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent) {
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null));
if (state == States.AUTH_FAILED) {
eventThread.queueEventOfDeath();
}
}
}
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
if (to <= 0) {
String warnInfo = String.format(
"Client session timed out, have not heard from server in %dms for session id 0x%s",
clientCnxnSocket.getIdleRecv(),
Long.toHexString(sessionId));
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2
- clientCnxnSocket.getIdleSend()
- ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
// If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
// closing so this is expected
LOG.warn(
"An exception was thrown while closing send thread for session 0x{}.",
Long.toHexString(getSessionId()),
e);
break;
} else {
LOG.warn(
"Session 0x{} for sever {}, Closing socket connection. "
+ "Attempting reconnect except it is a SessionExpiredException.",
Long.toHexString(getSessionId()),
serverAddress,
e);
// At this point, there might still be new packets appended to outgoingQueue.
// they will be handled in next connection or cleared up if closed.
cleanAndNotifyState();
}
}
}
synchronized (state) {
// When it comes to this point, it guarantees that later queued
// packet to outgoingQueue will be notified of death.
cleanup();
}
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
}
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));
ZooTrace.logTraceMessage(
LOG,
ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x" + Long.toHexString(getSessionId()));
}
①判断clientCnxnSocket是否是连接状态,有实例化clientCnxnSocket属性可知sockKey为null。
boolean isConnected() {
return sockKey != null;
}
②调用hostProvider的next方法从集群中获取一个服务器连接地址。
③,创建TCP连接,连接zookeeper服务器。
private void startConnect(InetSocketAddress addr) throws IOException {
// initializing it for new connection
saslLoginFailed = false;
if (!isFirstConnect) {
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
state = States.CONNECTING;
String hostPort = addr.getHostString() + ":" + addr.getPort();
MDC.put("myid", hostPort);
setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")"));
if (clientConfig.isSaslClientEnabled()) {
try {
if (zooKeeperSaslClient != null) {
zooKeeperSaslClient.shutdown();
}
zooKeeperSaslClient = new ZooKeeperSaslClient(SaslServerPrincipal.getServerPrincipal(addr, clientConfig), clientConfig);
} catch (LoginException e) {
// An authentication error occurred when the SASL client tried to initialize:
// for Kerberos this means that the client failed to authenticate with the KDC.
// This is different from an authentication error that occurs during communication
// with the Zookeeper server, which is handled below.
LOG.warn(
"SASL configuration failed. "
+ "Will continue connection to Zookeeper server without "
+ "SASL authentication, if Zookeeper server allows it.", e);
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null));
saslLoginFailed = true;
}
}
logStartConnect(addr);
clientCnxnSocket.connect(addr);
}
④创建客户端会话session
创建TCP网络连接成功后,更新socket地址,sendThread创建会话连接。
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
updateSocketAddresses();
sendThread.primeConnection();
}
}
设置sessId创建连接请求ConnectRequest
public ConnectRequest(int protocolVersion, long lastZxidSeen,
int timeOut, long sessionId, byte[] passwd) {
this.protocolVersion = protocolVersion;
this.lastZxidSeen = lastZxidSeen;
this.timeOut = timeOut;
this.sessionId = sessionId;
this.passwd = passwd;
}

创建Packet,放入发送队列outgoingQueue
outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));

clientCnxnSocke注册IO读事件和写事件
void connectionPrimed() {
sockKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
1.SendThread功能
1.1维护client和server的心跳连接,一旦失去连接会立即重连
1.2管理了所有客户端请求发送和响应接收操作。将上层客户端API操作转换成相应的请求协议并发送到服务端,并完成同步调用的返回和异步调用的回调。
1.3接收请求的返回并传递给eventThread去处理。
2.EventThread
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled) {
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId()));
}
EventThread是客户端ClientCnxn内部的一个事件处理线程,负责客户端的事件处理,并触发客户端的Watcher监听。EventThread中的waitingEvents队列用于临时存放需要被触发的对象,包括客户端注册的watcher和异步接口注册的回调器AsyncCallback。同时EventThread会不断从waitingEvents中取出对象,识别具体类型Watcher或AsyncCallback,并分别调用process和processResult接口方法来实现对事件的触发和回调。
3.Packet
Packet序列化的时候createBB方法里只有部分属性序列化了,包括watcher在内的很多变量都没序列化,这是Watcher轻量性的保证。
outgoingqueue和pendingqueue,他们内部放置的对象都是Packet。在发送时,sendThread从outgoingqueue中取出Packet序列化(带有请求序号XID)并发送,然后这个Packet被转移到pendingqueue中,等待响应处理。
四、响应阶段
相应处理阶段过程:
11.响应处理阶段
接受服务器端响应。ClientCnxnSocket接受到服务端响应后,会首先判断当前的客户端状态是否是“已初始
化”,如果尚未完成初始化,那么就认为该响应一定是会话创建请求的响应,直接交由readConnectResult方
法来处理该响应。
12.处理Response。
ClientCnxnSocket会对接受到的服务端响应进行反序列化,得到ConnectResponse对象,并从中获取到
Zookeeper服务端分配的会话SessionId。
13.连接成功。
连接成功后,一方面需要通知SendThread线程,进一步对客户端进行会话参数的设置,包括readTimeout和
connectTimeout等,并更新客户端状态,另一方面,需要通知地址管理器HostProvider当前成功连接的服务
器地址。
14.生成事件:SyncConnected-None。
为了能够让上层应用感知到会话的成功创建,SendThread会生成一个事件SyncConnected-None,
代表客户端与服务器会话创建成功,并将该事件传递给EventThread线程。
15.查询Watcher。
EventThread线程收到事件后,会从ClientWatchManager管理器中查询出对应的Watcher,
针对SyncConnected-None事件,那么就直接找出存储的默认Watcher,然后将其放到EventThread的
watingEvents队列中去。
16.处理事件。
EventThread不断的从watingEvents队列中取出待处理的Watcher对象,然后直接调用该对象的process接
口方法,以达到触发Watcher的目的。
1.ClientCnxnSocket接收到响应后,首先判断客户端状态是否初始化,若未初始化,说明客户端和服务端之间正在进行会话创建并反序列化reponse,生成ConnectReponse(带有sessionId),然后通知SendThread和HostProvider进行相应的设置。
2.如果为初始化状态,且收到的是事件,那么会反序列化为WatcherEvent,并放到EventThread的等待队列中。
3.如果是常规的请求,如getData、exists等,会从pendingqueue中取出一个Packet来处理。
