数据模型

ZooKeeper 的视图结构和标准的 Unix 文件系统非常类似,但没有引入传统文件系统中目录和文件等概念,而是使用了其特有的数据节点概念,我们称之为 ZNode。ZNode 是 ZooKeeper 中数据的最小单元,每个 ZNode 上都可以保存数据,同时还可以挂载子节点,因此构成了一个层次化的命名空间,我们称之为树。
image.png
在 ZooKeeper 中,每一个数据节点都被称为一个 ZNode,所有 ZNode 按层次化结构组织成一棵树。ZNode 的节点路径标识方式和 Unix 文件系统路径非常相似,都是由一系列使用斜杠(/)进行分割的路径表示,我们可以向这个节点中写入数据,也可以在节点下创建子节点。

对于每一个事务请求,ZooKeeper 都会为其分配一个全局唯一的事务 ID,用 ZXID 来表示,通常是一个 64 位的数字。每一个 ZXID 对应一次更新操作,从 ZXID 中可以间接识别出 ZooKeeper 处理这些更新操作请求的全局顺序。

节点特性

1. 节点类型

在 ZooKeeper 中,每个数据节点都是有生命周期的,其生命周期的长短取决于数据节点的节点类型。在 ZooKeeper 中,节点类型可以分为持久节点(PERSISTENT)、临时节点(EPHEMERAL)和顺序节点 (SEQUENTIAL)三大类,具体在节点创建过程中,通过组合使用,可生成以下四种节点类型:

  1. PERSISTENT
  2. PERSISTENT_SEQUENTIAL
  3. EPHEMERAL
  4. EPHEMERAL_SEQUENTIAL

持久节点是 ZooKeeper 中最常见的一种节点类型。所谓持久节点,是指该数据节点被创建后,就会一直存在于 ZooKeeper 服务器上,直到有删除操作来主动清除这个节点。

临时节点的生命周期和客户端的会话绑定在一起,当客户端会话失效时该节点会自动被清理。注意,这里是指客户端会话失效,而非 TCP 连接断开。另外,ZooKeeper 规定了不能基于临时节点来创建子节点,即临时节点只能作为叶子节点。

顺序节点的特性表现在顺序性上,在 ZooKeeper 中,每个父节点都会为它的第一级子节点维护一份顺序,用于记录下每个子节点创建的先后顺序。基于这个顺序特性,在创建子节点的时候可以设置这个标记,那么在创建节点的过程中,ZooKeeper 会自动为给定节点名加上一个数字后缀,作为一个新的、完整的节点名。

2. 节点状态信息

实际上,每个数据节点除了存储了数据内容之外,还存储了数据节点本身的一些状态信息。Stat 类中包含了一个数据节点的所有状态信息。

public class Stat implements Record {
    // 即 Created ZXID,表示该数据节点被创建时的事务 ID
    private long czxid;

    // 即 Modified ZXID,表示该节点最后一次被更新时的事务 ID
    private long mzxid;

    // 即 Created Time,表示节点被创建的时间
    private long ctime;

    // 即 Modified Time,表示该节点最后一次被更新的时间
    private long mtime;

    // 数据节点的版本号
    private int version;

    // 子节点的版本号
    private int cversion;

    // 节点的 ACL 版本号
    private int aversion;

    // 创建该临时节点的会话的 sessionID,如果该节点是持久节点,则该属性为 0
    private long ephemeralOwner;

    // 数据内容长度
    private int dataLength;

    // 当前节点的子节点个数
    private int numChildren;

    // 表示该节点的子节点列表最后一次被修改时的事务 ID,子节点内容修改不影响
    private long pzxid;
}

版本

在 ZooKeeper 中为数据节点引入了版本的概念,每个数据节点都具有三种类型的版本信息,即上面提到的 version、cversion、aversion,对数据节点的任何更新操作都会引起版本号的变化,以此来保证在分布式环境下对数据的原子性操作。注意,ZooKeeper 中的版本强调的是变更次数,即使前后两次变更并没有使得数据内容的值发生变化,版本号依然会变。

实际上,在 ZooKeeper 中,version 属性正是用来实现乐观锁机制中的 “写入校验” 的。下面我们看下
ZooKeeper 的 setData 方法的内部实现。在 ZooKeeper 服务器的 PrepRequestProcessor 处理器类中,在处理每一个数据更新请求(SetDataRequest)前都会进行版本检查。

version = setDataRequest.getVersion();
currentVersion = nodeRecord.stat.getVersion();
// version为-1,说明客户端并不要求使用乐观锁,可以忽略版本比对
if (version != -1 && version != currentVersion) {
    throw new BadVersionException(path);
}
version = currentVersion + 1;

Watcher

