缘起
最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
raft分布式一致性算法
分布式存储系统通常会通过维护多个副本来进行容错,以提高系统的可用性。这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?Raft算法把问题分解成了四个子问题:1. 领袖选举(leader election)、2. 日志复制(log replication)、3. 安全性(safety)4. 成员关系变化(membership changes)这几个子问题。源码gitee地址:https://gitee.com/ioly/learning.gooop
目标
- 根据raft协议,实现高可用分布式强一致的kv存储
子目标(Day 11)
- 虽然Leader State还有细节没处理完,但应该能启动并提供基本服务了
- 添加外围功能,为首次“点火”做准备:
- config/tRaftConfig:从本地json文件读取集群节点配置,提供IRaftConfig/IRaftNodeConfig的实现
- lsm/tRaftLSMImplement: 提供对顶层接口IRaftLSM的实现,将“配置/kv存储/节点通讯”三大块粘合起来
- server/IRaftKVServer:server启动器接口
- server/tRaftKVServer: server启动器的实现,监听raft rpc和kv rpc
config/tRaftConfig.go
从本地json文件读取集群节点配置,提供IRaftConfig/IRaftNodeConfig的实现
package configimport ("encoding/json""os")type tRaftConfig struct {ID stringNodes []*tRaftNodeConfig}type tRaftNodeConfig struct {ID stringEndpoint string}func (me *tRaftConfig) GetID() string {return me.ID}func (me *tRaftConfig) GetNodes() []IRaftNodeConfig {a := make([]IRaftNodeConfig, len(me.Nodes))for i,it := range me.Nodes {a[i] = it}return a}func (me *tRaftNodeConfig) GetID() string {return me.ID}func (me *tRaftNodeConfig) GetEndpoint() string {return me.Endpoint}func LoadJSONFile(file string) IRaftConfig {data, err := os.ReadFile(file)if err != nil {panic(err)}c := new(tRaftConfig)err = json.Unmarshal(data, c)if err != nil {panic(err)}return c}
lsm/tRaftLSMImplement.go
提供对顶层接口IRaftLSM的实现,将“配置/kv存储/节点通讯”三大块粘合起来,并添加诊断日志。
package lsmimport ("learning/gooop/etcd/raft/common""learning/gooop/etcd/raft/config""learning/gooop/etcd/raft/logger""learning/gooop/etcd/raft/rpc""learning/gooop/etcd/raft/rpc/client""learning/gooop/etcd/raft/store""sync")type tRaftLSMImplement struct {tEventDrivenModelmInitOnce sync.OncemConfig config.IRaftConfigmStore store.ILogStoremClientService client.IRaftClientServicemState IRaftState}// trigger: init()// args: emptyconst meInit = "lsm.Init"// trigger: HandleStateChanged()// args: IRaftStateconst meStateChanged = "lsm.StateChnaged"func (me *tRaftLSMImplement) init() {me.mInitOnce.Do(func() {me.initEventHandlers()me.raise(meInit)})}func (me *tRaftLSMImplement) initEventHandlers() {// write onlyme.hookEventsForConfig()me.hookEventsForStore()me.hookEventsForPeerService()me.hookEventsForState()}func (me *tRaftLSMImplement) hookEventsForConfig() {me.hook(meInit, func(e string, args ...interface{}) {logger.Logf("tRaftLSMImplement.init, ConfigFile = %v", common.ConfigFile)me.mConfig = config.LoadJSONFile(common.ConfigFile)})}func (me *tRaftLSMImplement) hookEventsForStore() {me.hook(meInit, func(e string, args ...interface{}) {logger.Logf("tRaftLSMImplement.init, DataFile = %v", common.DataFile)err, db := store.NewBoltStore(common.DataFile)if err != nil {panic(err)}me.mStore = db})}func (me *tRaftLSMImplement) hookEventsForPeerService() {me.hook(meInit, func(e string, args ...interface{}) {me.mClientService = client.NewRaftClientService(me.mConfig)})}func (me *tRaftLSMImplement) hookEventsForState() {me.hook(meInit, func(e string, args ...interface{}) {me.mState = newFollowerState(me, me.mStore.LastCommittedTerm())me.mState.Start()})me.hook(meStateChanged, func(e string, args ...interface{}) {state := args[0].(IRaftState)logger.Logf("tRaftLSMImplement.StateChanged, %v", state.Role())me.mState = statestate.Start()})}func (me *tRaftLSMImplement) Config() config.IRaftConfig {return me.mConfig}func (me *tRaftLSMImplement) Store() store.ILogStore {return me.mStore}func (me *tRaftLSMImplement) HandleStateChanged(state IRaftState) {me.raise(meStateChanged, state)}func (me *tRaftLSMImplement) RaftClientService() client.IRaftClientService {return me.mClientService}func (me *tRaftLSMImplement) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {state := me.mStatee := state.Heartbeat(cmd, ret)logger.Logf("tRaftLSMImplement.Heartbeat, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e)return e}func (me *tRaftLSMImplement) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {state := me.mStatee := state.AppendLog(cmd, ret)logger.Logf("tRaftLSMImplement.AppendLog, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e)return e}func (me *tRaftLSMImplement) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {state := me.mStatee := state.CommitLog(cmd, ret)logger.Logf("tRaftLSMImplement.CommitLog, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e)return e}func (me *tRaftLSMImplement) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {state := me.mStatee := state.RequestVote(cmd, ret)logger.Logf("tRaftLSMImplement.RequestVote, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e)return e}func (me *tRaftLSMImplement) ExecuteKVCmd(cmd *rpc.KVCmd, ret *rpc.KVRet) error {state := me.mStatee := state.ExecuteKVCmd(cmd, ret)logger.Logf("tRaftLSMImplement.ExecuteKVCmd, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e)return e}func (me *tRaftLSMImplement) State() IRaftState {return me.mState}func NewRaftLSM() IRaftLSM {it := new(tRaftLSMImplement)it.init()return it}
server/IRaftKVServer.go
server启动器接口
package servertype IRaftKVServer interface {BeginServeTCP(port int) error}
server/tRaftKVServer.go
server启动器的实现,监听raft rpc和kv rpc
package serverimport ("fmt""learning/gooop/etcd/raft/lsm"rrpc "learning/gooop/etcd/raft/rpc""learning/gooop/saga/mqs/logger""net""net/rpc""time")type tRaftKVServer intfunc (me *tRaftKVServer) BeginServeTCP(port int) error {logger.Logf("tRaftKVServer.BeginServeTCP, starting, port=%v", port)// resolve addressaddy, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", port))if err != nil {return err}// create raft lsm singletonraftLSM := lsm.NewRaftLSM()// register raft rpc serverrserver := &RaftRPCServer {mRaftLSM : raftLSM,}err = rpc.Register(rserver)if err != nil {return err}// register kv rpc serverkserver := &KVStoreRPCServer{mRaftLSM: raftLSM,}err = rpc.Register(kserver)if err != nil {return err}inbound, err := net.ListenTCP("tcp", addy)if err != nil {return err}go rpc.Accept(inbound)logger.Logf("tRaftKVServer.BeginServeTCP, service ready at port=%v", port)return nil}// RaftRPCServer exposes a raft rpc servicetype RaftRPCServer struct {mRaftLSM lsm.IRaftLSM}// Heartbeat leader to followerfunc (me *RaftRPCServer) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {e := me.mRaftLSM.Heartbeat(cmd, ret)logger.Logf("RaftRPCServer.Heartbeat, cmd=%v, ret=%v, e=%v", cmd, ret, e)return e}// AppendLog leader to followerfunc (me *RaftRPCServer) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {e := me.mRaftLSM.AppendLog(cmd, ret)logger.Logf("RaftRPCServer.AppendLog, cmd=%v, ret=%v, e=%v", cmd, ret, e)return e}// CommitLog leader to followerfunc (me *RaftRPCServer) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {e := me.mRaftLSM.CommitLog(cmd, ret)logger.Logf("RaftRPCServer.CommitLog, cmd=%v, ret=%v, e=%v", cmd, ret, e)return e}// RequestVote candidate to othersfunc (me *RaftRPCServer) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {e := me.mRaftLSM.RequestVote(cmd, ret)logger.Logf("RaftRPCServer.RequestVote, cmd=%v, ret=%v, e=%v", cmd, ret, e)return e}// Ping to keep alivefunc (me *RaftRPCServer) Ping(cmd *rrpc.PingCmd, ret *rrpc.PingRet) error {ret.SenderID = me.mRaftLSM.Config().GetID()ret.Timestamp = time.Now().UnixNano()logger.Logf("RaftRPCServer.Ping, cmd=%v, ret=%v", cmd, ret)return nil}// KVStoreRPCServer expose a kv storage servicetype KVStoreRPCServer struct {mRaftLSM lsm.IRaftLSM}// ExecuteKVCmd leader to followerfunc (me *KVStoreRPCServer) ExecuteKVCmd(cmd *rrpc.KVCmd, ret *rrpc.KVRet) error {e := me.mRaftLSM.ExecuteKVCmd(cmd, ret)logger.Logf("KVStoreRPCServer.ExecuteKVCmd, cmd=%v, ret=%v, e=%v", cmd, ret, e)return e}
(未完待续)
