缘起

最近阅读<> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之

Saga模式

  • saga模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过补偿动作进行撤销的
  • 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用 ``` Saga由一系列的子事务“Ti”组成, 每个Ti都有对应的补偿“Ci”, 当Ti出现问题时Ci用于处理Ti执行带来的问题。

可以通过下面的两个公式理解Saga模式。 T = T1 T2 … Tn T = TCT

Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务, 而应该将事务切分为一组按序依次提交的短事务, Saga模式满足ACD(原子性、一致性、持久性)特征。

摘自 <> 刘金亮, 2021.1

  1. <a name="fGPkq"></a>
  2. # 目标
  3. - 为实现saga模式的分布式事务, 先撸一个pub/sub事务消息队列服务
  4. - 事务消息队列服务的功能性要求
  5. - 消息不会丢失: 消息的持久化
  6. - 消息的唯一性: 要求每个消息有全局ID和子事务ID
  7. - 确保投递成功: 投递队列持久化, 投递状态持久化, 失败重试
  8. <a name="naUyZ"></a>
  9. # 子目标(Day 3)
  10. - 有序,可靠地投递消息
  11. - 有序: 每个订阅者绑定独立的投递worker, 按时间戳投递
  12. - 可靠: 投递状态的持久化
  13. <a name="XyCWB"></a>
  14. # 设计
  15. - QueuedMsg: 把投递队列的消息设计得胖一点, 以减少join的使用
  16. - IEventBus: 设计一个内部消息总线, 以减少逻辑耦合
  17. - tDeliveryService: 投递服务, 管理若干个投递worker
  18. - tWorkerInfo: 投递worker的初始化参数
  19. - tDeliveryWorker: 投递worker(未完成)
  20. <a name="HbILr"></a>
  21. # QueuedMsg.go
  22. 把待投递消息设计得胖一点, 以减少join的使用
  23. ```go
  24. package models
  25. type QueuedMsg struct {
  26. ID int
  27. ClientID string
  28. NotifyURL string
  29. MsgID int
  30. GlobalID string
  31. SubID string
  32. SenderID string
  33. CreateTime int64
  34. Topic string
  35. Content string
  36. StatusFlag int
  37. FailedCount int
  38. }

IEventBus.go

设计一个内部消息总线, 以减少逻辑耦合

  1. package eventbus
  2. import "sync"
  3. type EventHandleFunc func(e string, args interface{})
  4. type IEventBus interface {
  5. Pub(e string, args interface{})
  6. Sub(e string, handler EventHandleFunc)
  7. }
  8. type tEventBus struct {
  9. rwmutex *sync.RWMutex
  10. items map[string][]EventHandleFunc
  11. }
  12. func newEventBus() IEventBus {
  13. it := new(tEventBus)
  14. it.init()
  15. return it
  16. }
  17. func (me *tEventBus) init() {
  18. me.rwmutex = new(sync.RWMutex)
  19. me.items = make(map[string][]EventHandleFunc)
  20. }
  21. func (me *tEventBus) Pub(e string, args interface{}) {
  22. me.rwmutex.RLock()
  23. defer me.rwmutex.RUnlock()
  24. handlers,ok := me.items[e]
  25. if ok {
  26. for _,it := range handlers {
  27. go it(e, args)
  28. }
  29. }
  30. }
  31. func (me *tEventBus) Sub(e string, handler EventHandleFunc) {
  32. me.rwmutex.Lock()
  33. defer me.rwmutex.Unlock()
  34. handlers,ok := me.items[e]
  35. if ok {
  36. me.items[e] = append(handlers, handler)
  37. } else {
  38. me.items[e] = []EventHandleFunc{handler }
  39. }
  40. }
  41. var GlobalEventBus = newEventBus()

tDeliveryService.go

