今天我就和你聊聊如何实现一个类 etcd、支持多存储引擎的 KV 服务,我们将基于 etcd 自带的raftexample项目快速构建它。

整体架构设计

下面是我给你画的 metcd 整体架构设计,它由 API 层、Raft 层的共识模块、逻辑层及存储层组成的状态机组成。

image.png

API 设计

在设计 API 的时候,我们往往会考虑以下几个因素:

  • 性能
  • 易用性、可调试性
  • 开发效率、跨平台、可移植性
  • 安全性
  • 接口幂等性

因为我们场景的是 POC(Proof of concept)、Demo 开发,因此在 metcd 项目中,我们优先考虑点是易用性、可调试性,选择 HTTP/1.x 协议,接口上为了满足 key-value 操作,支持 Get 和 Put 接口即可。

假设 metcd 项目使用 3379 端口,Put 和 Get 接口,如下所示:

  • Put 接口,设置 key-value
  1. curl -L http://127.0.0.1:3379/hello -XPUT -d world
  • Get 接口,查询 key-value
curl -L http://127.0.0.1:3379/hello
world

复制状态机

我们知道 etcd 是基于下图复制状态机实现的分布式 KV 服务,复制状态机由共识模块、日志模块、状态机组成。

image.png

以下是复制状态机的写请求流程:

  1. client 发起一个写请求(put hello = world);
  2. server 向 Raft 共识模块提交请求,共识模块生成一个写提案日志条目。若 server 是 Leader,则把日志条目广播给其他节点,并持久化日志条目到 WAL 中;
  3. 当一半以上节点持久化日志条目后,Leader 的共识模块将此日志条目标记为已提交(committed),并通知其他节点提交;
  4. server 从共识模块获取已经提交的日志条目,异步应用到状态机存储中(boltdb/leveldb/memory),然后返回给 client。

多存储引擎

metcd 项目将基于 etcd 本身自带的 raftexample 项目进行快速开发,而 raftexample 本身只支持内存存储。

因此我们通过将 KV 存储接口进行抽象化设计,实现支持多存储引擎。KVStore interface 的定义如下所示。

type KVStore interface {
   // LookUp get key value
   Lookup(key string) (string, bool)

   // Propose propose kv request into raft state machine
   Propose(k, v string)

   // ReadCommits consume entry from raft state machine into KvStore map until error
   ReadCommits(commitC <-chan *string, errorC <-chan error)

   // Snapshot return KvStore snapshot
   Snapshot() ([]byte, error)

   // RecoverFromSnapshot recover data from snapshot
   RecoverFromSnapshot(snapshot []byte) error

   // Close close backend databases
   Close() err
}

boltdb

boltdb 是一个基于 B+ tree 实现的存储引擎库,在10中我已和你详细介绍过原理。

boltdb 为什么适合读多写少?

  • 对于读请求而言,一般情况下它可直接从内存中基于 B+ tree 遍历,快速获取数据返回给 client,不涉及经过磁盘 I/O。
  • 对于写请求,它基于 B+ tree 查找写入位置,更新 key-value。事务提交时,写请求包括 B+ tree 重平衡、分裂、持久化 ditry page、持久化 freelist、持久化 meta page 流程。同时,ditry page 可能分布在文件的各个位置,它发起的是随机写磁盘 I/O。

leveldb

如何设计适合写多读少的存储引擎呢?

  • 最简单的思路当然是写内存最快。可是内存有限的,无法支撑大容量的数据存储,不持久化数据会丢失。

能否直接将数据顺序追加到文件末尾(AOF)呢?因为磁盘的特点是顺序写性能比较快。

  • 当然可以。Bitcask存储模型就是采用 AOF 模式,把写请求顺序追加到文件。Facebook 的图片存储Haystack根据其论文介绍,也是使用类似的方案来解决大规模写入痛点。

在 AOF 写入模型中如何实现查询数据呢?

  • 很显然通过遍历文件一个个匹配 key 是可以的,但是它的性能是极差的。为了实现高性能的查询,最理想的解决方案从直接从内存中查询,但是内存是有限的,那么我们能否通过内存索引来记录一个 key-value 数据在文件中的偏移量,实现从磁盘快速读取呢?
  • 这正是Bitcask存储模型的查询的实现,它通过内存哈希表维护各个 key-value 数据的索引,实现了快速查找 key-value 数据。不过,内存中虽然只保存 key 索引信息,但是当 key 较多的时候,其对内存要求依然比较高。

