Zookeeper引入了Watcher机制来实现了分布式数据的发布和订阅功能,使得多个订阅者可以监听某一个主题对象,当主题对象自身状态发生改变时,就会通知所有订阅者。本质上来说,就是分布式环境的观察者模式实现。

一、注册监听

Zookeeper通过以下几个API来注册监听。

1.Zookeeper构造函数注册监听

image.png

2.通过getChildren()、getData()、exist()注册监听

可以出传入自定义Watcher,也可以使用默认watcher

  1. public byte[] getData(final String path, Watcher watcher, Stat stat)
  2. public byte[] getData(String path, boolean watch, Stat stat);
  3. public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx);
  4. public void getData(String path, boolean watch, DataCallback cb, Object ctx)
  5. public List<String> getChildren(String path, boolean watch);
  6. public void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx);
  7. public void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx);
  8. public List<String> getChildren(final String path, Watcher watcher,Stat stat);
  9. public List<String> getChildren(String path,boolean watch,Stat stat);
  10. public void getChildren(final String path, Watcher watcher, Children2Callback cb, Object ctx);
  11. public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx);
  12. public Stat exists(String path, boolean watch);
  13. public Stat exists(final String path, Watcher watcher);
  14. public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx);
  15. public void exists(String path, boolean watch, StatCallback cb, Object ctx);

二、触发监听

触发监听分两种情况:
1.构造器绑定的默认监听器会在连接成功后触发。通常利用该方式来保证client端在操作节点时zookeeper成功接。
2.通过exists()/getData()/getChildren()绑定的事件,会监听到相应节点的变化事件,即setData()/delete()/create()操作。

三、事件类型

事件类型

四、Watcher触发机制

每次注册绑定的Watcher都只会触发一次,而不是一直存在。如果Watcher一直存在,某些节点更新非常频繁时,服务端就会不停的通知客户端,使得服务端的压力非常大。对于需要一直保持监听的节点,只需要嵌套注册监听器。

五、Client端和Server端管理Watcher

1.Client端

ZKWatchManager,通过HashMap对Watcher的维护,map的key是节点路径,value是注册在这个节点上的所有Watcher。

  1. private final Map<String, Set<Watcher>> dataWatches =
  2. new HashMap<String, Set<Watcher>>();
  3. private final Map<String, Set<Watcher>> existWatches =
  4. new HashMap<String, Set<Watcher>>();
  5. private final Map<String, Set<Watcher>> childWatches =
  6. new HashMap<String, Set<Watcher>>();
  7. private final Map<String, Set<Watcher>> persistentWatches =
  8. new HashMap<String, Set<Watcher>>();
  9. private final Map<String, Set<Watcher>> persistentRecursiveWatches =
  10. new HashMap<String, Set<Watcher>>();

2.Server端

WatchManager对Watcher进行管理,这里的Watcher不是用户定义的Watcher而是ServerCnxn,代表一个client到server的连接。

private final Map<String, Set<Watcher>> watchTable = new HashMap<>();

private final Map<Watcher, Set<String>> watch2Paths = new HashMap<>();

watchTable定义了节点Node和Client连接的映射关系,一个节点对应到一组注册到此节点上的client,这样节点发生变化就可以通过这样的映射找到所有应该通过的client
watch2Paths和watchTable是相反的数据结构,是一个client连接和节点的映射关系,主要作用是:一个连接关闭的时候,需要将这个连接相关的所有Watcher都移除。

六、Watcher工作原理


Watcher机制 - 图2

1.客户端注册Watcher

1.1初始化客户端绑定Watcher

构造方法入口:

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) 
    throws IOException {
    this(connectString, sessionTimeout, watcher, false);
}

执行构造方法:

public ZooKeeper(
    String connectString,
    int sessionTimeout,
    Watcher watcher,
    boolean canBeReadOnly,
    HostProvider aHostProvider,
    ZKClientConfig clientConfig) throws IOException {
    LOG.info(
        "Initiating client connection, connectString={} sessionTimeout={} watcher={}",
        connectString,
        sessionTimeout,
        watcher);

    if (clientConfig == null) {
        clientConfig = new ZKClientConfig();
    }
    this.clientConfig = clientConfig;
    //实例化watchManager为ZKWatchManager类型
    watchManager = defaultWatchManager();
    //设置为默认的watcher
    watchManager.defaultWatcher = watcher;
    ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
    hostProvider = aHostProvider;
    //创建client到server的连接,ClientCnxn类型。
    cnxn = createConnection(
        connectStringParser.getChrootPath(),
        hostProvider,
        sessionTimeout,
        this,
        watchManager,
        //初始化客户端连接对象,默认为ClientCnxnSocketNIO类型
        getClientCnxnSocket(),
        canBeReadOnly);
    //启动
    cnxn.start();
}

public void start() {
    //发送线程启动
    sendThread.start();
    //事件线程启动
    eventThread.start();
}

构造参数详解及默认值:
connectString|String:连接地址,集群环境用逗号隔开,host:port。
sessionTimeout|int:会话超时时间
watcher|Watcher:事件通知处理器。
canBeReadOnly|boolean:设置当前会话是否支持只读模式,zookeeper集群发生故障时,仍能提供读服务。默认为false。
aHostProvider|HostProvider:host提供器

private static HostProvider createDefaultHostProvider(String connectString) {
    return new StaticHostProvider(
        new ConnectStringParser(connectString).getServerAddresses());
}

clientConfig|ZKClientConfig:客户端配置类

if (clientConfig == null) {
    clientConfig = new ZKClientConfig();
}

1.2exists/getData/getChildren绑定Watcher以及发送请求

通过构造器注册的监听器,在连接成功出发后就移除了。以getData为例分析: