一、Zookeeper工作流程

客户端会话创建 - 图1
上图主要描述了客户端和服务端互动过程:
1.客户端把request以Packet形式传递到Zookeeper类中。
2.Zookeeper类处理request并放入outgoingqueue中。
3.sendThread把发出的request转移到pendinqueue中。
4.收到回复后,sendThread从pendingqueue中取出request,并生成event。
5.eventThread处理event并触发watchManager中的Watcher,调用callback。
zk Client和Server建立连接.png
Zk 客户端和服务端建立连接,主要分为三个阶段:
1.初始化阶段:主要功能类实例化,
Zookeeper:客户端核心类之一,也是入口。
HostProvider:客户端地址列表管理器
ClientCnxn:客户端连接核心类,包含SendThread和EventThread两个线程。SendThread为IO线程,主要负责zookeeper客户端和服务端之间的网络IO通信,EventThread为事件线程,主要负责对服务端事件进行处理。
ClientWatchManager:客户端Watcher管理器。
2.创建阶段:启动并创建连接
3.响应请求:响应并接收。

二、初始化阶段

初始化阶段过程:

  1. 1.初始化ZooKeeper对象
  2. 通过调用ZooKeeper的构造方法实例化一个ZooKeeper对象,在初始化过程中会创建一个客户端Watcher管理器ClientWatcherManager
  3. 2.设置会话默认Watcher
  4. 如果构造方法中传入了一个Watcher对象,那么客户端将这个Watcher对象作为默认Watcher保存到ClientWatcherManager
  5. 3.构造ZK服务器地址列表管理器HostProvider
  6. 对于构造器中传入的服务器端地址,客户端将其保存在服务器地址列表管理器HostProvider
  7. 4.创建并初始化客户端网络连接
  8. ZooKeeper客户端首先会创建一个网络连接器ClientCnxn,用来管理客户端与服务器的网络交互。
  9. 另外,客户端在创建 ClientCnxn的同时还会初始化客户端两个核心队列outGoingQueuependingQueue
  10. 分别作为客户端请求组发送队列和服务端 响应等待队列
  11. 5.初始化SendThreadEventThread
  12. 客户端会创建两个核心网络线程SendThreadEventThread,前者用于管理客户端和服务端之间的所有网络
  13. I/O,后者则用于进行客户端的事件处理。同时,客户端还会将ClientCnxnSocket分配给SendThread作为底层
  14. 网络I/O处理器,并初始化EventThread的待处理事件队列waitingEvents,用于存放所有等待被客户端处理的
  15. 事情。

1.Zookeeper实例化

public ZooKeeper(String connectString,int sessionTimeout,Watcher watcher,
    boolean canBeReadOnly,HostProvider aHostProvider,ZKClientConfig clientConfig) 
    throws IOException {
    LOG.info(
        "Initiating client connection, connectString={} sessionTimeout={} watcher={}",
        connectString,
        sessionTimeout,
        watcher);

    if (clientConfig == null) {
        clientConfig = new ZKClientConfig();
    }
    this.clientConfig = clientConfig;
    //实例化ClientWatchManager类型为ZKWatchManager
    watchManager = defaultWatchManager();
    //设置默认watcher
    watchManager.defaultWatcher = watcher;
    //负责解析server地址串
    //1.加chroot(默认prefix)
    //2.读字符串把server地址分开
    ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
    //分局字符串解析host、ip等
    hostProvider = aHostProvider;
    //实例化ClientCnxn类
    cnxn = createConnection(
        connectStringParser.getChrootPath(),
        hostProvider,
        sessionTimeout,
        this,
        watchManager,
        //实例化NIOSocket
        getClientCnxnSocket(),
        canBeReadOnly);
    //启动SendThread、EventThread
    cnxn.start();
}

1.1实例化ZKWatchManager

protected ZKWatchManager defaultWatchManager() {
    return new ZKWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET));
}

public boolean getBoolean(String key) {
    //默认为false
    return getBoolean(key, false);
}

1.2实例化连接地址解析器ConnectStringParser

解析连接服务端地址串,并设置连接地址列表。解析chrootPath。

Chroot:客户端隔离命名空间。

