缘起

最近阅读<> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之

Saga模式

  • saga模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过补偿动作进行撤销的
  • 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用 ``` Saga由一系列的子事务“Ti”组成, 每个Ti都有对应的补偿“Ci”, 当Ti出现问题时Ci用于处理Ti执行带来的问题。

可以通过下面的两个公式理解Saga模式。 T = T1 T2 … Tn T = TCT

Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务, 而应该将事务切分为一组按序依次提交的短事务, Saga模式满足ACD(原子性、一致性、持久性)特征。

摘自 <> 刘金亮, 2021.1

  1. <a name="fGPkq"></a>
  2. # 目标
  3. - 为实现saga模式的分布式事务, 先撸一个pub/sub事务消息队列服务
  4. - 事务消息队列服务的功能性要求
  5. - 消息不会丢失: 消息的持久化
  6. - 消息的唯一性: 要求每个消息有全局ID和子事务ID
  7. - 确保投递成功: 投递队列持久化, 投递状态持久化, 失败重试
  8. <a name="naUyZ"></a>
  9. # 子目标(Day 7)
  10. - MQS已基本可用, 现在实现一个模拟的订单微服务, 并与MQ联动
  11. - 长事务: 订单创建后, 联动库存服务, 扣减库存
  12. - 补偿动作
  13. - 如果扣库成功, 更新订单状态为已出库(实际系统中, 可能还涉及物流发货等复杂流程)
  14. - 否则(库存不足), 更新订单状态为出库失败(实际系统中, 可能还涉及退款和通知客户等复杂流程)
  15. - 流程
  16. - 创建订单后, 向MQ发布[销售订单.创建]消息
  17. - 订阅MQ的[销售订单.出库.成功], [销售订单.出库.失败]消息
  18. - 接收到MQ的出库消息后, 更新订单状态
  19. <a name="WjBvI"></a>
  20. # 设计
  21. - ISaleOrderService: 订单服务接口
  22. - SaleOrder: 销售订单实体
  23. - tSaleOrderService: 模拟订单服务, 实现ISaleOrderService接口
  24. - NotifyStockOutbound: 接收库存服务的扣库结果消息
  25. <a name="v6aR0"></a>
  26. # ISaleOrderService.go
  27. 订单服务接口
  28. ```go
  29. package order
  30. // ISaleOrderService to manage sale order creation and modification
  31. type ISaleOrderService interface {
  32. // get order info
  33. Get(orderID string) *SaleOrder
  34. // create new order
  35. Create(it *SaleOrder) error
  36. // update order status
  37. Update(orderID string, oldStatusFlag int32, newStatusFlag int32) (error, *SaleOrder)
  38. }

SaleOrder.go

销售订单实体

  1. package order
  2. type SaleOrder struct {
  3. OrderID string
  4. CustomerID string
  5. ProductID string
  6. Quantity int
  7. Price float64
  8. Amount float64
  9. CreateTime int64
  10. StatusFlag int32
  11. }
  12. const StatusNotDelivered int32 = 0
  13. const StatusStockOutboundDone int32 = 1
  14. const StatusStockOutboundFailed int32 = 2
  15. const StatusMQServiceFailed int32 = 3

tSaleOrderService.go