ZooKeeper 引入了 Watcher 机制来实现分布式的通知功能。ZooKeeper 允许客户端向服务端注册一个 Watcher 监听,当服务端的一些指定事件触发了这个 Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。Watcher 机制主要包括客户端线程、客户端 WatcherManager 和 ZooKeeper 服务器三部分。整个注册与通知过程如下图所示:
image.png
客户端在向 ZooKeeper 注册 Watcher 的同时,会将 Watcher 对象存储在客户端的 WatcherManager 中。当 ZooKeeper 服务器端触发 Watcher 事件向客户端发送通知时,客户端线程从 WatcherManager 中取出对应 Watcher 对象来执行回调。

public interface ClientWatchManager {
    // 返回一组应该被通知的watcher
    Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path);
}

1. Watcher 接口

Watcher 接口定义了事件通知相关的逻辑,包含 KeeperState 和 EventType 两个枚举类,分别代表了通知状态和事件类型,同时定义了事件的回调方法 process(WatchedEvent event)。

1.1 Watcher 事件

同一个事件类型在不同的通知状态中代表的含义有所不同,下表列举了常见的通知状态和事件类型。

KeeperState EventType 触发条件 说明
SyncConnected None 客户端与服务器成功建立会话 此时客户端和服务器处于连接状态
NodeCreated Watcher 监听的对应数据节点被创建
NodeDeleted Watcher 监听的对应数据节点被删除
NodeDataChanged Watcher 监听的对应数据节点的数据内容发生变更
NodeChildrenChanged Watcher 监听的对应数据节点的子节点列表发生变更,子节点数据内容变化不会触发该事件
Disconnected None 客户端与 ZooKeeper 服务器断开连接 此时客户端和服务器处于断开连接状态
Expired None 会话超时 此时客户端会话失效
AuthFailed None
- 使用错误的 scheme 进行权限检查
- SASL 权限检查失败
通常会收到 AuthFailedException 异常

1.2 process 回调

process 方法是 Watcher 接口中的一个回调方法,当 ZooKeeper 向客户端发送一个 Watcher 事件通知时,客户端就会对相应的 process 方法进行回调,从而实现对事件的处理。process 方法定义如下:

void process(WatchedEvent event);

ZooKeeper 使用 WatchedEvent 对象来封装服务端事件并传递给 Watcher,从而方便回调方法 process 对服务端事件进行处理。其数据结构如下:

public class WatchedEvent {
    final private KeeperState keeperState;
    final private EventType eventType;
    private String path;
}

服务端在生成 WatchedEvent 事件之后,会调用 getWrapper 方法将自己包装成一个可序列化的 WatcherEvent事件,以便通过网络传输到客户端。客户端在接收到服务端的这个事件对象后,首先会将 WatcherEvent 事件还原成一个 WatchedEvent 事件,并传递给 process 方法处理。

public WatcherEvent getWrapper() {
    return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path);
}

可以看到,无论是 WatcherEvent 还是 WatchedEvent,其对 ZooKeeper 服务端事件的封装都是极其简单的。因此,客户端无法直接从该事件中获取到对应数据节点到变化内容,需要客户端再次主动去重新获取数据。

2. 工作机制

2.1 客户端注册 Watcher

在创建一个 ZooKeeper 客户端对象实例时,可以向构造方法中传入一个 Watcher,这个 Watcher 将作为整个 ZooKeeper 会话期间的默认 Watcher,会一直被保存在客户端 ZKWatchManager 的 defaultWatcher 属性。另外,客户端也可以通过 getData、getChildren 和 exist 三个接口来向 ZooKeeper 注册 Watcher,无论使用哪种方式,注册 Watcher 的工作原理都是一致的。

下面,我们以 getData 方法为例,来分析下客户端注册 Watcher 的实现过程:

public byte[] getData(final String path, Watcher watcher, Stat stat)

1)封装 Watcher
客户端首先会对当前客户端请求 request 进行标记,将其设置为 “使用 Watcher 监听”,同时会封装一个 Watcher 的注册信息 WatchRegistration 对象,用于暂时保存数据节点的路径和 Watcher 的对应关系。

WatchRegistration wcb = null;
if (watcher != null) {
    wcb = new DataWatchRegistration(watcher, clientPath);
}
......
// 该值用于服务端判断是否需要进行Watcher注册
request.setWatch(watcher != null);
// 放入发送队列中等待客户端发送
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);

2)发送 Watcher
在 ZooKeeper 中,Packet可以被看作是一个最小的通信协议单元,用于进行客户端与服务端之间的网络传输,任何需要传输的对象都需要包装成一个 Packet 对象。因此,在 ClientCnxn 中的 WatchRegistration 又会被封装到 Packet 中去,然后放入发送队列中等待客户端发送。