如果一个Zookeeper客户端设置了Chroot,那么该客户端对服务器的任何操作,都会被限制在其自己的命名空间下。在那些多个应用公用一个Zookeeper集群的场景下,这对于实现不同应用之间的相互隔离非常有帮助。客户端可以通过在connectString添加后缀的的方式来设置Chroot。
image.png

//默认端口
private static final int DEFAULT_PORT = 2181;
//默认前缀
private final String chrootPath;
//服务端地址列表
private final ArrayList<InetSocketAddress> serverAddresses = 
    new ArrayList<InetSocketAddress>();

public ConnectStringParser(String connectString) {
    // parse out chroot, if any
    int off = connectString.indexOf('/');
    if (off >= 0) {
        String chrootPath = connectString.substring(off);
        // ignore "/" chroot spec, same as null
        if (chrootPath.length() == 1) {
            this.chrootPath = null;
        } else {
            PathUtils.validatePath(chrootPath);
            this.chrootPath = chrootPath;
        }
        connectString = connectString.substring(0, off);
    } else {
        this.chrootPath = null;
    }
    //拆分地址串
    List<String> hostsList = split(connectString, ",");
    for (String host : hostsList) {
        int port = DEFAULT_PORT;
        String[] hostAndPort = NetUtils.getIPV6HostAndPort(host);
        if (hostAndPort.length != 0) {
            host = hostAndPort[0];
            if (hostAndPort.length == 2) {
                port = Integer.parseInt(hostAndPort[1]);
            }
        } else {
            int pidx = host.lastIndexOf(':');
            if (pidx >= 0) {
                // otherwise : is at the end of the string, ignore
                if (pidx < host.length() - 1) {
                    port = Integer.parseInt(host.substring(pidx + 1));
                }
                host = host.substring(0, pidx);
            }
        }
        serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
    }
}

1.3实例化HostProvider:地址列表管理器

public interface HostProvider {

    int size();

    InetSocketAddress next(long spinDelay);

    void onConnected();

    boolean updateServerList(Collection<InetSocketAddress> serverAddresses, 
                             InetSocketAddress currentHost);

}

Zookeeper规定:任何对于该接口的实现必须满足以下3点:
1.next()方法必须要有合法的返回值。
凡是对该方法的调用,必须要返回一个合法的InetSocketAddress对象,不能是null或其他不合法的InetSocketAddress
2.next()方法必须返回已解析的InetSocketAddress对象。
服务器的地址列表存放在ConnectStringParser.ServerAddresses中,是没有被解析的InetSocketAddress对象。在传递到HostProvider后,HostProvider负责对InetSocketAddress列表进行解析。最终返回的时已解析的InetSocketAddress对象。
3.size()方法不能返回0
Zookeeper中规定,size方法不能返回0,也就是说,HostProvider中必须至少有一个服务器地址。

private static HostProvider createDefaultHostProvider(String connectString) {
    return new StaticHostProvider(
        new ConnectStringParser(connectString).getServerAddresses());
}

public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) {
    init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(), new Resolver() {
        @Override
        public InetAddress[] getAllByName(String name) throws UnknownHostException {
            return InetAddress.getAllByName(name);
        }
    });
}

private void init(Collection<InetSocketAddress> serverAddresses, 
                  long randomnessSeed, 
                  Resolver resolver) {
    //获取随机数
    this.sourceOfRandomness = new Random(randomnessSeed);
    this.resolver = resolver;
    if (serverAddresses.isEmpty()) {
        throw new IllegalArgumentException("A HostProvider may not be empty!");
    }
    //重新解析连接地址列表
    this.serverAddresses = shuffle(serverAddresses);
    currentIndex = -1;
    lastIndex = -1;
}

StaticHostProvider

解析服务器地址
StaticHostProvider解析服务器地址列表,使用Collections工具类的shuffle方法来将这个服务器地址列表进行随机打散。
获取可用的服务器地址
next()方法先将打散的服务器地址列表拼装成一个环形循环队列。
工作原理:
客户端会话创建 - 图4

hostProvider的next方法

