Watch 特性初体验

初体验下 Watch 特性。

启动一个空集群,更新两次 key hello 后,使用 Watch 特性如何获取 key hello 的历史修改记录呢?

  • watch 命令, 带版本号监听 key hello
  • 集群版本号可通过 endpoint status 命令获取,空集群启动后的版本号为 1。
  1. $ etcdctl put hello world1
  2. $ etcdctl put hello world2
  3. $ etcdctl watch hello -w=json --rev=1
  4. {
  5. "Events":[
  6. {
  7. "kv":{
  8. "key":"aGVsbG8=",
  9. "create_revision":2,
  10. "mod_revision":2,
  11. "version":1,
  12. "value":"d29ybGQx"
  13. }
  14. },
  15. {
  16. "kv":{
  17. "key":"aGVsbG8=",
  18. "create_revision":2,
  19. "mod_revision":3,
  20. "version":2,
  21. "value":"d29ybGQy"
  22. }
  23. }
  24. ],
  25. "CompactRevision":0,
  26. "Canceled":false,
  27. "Created":false
  28. }

四大核心问题

  1. client 获取事件的机制,etcd 是使用轮询模式还是推送模式呢?两者各有什么优缺点?
  2. 事件是如何存储的? 会保留多久?watch 命令中的版本号具有什么作用?
  3. 当 client 和 server 端出现短暂网络波动等异常因素后,导致事件堆积时,server 端会丢弃事件吗?若你监听的历史版本号 server 端不存在了,你的代码该如何处理?
  4. 如果你创建了上万个 watcher 监听 key 变化,当 server 端收到一个写请求后,etcd 是如何根据变化的 key 快速找到监听它的 watcher 呢?

轮询 vs 流式推送

两种机制 etcd 都使用过。

etcd v2 Watch 机制实现中,使用的是 HTTP/1.x 协议,实现简单、兼容性好,每个 watcher 对应一个 TCP 连接。client 通过 HTTP/1.1 协议长连接定时轮询 server,获取最新的数据变化事件。

在 etcd v3 中,为了解决 etcd v2 的以上缺陷,使用的是基于 HTTP/2 的 gRPC 协议,双向流的 Watch API 设计,实现了连接多路复用。

HTTP/2 协议为什么能实现多路复用呢?

image.png

在 HTTP/2 协议中,HTTP 消息被分解独立的帧(Frame),交错发送,帧是最小的数据单位。每个帧会标识属于哪个流(Stream),流由多个数据帧组成,每个流拥有一个唯一的 ID,一个数据流对应一个请求或响应包

  • 一个连接上有并行的三个数据流,HTTP/2 可基于帧的流 ID 将并行、交错发送的帧重新组装成完整的消息

etcd 基于以上介绍的 HTTP/2 协议的多路复用等机制,实现了一个 client/TCP 连接支持多 gRPC Stream, 一个 gRPC Stream 又支持多个 watcher,如下图所示。同时事件通知模式也从 client 轮询优化成 server 流式推送,极大降低了 server 端 socket、内存等资源。

image.png

在 clientv3 库中,Watch 特性被抽象成 Watch、Close、RequestProgress 三个简单 API 提供给开发者使用

滑动窗口 vs MVCC

第二个问题的本质是历史版本存储,etcd 经历了从滑动窗口到 MVCC 机制的演变,滑动窗口是仅保存有限的最近历史版本到内存中,而 MVCC 机制则将历史版本保存在磁盘中,避免了历史版本的丢失,极大的提升了 Watch 机制的可靠性。

etcd v2 滑动窗口是如何实现的?它有什么缺点呢?

  • 环形数组
  • 若超过 eventQueue 的容量,则淘汰最旧的事件。在 etcd v2 中,eventQueue 的容量是固定的 1000
  1. type EventHistory struct {
  2. Queue eventQueue
  3. StartIndex uint64
  4. LastIndex uint64
  5. rwl sync.RWMutex
  6. }