模拟订单服务, 实现ISaleOrderService接口

  1. package order
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "io/ioutil"
  7. "learning/gooop/saga/mqs/logger"
  8. "learning/gooop/saga/mqs/models"
  9. "net/http"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. )
  14. type tSaleOrderService struct {
  15. rwmutex *sync.RWMutex
  16. orders map[string]*SaleOrder
  17. bMQReady bool
  18. publishQueue chan *SaleOrder
  19. }
  20. func newSaleOrderService() ISaleOrderService {
  21. it := new(tSaleOrderService)
  22. it.init()
  23. return it
  24. }
  25. func (me *tSaleOrderService) init() {
  26. me.rwmutex = new(sync.RWMutex)
  27. me.orders = make(map[string]*SaleOrder)
  28. me.bMQReady = false
  29. me.publishQueue = make(chan *SaleOrder, gMQMaxQueuedMsg)
  30. go me.beginSubscribeMQ()
  31. go me.beginPublishMQ()
  32. }
  33. func (me *tSaleOrderService) beginSubscribeMQ() {
  34. expireDuration := int64(1 * time.Hour)
  35. subscribeDuration := 20 * time.Minute
  36. pauseDuration := 3*time.Second
  37. lastSubscribeTime := int64(0)
  38. for {
  39. now := time.Now().UnixNano()
  40. if now - lastSubscribeTime >= int64(subscribeDuration) {
  41. expireTime := now + expireDuration
  42. err := fnSubscribeMQ(expireTime)
  43. if err != nil {
  44. me.bMQReady = false
  45. logger.Logf("tSaleOrderService.beginSubscribeMQ, failed, err=%v", err)
  46. } else {
  47. lastSubscribeTime = now
  48. me.bMQReady = true
  49. logger.Logf("tSaleOrderService.beginSubscribeMQ, done")
  50. }
  51. }
  52. time.Sleep(pauseDuration)
  53. }
  54. }
  55. func fnSubscribeMQ(expireTime int64) error {
  56. msg := &models.SubscribeMsg{
  57. ClientID: gMQClientID,
  58. Topic: gMQSubscribeTopic,
  59. NotifyUrl: gMQServerURL + PathOfNotifyStockOutbound,
  60. ExpireTime: expireTime,
  61. }
  62. url := gMQServerURL + "/subscribe"
  63. return fnPost(msg, url)
  64. }
  65. func fnPost(msg interface{}, url string) error {
  66. body,_ := json.Marshal(msg)
  67. rsp, err := http.Post(url, "application/json;charset=utf-8", bytes.NewReader(body))
  68. if err != nil {
  69. return err
  70. }
  71. defer rsp.Body.Close()
  72. j, err := ioutil.ReadAll(rsp.Body)
  73. if err != nil {
  74. return err
  75. }
  76. ok := &models.OkMsg{}
  77. err = json.Unmarshal(j, ok)
  78. if err != nil {
  79. return err
  80. }
  81. if !ok.OK {
  82. return gMQReplyFalse
  83. }
  84. return nil
  85. }
  86. func (me *tSaleOrderService) beginPublishMQ() {
  87. for {
  88. select {
  89. case msg := <- me.publishQueue :
  90. me.publishMQ(msg)
  91. break
  92. }
  93. }
  94. }
  95. func (me *tSaleOrderService) Get(orderID string) *SaleOrder {
  96. me.rwmutex.RLock()
  97. defer me.rwmutex.RUnlock()
  98. it,ok := me.orders[orderID]
  99. if ok {
  100. return it
  101. } else {
  102. return nil
  103. }
  104. }
  105. func (me *tSaleOrderService) Create(it *SaleOrder) error {
  106. me.rwmutex.Lock()
  107. defer me.rwmutex.Unlock()
  108. if len(me.publishQueue) >= gMQMaxQueuedMsg {
  109. return gMQNotAvailableError
  110. }
  111. me.orders[it.OrderID] = it
  112. me.publishQueue <- it
  113. return nil
  114. }
  115. func (me *tSaleOrderService) publishMQ(it *SaleOrder) {
  116. url := gMQServerURL + "/publish"
  117. j,_ := json.Marshal(it)
  118. msg := &models.TxMsg{
  119. GlobalID: it.OrderID,
  120. SubID: it.OrderID,
  121. SenderID: gMQClientID,
  122. Topic: gMQPublishTopic,
  123. CreateTime: it.CreateTime,
  124. Content: string(j),
  125. }
  126. for i := 0;i < gMQMaxPublishRetry;i++ {
  127. err := fnPost(msg, url)
  128. if err != nil {
  129. logger.Logf("tSaleOrderService.publishMQ, failed, err=%v, order=%v", err, it)
  130. time.Sleep(gMQPublishInterval)
  131. } else {
  132. logger.Logf("tSaleOrderService.publishMQ, done, order=%v", it)
  133. return
  134. }
  135. }
  136. // publish failed
  137. logger.Logf("tSaleOrderService.publishMQ, failed max retries, order=%v", it)
  138. _,_ = me.Update(it.OrderID, StatusNotDelivered, StatusMQServiceFailed)
  139. }
  140. func (me *tSaleOrderService) Update(orderID string, oldStatusFlag int32, newStatusFlag int32) (error, *SaleOrder) {
  141. me.rwmutex.RLock()
  142. defer me.rwmutex.RUnlock()
  143. it, ok := me.orders[orderID]
  144. if !ok {
  145. return gNotFoundError, nil
  146. }
  147. if !atomic.CompareAndSwapInt32(&it.StatusFlag, oldStatusFlag, newStatusFlag) {
  148. return gStatusChangedError, it
  149. }
  150. it.StatusFlag = newStatusFlag
  151. return nil, it
  152. }
  153. var gMQReplyFalse = errors.New("mq reply false")
  154. var gMQNotAvailableError = errors.New("mq not ready")
  155. var gNotFoundError = errors.New("order not found")
  156. var gStatusChangedError = errors.New("status changed")
  157. var gMQMaxPublishRetry = 3
  158. var gMQPublishInterval = 1*time.Second
  159. var gMQSubscribeTopic = "sale-order.stock.outbound"
  160. var gMQPublishTopic = "sale-order.created"
  161. var gMQClientID = "sale-order-service"
  162. var gMQServerURL = "http://localhost:333"
  163. var gMQMaxQueuedMsg = 1024
  164. var SaleOrderService = newSaleOrderService()

NotifyStockOutbound.go

接收库存服务的扣库结果消息

  1. package order
  2. import (
  3. "encoding/json"
  4. "github.com/gin-gonic/gin"
  5. "io/ioutil"
  6. "learning/gooop/saga/mqs/logger"
  7. "learning/gooop/saga/mqs/models"
  8. "net/http"
  9. )
  10. func NotifyStockOutbound(c *gin.Context) {
  11. body := c.Request.Body
  12. defer body.Close()
  13. j, e := ioutil.ReadAll(body)
  14. if e != nil {
  15. logger.Logf("order.NotifyStockOutbound, failed ioutil.ReadAll")
  16. c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})
  17. return
  18. }
  19. msg := &models.TxMsg{}
  20. e = json.Unmarshal(j, msg)
  21. if e != nil {
  22. logger.Logf("order.NotifyStockOutbound, failed json.Unmarshal")
  23. c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})
  24. return
  25. }
  26. orderID := msg.GlobalID
  27. succeeded := msg.Content == "1"
  28. logger.Logf("order.NotifyStockOutbound, orderID=%v, succeeded=%s", orderID, succeeded)
  29. var newStatusFlag int32
  30. if succeeded {
  31. newStatusFlag = StatusStockOutboundDone
  32. } else {
  33. newStatusFlag = StatusStockOutboundFailed
  34. }
  35. err, order := SaleOrderService.Update(orderID, StatusNotDelivered, newStatusFlag)
  36. if err != nil {
  37. logger.Logf("order.NotifyStockOutbound, failed SaleOrderService.Update, err=%v, order=%v", err, order)
  38. }
  39. }
  40. var PathOfNotifyStockOutbound = "/notify/sale-order.stock.outbound"

(未完待续)