投递服务, 管理若干个投递worker

  1. package delivery
  2. import (
  3. "github.com/jmoiron/sqlx"
  4. "learning/gooop/saga/mqs/database"
  5. "learning/gooop/saga/mqs/eventbus"
  6. "learning/gooop/saga/mqs/logger"
  7. "learning/gooop/saga/mqs/models"
  8. "sync"
  9. "time"
  10. )
  11. type tDeliveryService struct {
  12. rwmutex *sync.RWMutex
  13. workers map[string]*tDeliveryWorker
  14. }
  15. func newDeliveryService() *tDeliveryService {
  16. it := new(tDeliveryService)
  17. it.init()
  18. return it
  19. }
  20. func (me *tDeliveryService) init() {
  21. }
  22. func (me *tDeliveryService) handleBootEvent(e string, args interface{}) {
  23. go me.beginCreatingWorkers()
  24. go me.beginCleanExpiredWorkers()
  25. }
  26. func (me *tDeliveryService) beginCreatingWorkers() {
  27. for {
  28. e := database.DB(func(db *sqlx.DB) error {
  29. now := time.Now().UnixNano()
  30. rows, err := db.Queryx("select client_id, notify_url, expire_time from subscriber where expire_time>?", now)
  31. if err != nil {
  32. return err
  33. }
  34. for rows.Next() {
  35. it := new(tWorkerInfo)
  36. err = rows.StructScan(it)
  37. if err != nil {
  38. return err
  39. }
  40. me.createWorker(it)
  41. }
  42. return nil
  43. })
  44. if e != nil {
  45. logger.Logf("tDeliveryService.beginCreatingWorkers, error = %s", e.Error())
  46. }
  47. time.Sleep(time.Duration(5) * time.Second)
  48. }
  49. }
  50. func (me *tDeliveryService) beginCleanExpiredWorkers() {
  51. for range time.Tick(time.Duration(30) * time.Second) {
  52. me.clean()
  53. }
  54. }
  55. func (me *tDeliveryService) clean() {
  56. me.rwmutex.RLock()
  57. var keys []string
  58. for k,v := range me.workers {
  59. if v.isExpired() {
  60. keys = append(keys, k)
  61. }
  62. }
  63. me.rwmutex.RUnlock()
  64. if len(keys) == 0 {
  65. return
  66. }
  67. me.rwmutex.Lock()
  68. defer me.rwmutex.Unlock()
  69. for _,k := range keys {
  70. delete(me.workers, k)
  71. }
  72. }
  73. func (me *tDeliveryService) handleNewSubscriber(args interface{}) {
  74. it, ok := args.(*models.SubMsg)
  75. if !ok {
  76. return
  77. }
  78. me.createWorker(&tWorkerInfo{
  79. it.ClientID, it.NotifyUrl, it.ExpireTime,
  80. })
  81. }
  82. func maxInt64(a, b int64) int64 {
  83. if a >= b {
  84. return a
  85. }
  86. return b
  87. }
  88. func (me *tDeliveryService) createWorker(info *tWorkerInfo) {
  89. me.rwmutex.RLock()
  90. w,ok := me.workers[info.ClientID]
  91. me.rwmutex.RUnlock()
  92. if ok {
  93. w.info.ExpireTime = maxInt64(w.info.ExpireTime, info.ExpireTime)
  94. return
  95. }
  96. me.rwmutex.Lock()
  97. defer me.rwmutex.Unlock()
  98. me.workers[info.ClientID] = newDeliveryWorker(info)
  99. }
  100. var gDeliveryService = newDeliveryService()
  101. func init() {
  102. eventbus.GlobalEventBus.Sub("boot", gDeliveryService.handleBootEvent)
  103. eventbus.GlobalEventBus.Sub("subscriber.new", gDeliveryService.handleBootEvent)
  104. }

tWorkerInfo.go

投递worker的初始化参数

  1. package delivery
  2. type tWorkerInfo struct {
  3. ClientID string
  4. NotifyURL string
  5. ExpireTime int64
  6. }

tDeliveryWorker.go