leveldb 它的原理是怎样的呢?与 Bitcask 存储模型有什么不一样?

  • leveldb 是基于 LSM tree(log-structured merge-tree) 实现的 key-value 存储,它的架构如下图所示(引用自微软博客)。

它提升写性能的核心思路同样是将随机写转化为顺序写磁盘 WAL 文件和内存,结合了我们上面讨论的写内存和磁盘两种方法。数据持久化到 WAL 文件是为了确保机器 crash 后数据不丢失。

image.png

那么它要如何解决内存不足和查询的痛点问题呢?

核心解决方案是分层的设计和基于一系列对象的转换和压缩。接下来我给你分析一下上面架构图写流程和后台 compaction 任务:

  1. 首先写请求顺序写入 Log 文件 (WAL);
  2. 更新内存的 Memtable。leveldb Memtable 后端数据结构实现是 skiplist,skiplist 相比平衡二叉树,实现简单却同样拥有高性能的读写;
  3. 当 Memtable 达到一定的阈值时,转换成不可变的 Memtable,也就是只读不可写;
  4. leveldb 后台 Compact 任务会将不可变的 Memtable 生成 SSTable 文件,它有序地存储一系列 key-value 数据。注意 SST 文件按写入时间进行了分层,Level 层次越小数据越新。Manifest 文件记录了各个 SSTable 文件处于哪个层级、它的最小与最大 key 范围;
  5. 当某个 level 下的 SSTable 文件数目超过一定阈值后,Compact 任务会从这个 level 的 SSTable 中选择一个文件(level>0),将其和高一层级的 level+1 的 SSTable 文件合并;
  6. 注意 level 0 是由 Immutable 直接生成的,因此 level 0 SSTable 文件中的 key-value 存在相互重叠。而 level > 0 时,在和更高一层 SSTable 合并过程中,参与的 SSTable 文件是多个,leveldb 会确保各个 SSTable 中的 key-value 不重叠。

理解: 数据规模在一定范围内时, 使用 MemTable 来进行正常的读写 (先查 MemTable 找到偏移, 再根据偏移查找 value), 当 MemTable 超过某个大小后要整理数据, 也就是 Compact 开始执行, 其主要任务是将没有顺序的 key-value 重新排序写到 SSTable 文件中, 同时使用 level 来管理大量的 SSTable. 不太理解 level 0 SSTable 为什么存在相互重叠.

了解完写流程,读流程也就简单了,核心步骤如下:

  1. 从 Memtable 跳跃表中查询 key;
  2. 未找到则从 Immutable 中查找;
  3. Immutable 仍未命中,则按照 leveldb 的分层属性,因 level 0 SSTable 文件是直接从 Immutable 生成的,level 0 存在特殊性,因此你需要从 level 0 遍历 SSTable 查找 key;
  4. level 0 中若未命中,则从 level 1 乃至更高的层次查找。level 大于 0 时,各个 SSTable 中的 key 是不存在相互重叠的。根据 manifest 记录的 key-value 范围信息,可快递定位到具体的 SSTable。同时 leveldb 基于bloom filter实现了快速筛选 SSTable,因此查询效率较高。

理解: 如果 MemTable 中没有找到数据, 那么就从 Immutable 中找, 因为 level 机制, 所要找的数据不确定在哪个 level 上, 所以要根据 manifest 按层遍历 SSTable

更详细原理你可以参考一下leveldb源码。

实现分析

Raft 算法库

共识模块使用的是 etcd Raft 算法库,它是一个经过大量业务生产环境检验、具备良好可扩展性的共识算法库。

Raft API

Raft 作为一个库,它对外最核心的对象是一个名为Node的数据结构。Node 表示 Raft 集群中的一个节点,它的输入与输出接口如下图所示,下面我重点和你介绍它的几个接口功能:

  1. Campaign (活动),状态转换成 Candidate (候选人),发起新一轮 Leader 选举;
  2. Propose,提交提案接口;
  3. Ready,Raft 状态机输出接口,它的返回是一个输出 Ready 数据结构类型的管道,应用需要监听此管道,获取 Ready 数据,处理其中的各个消息(如持久化未提交的日志条目到 WAL 中,发送消息给其他节点等);
  4. Advance,通知 Raft 状态机,应用已处理上一个输出的 Ready 数据,等待发送下一个 Ready 数据;
  5. TransferLeaderShip,尝试将 Leader 转移到某个节点;
  6. Step,向 Raft 状态机提交收到的消息,比如当 Leader 广播完 MsgApp 消息给 Follower 节点后,Leader 收到 Follower 节点回复的 MsgAppResp 消息时,就通过 Step 接口将此消息提交给 Raft 状态机驱动其工作;
  7. ReadIndex,用于实现线性读。