public ReplyHeader submitRequest(RequestHeader h, Record request,
                                 Record response, WatchRegistration watchRegistration)
    throws InterruptedException {
    ReplyHeader r = new ReplyHeader();
    Packet packet = queuePacket(h, r, request, response, null, null, null,
                                null, watchRegistration);
    synchronized (packet) {
        if (requestTimeout > 0) {
            // Wait for request completion with timeout
            waitForPacketFinish(r, packet);
        } else {
            // Wait for request completion infinitely
            while (!packet.finished) {
                packet.wait();
            }
        }
    }
    if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
        sendThread.cleanAndNotifyState();
    }
    return r;
}

3)等待响应
随后,ZooKeeper 客户端会向服务端发送这个请求,同时等待请求返回。完成请求发送后,会由客户端 SendThread 线程的 readResponse 方法负责接收来自服务端的响应,finishPacket 方法会从 Packet 中取出对应 Watcher 注册到 ZKWatchManager 中。

protected void finishPacket(Packet p) {
    int err = p.replyHeader.getErr();
    if (p.watchRegistration != null) {
        p.watchRegistration.register(err);
    }
    ......
}

public void register(int rc) {
    if (shouldAddWatch(rc)) {
        Map<String, Set<Watcher>> watches = getWatches(rc);
        synchronized (watches) {
            Set<Watcher> watchers = watches.get(clientPath);
            if (watchers == null) {
                watchers = new HashSet<Watcher>();
                watches.put(clientPath, watchers);
            }
            watchers.add(watcher);
        }
    }
}

在 register 方法中,客户端会将之前暂时保存的 Watcher 对象转交给 ZKWatchManager 并最终保存到 datawatches 中去。ZKWatchManager.datawatches 是一个 Map> 类型的数据结构,用于将数据节点的路径和 Watcher 对象进行映射后管理起来。

4)存在的问题
考虑到每调用一次 getData 方法就注册一个 Watcher,如果客户端注册的所有 Watcher 都被传递到服务端的话,那么服务端肯定会出现内存紧张或其他性能问题了。因此在 ZooKeeper 底层的网络传输序列化过程中,只会将 Packet 对象中的 requestHeaderrequest 两个属性进行序列化,而并没有将 WatchRegistration 序列化到底层字节数组中去。我们看下 Packet 的序列化过程:

public void createBB() {
    try {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
        boa.writeInt(-1, "len");
        if (requestHeader != null) {
            requestHeader.serialize(boa, "header");
        }
        if (request instanceof ConnectRequest) {
            request.serialize(boa, "connect");
            boa.writeBool(readOnly, "readOnly");
        } else if (request != null) {
            request.serialize(boa, "request");
        }
        ......
    } catch (IOException e) {
        LOG.warn("Ignoring unexpected exception", e);
    }
}

2.2 服务端处理 Watcher

1)ServerCnxn 存储
服务端收到来自客户端的请求之后,会在 FinalRequestProcessor 类中的 processRequest() 中判断当前请求是否需要注册 Watcher,当 getWatch() 为 true 时,ZooKeeper 就认为当前客户端请求需要进行 Watcher 注册,于是就会将当前的 ServerCnxn 对象和数据节点路径传入例如 getData 方法中去。

// 假设当前请求是getData请求,则会被路由到该分支
case OpCode.getData: {
    GetDataRequest getDataRequest = new GetDataRequest();
    // 反序列化request
    ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
    DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
    ......
    Stat stat = new Stat();
    byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                                           getDataRequest.getWatch() ? cnxn : null);
    rsp = new GetDataResponse(b, stat);
    break;
}

ServerCnxn 是一个 ZooKeeper 客户端和服务器之间的连接接口,代表了一个客户端和服务器的连接。这个类实现了 Watcher 接口。数据节点路径和 ServerCnxn 会被存储在 DataTree 的 dataWatches 中。

public class DataTree {
    private final WatchManager dataWatches = new WatchManager();
    ......
}

dataWatches 是 WatchManager 类的一个实例,最终数据会保存在 WatchManager 的 watchTablewatch2Paths 中。WatchManager 是 ZooKeeper 服务端 Watcher 的管理者,负责事件的触发,并移除那些已经被触发的 Watcher。

public class WatchManager {
    // 从数据节点的粒度来托管Watcher
    private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap<>();
    // 从Watcher的粒度来控制事件触发需要触发的数据节点
    private final HashMap<Watcher, HashSet<String>> watch2Paths = new HashMap<>();
}

2)Watcher 触发
比如 NodeDataChanged 事件的触发条件是 “Watcher 监听的节点的数据内容发生变更”,具体实现如下:

public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException {
    Stat s = new Stat();
    DataNode n = nodes.get(path);
    byte lastdata[] = null;
    synchronized (n) {
        lastdata = n.data;
        n.data = data;
        n.stat.setMtime(time);
        n.stat.setMzxid(zxid);
        n.stat.setVersion(version);
        n.copyStat(s);
    }
    ......
    dataWatches.triggerWatch(path, EventType.NodeDataChanged);
    return s;
}

在对指定节点进行数据更新后,通过调用 WatchManager 的 triggerWatch 方法来触发相关事件。

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
    HashSet<Watcher> watchers;
    synchronized (this) {
        watchers = watchTable.remove(path);
        if (watchers == null || watchers.isEmpty()) {
            return null;
        }
        for (Watcher w : watchers) {
            HashSet<String> paths = watch2Paths.get(w);
            if (paths != null) {
                paths.remove(path);
            }
        }
    }
    for (Watcher w : watchers) {
        if (supress != null && supress.contains(w)) {
            continue;
        }
        w.process(e);
    }
    return watchers;
}
  • 首先将通知状态、事件类型以及节点路径封装成一个 WatchedEvent 对象。


  • 根据节点路径从 watchTable 中取出对应的 Watcher。如果没有找到 Watcher,说明没有任何客户端在该节点上注册过 Watcher,直接返回。如果找到了会将其提取出来,并从 watchTable 和 watch2Paths 中将其删除——从这里我们可以看出,Watcher 在服务端是一次性的。


  • 调用 process 方法来触发 Watcher,这里调用的 process 实际上就是 ServerCnxn 的对应方法。


  • 该 process 内部将 WatchedEvent 包装成 WatcherEvent,同时向客户端发送该通知。
synchronized public void process(WatchedEvent event) {
    // 标记XID为-1,代表这是一个通知类型的响应
    ReplyHeader h = new ReplyHeader(-1, -1L, 0);
    WatcherEvent e = event.getWrapper();
    sendResponse(h, e, "notification");
}

2.3 客户端回调 Watcher

1)SendThread 接收事件通知
对于一个来自服务端的响应,客户端都是由 SendThread 的 readResponse() 方法来统一进行处理的。

void readResponse(ByteBuffer incomingBuffer) throws IOException {
    ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    ReplyHeader replyHdr = new ReplyHeader();
    // 反序列化
    replyHdr.deserialize(bbia, "header");

    if (replyHdr.getXid() == -1) {
        WatcherEvent event = new WatcherEvent();
        event.deserialize(bbia, "response");

        if (chrootPath != null) {
            String serverPath = event.getPath();
            if(serverPath.compareTo(chrootPath)==0)
                event.setPath("/");
            else if (serverPath.length() > chrootPath.length())
                event.setPath(serverPath.substring(chrootPath.length()));
        }

        WatchedEvent we = new WatchedEvent(event);
        eventThread.queueEvent(we);
        return;
    }
    ......
}

如果响应头 replyHdr 中标识了 XID 为 -1,表明这是一个通知类型的响应,对其的处理大体分四个步骤:

  • 反序列化为 WatcherEvent 对象。
  • 处理 chrootPath,如果客户端设置了 chrootPath 属性,那么需要对服务端传过来的完整的节点路径进行 chrootPath 处理,生成客户端的一个相对节点路径。
  • 还原 WatchedEvent。
  • 回调 Watcher,将 WatchedEvent 对象交给 EventThread 线程,在下一个轮询周期中进行回调。

2)EventThread 处理事件通知
服务端的 Watcher 事件通知,最终交给了 EventThread 线程来处理。EventThread 线程是 ZooKeeper 客户端中专门用来处理服务端通知事件的线程,SendThread 接收到服务端的事件通知后,会调用 EventThread 的 queueEvent 方法来将事件传给 EventThread 线程,其逻辑如下:

public void queueEvent(WatchedEvent event) {
    if (event.getType() == EventType.None && sessionState == event.getState()) {
        return;
    }
    sessionState = event.getState();
    WatcherSetEventPair pair = new WatcherSetEventPair(
        watcher.materialize(event.getState(), event.getType(),
                            event.getPath()), event);
    waitingEvents.add(pair);
}
  • materialize 方法用于从 ZKWatchManager 中取出所有相关类型的 Watcher。
  • 获取到所有相关 Watcher 后,会将其放入 waitingEvents 队列中去,EventThread 的 run 方法会不断对该队列进行处理。

3. 特性

  1. 一次性:无论是服务端还是客户端,一旦一个 Watcher 被触发,ZooKeeper 都会将其从相应的存储中移除,这样的设计有效地减轻了服务端的压力。

  2. 客户端串行执行:客户端 Watcher 回调的过程是一个串行同步的过程,因此要注意不要因为一个 Watcher 的处理逻辑影响了整个客户端的回调。

  3. 轻量:Watcher 通知非常简单,只会告诉客户端发生了事件,而不会说明事件的具体内容。