缘起

最近阅读<> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之

Saga模式

  • saga模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过补偿动作进行撤销的
  • 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用 ``` Saga由一系列的子事务“Ti”组成, 每个Ti都有对应的补偿“Ci”, 当Ti出现问题时Ci用于处理Ti执行带来的问题。

可以通过下面的两个公式理解Saga模式。 T = T1 T2 … Tn T = TCT

Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务, 而应该将事务切分为一组按序依次提交的短事务, Saga模式满足ACD(原子性、一致性、持久性)特征。

摘自 <> 刘金亮, 2021.1

  1. <a name="fGPkq"></a>
  2. # 目标
  3. - 为实现saga模式的分布式事务, 先撸一个pub/sub事务消息队列服务
  4. - 事务消息队列服务的功能性要求
  5. - 消息不会丢失: 消息的持久化
  6. - 消息的唯一性: 要求每个消息有全局ID和子事务ID
  7. - 确保投递成功: 投递队列持久化, 投递状态持久化, 失败重试
  8. <a name="naUyZ"></a>
  9. # 子目标(Day 8)
  10. - 创建虚拟的库存服务
  11. - 启动时, 注册到MQ
  12. - 接收到订单创建的消息时, 扣减库存
  13. - 扣库成功时, 经MQ通知订单服务扣库成功
  14. - 扣库失败时, 经MQ通知订单服务扣库失败
  15. <a name="KVAqs"></a>
  16. # 设计
  17. - IStockService: 模拟的库存服务接口
  18. - tStockService: 虚拟库存服务, 实现IStockService接口
  19. - NotifySaleOrderCreated: 用于监听订单创建消息的http回调处理器
  20. <a name="qkOFr"></a>
  21. # 单元测试
  22. order_test.go
  23. 1. 初始化10个产品库存
  24. 1. 订单服务, 创建订单1, 尝试扣减1个库存, 预期成功
  25. 1. 订单服务, 创建订单2, 尝试扣减10个库存, 预期失败
  26. 1. 校验订单1的最终状态为出库成功
  27. 1. 校验订单2的最终状态为出库失败
  28. ```go
  29. package saga
  30. import (
  31. "github.com/jmoiron/sqlx"
  32. "learning/gooop/saga/mqs/cmd"
  33. "learning/gooop/saga/mqs/database"
  34. "learning/gooop/saga/mqs/logger"
  35. "learning/gooop/saga/order"
  36. "learning/gooop/saga/stock"
  37. "sync"
  38. "testing"
  39. "time"
  40. )
  41. var gRunOnce sync.Once
  42. func fnBootMQS() {
  43. gRunOnce.Do(func() {
  44. // boot mqs
  45. go cmd.BootMQS()
  46. // wait for mqs up
  47. time.Sleep(1 * time.Second)
  48. })
  49. }
  50. func fnAssertTrue (t *testing.T, b bool, msg string) {
  51. if !b {
  52. t.Fatal(msg)
  53. }
  54. }
  55. func Test_SagaSaleOrder(t *testing.T) {
  56. // prepare mqs
  57. fnClearDB(t)
  58. fnBootMQS()
  59. // 1 create prod stock
  60. prodID := "test-prod-1"
  61. err := stock.MockStockService.AddStock(prodID, 10)
  62. if err != nil {
  63. t.Fatal(err)
  64. }
  65. // create order 1
  66. o1 := &order.SaleOrder{
  67. OrderID: "test-order-1",
  68. ProductID: prodID,
  69. CustomerID: "test-customer-1",
  70. Quantity: 1,
  71. Price: 100,
  72. Amount: 100,
  73. CreateTime: time.Now().UnixNano(),
  74. StatusFlag: order.StatusNotDelivered,
  75. }
  76. err = order.MockSaleOrderService.Create(o1)
  77. if err != nil {
  78. t.Fatal(err)
  79. }
  80. // create order 2
  81. time.Sleep(10*time.Millisecond)
  82. o2 := &order.SaleOrder{
  83. OrderID: "test-order-2",
  84. ProductID: prodID,
  85. CustomerID: "test-customer-2",
  86. Quantity: 10,
  87. Price: 100,
  88. Amount: 1000,
  89. CreateTime: time.Now().UnixNano(),
  90. StatusFlag: order.StatusNotDelivered,
  91. }
  92. err = order.MockSaleOrderService.Create(o2)
  93. if err != nil {
  94. t.Fatal(err)
  95. }
  96. time.Sleep(1 * time.Second)
  97. logger.Logf("============================================")
  98. log := "tSaleOrderService.beginSubscribeMQ, done"
  99. fnAssertTrue(t, logger.Count(log)==1, "expecting log: " + log)
  100. log = "tSaleOrderService.publishMQ, done, order=test-order-1"
  101. fnAssertTrue(t, logger.Count(log)==1, "expecting log: " + log)
  102. log = "tSaleOrderService.publishMQ, done, order=test-order-2"
  103. fnAssertTrue(t, logger.Count(log)==1, "expecting log: " + log)
  104. log = "stock.NotifySaleOrderCreated, order=test-order-1"
  105. fnAssertTrue(t, logger.Count(log)==1, "expecting log: " + log)
  106. log = "stock.NotifySaleOrderCreated, order=test-order-2"
  107. fnAssertTrue(t, logger.Count(log)==1, "expecting log: " + log)
  108. o1 = order.MockSaleOrderService.Get(o1.OrderID)
  109. fnAssertTrue(t, o1.StatusFlag == order.StatusStockOutboundDone, "expecting o1 done")
  110. o2 = order.MockSaleOrderService.Get(o2.OrderID)
  111. fnAssertTrue(t, o2.StatusFlag == order.StatusStockOutboundFailed, "expecting o2 failed")
  112. logger.Logf("test passed")
  113. }
  114. func fnClearDB(t *testing.T) {
  115. fnDBExec(t, "delete from subscriber")
  116. fnDBExec(t, "delete from tx_msg")
  117. fnDBExec(t, "delete from delivery_queue")
  118. fnDBExec(t, "delete from success_queue")
  119. }
  120. func fnDBExec(t *testing.T, sql string, args... interface{}) int {
  121. rows := []int64{ 0 }
  122. err := database.DB(func(db *sqlx.DB) error {
  123. r,e := db.Exec(sql, args...)
  124. if e != nil {
  125. return e
  126. }
  127. rows[0], e = r.RowsAffected()
  128. if e != nil {
  129. return e
  130. }
  131. return nil
  132. })
  133. if err != nil {
  134. t.Fatal(err)
  135. }
  136. return int(rows[0])
  137. }