image.png

上面提到的 Raft 状态机的输出Ready 结构含有哪些信息呢? 下图是其详细字段,含义如下:

  1. SoftState,软状态。包括集群 Leader 和节点状态,不需要持久化到 WAL;
  2. pb.HardState,硬状态。与软状态相反,包括了节点当前 Term、Vote 等信息,需要持久化到 WAL 中;
  3. ReadStates,用于线性一致性读;
  4. Entries,在向其他节点发送消息之前需持久化到 WAL 中;
  5. Messages,持久化 Entries 后,发送给其他节点的消息;
  6. Committed Entries,已提交的日志条目,需要应用到存储状态机中;
  7. Snapshot,快照需保存到持久化存储中;
  8. MustSync,HardState 和 Entries 是否要持久化到 WAL 中;

image.png

正如我在04中和你介绍的,etcd Raft 库的设计抽象了网络、Raft 日志存储等模块,它本身并不会进行网络、存储相关的操作,上层应用需结合自己业务场景选择内置的模块或自定义实现网络、存储、日志等模块。

因此我们在使用 Raft 库时,需要先自定义好相关网络、存储等模块,再结合上面介绍的 Raft Node API,就可以完成一个 Node 的核心操作了。其数据结构定义如下:

// A key-value stream backed by raft
type raftNode struct {
   proposeC    <-chan string            // proposed messages (k,v)
   confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
   commitC     chan<- *string           // entries committed to log (k,v)
   errorC      chan<- error             // errors from raft session
   id          int      // client ID for raft session
   ......
   node        raft.Node
   raftStorage *raft.MemoryStorage
   wal         *wal.WAL
   transport *rafthttp.Transport
}

它提供了三个核心的管道与业务逻辑模块、存储状态机交互:

  • proposeC,它用来接收 client 发送的写请求提案消息;
  • confChangeC,它用来接收集群配置变化消息;
  • commitC,它用来输出 Raft 共识模块已提交的日志条目消息。

在 metcd 项目中因为我们是直接基于 raftexample 定制开发,因此日志持久化存储、网络都使用的是 etcd 自带的 WAL 和 rafthttp 模块。

  • WAL模块中提供了核心的保存未持久化的日志条目和快照功能接口,你可以参考03节写请求中我和你介绍的原理。
  • rafthttp模块基于 HTTP 协议提供了各个节点间的消息发送能力,metcd 使用如下:
rc.transport = &rafthttp.Transport{
   Logger:      zap.NewExample(),
   ID:          types.ID(rc.id),
   ClusterID:   0x1000,
   Raft:        rc,
   ServerStats: stats.NewServerStats("", ""),
   LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
   ErrorC:      make(chan error),
}

搞清楚 Raft 模块的输入、输出 API,设计好 raftNode 结构,复用 etcd 的 WAL、网络等模块后,接下来我们就只需要实现如下两个循环逻辑,处理业务层发送给 proposeC 和 confChangeC 消息、将 Raft 的 Node 输出 Ready 结构进行相对应的处理即可。精简后的代码如下所示:


func (rc *raftNode) serveChannels() {
   // send proposals over raft
   go func() {
      confChangeCount := uint64(0)
      for rc.proposeC != nil && rc.confChangeC != nil {
         select {
         // proposeC,它用来接收 client 发送的写请求提案消息
         case prop, ok := <-rc.proposeC:
            if !ok {
               rc.proposeC = nil
            } else {
               // blocks until accepted by raft state machine
               rc.node.Propose(context.TODO(), []byte(prop))
            }
         // confChangeC,它用来接收集群配置变化消息
         case cc, ok := <-rc.confChangeC:
            if !ok {
               rc.confChangeC = nil
            } else {
               confChangeCount++
               cc.ID = confChangeCount
               rc.node.ProposeConfChange(context.TODO(), cc)
            }
         }
      }
   }()

   // event loop on raft state machine updates
   for {
      select {
      case <-ticker.C:
         rc.node.Tick()

      // store raft entries to wal, then publish over commit channel
      case rd := <-rc.node.Ready():
         rc.wal.Save(rd.HardState, rd.Entries)
         if !raft.IsEmptySnap(rd.Snapshot) {
            rc.saveSnap(rd.Snapshot)
            rc.raftStorage.ApplySnapshot(rd.Snapshot)
            rc.publishSnapshot(rd.Snapshot)
         }
         rc.raftStorage.Append(rd.Entries)
         rc.transport.Send(rd.Messages)
         if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
            rc.stop()
            return
         }
         rc.maybeTriggerSnapshot()
         rc.node.Advance()
      }
   }
}

