Zookeeper引入了Watcher机制来实现了分布式数据的发布和订阅功能,使得多个订阅者可以监听某一个主题对象,当主题对象自身状态发生改变时,就会通知所有订阅者。本质上来说,就是分布式环境的观察者模式实现。
一、注册监听
1.Zookeeper构造函数注册监听
2.通过getChildren()、getData()、exist()注册监听
可以出传入自定义Watcher,也可以使用默认watcher
public byte[] getData(final String path, Watcher watcher, Stat stat)public byte[] getData(String path, boolean watch, Stat stat);public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx);public void getData(String path, boolean watch, DataCallback cb, Object ctx)public List<String> getChildren(String path, boolean watch);public void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx);public void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx);public List<String> getChildren(final String path, Watcher watcher,Stat stat);public List<String> getChildren(String path,boolean watch,Stat stat);public void getChildren(final String path, Watcher watcher, Children2Callback cb, Object ctx);public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx);public Stat exists(String path, boolean watch);public Stat exists(final String path, Watcher watcher);public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx);public void exists(String path, boolean watch, StatCallback cb, Object ctx);
二、触发监听
触发监听分两种情况:
1.构造器绑定的默认监听器会在连接成功后触发。通常利用该方式来保证client端在操作节点时zookeeper成功接。
2.通过exists()/getData()/getChildren()绑定的事件,会监听到相应节点的变化事件,即setData()/delete()/create()操作。
三、事件类型
四、Watcher触发机制
每次注册绑定的Watcher都只会触发一次,而不是一直存在。如果Watcher一直存在,某些节点更新非常频繁时,服务端就会不停的通知客户端,使得服务端的压力非常大。对于需要一直保持监听的节点,只需要嵌套注册监听器。
五、Client端和Server端管理Watcher
1.Client端
ZKWatchManager,通过HashMap对Watcher的维护,map的key是节点路径,value是注册在这个节点上的所有Watcher。
private final Map<String, Set<Watcher>> dataWatches =new HashMap<String, Set<Watcher>>();private final Map<String, Set<Watcher>> existWatches =new HashMap<String, Set<Watcher>>();private final Map<String, Set<Watcher>> childWatches =new HashMap<String, Set<Watcher>>();private final Map<String, Set<Watcher>> persistentWatches =new HashMap<String, Set<Watcher>>();private final Map<String, Set<Watcher>> persistentRecursiveWatches =new HashMap<String, Set<Watcher>>();
2.Server端
WatchManager对Watcher进行管理,这里的Watcher不是用户定义的Watcher而是ServerCnxn,代表一个client到server的连接。
private final Map<String, Set<Watcher>> watchTable = new HashMap<>();
private final Map<Watcher, Set<String>> watch2Paths = new HashMap<>();
watchTable定义了节点Node和Client连接的映射关系,一个节点对应到一组注册到此节点上的client,这样节点发生变化就可以通过这样的映射找到所有应该通过的client
watch2Paths和watchTable是相反的数据结构,是一个client连接和节点的映射关系,主要作用是:一个连接关闭的时候,需要将这个连接相关的所有Watcher都移除。
六、Watcher工作原理
1.客户端注册Watcher
1.1初始化客户端绑定Watcher
构造方法入口:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
throws IOException {
this(connectString, sessionTimeout, watcher, false);
}
执行构造方法:
public ZooKeeper(
String connectString,
int sessionTimeout,
Watcher watcher,
boolean canBeReadOnly,
HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
LOG.info(
"Initiating client connection, connectString={} sessionTimeout={} watcher={}",
connectString,
sessionTimeout,
watcher);
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
//实例化watchManager为ZKWatchManager类型
watchManager = defaultWatchManager();
//设置为默认的watcher
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
hostProvider = aHostProvider;
//创建client到server的连接,ClientCnxn类型。
cnxn = createConnection(
connectStringParser.getChrootPath(),
hostProvider,
sessionTimeout,
this,
watchManager,
//初始化客户端连接对象,默认为ClientCnxnSocketNIO类型
getClientCnxnSocket(),
canBeReadOnly);
//启动
cnxn.start();
}
public void start() {
//发送线程启动
sendThread.start();
//事件线程启动
eventThread.start();
}
构造参数详解及默认值:
connectString|String:连接地址,集群环境用逗号隔开,host:port。
sessionTimeout|int:会话超时时间
watcher|Watcher:事件通知处理器。
canBeReadOnly|boolean:设置当前会话是否支持只读模式,zookeeper集群发生故障时,仍能提供读服务。默认为false。
aHostProvider|HostProvider:host提供器
private static HostProvider createDefaultHostProvider(String connectString) {
return new StaticHostProvider(
new ConnectStringParser(connectString).getServerAddresses());
}
clientConfig|ZKClientConfig:客户端配置类
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
1.2exists/getData/getChildren绑定Watcher以及发送请求
通过构造器注册的监听器,在连接成功出发后就移除了。以getData为例分析:
