缘起

最近阅读<> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之

Saga模式

  • saga模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过补偿动作进行撤销的
  • 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用 ``` Saga由一系列的子事务“Ti”组成, 每个Ti都有对应的补偿“Ci”, 当Ti出现问题时Ci用于处理Ti执行带来的问题。

可以通过下面的两个公式理解Saga模式。 T = T1 T2 … Tn T = TCT

Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务, 而应该将事务切分为一组按序依次提交的短事务, Saga模式满足ACD(原子性、一致性、持久性)特征。

摘自 <> 刘金亮, 2021.1

  1. <a name="fGPkq"></a>
  2. # 目标
  3. - 为实现saga模式的分布式事务, 先撸一个pub/sub事务消息队列服务
  4. - 事务消息队列服务的功能性要求
  5. - 消息不会丢失: 消息的持久化
  6. - 消息的唯一性: 要求每个消息有全局ID和子事务ID
  7. - 确保投递成功: 投递队列持久化, 投递状态持久化, 失败重试
  8. <a name="naUyZ"></a>
  9. # 子目标(Day 6)
  10. - 终于可以进行基本功能测试了
  11. - 订阅接口测试: /subscribe
  12. - 发布接口测试: /publish
  13. - 通知接口测试: /notify
  14. - 重构
  15. - database.go: 重构DDL语句, 以兼容sqlx的StructScan字段映射
  16. - 所有Queryx返回的rows, 必须加上defer rows.Close()
  17. - 添加大量的过程诊断日志
  18. - tLiveMsgSource: fix handleMsgPublished未接收数据就return的bug
  19. <a name="mLMji"></a>
  20. # 单元测试
  21. mqs_test.go, 依次清空数据库, 测试订阅接口, 发布接口和通知接口, 并诊断过程日志和数据库记录变化.
  22. ```go
  23. package saga
  24. import (
  25. "bytes"
  26. "encoding/json"
  27. "errors"
  28. "fmt"
  29. "github.com/jmoiron/sqlx"
  30. "io/ioutil"
  31. "learning/gooop/saga/mqs/cmd"
  32. "learning/gooop/saga/mqs/database"
  33. "learning/gooop/saga/mqs/logger"
  34. "learning/gooop/saga/mqs/models"
  35. "net/http"
  36. "sync"
  37. "testing"
  38. "time"
  39. )
  40. var gRunOnce sync.Once
  41. func fnBootMQS() {
  42. gRunOnce.Do(func() {
  43. // boot mqs
  44. go cmd.BootMQS()
  45. // wait for mqs up
  46. time.Sleep(1 * time.Second)
  47. })
  48. }
  49. func fnAssertTrue (t *testing.T, b bool, msg string) {
  50. if !b {
  51. t.Fatal(msg)
  52. }
  53. }
  54. func Test_MQS(t *testing.T) {
  55. // prepare mqs
  56. fnClearDB(t)
  57. fnBootMQS()
  58. // subscribe
  59. fnTestSubscribe(t)
  60. t.Log("passed fnTestSubscribe")
  61. // publish and notify
  62. fnTestPublishAndNotify(t)
  63. t.Log("passed fnTestPublishAndNotify")
  64. }
  65. func fnTestPublishAndNotify(t *testing.T) {
  66. t.Log("testing fnTestPublishAndNotify")
  67. msg := &models.TxMsg{
  68. GlobalID: "test-global-id",
  69. SubID: "test-sub-id",
  70. SenderID: "test-sender-id",
  71. Topic: "test-topic",
  72. Content: "test-content",
  73. }
  74. fnPost(t, msg, "http://localhost:3333/publish")
  75. // check log
  76. fnAssertTrue(t, logger.Count("handlers.Publish, msg=test-global-id")==1, "expecting log: handlers.Publish, msg=test-global-id")
  77. // check notify
  78. time.Sleep(100 * time.Millisecond)
  79. fnAssertTrue(t, logger.Count("tLiveMsgSource.handleMsgPublished, clientID=test-client, msg=test-global-id")==1, "expecting log: tLiveMsgSource.handleMsgPublished")
  80. fnAssertTrue(t, logger.Count("tDeliveryWorker.afterDeliverySuccess, done, id=test-client, msg=test-global-id")==1, "expecting log: tDeliveryWorker.afterDeliverySuccess")
  81. fnAssertTrue(t, logger.Count("handlers.Notify, msg=")==1, "expecting log: handlers.Notify, msg=")
  82. // check success queue
  83. fnAssertTrue(t, fnDBCount(t, "select count(1) from success_queue where ClientID='test-client'")==1, "expectiang db.success_queue.count == 1")
  84. }
  85. func fnClearDB(t *testing.T) {
  86. fnDBExec(t, "delete from subscriber")
  87. fnDBExec(t, "delete from tx_msg")
  88. fnDBExec(t, "delete from delivery_queue")
  89. fnDBExec(t, "delete from success_queue")
  90. }
  91. func fnDBCount(t *testing.T, sql string, args... interface{}) int {
  92. sum := []int{ 0 }
  93. err := database.DB(func(db *sqlx.DB) error {
  94. r,e := db.Queryx(sql, args...)
  95. if e != nil {
  96. return e
  97. }
  98. defer r.Close()
  99. if !r.Next() {
  100. return errors.New("empty rows")
  101. }
  102. e = r.Scan(&sum[0])
  103. if e != nil {
  104. return e
  105. }
  106. return nil
  107. })
  108. if err != nil {
  109. t.Fatal(err)
  110. }
  111. return sum[0]
  112. }
  113. func fnDBExec(t *testing.T, sql string, args... interface{}) int {
  114. rows := []int64{ 0 }
  115. err := database.DB(func(db *sqlx.DB) error {
  116. r,e := db.Exec(sql, args...)
  117. if e != nil {
  118. return e
  119. }
  120. rows[0], e = r.RowsAffected()
  121. if e != nil {
  122. return e
  123. }
  124. return nil
  125. })
  126. if err != nil {
  127. t.Fatal(err)
  128. }
  129. return int(rows[0])
  130. }
  131. func fnTestSubscribe(t *testing.T) {
  132. t.Log("testing fnTestSubscribe")
  133. // clear subscriber
  134. fnDBExec(t, "delete from subscriber")
  135. msg := &models.SubscribeMsg{
  136. ClientID: "test-client",
  137. Topic: "test-topic",
  138. NotifyUrl: "http://localhost:3333/notify",
  139. ExpireTime: time.Now().UnixNano() + int64(30*time.Second),
  140. }
  141. fnPost(t, msg, "http://localhost:3333/subscribe")
  142. // check log
  143. fnAssertTrue(t, logger.Count("handlers.Subscribe, event=subscriber.registered") == 1, "expecting event=subscriber.registered")
  144. // check db
  145. count := fnDBCount(t, "select count(1) as n from subscriber where ClientID=? and topic=?", msg.ClientID, msg.Topic)
  146. fnAssertTrue(t, count == 1, "expecting subscriber.count == 1")
  147. }
  148. func fnPost(t *testing.T, msg interface{}, url string) {
  149. body,_ := json.Marshal(msg)
  150. rsp, err := http.Post(url, "application/json;charset=utf-8", bytes.NewReader(body))
  151. if err != nil {
  152. t.Fatal(err)
  153. }
  154. defer rsp.Body.Close()
  155. j, err := ioutil.ReadAll(rsp.Body)
  156. if err != nil {
  157. t.Fatal(err)
  158. }
  159. ok := &models.OkMsg{}
  160. err = json.Unmarshal(j, ok)
  161. if err != nil {
  162. t.Fatal(err)
  163. }
  164. fnAssertTrue(t, ok.OK, fmt.Sprintf("expecting replying ok from %s", url))
  165. }