public InetSocketAddress next(long spinDelay) {
    boolean needToSleep = false;
    InetSocketAddress addr;

    synchronized (this) {
        if (reconfigMode) {
            addr = nextHostInReconfigMode();
            if (addr != null) {
                currentIndex = serverAddresses.indexOf(addr);
                return resolve(addr);
            }
            //tried all servers and couldn't connect
            reconfigMode = false;
            needToSleep = (spinDelay > 0);
        }
        //当前索引+1
        ++currentIndex;
        //重置当前索引
        if (currentIndex == serverAddresses.size()) {
            currentIndex = 0;
        }
        //获取新的server地址
        addr = serverAddresses.get(currentIndex);
        needToSleep = needToSleep || (currentIndex == lastIndex && spinDelay > 0);
        if (lastIndex == -1) {
            // We don't want to sleep on the first ever connect attempt.
            lastIndex = 0;
        }
    }
    if (needToSleep) {
        try {
            Thread.sleep(spinDelay);
        } catch (InterruptedException e) {
            LOG.warn("Unexpected exception", e);
        }
    }

    return resolve(addr);
}

1.4实例化ClientCnxnSocketNIO:底层socket通信层

该实现类使用Java原生的NIO接口,其核心是doIO逻辑,主要负责对请求的发送和响应接收过程。

private ClientCnxnSocket getClientCnxnSocket() throws IOException {
    String clientCnxnSocketName = getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
    if (clientCnxnSocketName == null) {
        clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
    }
    try {
        Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName)
            .getDeclaredConstructor(ZKClientConfig.class);
        ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());
        return clientCxnSocket;
    } catch (Exception e) {
        throw new IOException("Couldn't instantiate " + clientCnxnSocketName, e);
    }
}

socketName = ClientCnxnSocketNIO
image.png
image.png

请求发送

正常情况下,会从outgoingQueue队列中取出一个可发送的Packet对象,同时生成一个xid并将其设置到packet的请求头中,然后将其序列化后发送。请求发送完毕后,会立即将该Packet保存到pendingQueue队列中,以便等待服务端响应返回后进行相应的处理。

响应接收

image.png

1.5实例化ClientCnxn:网络IO

ClientCnxn是Zookeeper客户端的核心工作类,负责维护客户端和服务端之间的网络连接,并进行一系列网络通信。

Packet

Packet是ClientCnxn内部定义的一个对协议层的封装。包含了最基本的请求头requestHeader、请求体request,响应头ReplyHeader、响应体response、节点路径serverPath/clientPath和注册的Watcher(WatchRegistration)等信息。
packet中的createBB()方法负责对Packet对象进行序列化,最终生成可用于底层网络传输的ByteBuffer对象。在这个过程中,只会将requestHeader、request和readOnly三个属性进行序列化,其余属性保存在客户端的上下文中,不会进行与服务端之间的网络传输。
image.png

public ClientCnxn(
    String chrootPath,
    HostProvider hostProvider,
    int sessionTimeout,
    ZooKeeper zooKeeper,
    ClientWatchManager watcher,
    ClientCnxnSocket clientCnxnSocket,
    long sessionId,
    byte[] sessionPasswd,
    boolean canBeReadOnly) {
    this.zooKeeper = zooKeeper;
    this.watcher = watcher;
    this.sessionId = sessionId;
    this.sessionPasswd = sessionPasswd;
    this.sessionTimeout = sessionTimeout;
    this.hostProvider = hostProvider;
    this.chrootPath = chrootPath;

    connectTimeout = sessionTimeout / hostProvider.size();
    readTimeout = sessionTimeout * 2 / 3;
    readOnly = canBeReadOnly;
    //创建发送线程
    sendThread = new SendThread(clientCnxnSocket);
    //创建事件线程
    eventThread = new EventThread();
    this.clientConfig = zooKeeper.getClientConfig();
    initRequestTimeout();
}

clientCnxn属性:
image.png

outgoingQueue和pendingQueue

outgoingQueue队列是一个请求发送队列,专门用于存储那些需要发送到服务端的Packet集合。Pending队列是为了存储那些已经从客户端发送到服务端的,但是需要等待服务端响应的Packet集合。

三、创建阶段

创建阶段过程:

6.启动SendThread和EventThread。
  SendThread首先会判断当前客户端的状态,进行一系列请理性工作,为客户端发送“会话创建”请求做准备。

