缘起

最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop

raft分布式一致性算法

  1. 分布式存储系统通常会通过维护多个副本来进行容错,
  2. 以提高系统的可用性。
  3. 这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
  4. Raft算法把问题分解成了四个子问题:
  5. 1. 领袖选举(leader election)、
  6. 2. 日志复制(log replication)、
  7. 3. 安全性(safety
  8. 4. 成员关系变化(membership changes
  9. 这几个子问题。

目标

  • 根据raft协议,实现高可用分布式强一致的kv存储

子目标(Day 7)

  • 实现各raft节点之间的rpc通讯
    • 定义IRaftClientService服务,管理所有节点的tcp长连接
    • 定义IRaftClient接口,封装节点间的rpc调用
      • 基于状态模式,区分已连接状态和已断开状态
      • 基于事件驱动的逻辑编排
      • 基于读写分离的字段管理

设计

  • model/IEventDirvenModel: 事件驱动的逻辑编排基类
  • IRaftClientService:管理所有的节点间rpc连接
  • IRaftClient:管理当前节点与某个节点间的rpc连接
  • iClientState:基于状态模式的rpc连接状态接口
  • iStateContext:状态模式下的连接状态上下文接口
  • tRaftClient:IRaftClient接口的具体实现,并实现iStateContext接口。
  • tConnectedState: 管理已连接状态的rpc连接
    • 定时Ping以检测连接状态
    • 基于事件驱动的逻辑编排
    • 基于读写分离的字段管理
  • tBrokenState:管理已断开状态的rpc连接
    • 定时Dial以尝试重连接
    • 基于事件驱动的逻辑编排
    • 基于读写分离的字段管理

model/IEventDirvenModel.go

事件驱动的逻辑编排基类

  1. package model
  2. type TEventHandleFunc func(e string, args ...interface{})
  3. type IEventDrivenModel interface {
  4. hook(e string, handleFunc TEventHandleFunc)
  5. raise(e string, args ...interface{})
  6. }
  7. type TEventDrivenModel struct {
  8. items map[string][]TEventHandleFunc
  9. }
  10. func (me *TEventDrivenModel) Hook(e string, handler TEventHandleFunc) {
  11. arr, ok := me.items[e]
  12. if ok {
  13. me.items[e] = append(arr, handler)
  14. } else {
  15. me.items[e] = []TEventHandleFunc{handler}
  16. }
  17. }
  18. func (me *TEventDrivenModel) Raise(e string, args ...interface{}) {
  19. if handlers, ok := me.items[e]; ok {
  20. for _, it := range handlers {
  21. it(e, args...)
  22. }
  23. }
  24. }

IRaftClientService.go

管理所有的节点间rpc连接

  1. package client
  2. import (
  3. "learning/gooop/etcd/raft/config"
  4. "learning/gooop/etcd/raft/rpc"
  5. netrpc "net/rpc"
  6. "sync"
  7. )
  8. type tRaftClientService struct {
  9. cfg config.IRaftConfig
  10. rwmutex *sync.RWMutex
  11. clients map[string]IRaftClient
  12. }
  13. func NewRaftClientService(cfg config.IRaftConfig) IRaftClientService {
  14. it := new(tRaftClientService)
  15. it.init(cfg)
  16. return it
  17. }
  18. func (me *tRaftClientService) init(cfg config.IRaftConfig) {
  19. me.cfg = cfg
  20. me.rwmutex = new(sync.RWMutex)
  21. me.clients = make(map[string]IRaftClient)
  22. }
  23. func (me *tRaftClientService) Using(peerID string, action func(client rpc.IRaftRPC) error) error {
  24. // check client exists?
  25. me.rwmutex.RLock()
  26. it,ok := me.clients[peerID]
  27. if ok {
  28. return action(it)
  29. }
  30. var nodeCfg config.IRaftNodeConfig
  31. for _,it := range me.cfg.Nodes() {
  32. if it.ID() == peerID {
  33. nodeCfg = it
  34. break
  35. }
  36. }
  37. me.rwmutex.RUnlock()
  38. // dial to peer
  39. conn, err := netrpc.Dial("tcp", nodeCfg.Endpoint())
  40. if err != nil {
  41. return err
  42. }
  43. // to create new client
  44. me.rwmutex.Lock()
  45. defer me.rwmutex.Unlock()
  46. // recheck client
  47. _,ok = me.clients[peerID]
  48. if ok {
  49. defer conn.Close()
  50. return action(it)
  51. }
  52. // create new client
  53. return action(newRaftClient(nodeCfg, conn))
  54. }

IRaftClient.go

管理当前节点与某个节点间的rpc连接

  1. package client
  2. import "learning/gooop/etcd/raft/rpc"
  3. type IRaftClient interface {
  4. rpc.IRaftRPC
  5. iStateContext
  6. Ping(cmd *PingCmd, ret *PingRet) error
  7. }
  8. type PingCmd struct {
  9. SenderID string
  10. Timestamp int64
  11. }
  12. type PingRet struct {
  13. SenderID string
  14. Timestamp int64
  15. }

iClientState.go

基于状态模式的rpc连接状态接口

  1. package client
  2. import "learning/gooop/etcd/raft/rpc"
  3. type iClientState interface {
  4. rpc.IRaftRPC
  5. Start()
  6. Ping(cmd *PingCmd, ret *PingRet) error
  7. }

iStateContext.go

状态模式下的连接状态上下文接口

  1. package client
  2. import (
  3. "learning/gooop/etcd/raft/config"
  4. "net/rpc"
  5. )
  6. type iStateContext interface {
  7. Config() config.IRaftNodeConfig
  8. GetConn() *rpc.Client
  9. SetConn(client *rpc.Client)
  10. HandleStateChanged(state iClientState)
  11. }

tRaftClient.go

IRaftClient接口的具体实现,并实现iStateContext接口。

  1. package client
  2. import (
  3. "learning/gooop/etcd/raft/config"
  4. "net/rpc"
  5. rrpc "learning/gooop/etcd/raft/rpc"
  6. )
  7. type tRaftClient struct {
  8. cfg config.IRaftNodeConfig
  9. conn *rpc.Client
  10. state iClientState
  11. }
  12. func newRaftClient(cfg config.IRaftNodeConfig, conn *rpc.Client) IRaftClient {
  13. it := new(tRaftClient)
  14. it.init(cfg, conn)
  15. return it
  16. }
  17. func (me *tRaftClient) init(cfg config.IRaftNodeConfig, conn *rpc.Client) {
  18. me.cfg = cfg
  19. me.conn = conn
  20. }
  21. func (me *tRaftClient) Config() config.IRaftNodeConfig {
  22. return me.cfg
  23. }
  24. func (me *tRaftClient) GetConn() *rpc.Client {
  25. return me.conn
  26. }
  27. func (me *tRaftClient) SetConn(conn *rpc.Client) {
  28. me.conn = conn
  29. }
  30. func (me *tRaftClient) HandleStateChanged(state iClientState) {
  31. me.state = state
  32. state.Start()
  33. }
  34. func (me *tRaftClient) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {
  35. return me.state.Heartbeat(cmd, ret)
  36. }
  37. func (me *tRaftClient) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {
  38. return me.state.AppendLog(cmd, ret)
  39. }
  40. func (me *tRaftClient) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {
  41. return me.state.CommitLog(cmd, ret)
  42. }
  43. func (me *tRaftClient) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {
  44. return me.state.RequestVote(cmd, ret)
  45. }
  46. func (me *tRaftClient) Ping(cmd *PingCmd, ret *PingRet) error {
  47. return me.state.Ping(cmd, ret)
  48. }

tConnectedState.go

管理已连接状态的rpc连接

  • 定时Ping以检测连接状态
  • 基于事件驱动的逻辑编排
  • 基于读写分离的字段管理 ```go package client

import ( “learning/gooop/etcd/raft/model” “learning/gooop/etcd/raft/rpc” “learning/gooop/etcd/raft/timeout” “sync” “time” )

type tConnectedState struct { model.TEventDrivenModel context iStateContext

  1. mInitOnce sync.Once
  2. mStartOnce sync.Once
  3. // update: ceInit, ceDisposing
  4. mDisposedFlag bool

}

// trigger: init() // args: empty const ceInit = “connected.init”

// trigger: Start() // args: empty const ceStart = “connected.Start”

// trigger: // args: empty const ceDisposing = “connected.Disposing”

// trigger: whenStartThenBeginPing() // args: empty const cePingFailed = “connected.PingFailed”

func newConnectedState(ctx iStateContext) iClientState { it := new(tConnectedState) it.init(ctx) return it }

func (me *tConnectedState) init(ctx iStateContext) { me.mInitOnce.Do(func() { me.context = ctx me.initEventHandlers() me.Raise(ceInit) }) }

func (me *tConnectedState) initEventHandlers() { // write only logic me.hookEventsForDisposedFlag()

  1. // read only logic
  2. me.Hook(ceStart,
  3. me.whenStartThenBeginPing)
  4. me.Hook(cePingFailed,
  5. me.whenPingFailedThenSwitchToBrokenState)
  6. me.Hook(ceDisposing,
  7. me.whenDisposingThenCloseConn)

}

func (me *tConnectedState) Start() { me.mStartOnce.Do(func() { me.Raise(ceStart) }) }

func (me *tConnectedState) hookEventsForDisposedFlag() { me.Hook(ceInit, func(e string, args …interface{}) { me.mDisposedFlag = false })

  1. me.Hook(ceDisposing, func(e string, args ...interface{}) {
  2. me.mDisposedFlag = true
  3. })

}

func (me *tConnectedState) whenStartThenBeginPing( string, …interface{}) { go func() { cmd := &PingCmd{ SenderID: me.context.Config().ID(), Timestamp: time.Now().UnixNano(), } ret := &PingRet{} for range time.Tick(timeout.ClientPingInterval) { if me.mDisposedFlag { return }

  1. cmd.Timestamp = time.Now().UnixNano()
  2. err := me.Ping(cmd, ret)
  3. if err != nil {
  4. me.Raise(cePingFailed)
  5. }
  6. }
  7. }()

}

func (me *tConnectedState) whenPingFailedThenSwitchToBrokenState( string, …interface{}) { me.Raise(ceDisposing) me.context.HandleStateChanged(newBrokenState(me.context)) }

func (me *tConnectedState) whenDisposingThenCloseConn( string, …interface{}) { it := me.context.GetConn() if it != nil { it.Close() }

  1. me.context.SetConn(nil)

}

func (me tConnectedState) Heartbeat(cmd rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error { return me.context.GetConn().Call(“TRaftRPCServer.Heartbeat”, cmd, ret) }

func (me tConnectedState) AppendLog(cmd rpc.AppendLogCmd, ret *rpc.AppendLogRet) error { return me.context.GetConn().Call(“TRaftRPCServer.AppendLog”, cmd, ret) }

func (me tConnectedState) CommitLog(cmd rpc.CommitLogCmd, ret *rpc.CommitLogRet) error { return me.context.GetConn().Call(“TRaftRPCServer.CommitLog”, cmd, ret) }

func (me tConnectedState) RequestVote(cmd rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error { return me.context.GetConn().Call(“TRaftRPCServer.RequestVote”, cmd, ret) }

func (me tConnectedState) Ping(cmd PingCmd, ret *PingRet) error { return me.context.GetConn().Call(“TRaftRPCServer.Ping”, cmd, ret) }

  1. <a name="YeHX8"></a>
  2. # tBrokenState.go
  3. 管理已断开状态的rpc连接
  4. - 定时Dial以尝试重连接
  5. - 基于事件驱动的逻辑编排
  6. - 基于读写分离的字段管理
  7. ```go
  8. package client
  9. import (
  10. "errors"
  11. "learning/gooop/etcd/raft/model"
  12. rrpc "learning/gooop/etcd/raft/rpc"
  13. "learning/gooop/etcd/raft/timeout"
  14. "sync"
  15. "net/rpc"
  16. "time"
  17. )
  18. type tBrokenState struct {
  19. model.TEventDrivenModel
  20. context iStateContext
  21. mInitOnce sync.Once
  22. mStartOnce sync.Once
  23. mDisposedFlag bool
  24. }
  25. // trigger : init()
  26. // args: empty
  27. const beInit = "broken.init"
  28. // trigger: Start()
  29. // args: empty
  30. const beStart = "broken.Start"
  31. // trigger: whenStartThenBeginDial
  32. // args: *rpc.Client
  33. const beDialOK = "broken.DialOK"
  34. // trigger: whenDialOKThenSwitchToConnectedState
  35. // args: empty
  36. const beDisposing = "broken.Disposing"
  37. func newBrokenState(ctx iStateContext) iClientState {
  38. it := new(tBrokenState)
  39. it.init(ctx)
  40. return it
  41. }
  42. func (me *tBrokenState) init(ctx iStateContext) {
  43. me.mInitOnce.Do(func() {
  44. me.context = ctx
  45. me.initEventHandlers()
  46. me.Raise(beInit)
  47. })
  48. }
  49. func (me *tBrokenState) initEventHandlers() {
  50. // write only logic
  51. me.hookEventsForDisposedFlag()
  52. // read only logic
  53. me.Hook(beStart,
  54. me.whenStartThenBeginDial)
  55. me.Hook(beDialOK,
  56. me.whenDialOKThenSetConn)
  57. me.Hook(beDialOK,
  58. me.whenDialOKThenSwitchToConnectedState)
  59. }
  60. func (me *tBrokenState) hookEventsForDisposedFlag() {
  61. me.Hook(beInit, func(e string, args ...interface{}) {
  62. me.mDisposedFlag = false
  63. })
  64. me.Hook(beDisposing, func(e string, args ...interface{}) {
  65. me.mDisposedFlag = true
  66. })
  67. }
  68. func (me *tBrokenState) Start() {
  69. me.mStartOnce.Do(func() {
  70. me.Raise(beStart)
  71. })
  72. }
  73. func (me *tBrokenState) whenStartThenBeginDial(_ string, _ ...interface{}) {
  74. go func() {
  75. for !me.mDisposedFlag {
  76. conn, err := rpc.Dial("tcp", me.context.Config().Endpoint())
  77. if err == nil {
  78. me.Raise(beDialOK, conn)
  79. break
  80. } else {
  81. time.Sleep(timeout.ClientRedialInterval)
  82. }
  83. }
  84. }()
  85. }
  86. func (me *tBrokenState) whenDialOKThenSetConn(_ string, args ...interface{}) {
  87. conn := args[0].(*rpc.Client)
  88. me.context.SetConn(conn)
  89. }
  90. func (me *tBrokenState) whenDialOKThenSwitchToConnectedState(_ string, _ ...interface{}) {
  91. me.Raise(beDisposing)
  92. me.context.HandleStateChanged(newConnectedState(me.context))
  93. }
  94. func (me *tBrokenState) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {
  95. return gErrorConnectionBroken
  96. }
  97. func (me *tBrokenState) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {
  98. return gErrorConnectionBroken
  99. }
  100. func (me *tBrokenState) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {
  101. return gErrorConnectionBroken
  102. }
  103. func (me *tBrokenState) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {
  104. return gErrorConnectionBroken
  105. }
  106. func (me *tBrokenState) Ping(cmd *PingCmd, ret *PingRet) error {
  107. return gErrorConnectionBroken
  108. }
  109. var gErrorConnectionBroken = errors.New("peer connection broken")

(未完待续)