测试输出

根据assert失败提示, 逐步排查诊断日志, 主要是sqlx字段映射错误, 和未及时调用rows.Close()错误.

  1. $ go test -v mqs_test.go
  2. === RUN Test_MQS
  3. eventbus.Pub, event=system.boot, handler=gDeliveryService.handleBootEvent
  4. [GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached.
  5. [GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
  6. - using env: export GIN_MODE=release
  7. - using code: gin.SetMode(gin.ReleaseMode)
  8. [GIN-debug] GET /ping --> learning/gooop/saga/mqs/handlers.Ping (4 handlers)
  9. [GIN-debug] POST /subscribe --> learning/gooop/saga/mqs/handlers.Subscribe (4 handlers)
  10. [GIN-debug] POST /publish --> learning/gooop/saga/mqs/handlers.Publish (4 handlers)
  11. [GIN-debug] POST /notify --> learning/gooop/saga/mqs/handlers.Notify (4 handlers)
  12. [GIN-debug] Listening and serving HTTP on :3333
  13. tDeliveryService.beginCreatingWorkers
  14. tDeliveryService.beginCleanExpiredWorkers
  15. mqs_test.go:139: testing fnTestSubscribe
  16. handlers.Subscribe, event=subscriber.registered, msg=&{test-client test-topic http://localhost:3333/notify 1615868155394998875}
  17. eventbus.Pub, event=subscriber.registered, handler=gDeliveryService.handleSubscriberRegistered
  18. [GIN] 2021/03/16 - 12:15:25 | 200 | 8.118873ms | ::1 | POST "/subscribe"
  19. [GIN] 2021/03/16 - 12:15:25 | 200 | 8.214968ms | ::1 | POST "/subscribe"
  20. tDeliveryWorker.afterInitialLoad, clientID=test-client, rows=0
  21. database.DB, err=empty rows
  22. mqs_test.go:45: passed fnTestSubscribe
  23. mqs_test.go:54: testing fnTestPublishAndNotify
  24. handlers.Publish, msg=test-global-id/test-sub-id/test-topic, msgId=15
  25. [GIN] 2021/03/16 - 12:15:25 | 200 | 15.200109ms | ::1 | POST "/publish"
  26. [GIN] 2021/03/16 - 12:15:25 | 200 | 15.216578ms | ::1 | POST "/publish"
  27. handlers.Publish, pubLiveMsg 15
  28. handlers.Publish, pubLiveMsg, msgId=15, rows=1
  29. handlers.Publish, event=msg.published, clientID=test-client, msg=test-global-id/test-sub-id/http://localhost:3333/notify
  30. eventbus.Pub, event=msg.published, handler=tLiveMsgSource.test-client
  31. tLiveMsgSource.handleMsgPublished, clientID=test-client, msg=test-global-id/test-sub-id/test-topic
  32. handlers.Notify, msg=&{test-global-id test-sub-id 0 test-topic test-content}
  33. [GIN] 2021/03/16 - 12:15:25 | 200 | 48.38µs | ::1 | POST "/notify"
  34. [GIN] 2021/03/16 - 12:15:25 | 200 | 110.659µs | ::1 | POST "/notify"
  35. tDeliveryWorker.afterDeliverySuccess, done, id=test-client, msg=test-global-id/test-sub-id
  36. mqs_test.go:49: passed fnTestPublishAndNotify
  37. --- PASS: Test_MQS (1.18s)
  38. PASS
  39. ok command-line-arguments 1.187s

(未完待续)