投递worker, 未完成

  1. package delivery
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "github.com/jmoiron/sqlx"
  7. "io/ioutil"
  8. "learning/gooop/saga/mqs/database"
  9. "learning/gooop/saga/mqs/logger"
  10. "learning/gooop/saga/mqs/models"
  11. "net/http"
  12. "time"
  13. )
  14. type tDeliveryWorker struct {
  15. info *tWorkerInfo
  16. }
  17. func newDeliveryWorker(info *tWorkerInfo) *tDeliveryWorker {
  18. it := new(tDeliveryWorker)
  19. it.info = info
  20. go it.beginMainLoop()
  21. return it
  22. }
  23. func (me *tDeliveryWorker) beginMainLoop() {
  24. for !me.isExpired() {
  25. ok, msg := me.peek()
  26. if ok {
  27. switch msg.StatusFlag {
  28. case 0:
  29. // 未处理的消息
  30. me.handleNewMsg(msg)
  31. break
  32. case 1:
  33. // 处理中的消息
  34. me.handleDeliveringMsg(msg)
  35. break
  36. }
  37. } else {
  38. time.Sleep(time.Duration(1) * time.Second)
  39. }
  40. }
  41. }
  42. func (me *tDeliveryWorker) isExpired() bool {
  43. return time.Now().UnixNano() >= me.info.ExpireTime
  44. }
  45. var gEmptyRowsErr = errors.New("empty rows")
  46. func (me *tDeliveryWorker) peek() (bool, *models.QueuedMsg) {
  47. msg := &models.QueuedMsg{}
  48. e := database.DB(func(db *sqlx.DB) error {
  49. rows, err := db.Queryx("select * from delivery_queue where client_id=? order by create_time asc limit 1", me.info.ClientID)
  50. if err != nil {
  51. return err
  52. }
  53. if rows.Next() {
  54. err = rows.StructScan(msg)
  55. if err != nil {
  56. return err
  57. }
  58. return nil
  59. } else {
  60. return gEmptyRowsErr
  61. }
  62. })
  63. if e != nil {
  64. return false, nil
  65. } else {
  66. return true, msg
  67. }
  68. }
  69. func (me *tDeliveryWorker) handleNewMsg(msg *models.QueuedMsg) bool {
  70. err := database.DB(func(db *sqlx.DB) error {
  71. r,e := db.Exec("update delivery_queue set status_flag=1 where id=?", msg.ID)
  72. if e != nil {
  73. return e
  74. }
  75. rows, e := r.RowsAffected()
  76. if e != nil {
  77. return e
  78. }
  79. if rows != 1 {
  80. return gEmptyRowsErr
  81. }
  82. return nil
  83. })
  84. if err != nil {
  85. logger.Logf("tDeliveryWorker.handleNewMsg, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
  86. return false
  87. }
  88. if me.deliver(msg) {
  89. // todo: move msg to success queue
  90. } else {
  91. // todo: msg.failed_count++
  92. }
  93. return false
  94. }
  95. func (me *tDeliveryWorker) deliver(msg *models.QueuedMsg) bool {
  96. t := &models.TxMsg{
  97. GlobalID: msg.GlobalID,
  98. SubID: msg.SubID,
  99. Topic: msg.Topic,
  100. CreateTime: msg.CreateTime,
  101. Content: msg.Content,
  102. }
  103. j,e := json.Marshal(t)
  104. if e != nil {
  105. return false
  106. }
  107. r, e := http.Post(me.info.NotifyURL, "application/json", bytes.NewReader(j))
  108. if e != nil {
  109. return false
  110. }
  111. defer r.Body.Close()
  112. rep, e := ioutil.ReadAll(r.Body)
  113. if e != nil {
  114. return false
  115. }
  116. m := &models.OkMsg{}
  117. e = json.Unmarshal(rep, m)
  118. if e != nil {
  119. return false
  120. }
  121. return m.OK
  122. }
  123. func (me *tDeliveryWorker) handleDeliveringMsg(msg *models.QueuedMsg) bool {
  124. // todo: check if delivery timeout
  125. return false
  126. }

(未完待续)