Overview
Lease 是 Etcd 实现探活机制的核心,通过定时的续租或者保持连接,就可以让 Etcd 知道客户端还存活,相当于定期发送一个心跳。
service Lease {
rpc LeaseGrant(LeaseGrantRequest) returns (LeaseGrantResponse)
rpc LeaseRevoke(LeaseRevokeRequest) returns (LeaseRevokeResponse)
rpc LeaseKeepAlive(stream LeaseKeepAliveRequest) returns (stream LeaseKeepAliveResponse)
rpc LeaseTimeToLive(LeaseTimeToLiveRequest) returns (LeaseTimeToLiveResponse)
rpc LeaseLeases(LeaseLeasesRequest) returns (LeaseLeasesResponse)
}
图 1:Overview
由于 Etcd 是分布式存储,每一个 Etcd 都有一个 Lessor,所以 Promote
和 Demote
用于给 Lessor 升级为 Primary 和降级,只有是 Primary 的 Lessor 才能进行续租等操作。
主要的结构体和方法图 1 所示,每一个租约在服务器的外部通过 LeaseID 表示,LeaseID 实际上只是 int64
类型的别名。Lease 最核心的字段是过期时间,etcd 只支持秒级的租约。
type Lease struct {
ID LeaseID
ttl int64 // time to live of the lease in seconds
remainingTTL int64 // remaining time to live in seconds, if zero valued it is considered unset and the full ttl should be used
// expiryMu protects concurrent accesses to expiry
expiryMu sync.RWMutex
// expiry is time when lease should expire. no expiration when expiry.IsZero() is true
expiry time.Time
// mu protects concurrent accesses to itemSet
mu sync.RWMutex
itemSet map[LeaseItem]struct{}
revokec chan struct{}
}
type LeaseItem struct {
Key string
}
Expired
控制租约过期的核心结构是 LeaseExpiredNotifier
,它的结构如图 2 所示,这里的 LeaseQueue 是一个根据过期时间排序的小根堆。
图 2:LeaseExpiredNotifier
过期的逻辑通过 runloop 函数的 Goroutine 实现,每 500 ms 会判断一次,原理就是获取 LeaseExpiredNotifier 的头部元素,然后查看是否过期。流程示意图如图 3 所示
图 3:runloop
Checkpoint
租约的检查点方法比较特殊,他不会自己更新过期时间,而是只更新 remainingTTL
字段,Promote
负责将其应用到过期的监测中。Promote
函数在 Raft 每次 applyEntry 的时候会调用,每次会续租 Leader 的选举超时时间。
func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
le.mu.Lock()
defer le.mu.Unlock()
if l, ok := le.leaseMap[id]; ok {
// when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
l.remainingTTL = remainingTTL
if le.shouldPersistCheckpoints() {
l.persistTo(le.b)
}
if le.isPrimary() {
// schedule the next checkpoint as needed
le.scheduleCheckpointIfNeeded(l)
}
}
return nil
}
只有 lessor 为 Primary 时,才会将更新的 lease 添加到堆中。这就说明了,Checkpoint 实际上是 Primary 节点与其他节点同步进度时使用的,这样能够保证其他节点在 Leader 切换之后,Leasor 可以继续计算。
func (le *lessor) scheduleCheckpointIfNeeded(lease *Lease) {
if le.cp == nil {
return
}
if lease.RemainingTTL() > int64(le.checkpointInterval.Seconds()) {
if le.lg != nil {
le.lg.Debug("Scheduling lease checkpoint",
zap.Int64("leaseID", int64(lease.ID)),
zap.Duration("intervalSeconds", le.checkpointInterval),
)
}
heap.Push(&le.leaseCheckpointHeap, &LeaseWithTime{
id: lease.ID,
time: time.Now().Add(le.checkpointInterval),
})
}
}
图 4:checkpoint time
图 5 展示了 Checkpoint 的全过程,更新后的 lease 会存储在堆中,然后生成 InternalRaftRequest,通知其他节点更新 Checkpoint。
图 5:checkpoint request