缘起

最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之

raft分布式一致性算法

  1. 分布式存储系统通常会通过维护多个副本来进行容错,
  2. 以提高系统的可用性。
  3. 这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?
  4. Raft算法把问题分解成了四个子问题:
  5. 1. 领袖选举(leader election)、
  6. 2. 日志复制(log replication)、
  7. 3. 安全性(safety
  8. 4. 成员关系变化(membership changes
  9. 这几个子问题。
  10. 源码gitee地址:
  11. https://gitee.com/ioly/learning.gooop

目标

  • 根据raft协议,实现高可用分布式强一致的kv存储

子目标(Day 11)

  • 虽然Leader State还有细节没处理完,但应该能启动并提供基本服务了
  • 添加外围功能,为首次“点火”做准备:
    • config/tRaftConfig:从本地json文件读取集群节点配置,提供IRaftConfig/IRaftNodeConfig的实现
    • lsm/tRaftLSMImplement: 提供对顶层接口IRaftLSM的实现,将“配置/kv存储/节点通讯”三大块粘合起来
    • server/IRaftKVServer:server启动器接口
    • server/tRaftKVServer: server启动器的实现,监听raft rpc和kv rpc

config/tRaftConfig.go

从本地json文件读取集群节点配置,提供IRaftConfig/IRaftNodeConfig的实现

  1. package config
  2. import (
  3. "encoding/json"
  4. "os"
  5. )
  6. type tRaftConfig struct {
  7. ID string
  8. Nodes []*tRaftNodeConfig
  9. }
  10. type tRaftNodeConfig struct {
  11. ID string
  12. Endpoint string
  13. }
  14. func (me *tRaftConfig) GetID() string {
  15. return me.ID
  16. }
  17. func (me *tRaftConfig) GetNodes() []IRaftNodeConfig {
  18. a := make([]IRaftNodeConfig, len(me.Nodes))
  19. for i,it := range me.Nodes {
  20. a[i] = it
  21. }
  22. return a
  23. }
  24. func (me *tRaftNodeConfig) GetID() string {
  25. return me.ID
  26. }
  27. func (me *tRaftNodeConfig) GetEndpoint() string {
  28. return me.Endpoint
  29. }
  30. func LoadJSONFile(file string) IRaftConfig {
  31. data, err := os.ReadFile(file)
  32. if err != nil {
  33. panic(err)
  34. }
  35. c := new(tRaftConfig)
  36. err = json.Unmarshal(data, c)
  37. if err != nil {
  38. panic(err)
  39. }
  40. return c
  41. }

lsm/tRaftLSMImplement.go

提供对顶层接口IRaftLSM的实现,将“配置/kv存储/节点通讯”三大块粘合起来,并添加诊断日志。

  1. package lsm
  2. import (
  3. "learning/gooop/etcd/raft/common"
  4. "learning/gooop/etcd/raft/config"
  5. "learning/gooop/etcd/raft/logger"
  6. "learning/gooop/etcd/raft/rpc"
  7. "learning/gooop/etcd/raft/rpc/client"
  8. "learning/gooop/etcd/raft/store"
  9. "sync"
  10. )
  11. type tRaftLSMImplement struct {
  12. tEventDrivenModel
  13. mInitOnce sync.Once
  14. mConfig config.IRaftConfig
  15. mStore store.ILogStore
  16. mClientService client.IRaftClientService
  17. mState IRaftState
  18. }
  19. // trigger: init()
  20. // args: empty
  21. const meInit = "lsm.Init"
  22. // trigger: HandleStateChanged()
  23. // args: IRaftState
  24. const meStateChanged = "lsm.StateChnaged"
  25. func (me *tRaftLSMImplement) init() {
  26. me.mInitOnce.Do(func() {
  27. me.initEventHandlers()
  28. me.raise(meInit)
  29. })
  30. }
  31. func (me *tRaftLSMImplement) initEventHandlers() {
  32. // write only
  33. me.hookEventsForConfig()
  34. me.hookEventsForStore()
  35. me.hookEventsForPeerService()
  36. me.hookEventsForState()
  37. }
  38. func (me *tRaftLSMImplement) hookEventsForConfig() {
  39. me.hook(meInit, func(e string, args ...interface{}) {
  40. logger.Logf("tRaftLSMImplement.init, ConfigFile = %v", common.ConfigFile)
  41. me.mConfig = config.LoadJSONFile(common.ConfigFile)
  42. })
  43. }
  44. func (me *tRaftLSMImplement) hookEventsForStore() {
  45. me.hook(meInit, func(e string, args ...interface{}) {
  46. logger.Logf("tRaftLSMImplement.init, DataFile = %v", common.DataFile)
  47. err, db := store.NewBoltStore(common.DataFile)
  48. if err != nil {
  49. panic(err)
  50. }
  51. me.mStore = db
  52. })
  53. }
  54. func (me *tRaftLSMImplement) hookEventsForPeerService() {
  55. me.hook(meInit, func(e string, args ...interface{}) {
  56. me.mClientService = client.NewRaftClientService(me.mConfig)
  57. })
  58. }
  59. func (me *tRaftLSMImplement) hookEventsForState() {
  60. me.hook(meInit, func(e string, args ...interface{}) {
  61. me.mState = newFollowerState(me, me.mStore.LastCommittedTerm())
  62. me.mState.Start()
  63. })
  64. me.hook(meStateChanged, func(e string, args ...interface{}) {
  65. state := args[0].(IRaftState)
  66. logger.Logf("tRaftLSMImplement.StateChanged, %v", state.Role())
  67. me.mState = state
  68. state.Start()
  69. })
  70. }
  71. func (me *tRaftLSMImplement) Config() config.IRaftConfig {
  72. return me.mConfig
  73. }
  74. func (me *tRaftLSMImplement) Store() store.ILogStore {
  75. return me.mStore
  76. }
  77. func (me *tRaftLSMImplement) HandleStateChanged(state IRaftState) {
  78. me.raise(meStateChanged, state)
  79. }
  80. func (me *tRaftLSMImplement) RaftClientService() client.IRaftClientService {
  81. return me.mClientService
  82. }
  83. func (me *tRaftLSMImplement) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
  84. state := me.mState
  85. e := state.Heartbeat(cmd, ret)
  86. logger.Logf("tRaftLSMImplement.Heartbeat, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e)
  87. return e
  88. }
  89. func (me *tRaftLSMImplement) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
  90. state := me.mState
  91. e := state.AppendLog(cmd, ret)
  92. logger.Logf("tRaftLSMImplement.AppendLog, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e)
  93. return e
  94. }
  95. func (me *tRaftLSMImplement) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
  96. state := me.mState
  97. e := state.CommitLog(cmd, ret)
  98. logger.Logf("tRaftLSMImplement.CommitLog, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e)
  99. return e
  100. }
  101. func (me *tRaftLSMImplement) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
  102. state := me.mState
  103. e := state.RequestVote(cmd, ret)
  104. logger.Logf("tRaftLSMImplement.RequestVote, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e)
  105. return e
  106. }
  107. func (me *tRaftLSMImplement) ExecuteKVCmd(cmd *rpc.KVCmd, ret *rpc.KVRet) error {
  108. state := me.mState
  109. e := state.ExecuteKVCmd(cmd, ret)
  110. logger.Logf("tRaftLSMImplement.ExecuteKVCmd, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e)
  111. return e
  112. }
  113. func (me *tRaftLSMImplement) State() IRaftState {
  114. return me.mState
  115. }
  116. func NewRaftLSM() IRaftLSM {
  117. it := new(tRaftLSMImplement)
  118. it.init()
  119. return it
  120. }

server/IRaftKVServer.go

server启动器接口

  1. package server
  2. type IRaftKVServer interface {
  3. BeginServeTCP(port int) error
  4. }

server/tRaftKVServer.go

server启动器的实现,监听raft rpc和kv rpc

  1. package server
  2. import (
  3. "fmt"
  4. "learning/gooop/etcd/raft/lsm"
  5. rrpc "learning/gooop/etcd/raft/rpc"
  6. "learning/gooop/saga/mqs/logger"
  7. "net"
  8. "net/rpc"
  9. "time"
  10. )
  11. type tRaftKVServer int
  12. func (me *tRaftKVServer) BeginServeTCP(port int) error {
  13. logger.Logf("tRaftKVServer.BeginServeTCP, starting, port=%v", port)
  14. // resolve address
  15. addy, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", port))
  16. if err != nil {
  17. return err
  18. }
  19. // create raft lsm singleton
  20. raftLSM := lsm.NewRaftLSM()
  21. // register raft rpc server
  22. rserver := &RaftRPCServer {
  23. mRaftLSM : raftLSM,
  24. }
  25. err = rpc.Register(rserver)
  26. if err != nil {
  27. return err
  28. }
  29. // register kv rpc server
  30. kserver := &KVStoreRPCServer{
  31. mRaftLSM: raftLSM,
  32. }
  33. err = rpc.Register(kserver)
  34. if err != nil {
  35. return err
  36. }
  37. inbound, err := net.ListenTCP("tcp", addy)
  38. if err != nil {
  39. return err
  40. }
  41. go rpc.Accept(inbound)
  42. logger.Logf("tRaftKVServer.BeginServeTCP, service ready at port=%v", port)
  43. return nil
  44. }
  45. // RaftRPCServer exposes a raft rpc service
  46. type RaftRPCServer struct {
  47. mRaftLSM lsm.IRaftLSM
  48. }
  49. // Heartbeat leader to follower
  50. func (me *RaftRPCServer) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {
  51. e := me.mRaftLSM.Heartbeat(cmd, ret)
  52. logger.Logf("RaftRPCServer.Heartbeat, cmd=%v, ret=%v, e=%v", cmd, ret, e)
  53. return e
  54. }
  55. // AppendLog leader to follower
  56. func (me *RaftRPCServer) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {
  57. e := me.mRaftLSM.AppendLog(cmd, ret)
  58. logger.Logf("RaftRPCServer.AppendLog, cmd=%v, ret=%v, e=%v", cmd, ret, e)
  59. return e
  60. }
  61. // CommitLog leader to follower
  62. func (me *RaftRPCServer) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {
  63. e := me.mRaftLSM.CommitLog(cmd, ret)
  64. logger.Logf("RaftRPCServer.CommitLog, cmd=%v, ret=%v, e=%v", cmd, ret, e)
  65. return e
  66. }
  67. // RequestVote candidate to others
  68. func (me *RaftRPCServer) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {
  69. e := me.mRaftLSM.RequestVote(cmd, ret)
  70. logger.Logf("RaftRPCServer.RequestVote, cmd=%v, ret=%v, e=%v", cmd, ret, e)
  71. return e
  72. }
  73. // Ping to keep alive
  74. func (me *RaftRPCServer) Ping(cmd *rrpc.PingCmd, ret *rrpc.PingRet) error {
  75. ret.SenderID = me.mRaftLSM.Config().GetID()
  76. ret.Timestamp = time.Now().UnixNano()
  77. logger.Logf("RaftRPCServer.Ping, cmd=%v, ret=%v", cmd, ret)
  78. return nil
  79. }
  80. // KVStoreRPCServer expose a kv storage service
  81. type KVStoreRPCServer struct {
  82. mRaftLSM lsm.IRaftLSM
  83. }
  84. // ExecuteKVCmd leader to follower
  85. func (me *KVStoreRPCServer) ExecuteKVCmd(cmd *rrpc.KVCmd, ret *rrpc.KVRet) error {
  86. e := me.mRaftLSM.ExecuteKVCmd(cmd, ret)
  87. logger.Logf("KVStoreRPCServer.ExecuteKVCmd, cmd=%v, ret=%v, e=%v", cmd, ret, e)
  88. return e
  89. }

(未完待续)