缘起
最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop
raft分布式一致性算法
分布式存储系统通常会通过维护多个副本来进行容错,以提高系统的可用性。这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?Raft算法把问题分解成了四个子问题:1. 领袖选举(leader election)、2. 日志复制(log replication)、3. 安全性(safety)4. 成员关系变化(membership changes)这几个子问题。
目标
- 根据raft协议,实现高可用分布式强一致的kv存储
子目标(Day 7)
- 实现各raft节点之间的rpc通讯
- 定义IRaftClientService服务,管理所有节点的tcp长连接
- 定义IRaftClient接口,封装节点间的rpc调用
- 基于状态模式,区分已连接状态和已断开状态
- 基于事件驱动的逻辑编排
- 基于读写分离的字段管理
设计
- model/IEventDirvenModel: 事件驱动的逻辑编排基类
- IRaftClientService:管理所有的节点间rpc连接
- IRaftClient:管理当前节点与某个节点间的rpc连接
- iClientState:基于状态模式的rpc连接状态接口
- iStateContext:状态模式下的连接状态上下文接口
- tRaftClient:IRaftClient接口的具体实现,并实现iStateContext接口。
- tConnectedState: 管理已连接状态的rpc连接
- 定时Ping以检测连接状态
- 基于事件驱动的逻辑编排
- 基于读写分离的字段管理
- tBrokenState:管理已断开状态的rpc连接
- 定时Dial以尝试重连接
- 基于事件驱动的逻辑编排
- 基于读写分离的字段管理
model/IEventDirvenModel.go
事件驱动的逻辑编排基类
package modeltype TEventHandleFunc func(e string, args ...interface{})type IEventDrivenModel interface {hook(e string, handleFunc TEventHandleFunc)raise(e string, args ...interface{})}type TEventDrivenModel struct {items map[string][]TEventHandleFunc}func (me *TEventDrivenModel) Hook(e string, handler TEventHandleFunc) {arr, ok := me.items[e]if ok {me.items[e] = append(arr, handler)} else {me.items[e] = []TEventHandleFunc{handler}}}func (me *TEventDrivenModel) Raise(e string, args ...interface{}) {if handlers, ok := me.items[e]; ok {for _, it := range handlers {it(e, args...)}}}
IRaftClientService.go
管理所有的节点间rpc连接
package clientimport ("learning/gooop/etcd/raft/config""learning/gooop/etcd/raft/rpc"netrpc "net/rpc""sync")type tRaftClientService struct {cfg config.IRaftConfigrwmutex *sync.RWMutexclients map[string]IRaftClient}func NewRaftClientService(cfg config.IRaftConfig) IRaftClientService {it := new(tRaftClientService)it.init(cfg)return it}func (me *tRaftClientService) init(cfg config.IRaftConfig) {me.cfg = cfgme.rwmutex = new(sync.RWMutex)me.clients = make(map[string]IRaftClient)}func (me *tRaftClientService) Using(peerID string, action func(client rpc.IRaftRPC) error) error {// check client exists?me.rwmutex.RLock()it,ok := me.clients[peerID]if ok {return action(it)}var nodeCfg config.IRaftNodeConfigfor _,it := range me.cfg.Nodes() {if it.ID() == peerID {nodeCfg = itbreak}}me.rwmutex.RUnlock()// dial to peerconn, err := netrpc.Dial("tcp", nodeCfg.Endpoint())if err != nil {return err}// to create new clientme.rwmutex.Lock()defer me.rwmutex.Unlock()// recheck client_,ok = me.clients[peerID]if ok {defer conn.Close()return action(it)}// create new clientreturn action(newRaftClient(nodeCfg, conn))}
IRaftClient.go
管理当前节点与某个节点间的rpc连接
package clientimport "learning/gooop/etcd/raft/rpc"type IRaftClient interface {rpc.IRaftRPCiStateContextPing(cmd *PingCmd, ret *PingRet) error}type PingCmd struct {SenderID stringTimestamp int64}type PingRet struct {SenderID stringTimestamp int64}
iClientState.go
基于状态模式的rpc连接状态接口
package clientimport "learning/gooop/etcd/raft/rpc"type iClientState interface {rpc.IRaftRPCStart()Ping(cmd *PingCmd, ret *PingRet) error}
iStateContext.go
状态模式下的连接状态上下文接口
package clientimport ("learning/gooop/etcd/raft/config""net/rpc")type iStateContext interface {Config() config.IRaftNodeConfigGetConn() *rpc.ClientSetConn(client *rpc.Client)HandleStateChanged(state iClientState)}
tRaftClient.go
IRaftClient接口的具体实现,并实现iStateContext接口。
package clientimport ("learning/gooop/etcd/raft/config""net/rpc"rrpc "learning/gooop/etcd/raft/rpc")type tRaftClient struct {cfg config.IRaftNodeConfigconn *rpc.Clientstate iClientState}func newRaftClient(cfg config.IRaftNodeConfig, conn *rpc.Client) IRaftClient {it := new(tRaftClient)it.init(cfg, conn)return it}func (me *tRaftClient) init(cfg config.IRaftNodeConfig, conn *rpc.Client) {me.cfg = cfgme.conn = conn}func (me *tRaftClient) Config() config.IRaftNodeConfig {return me.cfg}func (me *tRaftClient) GetConn() *rpc.Client {return me.conn}func (me *tRaftClient) SetConn(conn *rpc.Client) {me.conn = conn}func (me *tRaftClient) HandleStateChanged(state iClientState) {me.state = statestate.Start()}func (me *tRaftClient) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {return me.state.Heartbeat(cmd, ret)}func (me *tRaftClient) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {return me.state.AppendLog(cmd, ret)}func (me *tRaftClient) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {return me.state.CommitLog(cmd, ret)}func (me *tRaftClient) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {return me.state.RequestVote(cmd, ret)}func (me *tRaftClient) Ping(cmd *PingCmd, ret *PingRet) error {return me.state.Ping(cmd, ret)}
tConnectedState.go
管理已连接状态的rpc连接
- 定时Ping以检测连接状态
- 基于事件驱动的逻辑编排
- 基于读写分离的字段管理 ```go package client
import ( “learning/gooop/etcd/raft/model” “learning/gooop/etcd/raft/rpc” “learning/gooop/etcd/raft/timeout” “sync” “time” )
type tConnectedState struct { model.TEventDrivenModel context iStateContext
mInitOnce sync.OncemStartOnce sync.Once// update: ceInit, ceDisposingmDisposedFlag bool
}
// trigger: init() // args: empty const ceInit = “connected.init”
// trigger: Start() // args: empty const ceStart = “connected.Start”
// trigger: // args: empty const ceDisposing = “connected.Disposing”
// trigger: whenStartThenBeginPing() // args: empty const cePingFailed = “connected.PingFailed”
func newConnectedState(ctx iStateContext) iClientState { it := new(tConnectedState) it.init(ctx) return it }
func (me *tConnectedState) init(ctx iStateContext) { me.mInitOnce.Do(func() { me.context = ctx me.initEventHandlers() me.Raise(ceInit) }) }
func (me *tConnectedState) initEventHandlers() { // write only logic me.hookEventsForDisposedFlag()
// read only logicme.Hook(ceStart,me.whenStartThenBeginPing)me.Hook(cePingFailed,me.whenPingFailedThenSwitchToBrokenState)me.Hook(ceDisposing,me.whenDisposingThenCloseConn)
}
func (me *tConnectedState) Start() { me.mStartOnce.Do(func() { me.Raise(ceStart) }) }
func (me *tConnectedState) hookEventsForDisposedFlag() { me.Hook(ceInit, func(e string, args …interface{}) { me.mDisposedFlag = false })
me.Hook(ceDisposing, func(e string, args ...interface{}) {me.mDisposedFlag = true})
}
func (me *tConnectedState) whenStartThenBeginPing( string, …interface{}) { go func() { cmd := &PingCmd{ SenderID: me.context.Config().ID(), Timestamp: time.Now().UnixNano(), } ret := &PingRet{} for range time.Tick(timeout.ClientPingInterval) { if me.mDisposedFlag { return }
cmd.Timestamp = time.Now().UnixNano()err := me.Ping(cmd, ret)if err != nil {me.Raise(cePingFailed)}}}()
}
func (me *tConnectedState) whenPingFailedThenSwitchToBrokenState( string, …interface{}) { me.Raise(ceDisposing) me.context.HandleStateChanged(newBrokenState(me.context)) }
func (me *tConnectedState) whenDisposingThenCloseConn( string, …interface{}) { it := me.context.GetConn() if it != nil { it.Close() }
me.context.SetConn(nil)
}
func (me tConnectedState) Heartbeat(cmd rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error { return me.context.GetConn().Call(“TRaftRPCServer.Heartbeat”, cmd, ret) }
func (me tConnectedState) AppendLog(cmd rpc.AppendLogCmd, ret *rpc.AppendLogRet) error { return me.context.GetConn().Call(“TRaftRPCServer.AppendLog”, cmd, ret) }
func (me tConnectedState) CommitLog(cmd rpc.CommitLogCmd, ret *rpc.CommitLogRet) error { return me.context.GetConn().Call(“TRaftRPCServer.CommitLog”, cmd, ret) }
func (me tConnectedState) RequestVote(cmd rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error { return me.context.GetConn().Call(“TRaftRPCServer.RequestVote”, cmd, ret) }
func (me tConnectedState) Ping(cmd PingCmd, ret *PingRet) error { return me.context.GetConn().Call(“TRaftRPCServer.Ping”, cmd, ret) }
<a name="YeHX8"></a># tBrokenState.go管理已断开状态的rpc连接- 定时Dial以尝试重连接- 基于事件驱动的逻辑编排- 基于读写分离的字段管理```gopackage clientimport ("errors""learning/gooop/etcd/raft/model"rrpc "learning/gooop/etcd/raft/rpc""learning/gooop/etcd/raft/timeout""sync""net/rpc""time")type tBrokenState struct {model.TEventDrivenModelcontext iStateContextmInitOnce sync.OncemStartOnce sync.OncemDisposedFlag bool}// trigger : init()// args: emptyconst beInit = "broken.init"// trigger: Start()// args: emptyconst beStart = "broken.Start"// trigger: whenStartThenBeginDial// args: *rpc.Clientconst beDialOK = "broken.DialOK"// trigger: whenDialOKThenSwitchToConnectedState// args: emptyconst beDisposing = "broken.Disposing"func newBrokenState(ctx iStateContext) iClientState {it := new(tBrokenState)it.init(ctx)return it}func (me *tBrokenState) init(ctx iStateContext) {me.mInitOnce.Do(func() {me.context = ctxme.initEventHandlers()me.Raise(beInit)})}func (me *tBrokenState) initEventHandlers() {// write only logicme.hookEventsForDisposedFlag()// read only logicme.Hook(beStart,me.whenStartThenBeginDial)me.Hook(beDialOK,me.whenDialOKThenSetConn)me.Hook(beDialOK,me.whenDialOKThenSwitchToConnectedState)}func (me *tBrokenState) hookEventsForDisposedFlag() {me.Hook(beInit, func(e string, args ...interface{}) {me.mDisposedFlag = false})me.Hook(beDisposing, func(e string, args ...interface{}) {me.mDisposedFlag = true})}func (me *tBrokenState) Start() {me.mStartOnce.Do(func() {me.Raise(beStart)})}func (me *tBrokenState) whenStartThenBeginDial(_ string, _ ...interface{}) {go func() {for !me.mDisposedFlag {conn, err := rpc.Dial("tcp", me.context.Config().Endpoint())if err == nil {me.Raise(beDialOK, conn)break} else {time.Sleep(timeout.ClientRedialInterval)}}}()}func (me *tBrokenState) whenDialOKThenSetConn(_ string, args ...interface{}) {conn := args[0].(*rpc.Client)me.context.SetConn(conn)}func (me *tBrokenState) whenDialOKThenSwitchToConnectedState(_ string, _ ...interface{}) {me.Raise(beDisposing)me.context.HandleStateChanged(newConnectedState(me.context))}func (me *tBrokenState) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {return gErrorConnectionBroken}func (me *tBrokenState) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {return gErrorConnectionBroken}func (me *tBrokenState) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {return gErrorConnectionBroken}func (me *tBrokenState) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {return gErrorConnectionBroken}func (me *tBrokenState) Ping(cmd *PingCmd, ret *PingRet) error {return gErrorConnectionBroken}var gErrorConnectionBroken = errors.New("peer connection broken")
(未完待续)
