<br />
组件间关系图
点击查看【processon】
当开发人员 new
了一个 ZooKeeper
对象时,它会初始化以下三个组件:
HostProvider
,用来解析域名 和 负载均衡的ZKWatchManager
,用来处理事件的ClientCnxn
,用来处理网络IO的SendThread
,用来处理请求/响应EventThread
,用来处理事件回调的
最后会启动 SendThread
和 EventThread
循环执行任务。首先 SendThread
会判断当前客户端是否被关闭,如果被关闭了就清理一下数据退出;如果没有关闭,则判断是否和服务器建立了TCP连接,如果没有就去创建TCP连接,执行连接后可能还不会立即成功(可能是异步的),此种情况会在下几次循环 SendThread#run
时判断连接是否成功;如果(直接)连接成功了,就能创建会话请求包并放入 outgoing queue
里,等待 doTransport()
取走处理;当下几次循环获取到了响应,就构建相应的事件,设置状态~然后 EventThread
取到了待执行的事件,就会去处理。
连接的关键就是,当TCP连接建立成功后,会创建会话请求包并放入 outgoing queue
中; outgoing queue
里的数据会在某次循环中被取出来发送(优先第一个)。当收到创建会话请求的响应后,构建相应的事件并设置 ZooKeeper
的状态。
1. 初始化ZooKeeper
public ZooKeeper(
String connectString, // 连接的服务器列表,用 , 隔开
int sessionTimeout, // 会话超时时间
Watcher watcher, // 连接成功后的Watch回调
boolean canBeReadOnly, // 决定当前的Client是否支持 只读模式下的Server
HostProvider aHostProvider, // 自定义HostProvider
// ZooKeeper 客户端的配置
ZKClientConfig clientConfig) throws IOException {
// 如果客户端配置为null,则使用默认配置
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
// 初始化默认的WatchManager
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
hostProvider = aHostProvider;
// 构建 ClientCnxn
cnxn = createConnection(
connectStringParser.getChrootPath(),
hostProvider,
sessionTimeout,
this,
watchManager,
getClientCnxnSocket(),
canBeReadOnly);
// 启动 SendThread 、EventThread
cnxn.start();
}
2. 初始化ZKWatchManager
这部分略过
3. 初始化HostProvider
如果开发人员不给定 HostProvider
,那 ZooKeeper
默认会通过下面的方法创建:
private static HostProvider createDefaultHostProvider(String connectString) {
// 处理开发人员给的connectString,服务器地址列表字符串。处理的规范一些
// 构建StaticHostProvider
return new StaticHostProvider(new ConnectStringParser(connectString).getServerAddresses());
}
然后创建 StaticHostProvider
的逻辑如下所示:
public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) {
// 调用init方法;这里Resolver的匿名函数对象是重点,下面要考的
init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(), new Resolver() {
@Override
public InetAddress[] getAllByName(String name) throws UnknownHostException {
return InetAddress.getAllByName(name);
}
});
}
private void init(Collection<InetSocketAddress> serverAddresses,
long randomnessSeed, Resolver resolver) {
// 构造一个随机种子
this.sourceOfRandomness = new Random(randomnessSeed);
this.resolver = resolver;
if (serverAddresses.isEmpty()) {
throw new IllegalArgumentException("A HostProvider may not be empty!");
}
// 重点!!!!重新打乱服务器列表,简单的实现负载均衡!!!!
this.serverAddresses = shuffle(serverAddresses);
currentIndex = -1;
lastIndex = -1;
}
4. 初始化网络连接器ClientCnxn
这部分主要设置 ClientCnxn
的一些属性,比如: chroot
、会话ID、连接超时时间等等。它的源码如下所示:
public ClientCnxn(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
ZooKeeper zooKeeper,
ClientWatchManager watcher,
ClientCnxnSocket clientCnxnSocket,
long sessionId,
byte[] sessionPasswd,
boolean canBeReadOnly) {
// Zookeeper实例
this.zooKeeper = zooKeeper;
// WatchManager
this.watcher = watcher;
// 初始化时是0
this.sessionId = sessionId;
// 空的16字节数组
this.sessionPasswd = sessionPasswd;
// 会话过期时间
this.sessionTimeout = sessionTimeout;
// 服务器列表解析器
this.hostProvider = hostProvider;
// 获取chroot,即用户指定的逻辑上的根目录,指定一个目录为根目录,后续的相对目录可以使用/替代
this.chrootPath = chrootPath;
// 连接超时时长 由 会话超时时长 / 服务器列表个数 决定。服务器列表个数越多,连接超时时间越短,发生断开时切换的越快
connectTimeout = sessionTimeout / hostProvider.size();
// 读取数据超时时间
readTimeout = sessionTimeout * 2 / 3;
// 看代码后面的解释
readOnly = canBeReadOnly;
// 5. 关键:创建SendThread
sendThread = new SendThread(clientCnxnSocket);
// 5. 关键:创建EventThread
eventThread = new EventThread();
this.clientConfig = zooKeeper.getClientConfig();
// 根据配置文件初始化请求相关的参数
initRequestTimeout();
}
👉👉👉不了解 chroot
的戳这里《ZooKeeper - 创建会话》👈👈👈
readOnly
:该属性是用于启用了Read Only Mode
功能的ZooKeeper
集群。当Zookeeper
集群发生分区现象,那么该属性会支撑ZooKeeper
服务器的可读功能,但是所有的写入会被拒绝。该属性仅在发生分区时生效。
6. 启动SendThread和EventThread
// org.apache.zookeeper.ClientCnxn
public void start() {
sendThread.start();
eventThread.start();
}
SendThread#run
在本阶段,只会分析和 会话创建相关的内容,可能会省略掉一些不必要的琐碎的部分。
基本状态设置
public void run() {
// 设置新的操作对象
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
// 更新当前时间(此时为 Time.currentElapsedTime())
clientCnxnSocket.updateNow();
// 更新最后一次发送/最后一次接收的时间(此时 = now)
clientCnxnSocket.updateLastSendAndHeard();
// 当前时间 距离上一次 接收到响应 的间隔时间,超过一定时间会抛出异常的
int to;
// 最后一次ping 可read/可write的server时的时间
long lastPingRwServer = Time.currentElapsedTime();
// ping的间隔时间
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
// 要连接的服务器地址
InetSocketAddress serverAddress = null;
// 循环体
...
}
建立TCP连接
这部分需要进入循环体, ZooKeeper
是否关闭决定了该循环体是否结束。该循环体内主要就是发送请求,接收响应,包含了早期的一些必要的TCP建立,会话建立,认证等特殊请求:
// org.apache.zookeeper.ClientCnxn.SendThread#run
while (state.isAlive()) {
try {
// 判断Socket是否已经建立连接完毕,注意,这里是判断Socket是否连接
if (!clientCnxnSocket.isConnected()) {
// 如果我们手动关闭了连接,就不要再重新连接了
if (closing) {
break;
}
// 涉及readonly模式,这里不分析
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
serverAddress = hostProvider.next(1000);
}
// 重点!!!开始连接ZooKeeper(入口)
startConnect(serverAddress);
// 更新心跳时间,更新最后一次 发送/接收时间
clientCnxnSocket.updateLastSendAndHeard();
}
// 目前仅需要分析TCP如何建立连接的,暂时省略
...
} catch (Throwable e) {
...
}
}
在 startConnect()
方法里开始执行TCP连接:
private void startConnect(InetSocketAddress addr) throws IOException {
// sasl认证
...
// 如果不是第一次连接,需要暂停1秒左右
if (!isFirstConnect) {
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// 设置ClientCnxn的state状态,为正在连接
state = States.CONNECTING;
// IP地址
String hostPort = addr.getHostString() + ":" + addr.getPort();
// 多线程日志记录,在多线程下,给定一个唯一标志,方便后续分析
MDC.put("myid", hostPort);
// 设置线程名字
setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")"));
// sasl认证处理
...
// 记录连接日志
logStartConnect(addr);
// 根据不同的连接器实现执行连接,默认情况下是使用NIO方式连接
clientCnxnSocket.connect(addr);
}
clientCnxnSocket
有不同的实现,目前 ZooKeeper
提供了 NIO
和 Netty
的方式,默认情况下使用 NIO
方式建立TCP连接:
// org.apache.zookeeper.ClientCnxnSocketNIO#connect
@Override
void connect(InetSocketAddress addr) throws IOException {
// 创建Socket
SocketChannel sock = createSock();
try {
// 注册并连接指定地址
registerAndConnect(sock, addr);
} catch (IOException e) {
LOG.error("Unable to open socket to {}", addr);
sock.close();
throw e;
}
// 设置initialized为false
initialized = false;
/*
* 清空缓存
*/
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
// 连接
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {
// 注册 select,这部分可以看java NIO学习
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
// 执行连接,可能结果不会立刻为true;如果是异步情况或者正在建立连接的,会返回false,
boolean immediateConnect = sock.connect(addr);
// 如若连接成功,就能立刻执行会话创建
// 如若连接失败,就等后续while循环执行doTransport()时再执行
if (immediateConnect) {
sendThread.primeConnection();
}
}
在 registerAndConnect
方法里,我们可以看到 SocketChannel
执行 connect()
进行连接,这个方法根据 SocketChannel
是否为异步模式来决定是否直接返回:
- 比如
SocketChannel
是异步模式,那么immediateConnect
可能不会直接为true
,即仍处于连接中状态。那就要等到doTransport()
方法判断是否连接成功,进而构建会话请求包。 - 如果
SocketChannel
是同步模式,那么会一直阻塞到方法返回,即连接成功。如果返回结果immediateConnect
为true
,就能立刻执行sendThread.primeConnection()
构建会话请求包并放入outgoing queue
中。
截止目前为止,已经开始建立TCP连接了~注意如果是异步模式,到这一步可能还没成功建立连接。
假设现在是同步模式,我们阻塞并等待 sock.connect(addr)
方法返回,如果连接成功,就能接下去执行 primeConnection()
:
void primeConnection() throws IOException {
isFirstConnect = false;
// 判断之前是否存在session,如果有,就用之前的sessionId
long sessId = (seenRwServerBefore) ? sessionId : 0;
// 创建连接请求,这里有最后一次的事务ID、会话超时时间、sessionID
ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
...
// 如果存在要认证的信息,就创建认证请求并放入队列中
for (AuthData id : authInfo) {
outgoingQueue.addFirst(
new Packet(
new RequestHeader(ClientCnxn.AUTHPACKET_XID, OpCode.auth),
null,
new AuthPacket(0, id.scheme, id.data),
null,
null));
}
// 重点!保证会话请求在所有请求之前,addFirst
outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));
// 设置SelectionKey 可读/可写,即给SelectionKey设置OP_READ、OP_WRITE
clientCnxnSocket.connectionPrimed();
// 发送请求的日志
LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress());
}
构建会话请求的逻辑算是简单的,就是将请求封装成 Packet
,然后放入 outgoing queue
中。到时候,这个循环体的另外一个方法 doTransport()
会在TCP建立成功后,从 outgoing queue
中取出请求发送给服务端。
注意,该阶段没有发送任何请求!
处理网络IO
笔者中间省略了一些 建立在会话基础上的一些代码,先分析网络监听IO,等会话连接成功了,再回头分析中间省略的代码。
// org.apache.zookeeper.ClientCnxn.SendThread#run
while (state.isAlive()) {
try {
// 因为前面的代码,除了建立连接以外,都是需要建立在会话基础上的
// 目前先考虑网络IO,讲完了会话建立成功后再分析这部分代码
...
// 在初始化这个阶段,该方法主要用来监听连接的,如果连接成功
// 就可以接着执行sendThread.primeConnection();
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
} catch (Throwable e) {
...
}
}
现在让我们看看核心的网络IO处理逻辑:
void doTransport(
int waitTimeOut,
Queue<Packet> pendingQueue,
ClientCnxn cnxn) throws IOException, InterruptedException {
// 这部分都是NIO的知识
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
// 遍历所有的SelectionKey
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
// 如果SelectionKey仅处于连接状态,且已经准备就绪,就执行该条件语句
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
// 判断异步模式下的SocketChannel的Scoket是否连接完毕
// 之前是同步模式下的SocketChannel连接,就能直接执行会话请求的构建
if (sc.finishConnect()) {
// 更新最后一次的发送/接收时间
updateLastSendAndHeard();
// 更新当前绑定的socket地址
updateSocketAddresses();
// 因为TCP已经建立完毕。准备创建会话请求,并放入待取队列中
sendThread.primeConnection();
}
// 如果SelectionKey处于 可读/可写,且已经准备就绪的状态,就执行该条件语句
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
// 这里会处理读写请求
doIO(pendingQueue, cnxn);
}
}
// 判断是否已经建立连接,若连接建立成功,判断会话请求是否在请求队列内的首位。
// 如果确定,给予SelectionKey 可写能力
if (sendThread.getZkState().isConnected()) {
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
enableWrite();
}
}
// 清理引用
selected.clear();
}
关于 sendThread.primeConnection()
方法详解可以看 建立TCP连接的最后一部分 ,那里分析了 Blocking Mode SocketChannel
下的TCP连接成功后,对会话请求的构建;这边是 None Blocking Mode SocketChannel
下的TCP连接成功后,对会话请求的构建。
该方法除了 OP_CONNECT
的判断以外,还存在 OP_READ|OP_WRITE
的判断,这部分就是真正的处理读写请求的部分。当一个请求准备就绪了,并且支持读写后,就可以处理IO了。
void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
// 如果SelectionKey可读
if (sockKey.isReadable()) {
// 读取数据
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
// 全部读完了已经
if (!incomingBuffer.hasRemaining()) {
// 恢复
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount.getAndIncrement();
readLength();
// 如果有数据,且当前还未初始化,那么必然是会话请求
} else if (!initialized) {
// 读取连接结果,这里开始构建事件
readConnectResult();
// 启用可读能力
enableRead();
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
// 善后,恢复到初始状态
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
// 如果是普通的读,就读取信息,转换成Event放入EventThread
} else {
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
// 如果可写
if (sockKey.isWritable()) {
// 找到待发送的Packet,基本是队首
Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
if (p != null) {
// 更新最后发送时间
updateLastSend();
// 如果packet还没有序列化
if (p.bb == null) {
// 判断是否有请求头 && 请求头类型非ping && 请求头类型非auth
// 感觉是在判断请求是否为普通请求
if ((p.requestHeader != null)
&& (p.requestHeader.getType() != OpCode.ping)
&& (p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
// 序列化请求
p.createBB();
}
// 写入请求
sock.write(p.bb);
// 所有字节码都发送完毕,hasRemaining()表示是否有剩余;
if (!p.bb.hasRemaining()) {
// 发送成功数量++
sentCount.getAndIncrement();
// 从outgoing queue中移除队首(发送的包)
outgoingQueue.removeFirstOccurrence(p);
// 判断是否有请求头 && 请求头类型非ping && 请求头类型非auth
// 如果是普通请求,那么加入pendingQueue;反之不加入;
// 因为普通请求可以被开发人员增加监听;特殊事件基本都是按ZooKeeper既定的路线走
// 不需要再加入pendingQueue,直接内部消化了
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
// 如果待发送队列空了,就禁用写入
if (outgoingQueue.isEmpty()) {
disableWrite();
// 如果会话初始化还没成功,也禁用写入。禁用了那怎么发送会话包呢?
// 至少会执行一次,因为这在write之后
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
disableWrite();
} else {
// Just in case
enableWrite();
}
}
}
构建会话事件
当在还没有初始化的情况下,收到了一个响应,会进入 readConnectResult()
方法中,主要做以下几个步骤:
- 解析响应
- 构建WatchEvent
- 放入
waitingEvents
这部分负责解析响应,当响应处理完毕,进入 onConnected()
方法。
void readConnectResult() throws IOException {
// 日志部分
...
// 读取数据
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
// 创建ConnectResponse
ConnectResponse conRsp = new ConnectResponse();
// 解析数据,放入ConnectResponse
conRsp.deserialize(bbia, "connect");
// read "is read-only" flag
boolean isRO = false;
try {
isRO = bbia.readBool("readOnly");
} catch (IOException e) {
// this is ok -- just a packet from an old server which
// doesn't contain readOnly field
LOG.warn("Connected to an old server; r-o mode will be unavailable");
}
// 从响应里获取SessionID
this.sessionId = conRsp.getSessionId();
// 调用onConnected逻辑
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
}
在 onConnected()
方法里构建 SyncConnected
事件,并入队等待 EventThread
处理:
void onConnected(
int _negotiatedSessionTimeout,
long _sessionId,
byte[] _sessionPasswd,
boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;
// 会话超时处理
if (negotiatedSessionTimeout <= 0) {
state = States.CLOSED;
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null));
eventThread.queueEventOfDeath();
String warnInfo = String.format(
"Unable to reconnect to ZooKeeper service, session 0x%s has expired",
Long.toHexString(sessionId));
LOG.warn(warnInfo);
throw new SessionExpiredException(warnInfo);
}
// 可读可写客户端 连接进入 只读服务器,异常
if (!readOnly && isRO) {
LOG.error("Read/write client got connected to read-only server");
}
// 设置属性
readTimeout = negotiatedSessionTimeout * 2 / 3;
connectTimeout = negotiatedSessionTimeout / hostProvider.size();
// 更新当前正在使用的服务器地址下标
hostProvider.onConnected();
// 保存sessionID
sessionId = _sessionId;
// session的密码
sessionPasswd = _sessionPasswd;
// 判断连接类型
state = (isRO) ? States.CONNECTEDREADONLY : States.CONNECTED;
seenRwServerBefore |= !isRO;
LOG.info(
"Session establishment complete on server {}, session id = 0x{}, negotiated timeout = {}{}",
clientCnxnSocket.getRemoteSocketAddress(),
Long.toHexString(sessionId),
negotiatedSessionTimeout,
(isRO ? " (READ-ONLY mode)" : ""));
// 创建连接事件
KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
// 放入EventThread,开始处理
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, eventState, null));
}
到此,剩下的就是 EventThread
的事件回调处理了,已经没有 SendThread
啥事了。所以接下来将分析在会话建立完毕之后的一些事情(补之前的坑)。
补充
在分析这个循环体的时候,还有一部分还没有分析,就是在会话建立成功后的事情。这里大概可以分为这样几块:
- 会话建立后,处理认证信息;
- 超时判断
- 会话建立后,构建PING包放入
outgoing queue
中 - 若当前连接是只读连接,那么需要隔一段时间寻找一下可读/可写的服务器。
while (state.isAlive()) { try { // 连接未建立的if块 ... // ①会话建立后的if块:用来处理认证 if (state.isConnected()) { // 如果存在zooKeeperSaslClient,就需要进行Sasl if (zooKeeperSaslClient != null) { boolean sendAuthEvent = false; if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) { try { zooKeeperSaslClient.initialize(ClientCnxn.this); } catch (SaslException e) { LOG.error("SASL authentication with Zookeeper Quorum member failed.", e); state = States.AUTH_FAILED; sendAuthEvent = true; } } KeeperState authState = zooKeeperSaslClient.getKeeperState(); if (authState != null) { if (authState == KeeperState.AuthFailed) { // An authentication error occurred during authentication with the Zookeeper Server. state = States.AUTH_FAILED; sendAuthEvent = true; } else { if (authState == KeeperState.SaslAuthenticated) { sendAuthEvent = true; } } } if (sendAuthEvent) { eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null)); if (state == States.AUTH_FAILED) { eventThread.queueEventOfDeath(); } } } // 计算耗费时间 to = readTimeout - clientCnxnSocket.getIdleRecv(); } else { // 计算耗费时间 to = connectTimeout - clientCnxnSocket.getIdleRecv(); } // ② 超时判断 if (to <= 0) { String warnInfo = String.format( "Client session timed out, have not heard from server in %dms for session id 0x%s", clientCnxnSocket.getIdleRecv(), Long.toHexString(sessionId)); LOG.warn(warnInfo); throw new SessionTimeoutException(warnInfo); } // ③ 会话建立完毕后的if块:准备发送PING包,保持心跳 if (state.isConnected()) { //1000(1 second) is to prevent race condition missing to send the second ping //also make sure not to send too many pings when readTimeout is small int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0); //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) { sendPing(); clientCnxnSocket.updateLastSend(); } else { if (timeToNextPing < to) { to = timeToNextPing; } } } // If we are in read-only mode, seek for read/write server if (state == States.CONNECTEDREADONLY) { long now = Time.currentElapsedTime(); int idlePingRwServer = (int) (now - lastPingRwServer); if (idlePingRwServer >= pingRwTimeout) { lastPingRwServer = now; idlePingRwServer = 0; pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout); pingRwServer(); } to = Math.min(to, pingRwTimeout - idlePingRwServer); } ... } catch (Throwable e) { ... } }
EventThread#run
该部分的逻辑比较简单:
public void run() {
try {
isRunning = true;
while (true) {
// 从队列里去除事件
Object event = waitingEvents.take();
// 判断是否为关闭事件
if (event == eventOfDeath) {
wasKilled = true;
} else {
// 处理普通事件
processEvent(event);
}
// 判断客户端是否被kill
if (wasKilled) {
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId()));
}
总结
客户端的会话创建主要分三个阶段,第一个阶段是初始化阶段,主要初始化客户端的一些配置、属性并开始建立TCP连接;第二个阶段是会话请求发送阶段,这部分主做了两步,一步是准备会话请求包,另一部分是让SendThread循环执行NIO监听,如果可以写了就发送请求包;最后一个阶段是响应处理,循环NIO监听,如果发现可以读取数据了,就读取数据。由于会话请求包永远是在队首,所以一开始一定会是先处理会话请求(通过一个 initialized
状态来判断是否存在会话)
比较重点的是 NIO
的知识点,然后 HostProvider
以及处理包的逻辑~