7.获取一个服务器地址。
  在开始创建TCP之前,SendThread首先需要获取一个Zookeeper服务器的目标地址, 这通常是从
HostProvider中随机获取出一个地址,然后委托给ClientCnxnSocket去创建与Zookeeper服务器之间的TCP连
接。

8.创建TCP连接。获取一个服务器地址后,ClientCnxnSocket负责和服务器创建一个TCP长连接。
9.构造ConnectRequest请求。
  在TCP连接创建完毕后,可能有的读者会认为,这样是否就说明已经和Zookeeper服务器完成连接了呢?
其实不然,上面的步骤只是纯粹地从网络TCP层完成了客户端与服务端之间的Socket连接,但远未完成
Zookeeper客户端的会话创建。
  SendThread会负责根据当前客户端的实际设置,构造出一个ConnectRequest请求,该请求代表了客户端试
图与服务端创建一个会话。
同时,Zookeeper客户端还会进一步将该请求包装成网络I/O层的Packet对象,放入发送队列outgoingQueue
中去。

10.发送请求。
  当客户端请求准备完毕后,就可以开始向服务端发送请求了。
  ClientCnxnSocket负责从outgoingQueue中取出一个待发送的Packet对象,将其序列化成ByteBuffer后,
向服务端进行发送。

SendThread线程启动,从hostProvider获取server地址,建立连接并构造请求发送。
1.获取服务器地址,建立TCP连接。
2.构造ConnectRequest请求。TCP连接建立后,client和server之间的会话并没完全建立。SendThread会根据响应的参数构造ConnectRequest,并包装成Packet对象,放入outgoingqueue中发送到Server端,这就是client和server的一个会话。
3.ClientCnxnSocket从queue中取出Packet并序列化部分属性并发送到server。

public void run() {
    //ClientCnxnSocket设置sendThread、sessionId、outgoingQueue
    clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
    clientCnxnSocket.updateNow();
    clientCnxnSocket.updateLastSendAndHeard();
    int to;
    long lastPingRwServer = Time.currentElapsedTime();
    final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
    InetSocketAddress serverAddress = null;
    while (state.isAlive()) {
        try {
            //判断clientCnxnSocket是否是连接状态。
            //1.创建clientCnxnSocket时,没有建立连接
            //2.客户端断开连接
            if (!clientCnxnSocket.isConnected()) {
                // don't re-establish connection if we are closing
                if (closing) {
                    break;
                }
                if (rwServerAddress != null) {
                    serverAddress = rwServerAddress;
                    rwServerAddress = null;
                } else {
                    //获取下一个连接地址
                    serverAddress = hostProvider.next(1000);
                }
                //启动连接
                startConnect(serverAddress);
                //更新连接时间
                clientCnxnSocket.updateLastSendAndHeard();
            }

            if (state.isConnected()) {
                // determine whether we need to send an AuthFailed event.
                if (zooKeeperSaslClient != null) {
                    boolean sendAuthEvent = false;
                    if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                        try {
                            zooKeeperSaslClient.initialize(ClientCnxn.this);
                        } catch (SaslException e) {
                            LOG.error("SASL authentication with Zookeeper Quorum member failed.", e);
                            state = States.AUTH_FAILED;
                            sendAuthEvent = true;
                        }
                    }
                    KeeperState authState = zooKeeperSaslClient.getKeeperState();
                    if (authState != null) {
                        if (authState == KeeperState.AuthFailed) {
                            // An authentication error occurred during authentication with the Zookeeper Server.
                            state = States.AUTH_FAILED;
                            sendAuthEvent = true;
                        } else {
                            if (authState == KeeperState.SaslAuthenticated) {
                                sendAuthEvent = true;
                            }
                        }
                    }

                    if (sendAuthEvent) {
                        eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null));
                        if (state == States.AUTH_FAILED) {
                            eventThread.queueEventOfDeath();
                        }
                    }
                }
                to = readTimeout - clientCnxnSocket.getIdleRecv();
            } else {
                to = connectTimeout - clientCnxnSocket.getIdleRecv();
            }

            if (to <= 0) {
                String warnInfo = String.format(
                    "Client session timed out, have not heard from server in %dms for session id 0x%s",
                    clientCnxnSocket.getIdleRecv(),
                    Long.toHexString(sessionId));
                LOG.warn(warnInfo);
                throw new SessionTimeoutException(warnInfo);
            }
            if (state.isConnected()) {
                //1000(1 second) is to prevent race condition missing to send the second ping
                //also make sure not to send too many pings when readTimeout is small
                int timeToNextPing = readTimeout / 2
                    - clientCnxnSocket.getIdleSend()
                    - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                    sendPing();
                    clientCnxnSocket.updateLastSend();
                } else {
                    if (timeToNextPing < to) {
                        to = timeToNextPing;
                    }
                }
            }

            // If we are in read-only mode, seek for read/write server
            if (state == States.CONNECTEDREADONLY) {
                long now = Time.currentElapsedTime();
                int idlePingRwServer = (int) (now - lastPingRwServer);
                if (idlePingRwServer >= pingRwTimeout) {
                    lastPingRwServer = now;
                    idlePingRwServer = 0;
                    pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout);
                    pingRwServer();
                }
                to = Math.min(to, pingRwTimeout - idlePingRwServer);
            }

            clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
        } catch (Throwable e) {
            if (closing) {
                // closing so this is expected
                LOG.warn(
                    "An exception was thrown while closing send thread for session 0x{}.",
                    Long.toHexString(getSessionId()),
                    e);
                break;
            } else {
                LOG.warn(
                    "Session 0x{} for sever {}, Closing socket connection. "
                    + "Attempting reconnect except it is a SessionExpiredException.",
                    Long.toHexString(getSessionId()),
                    serverAddress,
                    e);

                // At this point, there might still be new packets appended to outgoingQueue.
                // they will be handled in next connection or cleared up if closed.
                cleanAndNotifyState();
            }
        }
    }

    synchronized (state) {
        // When it comes to this point, it guarantees that later queued
        // packet to outgoingQueue will be notified of death.
        cleanup();
    }
    clientCnxnSocket.close();
    if (state.isAlive()) {
        eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
    }
    eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));
    ZooTrace.logTraceMessage(
        LOG,
        ZooTrace.getTextTraceLevel(),
        "SendThread exited loop for session: 0x" + Long.toHexString(getSessionId()));
}

①判断clientCnxnSocket是否是连接状态,有实例化clientCnxnSocket属性可知sockKey为null。

boolean isConnected() {
    return sockKey != null;
}

②调用hostProvider的next方法从集群中获取一个服务器连接地址。
image.png
③,创建TCP连接,连接zookeeper服务器。

private void startConnect(InetSocketAddress addr) throws IOException {
    // initializing it for new connection
    saslLoginFailed = false;
    if (!isFirstConnect) {
        try {
            Thread.sleep(r.nextInt(1000));
        } catch (InterruptedException e) {
            LOG.warn("Unexpected exception", e);
        }
    }
    state = States.CONNECTING;

    String hostPort = addr.getHostString() + ":" + addr.getPort();
    MDC.put("myid", hostPort);
    setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")"));
    if (clientConfig.isSaslClientEnabled()) {
        try {
            if (zooKeeperSaslClient != null) {
                zooKeeperSaslClient.shutdown();
            }
            zooKeeperSaslClient = new ZooKeeperSaslClient(SaslServerPrincipal.getServerPrincipal(addr, clientConfig), clientConfig);
        } catch (LoginException e) {
            // An authentication error occurred when the SASL client tried to initialize:
            // for Kerberos this means that the client failed to authenticate with the KDC.
            // This is different from an authentication error that occurs during communication
            // with the Zookeeper server, which is handled below.
            LOG.warn(
                "SASL configuration failed. "
                + "Will continue connection to Zookeeper server without "
                + "SASL authentication, if Zookeeper server allows it.", e);
            eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null));
            saslLoginFailed = true;
        }
    }
    logStartConnect(addr);

    clientCnxnSocket.connect(addr);
}

④创建客户端会话session
image.png
创建TCP网络连接成功后,更新socket地址,sendThread创建会话连接。

if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
    if (sc.finishConnect()) {
        updateLastSendAndHeard();
        updateSocketAddresses();
        sendThread.primeConnection();
    }
}

设置sessId创建连接请求ConnectRequest
image.png

