缘起
最近阅读<
本系列笔记拟采用golang练习之
Saga模式
- saga模式将分布式长事务切分为一系列独立短事务
- 每个短事务是可通过补偿动作进行撤销的
- 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用 ``` Saga由一系列的子事务“Ti”组成, 每个Ti都有对应的补偿“Ci”, 当Ti出现问题时Ci用于处理Ti执行带来的问题。
可以通过下面的两个公式理解Saga模式。 T = T1 T2 … Tn T = TCT
Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务, 而应该将事务切分为一组按序依次提交的短事务, Saga模式满足ACD(原子性、一致性、持久性)特征。
摘自 <
<a name="fGPkq"></a># 目标- 为实现saga模式的分布式事务, 先撸一个pub/sub事务消息队列服务- 事务消息队列服务的功能性要求- 消息不会丢失: 消息的持久化- 消息的唯一性: 要求每个消息有全局ID和子事务ID- 确保投递成功: 投递队列持久化, 投递状态持久化, 失败重试<a name="naUyZ"></a># 子目标(Day 5)- 重构和完善消息投递机制- iMsgHeap: 使用待投递消息堆缓存消息. 总是优先投递创建时间最小的消息- iMsgSource: 定义消息来源接口. 有两种消息来源, 1-数据库;2-eventbus- iMsgHistoryRing: 使用ring buffer记录近期已投递成功的消息, 防止重复投递- tConcurrentMsgHeap: 最小CreateTime优先的消息堆, 实现iMsgHeap接口, 并且是线程安全的.- tDBMsgSource: 从数据库拉取待投递消息, 实现iMsgSource接口- tLiveMsgSource: 监听eventbus即时推送的投递消息, 实现iMsgSource接口- tMsgHistoryRing: 历史消息的固定大小环形队列, 实现iMsgHistoryRing接口, 缓存近期已投递成功的消息- tDeliveryWorker:- 初始化时, 优先从数据库加载待投递消息- 使用iMsgHeap缓存待投递消息, 并确保有序- 使用iMsgSource接口, 分别从db和eventbus接收投递消息- 使用iMsgHistoryRing, 缓存已投递成功的消息, 防止重复投递<a name="SoPHV"></a># iMsgHeap.go使用待投递消息堆缓存消息. 总是优先投递创建时间最小的消息```gopackage deliveryimport "learning/gooop/saga/mqs/models"type iMsgHeap interface {Size() intIsEmpty() boolIsNotEmpty() boolPush(msg *models.QueuedMsg)Peek() *models.QueuedMsgPop() *models.QueuedMsg}
iMsgSource.go
定义消息来源接口. 有两种消息来源, 1-数据库;2-eventbus
package deliveryimport "learning/gooop/saga/mqs/models"type iMsgSource interface {MsgChan() <- chan *models.QueuedMsg}type tSourceExpireFunc func() bool
iMsgHistoryRing.go
使用ring buffer记录近期已投递成功的消息, 防止重复投递
package deliveryimport "learning/gooop/saga/mqs/models"type iMsgHistoryRing interface {Push(msg *models.QueuedMsg)Has(id int) bool}
tConcurrentMsgHeap.go
最小CreateTime优先的消息堆, 实现iMsgHeap接口, 并且是线程安全的.
package deliveryimport ("learning/gooop/saga/mqs/models""sync")type tConcurrentMsgHeap struct {items []*models.QueuedMsgsize intmutex *sync.Mutex}func newMsgHeap() iMsgHeap {it := new(tConcurrentMsgHeap)it.init()return it}func (me *tConcurrentMsgHeap) init() {me.items = make([]*models.QueuedMsg, 0)me.size = 0me.mutex = new(sync.Mutex)}func (me *tConcurrentMsgHeap) Size() int {return me.size}func (me *tConcurrentMsgHeap) IsEmpty() bool {return me.size <= 0}func (me *tConcurrentMsgHeap) IsNotEmpty() bool {return !me.IsEmpty()}func (me *tConcurrentMsgHeap) has(msgID int) bool {for _,it := range me.items {if it.MsgID == msgID {return true}}return false}func (me *tConcurrentMsgHeap) Push(msg *models.QueuedMsg) {me.mutex.Lock()defer me.mutex.Unlock()if me.has(msg.MsgID) {return}me.ensureSize(me.size + 1)me.items[me.size] = msgme.size++me.shiftUp(me.size - 1)}func (me *tConcurrentMsgHeap) ensureSize(size int) {for ;len(me.items) < size; {me.items = append(me.items, nil)}}func (me *tConcurrentMsgHeap) parentOf(i int) int {return (i - 1) / 2}func (me *tConcurrentMsgHeap) leftChildOf(i int) int {return i*2 + 1}func (me *tConcurrentMsgHeap) rightChildOf(i int) int {return me.leftChildOf(i) + 1}func (me *tConcurrentMsgHeap) last() (i int, v *models.QueuedMsg) {if me.IsEmpty() {return -1, nil}i = me.size - 1v = me.items[i]return i,v}func (me *tConcurrentMsgHeap) shiftUp(i int) {if i <= 0 {return}v := me.items[i]pi := me.parentOf(i)pv := me.items[pi]if me.less(v, pv) {me.items[pi], me.items[i] = v, pvme.shiftUp(pi)}}func (me *tConcurrentMsgHeap) less(a, b *models.QueuedMsg) bool {return a.CreateTime < b.CreateTime}func (me *tConcurrentMsgHeap) Pop() *models.QueuedMsg {me.mutex.Lock()defer me.mutex.Unlock()if me.IsEmpty() {return nil}top := me.items[0]li, lv := me.last()me.items[0] = nilme.size--if me.IsEmpty() {return top}me.items[0] = lvme.items[li] = nilme.shiftDown(0)return top}func (me *tConcurrentMsgHeap) Peek() *models.QueuedMsg {me.mutex.Lock()defer me.mutex.Unlock()if me.IsEmpty() {return nil}return me.items[0]}func (me *tConcurrentMsgHeap) shiftDown(i int) {pv := me.items[i]ok, ci, cv := me.minChildOf(i)if ok && me.less(cv, pv) {me.items[i], me.items[ci] = cv, pvme.shiftDown(ci)}}func (me *tConcurrentMsgHeap) minChildOf(p int) (ok bool, i int, v *models.QueuedMsg) {li := me.leftChildOf(p)if li >= me.size {return false, 0, nil}lv := me.items[li]ri := me.rightChildOf(p)if ri >= me.size {return true, li, lv}rv := me.items[ri]if me.less(lv, rv) {return true, li, lv} else {return true, ri, rv}}
tDBMsgSource.go
从数据库拉取待投递消息, 实现iMsgSource接口
package deliveryimport ("github.com/jmoiron/sqlx""learning/gooop/saga/mqs/database""learning/gooop/saga/mqs/models""time")type tDBMsgSource struct {clientID stringexpireFunc tSourceExpireFuncmsgChan chan *models.QueuedMsg}func newDBMsgSource(clientID string, expireFunc tSourceExpireFunc) iMsgSource {it := new(tDBMsgSource)it.init(clientID, expireFunc)return it}func (me *tDBMsgSource) init(clientID string, expireFunc tSourceExpireFunc) {me.clientID = clientIDme.expireFunc = expireFuncme.msgChan = make(chan *models.QueuedMsg, 1)go me.beginPollDB()}func (me *tDBMsgSource) MsgChan() <- chan *models.QueuedMsg {return me.msgChan}func (me *tDBMsgSource) beginPollDB() {interval := time.Duration(1) * time.Secondfor !me.expireFunc() {if len(me.msgChan) <= 0 {ok, msg := me.poll()if ok {me.msgChan <- msgcontinue}}// poll failed, or chan fulltime.Sleep(interval)}close(me.msgChan)}func (me *tDBMsgSource) poll() (bool, *models.QueuedMsg) {msg := &models.QueuedMsg{}e := database.DB(func(db *sqlx.DB) error {rows, err := db.Queryx("select * from delivery_queue where client_id=? order by create_time asc limit 1",me.clientID,)if err != nil {return err}if rows.Next() {err = rows.StructScan(msg)if err != nil {return err}return nil} else {return gEmptyRowsErr}})if e != nil {return false, nil} else {return true, msg}}
tLiveMsgSource.go
监听eventbus即时推送的投递消息, 实现iMsgSource接口
package deliveryimport ("fmt""learning/gooop/saga/mqs/eventbus""learning/gooop/saga/mqs/logger""learning/gooop/saga/mqs/models""learning/gooop/saga/mqs/models/events""time")type tLiveMsgSource struct {clientID stringexpireFunc tSourceExpireFuncmsgChan chan *models.QueuedMsg}func newLiveMsgSource(clientID string, expireFunc tSourceExpireFunc) iMsgSource {it := new(tLiveMsgSource)it.init(clientID, expireFunc)return it}func (me *tLiveMsgSource) init(clientID string, expireFunc tSourceExpireFunc) {me.clientID = clientIDme.expireFunc = expireFuncme.msgChan = make(chan *models.QueuedMsg, 1)eventbus.GlobalEventBus.Sub(events.MsgPublishedEvent,me.id(),me.handleMsgPublished)go me.beginWatchExpire()}func (me *tLiveMsgSource) id() string {return fmt.Sprintf("tLiveMsgSource.%s", me.clientID)}func (me *tLiveMsgSource) beginWatchExpire() {for range time.Tick(1 * time.Second) {if me.expireFunc() {me.afterExpired()return}}}func (me *tLiveMsgSource) afterExpired() {eventbus.GlobalEventBus.Unsub(events.MsgPublishedEvent, me.id())close(me.msgChan)}func (me *tLiveMsgSource) handleMsgPublished(_ string, args interface{}) {msg, ok := args.(*models.QueuedMsg)if !ok {return}if msg.ClientID != me.clientID {return}if len(me.msgChan) >= 0 {return}logger.Logf("tLiveMsgSource.handleMsgPublished, clientID=%s, msg=%s/%s/%s",me.clientID, msg.GlobalID, msg.SubID, msg.Topic )me.msgChan <- msg}func (me *tLiveMsgSource) MsgChan() <- chan *models.QueuedMsg {return me.msgChan}
tMsgHistoryRing.go
历史消息的固定大小环形队列, 实现iMsgHistoryRing接口, 缓存近期已投递成功的消息
package deliveryimport "learning/gooop/saga/mqs/models"type tMsgHistoryRing struct {items []*models.QueuedMsgcapacity intindex int}func newMsgHistoryRing(capacity int) iMsgHistoryRing {it := new(tMsgHistoryRing)it.init(capacity)return it}func (me *tMsgHistoryRing) init(capacity int) {me.items = make([]*models.QueuedMsg, capacity)me.capacity = capacityme.index = 0}func (me *tMsgHistoryRing) Has(id int) bool {for _,it := range me.items {if it != nil && it.ID == id {return true}}return false}func (me *tMsgHistoryRing) Push(msg *models.QueuedMsg) {me.items[me.index] = msgme.index++if me.index >= me.capacity {me.index = 0}}
tDeliveryWorker.go
- 初始化时, 优先从数据库加载待投递消息
- 使用iMsgHeap缓存待投递消息, 并确保有序
- 使用iMsgSource接口, 分别从db和eventbus接收投递消息
- 使用iMsgHistoryRing, 缓存已投递成功的消息, 防止重复投递 ```go package delivery
import ( “bytes” “encoding/json” “errors” “github.com/jmoiron/sqlx” “io/ioutil” “learning/gooop/saga/mqs/database” “learning/gooop/saga/mqs/logger” “learning/gooop/saga/mqs/models” “net/http” “time” )
type tDeliveryWorker struct { info *tWorkerInfo successRing iMsgHistoryRing dbSource iMsgSource liveSource iMsgSource msgHeap iMsgHeap }
func newDeliveryWorker(info tWorkerInfo) tDeliveryWorker { it := new(tDeliveryWorker) it.init(info) return it }
// init: do initialization, and start initial load func (me tDeliveryWorker) init(info tWorkerInfo) { me.info = info me.successRing = newMsgHistoryRing(64)
me.dbSource = newDBMsgSource(info.ClientID, me.isExpired)me.liveSource = newLiveMsgSource(info.ClientID, me.isExpired)me.msgHeap = newMsgHeap()go me.beginInitialLoadFromDB()
}
// beginInitialLoadFromDB: initially, load queued msg from database func (me tDeliveryWorker) beginInitialLoadFromDB() { buf := [][]models.QueuedMsg{ nil } for !me.isExpired() { err := database.DB(func(db *sqlx.DB) error { e, rows := me.loadFromDB(db) if e != nil { return e }
buf[0] = rowsreturn nil})if err != nil {logger.Logf("tDeliveryWorker.initialLoadFromDB, clientID=%s, err=%s", me.info.ClientID, err.Error())time.Sleep(3 * time.Second)} else {me.afterInitialLoad(buf[0])}}
}
// loadFromDB: load queued msg from database func (me tDeliveryWorker) loadFromDB(db sqlx.DB) (error, []models.QueuedMsg) { rows, err := db.Queryx( “select from delivery_queue where client_id=? order by create_time asc limit ?”, me.info.ClientID, gInitialLoadRows, ) if err != nil { return err, nil }
msgList := []*models.QueuedMsg{}for rows.Next() {msg := &models.QueuedMsg{}err = rows.StructScan(msg)if err != nil {return err, nil}msgList = append(msgList, msg)}return nil, msgList
}
// afterInitialLoad: after initial load done, push msgs into heap, and start delivery loop func (me tDeliveryWorker) afterInitialLoad(msgList []models.QueuedMsg) { logger.Logf(“tDeliveryWorker.afterInitialLoad, clientID=%s, rows=%d”, me.info.ClientID, len(msgList)) for _,it := range msgList { me.msgHeap.Push(it) }
go me.beginPollAndDeliver()
}
// beginPollAndDeliver: poll msg from heap, and then deliver it func (me *tDeliveryWorker) beginPollAndDeliver() { for !me.isExpired() { select { case msg := <- me.dbSource.MsgChan(): me.msgHeap.Push(msg) break
case msg := <- me.liveSource.MsgChan():me.msgHeap.Push(msg)break}if me.msgHeap.IsEmpty() {continue}msg := me.msgHeap.Pop()if msg == nil {continue}switch msg.StatusFlag {case 0:// 未处理的消息me.handleUndeliveredMsg(msg)breakcase 1:// 处理中的消息me.handleDeliveringMsg(msg)break}}
}
// isExpired: is me expired? func (me *tDeliveryWorker) isExpired() bool { return time.Now().UnixNano() >= me.info.ExpireTime }
// handleUndeliveredMsg: if msg unhandled, then try to deliver it func (me tDeliveryWorker) handleUndeliveredMsg(msg models.QueuedMsg) { err := database.DB(func(db *sqlx.DB) error { now := time.Now().UnixNano() r,e := db.Exec( “update delivery_queue set status_flag=1, update_time=? where id=? and status_flag=0 and update_time=?”, now, msg.ID, msg.UpdateTime, ) if e != nil { return e }
rows, e := r.RowsAffected()if e != nil {return e}if rows != 1 {return gOneRowsErr}msg.StatusFlag = 1msg.UpdateTime = nowreturn nil})if err != nil {logger.Logf("tDeliveryWorker.handleNewMsg, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())return}if me.deliver(msg) {me.afterDeliverySuccess(msg)} else {me.afterDeliveryFailed(msg)}
}
// deliver: use http.Post function to delivery msg func (me tDeliveryWorker) deliver(msg models.QueuedMsg) bool { if me.successRing.Has(msg.ID) { return true }
t := &models.TxMsg{GlobalID: msg.GlobalID,SubID: msg.SubID,Topic: msg.Topic,CreateTime: msg.CreateTime,Content: msg.Content,}j,e := json.Marshal(t)if e != nil {logger.Logf("tDeliveryWorker.deliver, failed json.Marshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)return false}r, e := http.Post(me.info.NotifyURL, "application/json", bytes.NewReader(j))if e != nil {logger.Logf("tDeliveryWorker.deliver, failed http.Post, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)return false}defer r.Body.Close()rep, e := ioutil.ReadAll(r.Body)if e != nil {logger.Logf("tDeliveryWorker.deliver, failed ioutil.ReadAll, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)return false}m := &models.OkMsg{}e = json.Unmarshal(rep, m)if e != nil {logger.Logf("tDeliveryWorker.deliver, failed json.Unmarshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)return false}if m.OK {return true} else {logger.Logf("tDeliveryWorker.deliver, failed OkMsg.OK, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)return false}
}
// handleDeliveringMsg: if delivery timeout, then retry delivery func (me tDeliveryWorker) handleDeliveringMsg(msg models.QueuedMsg) { now := time.Now().UnixNano() if msg.UpdateTime + gDeliveryTimeoutNanos > now { return }
// delivery timeoutme.afterDeliveryTimeout(msg)
}
// afterDeliverySuccess: if done, move msg to success queue func (me tDeliveryWorker) afterDeliverySuccess(msg models.QueuedMsg) { if me.successRing.Has(msg.ID) { return } me.successRing.Push(msg)
err := database.TX(func(db *sqlx.DB, tx *sqlx.Tx) error {r,e := db.Exec("delete from delivery_queue where id=? and update_time=? and status_flag=1",msg.ID,msg.UpdateTime,)if e != nil {return e}rows, e := r.RowsAffected()if e != nil {return e}if rows != 1 {return gOneRowsErr}r, e = db.Exec("insert into success_queue (msg_id, client_id, create_time) values(?, ?, ?)",msg.ID,msg.ClientID,time.Now().UnixNano(),)if e != nil {return e}rows, e = r.RowsAffected()if e != nil {return e}if rows != 1 {return gOneRowsErr}return nil})if err != nil {logger.Logf("tDeliveryWorker.afterDeliverySuccess, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())} else {logger.Logf("tDeliveryWorker.afterDeliverySuccess, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)}
}
// afterDeliveryFailed: if failed, do nothing but just log it func (me tDeliveryWorker) afterDeliveryFailed(msg models.QueuedMsg) { logger.Logf(“tDeliveryWorker.afterDeliveryFailed, id=%v, msg=%s/%s”, me.info.ClientID, msg.GlobalID, msg.SubID) }
// afterDeliveryTimeout: if timeout, then reset status and retry func (me tDeliveryWorker) afterDeliveryTimeout(msg models.QueuedMsg) { err := database.DB(func(db *sqlx.DB) error { r,e := db.Exec( “update delivery_queue set status_flag=0 where id=? and status_flag=1 and update_time=?”, msg.ID, msg.UpdateTime, ) if e != nil { return e }
rows,e := r.RowsAffected()if e != nil {return e}if rows != 1 {return gOneRowsErr}return nil})if err != nil {logger.Logf("tDeliveryWorker.afterDeliveryTimeout, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())} else {logger.Logf("tDeliveryWorker.afterDeliveryTimeout, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)}
}
var gEmptyRowsErr = errors.New(“empty rows”) var gOneRowsErr = errors.New(“expecting one row affected”) var gDeliveryTimeoutNanos = int64(10 * (time.Second / time.Nanosecond)) var gInitialLoadRows = 100 ```
(未完待续)