相比 etcd v2 直接保存事件到内存的环形数组中,etcd v3 则是将一个 key 的历史修改版本保存在 boltdb 里面。boltdb 是一个基于磁盘文件的持久化存储,因此它重启后历史事件不像 etcd v2 一样会丢失,同时你可通过配置压缩策略,来控制保存的历史版本数

watch 命令中的版本号具有什么作用呢?

  • 通过版本号,它就可从 server 端的 boltdb 中获取错过的历史事件,而无需全量同步,它是 etcd Watch 机制数据增量同步的核心。

可靠的事件推送机制

第三个问题的本质是可靠事件推送机制,要搞懂它,我们就得弄懂 etcd Watch 特性的整体架构、核心流程,下图是 Watch 特性整体架构图。

image.png

watch 请求流程:

  1. 当你通过 etcdctl 或 API 发起一个 watch key 请求的时候,etcd 的 gRPCWatchServer 收到 watch 请求后,会创建一个 serverWatchStream, 它负责接收 client 的 gRPC Stream 的 create/cancel watcher 请求 (recvLoop goroutine),并将从 MVCC 模块接收的 Watch 事件转发给 client(sendLoop goroutine)。
  2. 当 serverWatchStream 收到 create watcher 请求后,serverWatchStream 会调用 MVCC 模块的 WatchStream 子模块分配一个 watcher id,并将 watcher 注册到 MVCC 的 WatchableKV 模块。
  3. 在 etcd 启动的时候,WatchableKV 模块会运行 syncWatchersLoop 和 syncVictimsLoop goroutine,分别负责不同场景下的事件推送,它们也是 Watch 特性可靠性的核心之一。

etcd 根据不同场景,对问题进行了分解,将 watcher 按场景分类,实现了轻重分离、低耦合。我首先给你介绍下 synced watcher、unsynced watcher 它们各自的含义。

synced watcher:

  • 表示此类 watcher 监听的数据都已经同步完毕,在等待新的变更。
  • 如果你创建的 watcher 未指定版本号 (默认 0)、或指定的版本号大于 etcd sever 当前最新的版本号 (currentRev),那么它就会保存到 synced watcherGroup 中。
  • watcherGroup 负责管理多个 watcher,能够根据 key 快速找到监听该 key 的一个或多个 watcher。

unsynced watcher:

  • 表示此类 watcher 监听的数据还未同步完成,落后于当前最新数据变更,正在努力追赶。
  • 如果你创建的 watcher 指定版本号小于 etcd server 当前最新版本号,那么它就会保存到 unsynced watcherGroup 中。比如我们的这个案例中 watch 带指定版本号 1 监听时,版本号 1 和 etcd server 当前版本之间的数据并未同步给你,因此它就属于此类。

将可靠的事件推送机制拆分成最新事件推送、异常场景重试、历史事件推送机制三个子问题来进行分析。

最新事件推送机制

当 etcd 收到一个写请求,key-value 发生变化的时候,处于 syncedGroup 中的 watcher,是如何获取到最新变化事件并推送给 client 的呢?

image.png

  1. 当你创建完成 watcher 后,此时你执行 put hello 修改操作时,如上图所示,请求经过 KVServer、Raft 模块后 Apply 到状态机时,在 MVCC 的 put 事务中,它会将本次修改的后的 mvccpb.KeyValue 保存到一个 changes 数组中。
  2. 在 put 事务结束时,如下面的精简代码所示,它会将 KeyValue 转换成 Event 事件,然后回调 watchableStore.notify 函数(流程 5)。notify 会匹配出监听过此 key 并处于 synced watcherGroup 中的 watcher,同时事件中的版本号要大于等于 watcher 监听的最小版本号,才能将事件发送到此 watcher 的事件 channel 中。
  3. serverWatchStream 的 sendLoop goroutine 监听到 channel 消息后,读出消息立即推送给 client(流程 6 和 7),至此,完成一个最新修改事件推送。
evs := make([]mvccpb.Event, len(changes))
for i, change := range changes {
   evs[i].Kv = &changes[i]
   if change.CreateRevision == 0 {
      evs[i].Type = mvccpb.DELETE
      evs[i].Kv.ModRevision = rev
   } else {
      evs[i].Type = mvccpb.PUT
   }
}
tw.s.notify(rev, evs)

