缘起
最近阅读<
本系列笔记拟采用golang练习之
Saga模式
- saga模式将分布式长事务切分为一系列独立短事务
- 每个短事务是可通过补偿动作进行撤销的
- 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用 ``` Saga由一系列的子事务“Ti”组成, 每个Ti都有对应的补偿“Ci”, 当Ti出现问题时Ci用于处理Ti执行带来的问题。
可以通过下面的两个公式理解Saga模式。 T = T1 T2 … Tn T = TCT
Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务, 而应该将事务切分为一组按序依次提交的短事务, Saga模式满足ACD(原子性、一致性、持久性)特征。
摘自 <
<a name="fGPkq"></a># 目标- 为实现saga模式的分布式事务, 先撸一个pub/sub事务消息队列服务- 事务消息队列服务的功能性要求- 消息不会丢失: 消息的持久化- 消息的唯一性: 要求每个消息有全局ID和子事务ID- 确保投递成功: 投递队列持久化, 投递状态持久化, 失败重试<a name="naUyZ"></a># 子目标(Day 3)- 有序,可靠地投递消息- 有序: 每个订阅者绑定独立的投递worker, 按时间戳投递- 可靠: 投递状态的持久化<a name="XyCWB"></a># 设计- QueuedMsg: 把投递队列的消息设计得胖一点, 以减少join的使用- IEventBus: 设计一个内部消息总线, 以减少逻辑耦合- tDeliveryService: 投递服务, 管理若干个投递worker- tWorkerInfo: 投递worker的初始化参数- tDeliveryWorker: 投递worker(未完成)<a name="HbILr"></a># QueuedMsg.go把待投递消息设计得胖一点, 以减少join的使用```gopackage modelstype QueuedMsg struct {ID intClientID stringNotifyURL stringMsgID intGlobalID stringSubID stringSenderID stringCreateTime int64Topic stringContent stringStatusFlag intFailedCount int}
IEventBus.go
设计一个内部消息总线, 以减少逻辑耦合
package eventbusimport "sync"type EventHandleFunc func(e string, args interface{})type IEventBus interface {Pub(e string, args interface{})Sub(e string, handler EventHandleFunc)}type tEventBus struct {rwmutex *sync.RWMutexitems map[string][]EventHandleFunc}func newEventBus() IEventBus {it := new(tEventBus)it.init()return it}func (me *tEventBus) init() {me.rwmutex = new(sync.RWMutex)me.items = make(map[string][]EventHandleFunc)}func (me *tEventBus) Pub(e string, args interface{}) {me.rwmutex.RLock()defer me.rwmutex.RUnlock()handlers,ok := me.items[e]if ok {for _,it := range handlers {go it(e, args)}}}func (me *tEventBus) Sub(e string, handler EventHandleFunc) {me.rwmutex.Lock()defer me.rwmutex.Unlock()handlers,ok := me.items[e]if ok {me.items[e] = append(handlers, handler)} else {me.items[e] = []EventHandleFunc{handler }}}var GlobalEventBus = newEventBus()
tDeliveryService.go
投递服务, 管理若干个投递worker
package deliveryimport ("github.com/jmoiron/sqlx""learning/gooop/saga/mqs/database""learning/gooop/saga/mqs/eventbus""learning/gooop/saga/mqs/logger""learning/gooop/saga/mqs/models""sync""time")type tDeliveryService struct {rwmutex *sync.RWMutexworkers map[string]*tDeliveryWorker}func newDeliveryService() *tDeliveryService {it := new(tDeliveryService)it.init()return it}func (me *tDeliveryService) init() {}func (me *tDeliveryService) handleBootEvent(e string, args interface{}) {go me.beginCreatingWorkers()go me.beginCleanExpiredWorkers()}func (me *tDeliveryService) beginCreatingWorkers() {for {e := database.DB(func(db *sqlx.DB) error {now := time.Now().UnixNano()rows, err := db.Queryx("select client_id, notify_url, expire_time from subscriber where expire_time>?", now)if err != nil {return err}for rows.Next() {it := new(tWorkerInfo)err = rows.StructScan(it)if err != nil {return err}me.createWorker(it)}return nil})if e != nil {logger.Logf("tDeliveryService.beginCreatingWorkers, error = %s", e.Error())}time.Sleep(time.Duration(5) * time.Second)}}func (me *tDeliveryService) beginCleanExpiredWorkers() {for range time.Tick(time.Duration(30) * time.Second) {me.clean()}}func (me *tDeliveryService) clean() {me.rwmutex.RLock()var keys []stringfor k,v := range me.workers {if v.isExpired() {keys = append(keys, k)}}me.rwmutex.RUnlock()if len(keys) == 0 {return}me.rwmutex.Lock()defer me.rwmutex.Unlock()for _,k := range keys {delete(me.workers, k)}}func (me *tDeliveryService) handleNewSubscriber(args interface{}) {it, ok := args.(*models.SubMsg)if !ok {return}me.createWorker(&tWorkerInfo{it.ClientID, it.NotifyUrl, it.ExpireTime,})}func maxInt64(a, b int64) int64 {if a >= b {return a}return b}func (me *tDeliveryService) createWorker(info *tWorkerInfo) {me.rwmutex.RLock()w,ok := me.workers[info.ClientID]me.rwmutex.RUnlock()if ok {w.info.ExpireTime = maxInt64(w.info.ExpireTime, info.ExpireTime)return}me.rwmutex.Lock()defer me.rwmutex.Unlock()me.workers[info.ClientID] = newDeliveryWorker(info)}var gDeliveryService = newDeliveryService()func init() {eventbus.GlobalEventBus.Sub("boot", gDeliveryService.handleBootEvent)eventbus.GlobalEventBus.Sub("subscriber.new", gDeliveryService.handleBootEvent)}
tWorkerInfo.go
投递worker的初始化参数
package deliverytype tWorkerInfo struct {ClientID stringNotifyURL stringExpireTime int64}
tDeliveryWorker.go
投递worker, 未完成
package deliveryimport ("bytes""encoding/json""errors""github.com/jmoiron/sqlx""io/ioutil""learning/gooop/saga/mqs/database""learning/gooop/saga/mqs/logger""learning/gooop/saga/mqs/models""net/http""time")type tDeliveryWorker struct {info *tWorkerInfo}func newDeliveryWorker(info *tWorkerInfo) *tDeliveryWorker {it := new(tDeliveryWorker)it.info = infogo it.beginMainLoop()return it}func (me *tDeliveryWorker) beginMainLoop() {for !me.isExpired() {ok, msg := me.peek()if ok {switch msg.StatusFlag {case 0:// 未处理的消息me.handleNewMsg(msg)breakcase 1:// 处理中的消息me.handleDeliveringMsg(msg)break}} else {time.Sleep(time.Duration(1) * time.Second)}}}func (me *tDeliveryWorker) isExpired() bool {return time.Now().UnixNano() >= me.info.ExpireTime}var gEmptyRowsErr = errors.New("empty rows")func (me *tDeliveryWorker) peek() (bool, *models.QueuedMsg) {msg := &models.QueuedMsg{}e := database.DB(func(db *sqlx.DB) error {rows, err := db.Queryx("select * from delivery_queue where client_id=? order by create_time asc limit 1", me.info.ClientID)if err != nil {return err}if rows.Next() {err = rows.StructScan(msg)if err != nil {return err}return nil} else {return gEmptyRowsErr}})if e != nil {return false, nil} else {return true, msg}}func (me *tDeliveryWorker) handleNewMsg(msg *models.QueuedMsg) bool {err := database.DB(func(db *sqlx.DB) error {r,e := db.Exec("update delivery_queue set status_flag=1 where id=?", msg.ID)if e != nil {return e}rows, e := r.RowsAffected()if e != nil {return e}if rows != 1 {return gEmptyRowsErr}return nil})if err != nil {logger.Logf("tDeliveryWorker.handleNewMsg, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())return false}if me.deliver(msg) {// todo: move msg to success queue} else {// todo: msg.failed_count++}return false}func (me *tDeliveryWorker) deliver(msg *models.QueuedMsg) bool {t := &models.TxMsg{GlobalID: msg.GlobalID,SubID: msg.SubID,Topic: msg.Topic,CreateTime: msg.CreateTime,Content: msg.Content,}j,e := json.Marshal(t)if e != nil {return false}r, e := http.Post(me.info.NotifyURL, "application/json", bytes.NewReader(j))if e != nil {return false}defer r.Body.Close()rep, e := ioutil.ReadAll(r.Body)if e != nil {return false}m := &models.OkMsg{}e = json.Unmarshal(rep, m)if e != nil {return false}return m.OK}func (me *tDeliveryWorker) handleDeliveringMsg(msg *models.QueuedMsg) bool {// todo: check if delivery timeoutreturn false}
(未完待续)