public ConnectRequest(int protocolVersion, long lastZxidSeen, 
                      int timeOut, long sessionId, byte[] passwd) {
    this.protocolVersion = protocolVersion;
    this.lastZxidSeen = lastZxidSeen;
    this.timeOut = timeOut;
    this.sessionId = sessionId;
    this.passwd = passwd;
}

image.png
创建Packet,放入发送队列outgoingQueue

outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));

image.png
clientCnxnSocke注册IO读事件和写事件

void connectionPrimed() {
    sockKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}

1.SendThread功能

1.1维护client和server的心跳连接,一旦失去连接会立即重连
1.2管理了所有客户端请求发送和响应接收操作。将上层客户端API操作转换成相应的请求协议并发送到服务端,并完成同步调用的返回和异步调用的回调。
1.3接收请求的返回并传递给eventThread去处理。
客户端会话创建 - 图15

2.EventThread

public void run() {
    try {
        isRunning = true;
        while (true) {
            Object event = waitingEvents.take();
            if (event == eventOfDeath) {
                wasKilled = true;
            } else {
                processEvent(event);
            }
            if (wasKilled) {
                synchronized (waitingEvents) {
                    if (waitingEvents.isEmpty()) {
                        isRunning = false;
                        break;
                    }
                }
            }
        }
    } catch (InterruptedException e) {
        LOG.error("Event thread exiting due to interruption", e);
    }

    LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId()));
}

EventThread是客户端ClientCnxn内部的一个事件处理线程,负责客户端的事件处理,并触发客户端的Watcher监听。EventThread中的waitingEvents队列用于临时存放需要被触发的对象,包括客户端注册的watcher和异步接口注册的回调器AsyncCallback。同时EventThread会不断从waitingEvents中取出对象,识别具体类型Watcher或AsyncCallback,并分别调用process和processResult接口方法来实现对事件的触发和回调。

3.Packet

Packet序列化的时候createBB方法里只有部分属性序列化了,包括watcher在内的很多变量都没序列化,这是Watcher轻量性的保证。
outgoingqueue和pendingqueue,他们内部放置的对象都是Packet。在发送时,sendThread从outgoingqueue中取出Packet序列化(带有请求序号XID)并发送,然后这个Packet被转移到pendingqueue中,等待响应处理。

四、响应阶段

相应处理阶段过程:

11.响应处理阶段
    接受服务器端响应。ClientCnxnSocket接受到服务端响应后,会首先判断当前的客户端状态是否是“已初始
化”,如果尚未完成初始化,那么就认为该响应一定是会话创建请求的响应,直接交由readConnectResult方
法来处理该响应。

12.处理Response。
    ClientCnxnSocket会对接受到的服务端响应进行反序列化,得到ConnectResponse对象,并从中获取到
Zookeeper服务端分配的会话SessionId。
13.连接成功。
    连接成功后,一方面需要通知SendThread线程,进一步对客户端进行会话参数的设置,包括readTimeout和
connectTimeout等,并更新客户端状态,另一方面,需要通知地址管理器HostProvider当前成功连接的服务
器地址。

14.生成事件:SyncConnected-None。
    为了能够让上层应用感知到会话的成功创建,SendThread会生成一个事件SyncConnected-None,
代表客户端与服务器会话创建成功,并将该事件传递给EventThread线程。

15.查询Watcher。
    EventThread线程收到事件后,会从ClientWatchManager管理器中查询出对应的Watcher,
  针对SyncConnected-None事件,那么就直接找出存储的默认Watcher,然后将其放到EventThread的
watingEvents队列中去。

16.处理事件。
    EventThread不断的从watingEvents队列中取出待处理的Watcher对象,然后直接调用该对象的process接
 口方法,以达到触发Watcher的目的。

1.ClientCnxnSocket接收到响应后,首先判断客户端状态是否初始化,若未初始化,说明客户端和服务端之间正在进行会话创建并反序列化reponse,生成ConnectReponse(带有sessionId),然后通知SendThread和HostProvider进行相应的设置。
2.如果为初始化状态,且收到的是事件,那么会反序列化为WatcherEvent,并放到EventThread的等待队列中。
3.如果是常规的请求,如getData、exists等,会从pendingqueue中取出一个Packet来处理。