注意接收 Watch 事件 channel 的 buffer 容量默认 1024(etcd v3.4.9)。若 client 与 server 端因网络波动、高负载等原因导致推送缓慢,buffer 满了,事件会丢失吗?这就是第二个子问题,异常场景的重试机制。

异常场景重试机制

若出现 channel buffer 满了,etcd 为了保证 Watch 事件的高可靠性,并不会丢弃它,而是将此 watcher 从 synced watcherGroup 中删除,然后将此 watcher 和事件列表保存到一个名为受害者 victim 的 watcherBatch 结构中,通过异步机制重试保证事件的可靠性。

  • syncVictimsLoop,正是它负责 slower watcher 的堆积的事件推送
  • 它的基本工作原理是,遍历 victim watcherBatch 数据结构,尝试将堆积的事件再次推送到 watcher 的接收 channel 中。若推送失败,则再次加入到 victim watcherBatch 数据结构中等待下次重试。
  • 若推送成功,watcher 监听的最小版本号 (minRev) 小于等于 server 当前版本号 (currentRev),说明可能还有历史事件未推送,需加入到 unsynced watcherGroup 中,由下面介绍的历史事件推送机制,推送 minRev 到 currentRev 之间的事件。
  • 若 watcher 的最小版本号大于 server 当前版本号,则加入到 synced watcher 集合中,进入上面介绍的最新事件通知机制。

各类 watcher 状态转换关系:

image.png

历史事件推送机制

WatchableKV 模块的另一个 goroutine,syncWatchersLoop,正是负责 unsynced watcherGroup 中的 watcher 历史事件推送。

在历史事件推送机制中,如果你监听老的版本号已经被 etcd 压缩了,client 该如何处理?

  • syncWatchersLoop,它会遍历处于 unsynced watcherGroup 中的每个 watcher,为了优化性能,它会选择一批 unsynced watcher 批量同步,找出这一批 unsynced watcher 中监听的最小版本号。
  • 因 boltdb 的 key 是按版本号存储的,因此可通过指定查询的 key 范围的最小版本号作为开始区间,当前 server 最大版本号作为结束区间,遍历 boltdb 获得所有历史数据。
  • 然后将 KeyValue 结构转换成事件,匹配出监听过事件中 key 的 watcher 后,将事件发送给对应的 watcher 事件接收 channel 即可。发送完成后,watcher 从 unsynced watcherGroup 中移除、添加到 synced watcherGroup 中,如下面的 watcher 状态转换图黑色虚线框所示。

image.png

若 watcher 监听的版本号已经小于当前 etcd server 压缩的版本号,历史变更数据就可能已丢失,因此 etcd server 会返回 ErrCompacted 错误给 client。client 收到此错误后,需重新获取数据最新版本号后,再次 Watch。你在业务开发过程中,使用 Watch API 最常见的一个错误之一就是未处理此错误。

高效的事件匹配

那使用什么数据结构来快速查找哪些 watcher 监听了一个事件中的 key 呢?

  • 也许你会说使用 map 记录下哪些 watcher 监听了什么 key 不就可以了吗? etcd 的确使用 map 记录了监听单个 key 的 watcher,
  • 但是你要注意的是 Watch 特性不仅仅可以监听单 key,它还可以指定监听 key 范围、key 前缀,因此 etcd 还使用了如下的区间树

image.png

当收到创建 watcher 请求的时候,它会把 watcher 监听的 key 范围插入到上面的区间树中,区间的值保存了监听同样 key 范围的 watcher 集合 /watcherSet

  • 当产生一个事件时,etcd 首先需要从 map 查找是否有 watcher 监听了单 key,其次它还需要从区间树找出与此 key 相交的所有区间,然后从区间的值获取监听的 watcher 集合。(相当于有两个映射: 1. watcher -> keys 2. key -> watchers)
  • 区间树支持快速查找一个 key 是否在某个区间内,时间复杂度 O(LogN),因此 etcd 基于 map 和区间树实现了 watcher 与事件快速匹配,具备良好的扩展性。