缘起

最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop

raft分布式一致性算法

  1. 分布式存储系统通常会通过维护多个副本来进行容错,
  2. 以提高系统的可用性。
  3. 这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
  4. Raft算法把问题分解成了领袖选举(leader election)、
  5. 日志复制(log replication)、安全性(safety
  6. 和成员关系变化(membership changes)这几个子问题。
  7. Raft算法的基本操作只需2RPC即可完成。
  8. RequestVote RPC是在选举过程中通过旧的Leader触发的,
  9. AppendEntries RPC是领导人触发的,目的是向其他节点复制日志条目和发送心跳(heartbeat)。

目标

  • 根据raft协议,实现高可用分布式强一致的kv存储

子目标(Day 2)

  • 定义raft rpc接口
  • 定义raft lsm有限状态自动机接口(状态模式)

设计

  • config/IRaftConfig.go: 集群配置接口。简单起见, 使用静态配置模式定义节点数量和地址。
  • config/IRaftNodeConfig.go: 节点配置接口
  • roles/roles.go:raft三种角色常量
  • timeout/timeout.go:超时时间常量
  • rpc/IRaftRPC.go: raft协议的基本RPC接口及参数定义。简单起见,拟采用net/rpc实现之。
  • rpc/IRaftRPCServer.go: 支持raft协议的服务器接口。简单起见,拟采用net/rpc实现之。
  • lsm/IRaftLSM.go: raft有限状态机接口
  • lsm/IRaftState.go: 状态接口
  • lsm/tRaftStateBase.go: 基本状态数据
  • lsm/tFollowerState: follower状态的实现,未完成

config/IRaftConfig.go

集群配置接口。简单起见, 使用静态配置模式定义节点数量和地址。

  1. package config
  2. type IRaftConfig interface {
  3. ID() string
  4. Nodes() []IRaftNodeConfig
  5. }

config/IRaftNodeConfig.go

节点配置接口

  1. package config
  2. type IRaftNodeConfig interface {
  3. ID() string
  4. Endpoint() string
  5. }

roles/roles.go

raft三种角色常量

  1. package roles
  2. type RaftRole int
  3. const Follower RaftRole = 1
  4. const Candidate RaftRole = 2
  5. const Leader RaftRole = 3

timeout/timeout.go

超时时间常量

  1. package timeout
  2. import "time"
  3. const HeartbeatInterval = 150 * time.Millisecond
  4. const HeartbeatTimeout = 5 * HeartbeatInterval
  5. const ElectionTimeout = HeartbeatTimeout

rpc/IRaftRPC.go

raft协议的基本RPC接口及参数定义。简单起见,拟采用net/rpc实现之。

  1. package rpc
  2. type IRaftRPC interface {
  3. RequestVote(cmd *RequestVoteCmd, ret *RequestVoteRet) error
  4. AppendEntries(cmd *AppendEntriesCmd, ret *AppendEntriesRet) error
  5. }
  6. type RequestVoteCmd struct {
  7. CandidateID string
  8. Term int
  9. LastLogIndex int
  10. LastLogTerm int
  11. }
  12. type RequestVoteRet struct {
  13. Term int
  14. VoteGranted bool
  15. }
  16. type AppendEntriesCmd struct {
  17. Term int
  18. LeaderID string
  19. PrevLogTerm int
  20. PrevLogIndex int
  21. LeaderCommit int
  22. Entries []*LogEntry
  23. }
  24. type LogEntry struct {
  25. Tag int
  26. Content []byte
  27. }
  28. type AppendEntriesRet struct {
  29. Term int
  30. Success bool
  31. }

rpc/IRaftRPCServer.go

支持raft协议的服务器接口。简单起见,拟采用net/rpc实现之。

  1. package rpc
  2. type IRaftRPCServer interface {
  3. BeginServeTCP(port int, r IRaftRPC)
  4. }

lsm/IRaftLSM.go

raft有限状态机接口

  1. package lsm
  2. import "learning/gooop/etcd/raft/rpc"
  3. // IRaftLSM raft有限状态自动机
  4. type IRaftLSM interface {
  5. rpc.IRaftRPC
  6. State() IRaftState
  7. }

lsm/IRaftState.go

状态接口

  1. package lsm
  2. import (
  3. "learning/gooop/etcd/raft/roles"
  4. "learning/gooop/etcd/raft/rpc"
  5. )
  6. type IRaftState interface {
  7. rpc.IRaftRPC
  8. Role() roles.RaftRole
  9. Start()
  10. }

lsm/tRaftStateBase.go

基本状态数据

  1. package lsm
  2. import (
  3. "learning/gooop/etcd/raft/config"
  4. "learning/gooop/etcd/raft/roles"
  5. )
  6. //
  7. type tRaftStateBase struct {
  8. // 当前角色
  9. role roles.RaftRole
  10. // 当前任期号
  11. term int
  12. // leader.id
  13. leaderID string
  14. // 集群配置
  15. cfg config.IRaftConfig
  16. }
  17. func newRaftStateBase(term int, cfg config.IRaftConfig) *tRaftStateBase {
  18. it := new(tRaftStateBase)
  19. it.init(term, cfg)
  20. return it
  21. }
  22. // init initialize self, with term and config specified
  23. func (me *tRaftStateBase) init(term int, cfg config.IRaftConfig) {
  24. me.cfg = cfg
  25. me.role = roles.Follower
  26. me.term = term
  27. me.leaderID = ""
  28. }
  29. func (me *tRaftStateBase) Role() roles.RaftRole {
  30. return me.role
  31. }

lsm/tFollowerState

follower状态的实现,未完成

  1. package lsm
  2. import (
  3. "learning/gooop/etcd/raft/config"
  4. "learning/gooop/etcd/raft/timeout"
  5. "sync"
  6. "time"
  7. )
  8. type tFollowerState struct {
  9. tRaftStateBase
  10. mInitOnce sync.Once
  11. mStartOnce sync.Once
  12. mEventMap map[tFollowerEvent][]tFollowerEventHandler
  13. }
  14. type tFollowerEvent int
  15. const evStart tFollowerEvent = 1
  16. type tFollowerEventHandler func(e tFollowerEvent, args ...interface{})
  17. func newFollowerState(term int, cfg config.IRaftConfig) *tFollowerState {
  18. it := new(tFollowerState)
  19. it.init(term, cfg)
  20. // todo: to implement IRaftState
  21. return it
  22. }
  23. func (me *tFollowerState) init(term int, cfg config.IRaftConfig) {
  24. me.mInitOnce.Do(func() {
  25. me.tRaftStateBase = *newRaftStateBase(term, cfg)
  26. // init event map
  27. me.mEventMap = make(map[tFollowerEvent][]tFollowerEventHandler)
  28. me.registerEventHandlers()
  29. })
  30. }
  31. func (me *tFollowerState) registerEventHandlers() {
  32. me.mEventMap[evStart] = []tFollowerEventHandler{
  33. me.afterStartThenBeginWatchLeaderTimeout,
  34. }
  35. }
  36. func (me *tFollowerState) raise(e tFollowerEvent, args ...interface{}) {
  37. if handlers, ok := me.mEventMap[e]; ok {
  38. for _, it := range handlers {
  39. it(e, args...)
  40. }
  41. }
  42. }
  43. func (me *tFollowerState) Start() {
  44. me.mStartOnce.Do(func() {
  45. me.raise(evStart)
  46. })
  47. }
  48. func (me *tFollowerState) afterStartThenBeginWatchLeaderTimeout(e tFollowerEvent, args ...interface{}) {
  49. go func() {
  50. iCheckingTimeoutInterval := timeout.HeartbeatTimeout / 3
  51. for range time.Tick(iCheckingTimeoutInterval) {
  52. // todo: watch leader.AppendEntries rpc timeout
  53. }
  54. }()
  55. }

(未完待续)