今天我就和你聊聊如何实现一个类 etcd、支持多存储引擎的 KV 服务,我们将基于 etcd 自带的raftexample项目快速构建它。
整体架构设计
下面是我给你画的 metcd 整体架构设计,它由 API 层、Raft 层的共识模块、逻辑层及存储层组成的状态机组成。
API 设计
在设计 API 的时候,我们往往会考虑以下几个因素:
- 性能
- 易用性、可调试性
- 开发效率、跨平台、可移植性
- 安全性
- 接口幂等性
因为我们场景的是 POC(Proof of concept)、Demo 开发,因此在 metcd 项目中,我们优先考虑点是易用性、可调试性,选择 HTTP/1.x 协议,接口上为了满足 key-value 操作,支持 Get 和 Put 接口即可。
假设 metcd 项目使用 3379 端口,Put 和 Get 接口,如下所示:
- Put 接口,设置 key-value
curl -L http://127.0.0.1:3379/hello -XPUT -d world
- Get 接口,查询 key-value
curl -L http://127.0.0.1:3379/hello
world
复制状态机
我们知道 etcd 是基于下图复制状态机实现的分布式 KV 服务,复制状态机由共识模块、日志模块、状态机组成。
以下是复制状态机的写请求流程:
- client 发起一个写请求(put hello = world);
- server 向 Raft 共识模块提交请求,共识模块生成一个写提案日志条目。若 server 是 Leader,则把日志条目广播给其他节点,并持久化日志条目到 WAL 中;
- 当一半以上节点持久化日志条目后,Leader 的共识模块将此日志条目标记为已提交(committed),并通知其他节点提交;
- server 从共识模块获取已经提交的日志条目,异步应用到状态机存储中(boltdb/leveldb/memory),然后返回给 client。
多存储引擎
metcd 项目将基于 etcd 本身自带的 raftexample 项目进行快速开发,而 raftexample 本身只支持内存存储。
因此我们通过将 KV 存储接口进行抽象化设计,实现支持多存储引擎。KVStore interface 的定义如下所示。
type KVStore interface {
// LookUp get key value
Lookup(key string) (string, bool)
// Propose propose kv request into raft state machine
Propose(k, v string)
// ReadCommits consume entry from raft state machine into KvStore map until error
ReadCommits(commitC <-chan *string, errorC <-chan error)
// Snapshot return KvStore snapshot
Snapshot() ([]byte, error)
// RecoverFromSnapshot recover data from snapshot
RecoverFromSnapshot(snapshot []byte) error
// Close close backend databases
Close() err
}
boltdb
boltdb 是一个基于 B+ tree 实现的存储引擎库,在10中我已和你详细介绍过原理。
boltdb 为什么适合读多写少?
- 对于读请求而言,一般情况下它可直接从内存中基于 B+ tree 遍历,快速获取数据返回给 client,不涉及经过磁盘 I/O。
- 对于写请求,它基于 B+ tree 查找写入位置,更新 key-value。事务提交时,写请求包括 B+ tree 重平衡、分裂、持久化 ditry page、持久化 freelist、持久化 meta page 流程。同时,ditry page 可能分布在文件的各个位置,它发起的是随机写磁盘 I/O。
leveldb
如何设计适合写多读少的存储引擎呢?
- 最简单的思路当然是写内存最快。可是内存有限的,无法支撑大容量的数据存储,不持久化数据会丢失。
能否直接将数据顺序追加到文件末尾(AOF)呢?因为磁盘的特点是顺序写性能比较快。
在 AOF 写入模型中如何实现查询数据呢?
- 很显然通过遍历文件一个个匹配 key 是可以的,但是它的性能是极差的。为了实现高性能的查询,最理想的解决方案从直接从内存中查询,但是内存是有限的,那么我们能否通过内存索引来记录一个 key-value 数据在文件中的偏移量,实现从磁盘快速读取呢?
- 这正是Bitcask存储模型的查询的实现,它通过内存哈希表维护各个 key-value 数据的索引,实现了快速查找 key-value 数据。不过,内存中虽然只保存 key 索引信息,但是当 key 较多的时候,其对内存要求依然比较高。
leveldb 它的原理是怎样的呢?与 Bitcask 存储模型有什么不一样?
- leveldb 是基于 LSM tree(log-structured merge-tree) 实现的 key-value 存储,它的架构如下图所示(引用自微软博客)。
它提升写性能的核心思路同样是将随机写转化为顺序写磁盘 WAL 文件和内存,结合了我们上面讨论的写内存和磁盘两种方法。数据持久化到 WAL 文件是为了确保机器 crash 后数据不丢失。
那么它要如何解决内存不足和查询的痛点问题呢?
核心解决方案是分层的设计和基于一系列对象的转换和压缩。接下来我给你分析一下上面架构图写流程和后台 compaction 任务:
- 首先写请求顺序写入 Log 文件 (WAL);
- 更新内存的 Memtable。leveldb Memtable 后端数据结构实现是 skiplist,skiplist 相比平衡二叉树,实现简单却同样拥有高性能的读写;
- 当 Memtable 达到一定的阈值时,转换成不可变的 Memtable,也就是只读不可写;
- leveldb 后台 Compact 任务会将不可变的 Memtable 生成 SSTable 文件,它有序地存储一系列 key-value 数据。注意 SST 文件按写入时间进行了分层,Level 层次越小数据越新。Manifest 文件记录了各个 SSTable 文件处于哪个层级、它的最小与最大 key 范围;
- 当某个 level 下的 SSTable 文件数目超过一定阈值后,Compact 任务会从这个 level 的 SSTable 中选择一个文件(level>0),将其和高一层级的 level+1 的 SSTable 文件合并;
- 注意 level 0 是由 Immutable 直接生成的,因此 level 0 SSTable 文件中的 key-value 存在相互重叠。而 level > 0 时,在和更高一层 SSTable 合并过程中,参与的 SSTable 文件是多个,leveldb 会确保各个 SSTable 中的 key-value 不重叠。
理解: 数据规模在一定范围内时, 使用 MemTable 来进行正常的读写 (先查 MemTable 找到偏移, 再根据偏移查找 value), 当 MemTable 超过某个大小后要整理数据, 也就是 Compact 开始执行, 其主要任务是将没有顺序的 key-value 重新排序写到 SSTable 文件中, 同时使用 level 来管理大量的 SSTable. 不太理解 level 0 SSTable 为什么存在相互重叠.
了解完写流程,读流程也就简单了,核心步骤如下:
- 从 Memtable 跳跃表中查询 key;
- 未找到则从 Immutable 中查找;
- Immutable 仍未命中,则按照 leveldb 的分层属性,因 level 0 SSTable 文件是直接从 Immutable 生成的,level 0 存在特殊性,因此你需要从 level 0 遍历 SSTable 查找 key;
- level 0 中若未命中,则从 level 1 乃至更高的层次查找。level 大于 0 时,各个 SSTable 中的 key 是不存在相互重叠的。根据 manifest 记录的 key-value 范围信息,可快递定位到具体的 SSTable。同时 leveldb 基于bloom filter实现了快速筛选 SSTable,因此查询效率较高。
理解: 如果 MemTable 中没有找到数据, 那么就从 Immutable 中找, 因为 level 机制, 所要找的数据不确定在哪个 level 上, 所以要根据 manifest 按层遍历 SSTable
更详细原理你可以参考一下leveldb源码。
实现分析
Raft 算法库
共识模块使用的是 etcd Raft 算法库,它是一个经过大量业务生产环境检验、具备良好可扩展性的共识算法库。
Raft API
Raft 作为一个库,它对外最核心的对象是一个名为Node的数据结构。Node 表示 Raft 集群中的一个节点,它的输入与输出接口如下图所示,下面我重点和你介绍它的几个接口功能:
- Campaign (活动),状态转换成 Candidate (候选人),发起新一轮 Leader 选举;
- Propose,提交提案接口;
- Ready,Raft 状态机输出接口,它的返回是一个输出 Ready 数据结构类型的管道,应用需要监听此管道,获取 Ready 数据,处理其中的各个消息(如持久化未提交的日志条目到 WAL 中,发送消息给其他节点等);
- Advance,通知 Raft 状态机,应用已处理上一个输出的 Ready 数据,等待发送下一个 Ready 数据;
- TransferLeaderShip,尝试将 Leader 转移到某个节点;
- Step,向 Raft 状态机提交收到的消息,比如当 Leader 广播完 MsgApp 消息给 Follower 节点后,Leader 收到 Follower 节点回复的 MsgAppResp 消息时,就通过 Step 接口将此消息提交给 Raft 状态机驱动其工作;
- ReadIndex,用于实现线性读。
上面提到的 Raft 状态机的输出Ready 结构含有哪些信息呢? 下图是其详细字段,含义如下:
- SoftState,软状态。包括集群 Leader 和节点状态,不需要持久化到 WAL;
- pb.HardState,硬状态。与软状态相反,包括了节点当前 Term、Vote 等信息,需要持久化到 WAL 中;
- ReadStates,用于线性一致性读;
- Entries,在向其他节点发送消息之前需持久化到 WAL 中;
- Messages,持久化 Entries 后,发送给其他节点的消息;
- Committed Entries,已提交的日志条目,需要应用到存储状态机中;
- Snapshot,快照需保存到持久化存储中;
- MustSync,HardState 和 Entries 是否要持久化到 WAL 中;
正如我在04中和你介绍的,etcd Raft 库的设计抽象了网络、Raft 日志存储等模块,它本身并不会进行网络、存储相关的操作,上层应用需结合自己业务场景选择内置的模块或自定义实现网络、存储、日志等模块。
因此我们在使用 Raft 库时,需要先自定义好相关网络、存储等模块,再结合上面介绍的 Raft Node API,就可以完成一个 Node 的核心操作了。其数据结构定义如下:
// A key-value stream backed by raft
type raftNode struct {
proposeC <-chan string // proposed messages (k,v)
confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
commitC chan<- *string // entries committed to log (k,v)
errorC chan<- error // errors from raft session
id int // client ID for raft session
......
node raft.Node
raftStorage *raft.MemoryStorage
wal *wal.WAL
transport *rafthttp.Transport
}
它提供了三个核心的管道与业务逻辑模块、存储状态机交互:
- proposeC,它用来接收 client 发送的写请求提案消息;
- confChangeC,它用来接收集群配置变化消息;
- commitC,它用来输出 Raft 共识模块已提交的日志条目消息。
在 metcd 项目中因为我们是直接基于 raftexample 定制开发,因此日志持久化存储、网络都使用的是 etcd 自带的 WAL 和 rafthttp 模块。
rc.transport = &rafthttp.Transport{
Logger: zap.NewExample(),
ID: types.ID(rc.id),
ClusterID: 0x1000,
Raft: rc,
ServerStats: stats.NewServerStats("", ""),
LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
ErrorC: make(chan error),
}
搞清楚 Raft 模块的输入、输出 API,设计好 raftNode 结构,复用 etcd 的 WAL、网络等模块后,接下来我们就只需要实现如下两个循环逻辑,处理业务层发送给 proposeC 和 confChangeC 消息、将 Raft 的 Node 输出 Ready 结构进行相对应的处理即可。精简后的代码如下所示:
func (rc *raftNode) serveChannels() {
// send proposals over raft
go func() {
confChangeCount := uint64(0)
for rc.proposeC != nil && rc.confChangeC != nil {
select {
// proposeC,它用来接收 client 发送的写请求提案消息
case prop, ok := <-rc.proposeC:
if !ok {
rc.proposeC = nil
} else {
// blocks until accepted by raft state machine
rc.node.Propose(context.TODO(), []byte(prop))
}
// confChangeC,它用来接收集群配置变化消息
case cc, ok := <-rc.confChangeC:
if !ok {
rc.confChangeC = nil
} else {
confChangeCount++
cc.ID = confChangeCount
rc.node.ProposeConfChange(context.TODO(), cc)
}
}
}
}()
// event loop on raft state machine updates
for {
select {
case <-ticker.C:
rc.node.Tick()
// store raft entries to wal, then publish over commit channel
case rd := <-rc.node.Ready():
rc.wal.Save(rd.HardState, rd.Entries)
if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.publishSnapshot(rd.Snapshot)
}
rc.raftStorage.Append(rd.Entries)
rc.transport.Send(rd.Messages)
if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
rc.stop()
return
}
rc.maybeTriggerSnapshot()
rc.node.Advance()
}
}
}
代码简要分析如下:
- 从 proposeC 中取出提案消息,通过 raft.Node.Propose API 提交提案;
- 从 confChangeC 取出配置变更消息,通过 raft.Node.ProposeConfChange API 提交配置变化消息;
- 从 raft.Node 中获取 Raft 算法状态机输出到 Ready 结构中,将 rd.Entries 和 rd.HardState 通过 WAL 模块持久化,将 rd.Messages 通过 rafthttp 模块,发送给其他节点。将 rd.CommittedEntries 应用到业务存储状态机。
支持多存储引擎
在整体架构设计时,我和你介绍了为了使 metcd 项目能支撑多存储引擎,我们将 KVStore 进行了抽象化设计,因此我们只需要实现各个存储引擎相对应的 API 即可。
boltdb
func (s *boltdbKVStore) Put(key, value string) error {
s.mu.Lock()
defer s.mu.Unlock()
// Start a writable transaction.
tx, err := s.db.Begin(true)
if err != nil {
return err
}
defer tx.Rollback()
// Use the transaction...
bucket, err := tx.CreateBucketIfNotExists([]byte("keys"))
if err != nil {
log.Printf("failed to put key %s, value %s, err is %v", key, value, err)
return err
}
err = bucket.Put([]byte(key), []byte(value))
if err != nil {
log.Printf("failed to put key %s, value %s, err is %v", key, value, err)
return err
}
// Commit the transaction and check for error.
if err := tx.Commit(); err != nil {
log.Printf("failed to commit transaction, key %s, err is %v", key, err)
return err
}
log.Printf("backend:%s,put key:%s,value:%s succ", s.config.backend, key, value)
return nil
leveldb
我们使用的是goleveldb,它基于 Google 开源的 c++ leveldb版本实现。它提供的常用 API 如下所示。
- 通过 OpenFile API 创建或打开一个 leveldb 数据库。
db, err := leveldb.OpenFile("path/to/db", nil)
...
defer db.Close()
- 通过 DB.Get/Put/Delete API 操作数据。
data, err := db.Get([]byte("key"), nil)
...
err = db.Put([]byte("key"), []byte("value"), nil)
...
err = db.Delete([]byte("key"), nil)
...
了解其接口后,通过 goleveldb 的库,client 调用就非常简单了,下面是 metcd 项目中,leveldb 存储引擎 Put 接口的实现。
func (s *leveldbKVStore) Put(key, value string) error {
err := s.db.Put([]byte(key), []byte(value), nil)
if err != nil {
log.Printf("failed to put key %s, value %s, err is %v", key, value, err)
return err
}
log.Printf("backend:%s,put key:%s,value:%s succ", s.config.backend, key, value)
return nil
}
读写流程
写流程
当你通过如下 curl 命令发起一个写操作时,写流程如下面架构图序号所示:
curl -L http://127.0.0.1:3379/hello -XPUT -d world
- client 通过 curl 发送 HTTP PUT 请求到 server;
- server 收到后,将消息写入到 KVStore 的 ProposeC 管道;
- raftNode 循环逻辑将消息通过 Raft 模块的 Propose 接口提交;
- Raft 模块输出 Ready 结构,server 将日志条目持久化后,并发送给其他节点;
- 集群多数节点持久化此日志条目后,这个日志条目被提交给存储状态机 KVStore 执行;
- KVStore 根据启动的 backend 存储引擎名称,调用对应的 Put 接口即可。
读流程
当你通过如下 curl 命令发起一个读操作时,读流程如下面架构图序号所示:
curl -L http://127.0.0.1:3379/hello
world
- client 通过 curl 发送 HTTP Get 请求到 server;
- server 收到后,根据 KVStore 的存储引擎,从后端查询出对应的 key-value 数据。