代码简要分析如下:

  • 从 proposeC 中取出提案消息,通过 raft.Node.Propose API 提交提案;
  • 从 confChangeC 取出配置变更消息,通过 raft.Node.ProposeConfChange API 提交配置变化消息;
  • 从 raft.Node 中获取 Raft 算法状态机输出到 Ready 结构中,将 rd.Entries 和 rd.HardState 通过 WAL 模块持久化,将 rd.Messages 通过 rafthttp 模块,发送给其他节点。将 rd.CommittedEntries 应用到业务存储状态机。

支持多存储引擎

在整体架构设计时,我和你介绍了为了使 metcd 项目能支撑多存储引擎,我们将 KVStore 进行了抽象化设计,因此我们只需要实现各个存储引擎相对应的 API 即可。

boltdb


func (s *boltdbKVStore) Put(key, value string) error {
   s.mu.Lock()
   defer s.mu.Unlock()
   // Start a writable transaction.
   tx, err := s.db.Begin(true)
   if err != nil {
      return err
   }
   defer tx.Rollback()

   // Use the transaction...
   bucket, err := tx.CreateBucketIfNotExists([]byte("keys"))
   if err != nil {
      log.Printf("failed to put key %s, value %s, err is %v", key, value, err)
      return err
   }
   err = bucket.Put([]byte(key), []byte(value))
   if err != nil {
      log.Printf("failed to put key %s, value %s, err is %v", key, value, err)
      return err
   }

   // Commit the transaction and check for error.
   if err := tx.Commit(); err != nil {
      log.Printf("failed to commit transaction, key %s, err is %v", key, err)
      return err
   }
   log.Printf("backend:%s,put key:%s,value:%s succ", s.config.backend, key, value)
   return nil

leveldb

我们使用的是goleveldb,它基于 Google 开源的 c++ leveldb版本实现。它提供的常用 API 如下所示。

  • 通过 OpenFile API 创建或打开一个 leveldb 数据库。
db, err := leveldb.OpenFile("path/to/db", nil)
...
defer db.Close()
  • 通过 DB.Get/Put/Delete API 操作数据。
data, err := db.Get([]byte("key"), nil)
...
err = db.Put([]byte("key"), []byte("value"), nil)
...
err = db.Delete([]byte("key"), nil)
...

了解其接口后,通过 goleveldb 的库,client 调用就非常简单了,下面是 metcd 项目中,leveldb 存储引擎 Put 接口的实现。

func (s *leveldbKVStore) Put(key, value string) error {
   err := s.db.Put([]byte(key), []byte(value), nil)
   if err != nil {
      log.Printf("failed to put key %s, value %s, err is %v", key, value, err)
      return err
   }
   log.Printf("backend:%s,put key:%s,value:%s succ", s.config.backend, key, value)
   return nil
}

读写流程

写流程

当你通过如下 curl 命令发起一个写操作时,写流程如下面架构图序号所示:

curl -L http://127.0.0.1:3379/hello -XPUT -d world
  1. client 通过 curl 发送 HTTP PUT 请求到 server;
  2. server 收到后,将消息写入到 KVStore 的 ProposeC 管道;
  3. raftNode 循环逻辑将消息通过 Raft 模块的 Propose 接口提交;
  4. Raft 模块输出 Ready 结构,server 将日志条目持久化后,并发送给其他节点;
  5. 集群多数节点持久化此日志条目后,这个日志条目被提交给存储状态机 KVStore 执行;
  6. KVStore 根据启动的 backend 存储引擎名称,调用对应的 Put 接口即可。

image.png

读流程

当你通过如下 curl 命令发起一个读操作时,读流程如下面架构图序号所示:

curl -L http://127.0.0.1:3379/hello
world
  • client 通过 curl 发送 HTTP Get 请求到 server;
  • server 收到后,根据 KVStore 的存储引擎,从后端查询出对应的 key-value 数据。

image.png