Overview

Lease 是 Etcd 实现探活机制的核心,通过定时的续租或者保持连接,就可以让 Etcd 知道客户端还存活,相当于定期发送一个心跳。

  1. service Lease {
  2. rpc LeaseGrant(LeaseGrantRequest) returns (LeaseGrantResponse)
  3. rpc LeaseRevoke(LeaseRevokeRequest) returns (LeaseRevokeResponse)
  4. rpc LeaseKeepAlive(stream LeaseKeepAliveRequest) returns (stream LeaseKeepAliveResponse)
  5. rpc LeaseTimeToLive(LeaseTimeToLiveRequest) returns (LeaseTimeToLiveResponse)
  6. rpc LeaseLeases(LeaseLeasesRequest) returns (LeaseLeasesResponse)
  7. }

etcd-lessor-Overview.drawio.png
图 1:Overview

由于 Etcd 是分布式存储,每一个 Etcd 都有一个 Lessor,所以 PromoteDemote 用于给 Lessor 升级为 Primary 和降级,只有是 Primary 的 Lessor 才能进行续租等操作。

主要的结构体和方法图 1 所示,每一个租约在服务器的外部通过 LeaseID 表示,LeaseID 实际上只是 int64 类型的别名。Lease 最核心的字段是过期时间,etcd 只支持秒级的租约。

  1. type Lease struct {
  2. ID LeaseID
  3. ttl int64 // time to live of the lease in seconds
  4. remainingTTL int64 // remaining time to live in seconds, if zero valued it is considered unset and the full ttl should be used
  5. // expiryMu protects concurrent accesses to expiry
  6. expiryMu sync.RWMutex
  7. // expiry is time when lease should expire. no expiration when expiry.IsZero() is true
  8. expiry time.Time
  9. // mu protects concurrent accesses to itemSet
  10. mu sync.RWMutex
  11. itemSet map[LeaseItem]struct{}
  12. revokec chan struct{}
  13. }
  14. type LeaseItem struct {
  15. Key string
  16. }

Expired

控制租约过期的核心结构是 LeaseExpiredNotifier,它的结构如图 2 所示,这里的 LeaseQueue 是一个根据过期时间排序的小根堆。
etcd-lessor-LeaseExpiredNotifier.drawio.png
图 2:LeaseExpiredNotifier

过期的逻辑通过 runloop 函数的 Goroutine 实现,每 500 ms 会判断一次,原理就是获取 LeaseExpiredNotifier 的头部元素,然后查看是否过期。流程示意图如图 3 所示

etcd-lessor-第 4 页.drawio.svg
图 3:runloop

Checkpoint

租约的检查点方法比较特殊,他不会自己更新过期时间,而是只更新 remainingTTL 字段,Promote 负责将其应用到过期的监测中。Promote 函数在 Raft 每次 applyEntry 的时候会调用,每次会续租 Leader 的选举超时时间。

  1. func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
  2. le.mu.Lock()
  3. defer le.mu.Unlock()
  4. if l, ok := le.leaseMap[id]; ok {
  5. // when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
  6. l.remainingTTL = remainingTTL
  7. if le.shouldPersistCheckpoints() {
  8. l.persistTo(le.b)
  9. }
  10. if le.isPrimary() {
  11. // schedule the next checkpoint as needed
  12. le.scheduleCheckpointIfNeeded(l)
  13. }
  14. }
  15. return nil
  16. }

只有 lessor 为 Primary 时,才会将更新的 lease 添加到堆中。这就说明了,Checkpoint 实际上是 Primary 节点与其他节点同步进度时使用的,这样能够保证其他节点在 Leader 切换之后,Leasor 可以继续计算。

  1. func (le *lessor) scheduleCheckpointIfNeeded(lease *Lease) {
  2. if le.cp == nil {
  3. return
  4. }
  5. if lease.RemainingTTL() > int64(le.checkpointInterval.Seconds()) {
  6. if le.lg != nil {
  7. le.lg.Debug("Scheduling lease checkpoint",
  8. zap.Int64("leaseID", int64(lease.ID)),
  9. zap.Duration("intervalSeconds", le.checkpointInterval),
  10. )
  11. }
  12. heap.Push(&le.leaseCheckpointHeap, &LeaseWithTime{
  13. id: lease.ID,
  14. time: time.Now().Add(le.checkpointInterval),
  15. })
  16. }
  17. }

etcd-lessor-第 5 页.drawio.svg
图 4:checkpoint time

图 5 展示了 Checkpoint 的全过程,更新后的 lease 会存储在堆中,然后生成 InternalRaftRequest,通知其他节点更新 Checkpoint。
etcd-lessor-第 6 页.drawio.svg
图 5:checkpoint request