测试输出

  1. $ go test -v order_test.go
  2. === RUN Test_SagaSaleOrder
  3. 23:55:54.292132442 eventbus.Pub, event=system.boot, handler=gDeliveryService.handleBootEvent
  4. [GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached.
  5. [GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
  6. - using env: export GIN_MODE=release
  7. - using code: gin.SetMode(gin.ReleaseMode)
  8. [GIN-debug] GET /ping --> learning/gooop/saga/mqs/handlers.Ping (4 handlers)
  9. [GIN-debug] POST /subscribe --> learning/gooop/saga/mqs/handlers.Subscribe (4 handlers)
  10. [GIN-debug] POST /publish --> learning/gooop/saga/mqs/handlers.Publish (4 handlers)
  11. [GIN-debug] POST /notify --> learning/gooop/saga/mqs/handlers.Notify (4 handlers)
  12. [GIN-debug] POST /notify/sale-order.stock.outbound --> learning/gooop/saga/order.NotifyStockOutbound (4 handlers)
  13. [GIN-debug] POST /notify/sale-order.created --> learning/gooop/saga/stock.NotifySaleOrderCreated (4 handlers)
  14. [GIN-debug] Listening and serving HTTP on :3333
  15. 23:55:54.292287032 tDeliveryService.beginCleanExpiredWorkers
  16. 23:55:54.292345845 tDeliveryService.beginCreatingWorkers
  17. 23:55:54.356542981 handlers.Subscribe, msg=&{sale-order-service sale-order.stock.outbound http://localhost:3333/notify/sale-order.stock.outbound 1616086554355593476}
  18. 23:55:54.356524325 handlers.Subscribe, msg=&{stock-service sale-order.created http://localhost:3333/notify/sale-order.created 1616086554355598830}
  19. 23:55:54.365256441 handlers.Subscribe, event=subscriber.registered, msg=&{sale-order-service sale-order.stock.outbound http://localhost:3333/notify/sale-order.stock.outbound 1616086554355593476}
  20. 23:55:54.365271105 eventbus.Pub, event=subscriber.registered, handler=gDeliveryService.handleSubscriberRegistered
  21. [GIN] 2021/03/18 - 23:55:54 | 200 | 8.865173ms | ::1 | POST "/subscribe"
  22. [GIN] 2021/03/18 - 23:55:54 | 200 | 8.882138ms | ::1 | POST "/subscribe"
  23. 23:55:54.365488163 tSaleOrderService.beginSubscribeMQ, done
  24. 23:55:54.365861542 database.DB, err=empty rows
  25. 23:55:54.366239244 tDeliveryWorker.afterInitialLoad, clientID=sale-order-service, rows=0
  26. 23:55:54.373588493 handlers.Subscribe, event=subscriber.registered, msg=&{stock-service sale-order.created http://localhost:3333/notify/sale-order.created 1616086554355598830}
  27. 23:55:54.373605972 eventbus.Pub, event=subscriber.registered, handler=gDeliveryService.handleSubscriberRegistered
  28. [GIN] 2021/03/18 - 23:55:54 | 200 | 17.189632ms | ::1 | POST "/subscribe"
  29. [GIN] 2021/03/18 - 23:55:54 | 200 | 17.205549ms | ::1 | POST "/subscribe"
  30. 23:55:54.373843032 tStockService.beginSubscribeMQ, done
  31. 23:55:54.3743926 database.DB, err=empty rows
  32. 23:55:54.374499757 tDeliveryWorker.afterInitialLoad, clientID=stock-service, rows=0
  33. 23:55:55.292336699 tStockService.AddStock, done, prodId=test-prod-1, stock=0, delta=0, after=10
  34. 23:55:55.323746568 handlers.Publish, msg=test-order-1/test-order-1/sale-order.created, msgId=112
  35. [GIN] 2021/03/18 - 23:55:55 | 200 | 31.112478ms | ::1 | POST "/publish"
  36. [GIN] 2021/03/18 - 23:55:55 | 200 | 31.125855ms | ::1 | POST "/publish"
  37. 23:55:55.323811205 handlers.Publish, pubLiveMsg 112
  38. 23:55:55.323910377 tSaleOrderService.publishMQ, done, order=test-order-1/&{test-order-1 test-customer-1 test-prod-1 1 100 100 1616082955292352151 0}
  39. 23:55:55.324227736 handlers.Publish, pubLiveMsg, msgId=112, rows=1
  40. 23:55:55.324273573 handlers.Publish, event=msg.published, clientID=stock-service, msg=test-order-1/test-order-1/http://localhost:3333/notify/sale-order.created
  41. 23:55:55.32428051 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.sale-order-service
  42. 23:55:55.324285512 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.stock-service
  43. 23:55:55.324292286 tLiveMsgSource.handleMsgPublished, clientID=stock-service, msg=test-order-1/test-order-1/sale-order.created
  44. 23:55:55.324346678 tDeliveryWorker.beginPollAndDeliver, msg from live=&{98 stock-service http://localhost:3333/notify/sale-order.created 112 test-order-1 test-order-1 sale-order-service 1616082955292352151 sale-order.created {"OrderID":"test-order-1","CustomerID":"test-customer-1","ProductID":"test-prod-1","Quantity":1,"Price":100,"Amount":100,"CreateTime":1616082955292352151,"StatusFlag":0} 0 0}
  45. 23:55:55.33925766 handlers.Publish, msg=test-order-2/test-order-2/sale-order.created, msgId=113
  46. [GIN] 2021/03/18 - 23:55:55 | 200 | 15.264561ms | ::1 | POST "/publish"
  47. [GIN] 2021/03/18 - 23:55:55 | 200 | 15.280884ms | ::1 | POST "/publish"
  48. 23:55:55.339353768 handlers.Publish, pubLiveMsg 113
  49. 23:55:55.339446893 tSaleOrderService.publishMQ, done, order=test-order-2/&{test-order-2 test-customer-2 test-prod-1 10 100 1000 1616082955302734821 0}
  50. 23:55:55.339909493 handlers.Publish, pubLiveMsg, msgId=113, rows=1
  51. 23:55:55.339919874 handlers.Publish, event=msg.published, clientID=stock-service, msg=test-order-2/test-order-2/http://localhost:3333/notify/sale-order.created
  52. 23:55:55.339925049 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.sale-order-service
  53. 23:55:55.339929964 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.stock-service
  54. 23:55:55.339935935 tLiveMsgSource.handleMsgPublished, clientID=stock-service, msg=test-order-2/test-order-2/sale-order.created
  55. 23:55:55.350117186 tDeliveryWorker.deliver, begin, id=stock-service, msg=test-order-1/test-order-1
  56. 23:55:55.35041833 stock.NotifySaleOrderCreated, order=test-order-1/&{test-order-1 test-customer-1 test-prod-1 1 100 100 1616082955292352151 0}
  57. 23:55:55.350429178 tStockService.AddStock, done, prodId=test-prod-1, stock=10, delta=-1, after=9
  58. [GIN] 2021/03/18 - 23:55:55 | 200 | 88.872µs | ::1 | POST "/notify/sale-order.created"
  59. [GIN] 2021/03/18 - 23:55:55 | 200 | 133.617µs | ::1 | POST "/notify/sale-order.created"
  60. 23:55:55.350592351 tDeliveryWorker.deliver, OK, id=stock-service, msg=test-order-1/test-order-1
  61. 23:55:55.367336707 tDeliveryWorker.afterDeliverySuccess, done, id=stock-service, msg=test-order-1/test-order-1
  62. 23:55:55.36738322 tDeliveryWorker.beginPollAndDeliver, msg from live=&{99 stock-service http://localhost:3333/notify/sale-order.created 113 test-order-2 test-order-2 sale-order-service 1616082955302734821 sale-order.created {"OrderID":"test-order-2","CustomerID":"test-customer-2","ProductID":"test-prod-1","Quantity":10,"Price":100,"Amount":1000,"CreateTime":1616082955302734821,"StatusFlag":0} 0 0}
  63. 23:55:55.367530495 database.DB, err=empty rows
  64. 23:55:55.374978535 tDeliveryWorker.deliver, begin, id=stock-service, msg=test-order-2/test-order-2
  65. 23:55:55.375201115 stock.NotifySaleOrderCreated, order=test-order-2/&{test-order-2 test-customer-2 test-prod-1 10 100 1000 1616082955302734821 0}
  66. 23:55:55.375211216 tStockService.AddStock, failed, prodId=test-prod-1, stock=9, delta=-10
  67. 23:55:55.375219558 tStockService.HandleSaleOrderCreated, err=insufficient stock, order=&{test-order-2 test-customer-2 test-prod-1 10 100 1000 1616082955302734821 0}
  68. [GIN] 2021/03/18 - 23:55:55 | 200 | 102.52µs | ::1 | POST "/notify/sale-order.created"
  69. [GIN] 2021/03/18 - 23:55:55 | 200 | 116.933µs | ::1 | POST "/notify/sale-order.created"
  70. 23:55:55.375354895 tDeliveryWorker.deliver, OK, id=stock-service, msg=test-order-2/test-order-2
  71. 23:55:55.389901711 tDeliveryWorker.afterDeliverySuccess, done, id=stock-service, msg=test-order-2/test-order-2
  72. 23:55:55.38993077 tDeliveryWorker.beginPollAndDeliver, msg from db=&{99 stock-service http://localhost:3333/notify/sale-order.created 113 test-order-2 test-order-2 sale-order-service 1616082955302734821 sale-order.created {"OrderID":"test-order-2","CustomerID":"test-customer-2","ProductID":"test-prod-1","Quantity":10,"Price":100,"Amount":1000,"CreateTime":1616082955302734821,"StatusFlag":0} 1 1616082955367401386}
  73. 23:55:55.420121681 handlers.Publish, msg=test-order-1/test-order-1.outbound/sale-order.stock.outbound, msgId=114
  74. [GIN] 2021/03/18 - 23:55:55 | 200 | 69.507171ms | ::1 | POST "/publish"
  75. [GIN] 2021/03/18 - 23:55:55 | 200 | 69.520805ms | ::1 | POST "/publish"
  76. 23:55:55.420220719 handlers.Publish, pubLiveMsg 114
  77. 23:55:55.420321792 tStockService.publishMQ, done, msg=&{test-order-1 test-order-1.outbound stock-service 1616082955350432496 sale-order.stock.outbound 1}
  78. 23:55:55.42071623 handlers.Publish, pubLiveMsg, msgId=114, rows=1
  79. 23:55:55.420731889 handlers.Publish, event=msg.published, clientID=sale-order-service, msg=test-order-1/test-order-1.outbound/http://localhost:3333/notify/sale-order.stock.outbound
  80. 23:55:55.420741935 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.sale-order-service
  81. 23:55:55.420746401 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.stock-service
  82. 23:55:55.420755367 tLiveMsgSource.handleMsgPublished, clientID=sale-order-service, msg=test-order-1/test-order-1.outbound/sale-order.stock.outbound
  83. 23:55:55.42079505 tDeliveryWorker.beginPollAndDeliver, msg from live=&{100 sale-order-service http://localhost:3333/notify/sale-order.stock.outbound 114 test-order-1 test-order-1.outbound stock-service 1616082955350432496 sale-order.stock.outbound 1 0 0}
  84. 23:55:55.435844021 handlers.Publish, msg=test-order-2/test-order-2.outbound/sale-order.stock.outbound, msgId=115
  85. [GIN] 2021/03/18 - 23:55:55 | 200 | 15.407267ms | ::1 | POST "/publish"
  86. [GIN] 2021/03/18 - 23:55:55 | 200 | 15.420327ms | ::1 | POST "/publish"
  87. 23:55:55.4359058 handlers.Publish, pubLiveMsg 115
  88. 23:55:55.436026025 tStockService.publishMQ, done, msg=&{test-order-2 test-order-2.outbound stock-service 1616082955375214295 sale-order.stock.outbound 0}
  89. 23:55:55.436398324 handlers.Publish, pubLiveMsg, msgId=115, rows=1
  90. 23:55:55.436409937 handlers.Publish, event=msg.published, clientID=sale-order-service, msg=test-order-2/test-order-2.outbound/http://localhost:3333/notify/sale-order.stock.outbound
  91. 23:55:55.43642793 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.sale-order-service
  92. 23:55:55.436433697 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.stock-service
  93. 23:55:55.43644379 tLiveMsgSource.handleMsgPublished, clientID=sale-order-service, msg=test-order-2/test-order-2.outbound/sale-order.stock.outbound
  94. 23:55:55.446599314 tDeliveryWorker.deliver, begin, id=sale-order-service, msg=test-order-1/test-order-1.outbound
  95. 23:55:55.446809726 order.NotifyStockOutbound, orderID=test-order-1, succeeded=true
  96. [GIN] 2021/03/18 - 23:55:55 | 200 | 61.898µs | ::1 | POST "/notify/sale-order.stock.outbound"
  97. [GIN] 2021/03/18 - 23:55:55 | 200 | 81.911µs | ::1 | POST "/notify/sale-order.stock.outbound"
  98. 23:55:55.446951354 tDeliveryWorker.deliver, OK, id=sale-order-service, msg=test-order-1/test-order-1.outbound
  99. 23:55:55.462584405 tDeliveryWorker.afterDeliverySuccess, done, id=sale-order-service, msg=test-order-1/test-order-1.outbound
  100. 23:55:55.462615131 tDeliveryWorker.beginPollAndDeliver, msg from live=&{101 sale-order-service http://localhost:3333/notify/sale-order.stock.outbound 115 test-order-2 test-order-2.outbound stock-service 1616082955375214295 sale-order.stock.outbound 0 0 0}
  101. 23:55:55.469999185 tDeliveryWorker.deliver, begin, id=sale-order-service, msg=test-order-2/test-order-2.outbound
  102. 23:55:55.470163043 order.NotifyStockOutbound, orderID=test-order-2, succeeded=false
  103. [GIN] 2021/03/18 - 23:55:55 | 200 | 85.14µs | ::1 | POST "/notify/sale-order.stock.outbound"
  104. [GIN] 2021/03/18 - 23:55:55 | 200 | 105.638µs | ::1 | POST "/notify/sale-order.stock.outbound"
  105. 23:55:55.470369408 tDeliveryWorker.deliver, OK, id=sale-order-service, msg=test-order-2/test-order-2.outbound
  106. 23:55:55.486229145 tDeliveryWorker.afterDeliverySuccess, done, id=sale-order-service, msg=test-order-2/test-order-2.outbound
  107. 23:55:56.302885199 ============================================
  108. 23:55:56.303470422 test passed
  109. --- PASS: Test_SagaSaleOrder (2.05s)
  110. PASS
  111. ok command-line-arguments 2.057s

IStockService.go

模拟的库存服务接口

  1. package stock;
  2. import "learning/gooop/saga/order"
  3. type IStockService interface {
  4. GetStock(prodId string) int
  5. AddStock(prodId string, delta int) error
  6. HandleSaleOrderCreated(it *order.SaleOrder) error
  7. }

tStockService.go

虚拟库存服务, 实现IStockService接口

  1. package stock
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "io/ioutil"
  7. "learning/gooop/saga/mqs/logger"
  8. "learning/gooop/saga/mqs/models"
  9. "learning/gooop/saga/order"
  10. "net/http"
  11. "sync"
  12. "time"
  13. )
  14. type tStockService struct {
  15. rwmutex *sync.RWMutex
  16. stock map[string]int
  17. bMQReady bool
  18. publishQueue chan *models.TxMsg
  19. }
  20. func newStockService() IStockService {
  21. it := new(tStockService)
  22. it.init()
  23. return it
  24. }
  25. func (me *tStockService) init() {
  26. me.rwmutex = new(sync.RWMutex)
  27. me.stock = make(map[string]int)
  28. me.bMQReady = false
  29. me.publishQueue = make(chan *models.TxMsg, gMQMaxQueuedMsg)
  30. go func() {
  31. time.Sleep(100*time.Millisecond)
  32. go me.beginSubscribeMQ()
  33. go me.beginPublishMQ()
  34. }()
  35. }
  36. func (me *tStockService) GetStock(prodId string) int {
  37. me.rwmutex.RLock()
  38. defer me.rwmutex.RUnlock()
  39. it,ok := me.stock[prodId]
  40. if ok {
  41. return it
  42. } else {
  43. return 0
  44. }
  45. }
  46. func (me *tStockService) AddStock(prodId string, delta int) error {
  47. me.rwmutex.RLock()
  48. defer me.rwmutex.RUnlock()
  49. it,ok := me.stock[prodId]
  50. if ok {
  51. n := it + delta
  52. if n < 0 {
  53. logger.Logf("tStockService.AddStock, failed, prodId=%s, stock=%d, delta=%d", prodId, it, delta)
  54. return gInsufficientStockError
  55. } else {
  56. logger.Logf("tStockService.AddStock, done, prodId=%s, stock=%d, delta=%d, after=%d", prodId, it, delta, n)
  57. me.stock[prodId] = n
  58. }
  59. } else {
  60. if delta < 0 {
  61. logger.Logf("tStockService.AddStock, failed, prodId=%s, stock=0, delta=%d", prodId, delta)
  62. return gInsufficientStockError
  63. } else {
  64. logger.Logf("tStockService.AddStock, done, prodId=%s, stock=0, delta=%d, after=%d", prodId, it, delta)
  65. me.stock[prodId] = delta
  66. }
  67. }
  68. return nil
  69. }
  70. func (me *tStockService) beginSubscribeMQ() {
  71. expireDuration := int64(1 * time.Hour)
  72. subscribeDuration := 20 * time.Minute
  73. pauseDuration := 3*time.Second
  74. lastSubscribeTime := int64(0)
  75. for {
  76. now := time.Now().UnixNano()
  77. if now - lastSubscribeTime >= int64(subscribeDuration) {
  78. expireTime := now + expireDuration
  79. err := fnSubscribeMQ(expireTime)
  80. if err != nil {
  81. me.bMQReady = false
  82. logger.Logf("tStockService.beginSubscribeMQ, failed, err=%v", err)
  83. } else {
  84. lastSubscribeTime = now
  85. me.bMQReady = true
  86. logger.Logf("tStockService.beginSubscribeMQ, done")
  87. }
  88. }
  89. time.Sleep(pauseDuration)
  90. }
  91. }
  92. func fnSubscribeMQ(expireTime int64) error {
  93. msg := &models.SubscribeMsg{
  94. ClientID: gMQClientID,
  95. Topic: gMQSubscribeTopic,
  96. NotifyUrl: gMQServerURL + PathOfNotifySaleOrderCreated,
  97. ExpireTime: expireTime,
  98. }
  99. url := gMQServerURL + "/subscribe"
  100. return fnPost(msg, url)
  101. }
  102. func fnPost(msg interface{}, url string) error {
  103. body,_ := json.Marshal(msg)
  104. rsp, err := http.Post(url, "application/json;charset=utf-8", bytes.NewReader(body))
  105. if err != nil {
  106. return err
  107. }
  108. defer rsp.Body.Close()
  109. j, err := ioutil.ReadAll(rsp.Body)
  110. if err != nil {
  111. return err
  112. }
  113. ok := &models.OkMsg{}
  114. err = json.Unmarshal(j, ok)
  115. if err != nil {
  116. return err
  117. }
  118. if !ok.OK {
  119. return gMQReplyFalse
  120. }
  121. return nil
  122. }
  123. func (me *tStockService) beginPublishMQ() {
  124. for {
  125. select {
  126. case msg := <- me.publishQueue :
  127. me.publishMQ(msg)
  128. break
  129. }
  130. }
  131. }
  132. func (me *tStockService) publishMQ(msg *models.TxMsg) {
  133. url := gMQServerURL + "/publish"
  134. for i := 0;i < gMQMaxPublishRetry;i++ {
  135. err := fnPost(msg, url)
  136. if err != nil {
  137. logger.Logf("tStockService.publishMQ, failed, err=%v, msg=%v", err, msg)
  138. time.Sleep(gMQPublishInterval)
  139. } else {
  140. logger.Logf("tStockService.publishMQ, done, msg=%v", msg)
  141. return
  142. }
  143. }
  144. // publish failed
  145. logger.Logf("tStockService.publishMQ, failed max retries, msg=%v", msg)
  146. }
  147. func (me *tStockService) HandleSaleOrderCreated(it *order.SaleOrder) error {
  148. msg := &models.TxMsg{}
  149. msg.GlobalID = it.OrderID
  150. msg.SubID = it.OrderID + ".outbound"
  151. msg.SenderID = gMQClientID
  152. msg.Topic = gMQPublishTopic
  153. err := me.AddStock(it.ProductID, -it.Quantity)
  154. msg.CreateTime = time.Now().UnixNano()
  155. if err != nil {
  156. logger.Logf("tStockService.HandleSaleOrderCreated, err=%s, order=%v", err.Error(), it)
  157. msg.Content = "0"
  158. } else {
  159. msg.Content = "1"
  160. }
  161. if len(me.publishQueue) >= gMQMaxQueuedMsg {
  162. logger.Logf("tStockService.HandleSaleOrderCreated, err=%s, order=%v", gMQBlocked.Error(), it)
  163. return gMQBlocked
  164. } else {
  165. me.publishQueue <- msg
  166. return err
  167. }
  168. }
  169. var gInsufficientStockError = errors.New("insufficient stock")
  170. var gMQReplyFalse = errors.New("mq reply false")
  171. var gMQBlocked = errors.New("mq blocked")
  172. var gMQMaxPublishRetry = 10
  173. var gMQPublishInterval = 1*time.Second
  174. var gMQSubscribeTopic = "sale-order.created"
  175. var gMQPublishTopic = "sale-order.stock.outbound"
  176. var gMQClientID = "stock-service"
  177. var gMQServerURL = "http://localhost:3333"
  178. var gMQMaxQueuedMsg = 1024
  179. var MockStockService = newStockService()

NotifySaleOrderCreated.go

用于监听订单创建消息的http回调处理器

package stock

import (
    "encoding/json"
    "github.com/gin-gonic/gin"
    "io/ioutil"
    "learning/gooop/saga/mqs/logger"
    "learning/gooop/saga/mqs/models"
    "learning/gooop/saga/order"
    "net/http"
)

func NotifySaleOrderCreated(c *gin.Context) {
    body := c.Request.Body
    defer body.Close()

    j, e := ioutil.ReadAll(body)
    if e != nil {
        logger.Logf("stock.NotifySaleOrderCreated, failed ioutil.ReadAll")
        c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})
        return
    }

    msg := &models.TxMsg{}
    e = json.Unmarshal(j, msg)
    if e != nil {
        logger.Logf("stock.NotifySaleOrderCreated, failed json.Unmarshal msg")
        c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})
        return
    }

    order := &order.SaleOrder{}
    e = json.Unmarshal([]byte(msg.Content), order)
    if e != nil {
        logger.Logf("stock.NotifySaleOrderCreated, failed json.Unmarshal order")
        c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})
        return
    }
    logger.Logf("stock.NotifySaleOrderCreated, order=%s/%v", order.OrderID, order)

    // notify stock service
    _ = MockStockService.HandleSaleOrderCreated(order)
    c.JSON(http.StatusOK, gin.H{ "ok": true })
}

var PathOfNotifySaleOrderCreated = "/notify/sale-order.created"

(未完待续)