zkCli.sh
当客户端调用 zkCli.sh 脚本的时候,我们发现脚本中有这么一段话
"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \-cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS \org.apache.zookeeper.ZooKeeperMain "$@"
我们可以看到,其实调用 zkCli.sh 的时候,就是执行了 JAVA **org.apache.zookeeper.ZooKeeperMain** 这段逻辑
NIO 中的几个概念
Channel(通道),Buffer(缓冲区),Selector(选择器)
Channel
可以将 NIO 中的 Channel 同传统 IO 中的 Stream 来类比,但是需要注意的是,传统 IO 中,Stream 是单向的,比如 InputSteam 只能进行读取操作,OutputStream 只能进行写操作。而 Channel 是双向的,既可以用来进行读操作,也可以用来进行写操作。
Buffer
在 NIO 中所有的数据的读写都离不开 Buffer,读取的数据只能放在 buffer 中,写数据也放在 buffer 中
Selector
将 Channel 和 Selector 配合使用,必须将 channel 注册到 selector 上,通过 SelectableChannel.register() 方法来实现
channel.configureBlocking(false);SelectionKey key = channel.register(selector, Selectionkey.OP_READ);
ZooKeeperMain - 客户端启动类
- ZooKeeperMain 接收客户端信息
- ZooKeeperMain 将客户端信息转化为 Request
ZooKeeper
ZooKeeper 调用 ClientCnxn.submitRequest() 方法将 Request 包装成 Packet 并添加到 outgoingQueue 队列中
ClientCnxn
SendThread
- 连接到服务端并且进行重试
- 发送 ping
- doIO()
发送数据
- 从 outgoingQueue 中取出数据并发送给服务端
- 将需要等待结果的 Packet 加入到 pendingQueue 中
读取数据
- 接收 Watcher 事件通知,并且把通知加入到 waitingEvents 队列中去
- 将 pendingQueue 中的 Packet 读取出来并且使用服务端返回的结果进行装配
- 如果是同步请求则唤醒线程
- 如果是异步请求则将 Packet 加入到 waitignEvents 队列中
EventThread
- 从 waitingEvents 队列中获取数据
- 如果是 watcher 事件通知,发出绑定的 watcher 逻辑
- 如果是异步请求,则调用对应的异步回调函数
QuorumPeerMain - 服务端启动类
- 解析配置
- 根据配置进行单机或集群模式的启动(判断条件就是配置文件中的 servers 的数量)
服务端接收请求:
- 创建事务日志
- 创建快照、Database、写入文件
- 更新内存,操作 DataTree
- 返回错误或正确的信息
QuorumPeerConfig
配置类
ZooKeeperServerMain - 单机模式启动类
- 初始化 ZooKeeperServer
- 初始化 FileTxnSnapLog
- 初始化 NIOServerCnxnFactory
- 启动 NIOServerCnxnFactory
- 启动 ZooKeeperServer
ServerConfig
单机模式下的配置类,配置属性比集群模式下要少一点
ZooKeeperServer
- 初始化 ZKDatabase
- 初始化 DataTree
- 从 SnapShot 还原 DataTree
- 开始 Session 检查器
- 设置请求处理器 RequestProcessor
FileTxnSnapLog
事务日志和快照持久化工具类
- TxnLog 事务日志
- SnapShot 快照日志
ServerCnxnFactory
服务器上下文工厂类,负责创建服务器上下文,默认为 NIOServerCnxnFactory,可配
- 开启 ServerSocketChannel
NIOServerCnxnFactory
一个线程类
- 启动 ZooKeeperThread,这个类开启的其实就是 NIOServerCnxnFactory 自己
- 接收客户端的连接事件,就初始化出来一个 NIOServerCnxn
- 接收数据或写出数据,NIOServerCnxn.doIO
读数据
- 读取的是 ConnectRequest,服务端新建一个 session,并生成一个新的 sessionid,并生成一个 Request 调用 submitRequest 方法
- 读取的正常操作请求,也会生成一个 Request 调用 submitRequest 方法
提交请求 submitRequest 方法
RequestProcessor
单机模式下:
- firstProcessor = PrepRequestProcessor(线程)
- PrepRequestProcessor.next = SyncRequestProcessor(线程)
- SyncRequestProcessor,next = FinalRequestProcessor
PrepRequestProcessor - 接收客户端请求生成 txn 事务以及节点修改记录
- processRequest 方法将 Request 添加到 submittedRequests 队列中
- 线程不停的从 submittedRequests 队列中获取请求
- 根据请求类型进入不同的处理,我们以 create 为例
- 获取父节点信息
- 校验 ACL
- 临时节点与顺序节点逻辑
- 生成 txn 事务
- 生成父节点修改记录
- 生成新增节点修改记录
- 将修改记录加入到 outstandingChanges 队列中
- 调用 nextProecessor.processRequest(request)
SyncRequestProcessor
- processRequest 方法将 Request 添加到 queuedRequests 队列中
- 负责从 queuedRequests 队列中获取 Request
- 负责将 txn 同步到磁盘,并且进行快照
- 如果同步完成了就会调用 nextProcessor.processRequest(si)
FinalRequestProcessor
- 从 outstandingChanges 队列中获取 Request
- 更新 DataTree
- 触发 Watcher
- 构造 Response
- 通过 NIOServerCnxn 中的 sendResponse 把 Response 转化成 ByteBuffer 返回给客户端
只读模式
当服务器集群一半以上的节点挂掉以后,不能进行写操作,但是可以进行读操作,这时可以开启只读模式
- 服务器开启只读模式:-Dreadonlymode.enable = true
- 客户端调用开启只读模式:zkServer.sh start -r 或者 new ZooKeeper(…, canBeReadOnly = true)
- 以上两步都开启了才是支持只读模式
总结
命令 -> Request -> Packet -> Outgoingqueue
SendThread 线程while(true) {1、如果 socket 没有连接,就去连接2、如果 socket 连接成功了,客户端就会发送一个 ConnectRequest,接收服务器端返回的 ConnectResquest(Event.none)3、从 Outgoingqueue 取 Packet,通过 socket 发送出去,同时,如果说需要等待结果的,就去 pendingqueue 放入等待结果的 Packet 去等待结果}
