缘起

最近阅读<> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之

Saga模式

  • saga模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过补偿动作进行撤销的
  • 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用 ``` Saga由一系列的子事务“Ti”组成, 每个Ti都有对应的补偿“Ci”, 当Ti出现问题时Ci用于处理Ti执行带来的问题。

可以通过下面的两个公式理解Saga模式。 T = T1 T2 … Tn T = TCT

Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务, 而应该将事务切分为一组按序依次提交的短事务, Saga模式满足ACD(原子性、一致性、持久性)特征。

摘自 <> 刘金亮, 2021.1

  1. <a name="fGPkq"></a>
  2. # 目标
  3. - 为实现saga模式的分布式事务, 先撸一个pub/sub事务消息队列服务
  4. - 事务消息队列服务的功能性要求
  5. - 消息不会丢失: 消息的持久化
  6. - 消息的唯一性: 要求每个消息有全局ID和子事务ID
  7. - 确保投递成功: 投递队列持久化, 投递状态持久化, 失败重试
  8. <a name="naUyZ"></a>
  9. # 子目标(Day 4)
  10. - 完善投递worker
  11. - 未处理消息: 标记, 并尝试投递
  12. - 已处理消息: 判断是否超时, 并重试投递
  13. - 投递成功: 移动到成功投递表
  14. - 投递失败: 重置标记, 下轮重试
  15. - 数据库表相应的细节调整
  16. - delivery_queue: 去掉failed_count, 增加update_time时间戳
  17. - success_queue: 去掉sub_id, 改为client_id, 并增加create_time时间戳
  18. - failed_queue: 因为不允许失败, 因此删除失败投递表
  19. <a name="beZzA"></a>
  20. # tDeliveryWorker.go
  21. - 完善投递worker
  22. - 未处理消息: 标记, 并尝试投递
  23. - 已处理消息: 判断是否超时, 并重试投递
  24. - 投递成功: 移动到成功投递表
  25. - 投递失败: 重置标记, 下轮重试
  26. ```go
  27. package delivery
  28. import (
  29. "bytes"
  30. "encoding/json"
  31. "errors"
  32. "github.com/jmoiron/sqlx"
  33. "io/ioutil"
  34. "learning/gooop/saga/mqs/database"
  35. "learning/gooop/saga/mqs/logger"
  36. "learning/gooop/saga/mqs/models"
  37. "net/http"
  38. "time"
  39. )
  40. type tDeliveryWorker struct {
  41. info *tWorkerInfo
  42. }
  43. func newDeliveryWorker(info *tWorkerInfo) *tDeliveryWorker {
  44. it := new(tDeliveryWorker)
  45. it.info = info
  46. go it.beginMainLoop()
  47. return it
  48. }
  49. func (me *tDeliveryWorker) beginMainLoop() {
  50. for !me.isExpired() {
  51. ok, msg := me.peek()
  52. if ok {
  53. switch msg.StatusFlag {
  54. case 0:
  55. // 未处理的消息
  56. me.handleUndeliveredMsg(msg)
  57. break
  58. case 1:
  59. // 处理中的消息
  60. me.handleDeliveringMsg(msg)
  61. break
  62. }
  63. } else {
  64. time.Sleep(time.Duration(1) * time.Second)
  65. }
  66. }
  67. }
  68. func (me *tDeliveryWorker) isExpired() bool {
  69. return time.Now().UnixNano() >= me.info.ExpireTime
  70. }
  71. // peek: 从待投递队列中获取最早的一条记录
  72. func (me *tDeliveryWorker) peek() (bool, *models.QueuedMsg) {
  73. msg := &models.QueuedMsg{}
  74. e := database.DB(func(db *sqlx.DB) error {
  75. rows, err := db.Queryx(
  76. "select * from delivery_queue where client_id=? order by create_time asc limit 1",
  77. me.info.ClientID,
  78. )
  79. if err != nil {
  80. return err
  81. }
  82. if rows.Next() {
  83. err = rows.StructScan(msg)
  84. if err != nil {
  85. return err
  86. }
  87. return nil
  88. } else {
  89. return gEmptyRowsErr
  90. }
  91. })
  92. if e != nil {
  93. return false, nil
  94. } else {
  95. return true, msg
  96. }
  97. }
  98. // handleUndeliveredMsg: if msg unhandled, then try to deliver it
  99. func (me *tDeliveryWorker) handleUndeliveredMsg(msg *models.QueuedMsg) {
  100. err := database.DB(func(db *sqlx.DB) error {
  101. now := time.Now().UnixNano()
  102. r,e := db.Exec(
  103. "update delivery_queue set status_flag=1, update_time=? where id=? and status_flag=0 and update_time=?",
  104. now,
  105. msg.ID,
  106. msg.UpdateTime,
  107. )
  108. if e != nil {
  109. return e
  110. }
  111. rows, e := r.RowsAffected()
  112. if e != nil {
  113. return e
  114. }
  115. if rows != 1 {
  116. return gOneRowsErr
  117. }
  118. msg.UpdateTime = now
  119. return nil
  120. })
  121. if err != nil {
  122. logger.Logf("tDeliveryWorker.handleNewMsg, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
  123. return
  124. }
  125. if me.deliver(msg) {
  126. me.afterDeliverySuccess(msg)
  127. } else {
  128. me.afterDeliveryFailed(msg)
  129. }
  130. }
  131. // deliver: use http.Post function to delivery msg
  132. func (me *tDeliveryWorker) deliver(msg *models.QueuedMsg) bool {
  133. t := &models.TxMsg{
  134. GlobalID: msg.GlobalID,
  135. SubID: msg.SubID,
  136. Topic: msg.Topic,
  137. CreateTime: msg.CreateTime,
  138. Content: msg.Content,
  139. }
  140. j,e := json.Marshal(t)
  141. if e != nil {
  142. logger.Logf("tDeliveryWorker.deliver, failed json.Marshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
  143. return false
  144. }
  145. r, e := http.Post(me.info.NotifyURL, "application/json", bytes.NewReader(j))
  146. if e != nil {
  147. logger.Logf("tDeliveryWorker.deliver, failed http.Post, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
  148. return false
  149. }
  150. defer r.Body.Close()
  151. rep, e := ioutil.ReadAll(r.Body)
  152. if e != nil {
  153. logger.Logf("tDeliveryWorker.deliver, failed ioutil.ReadAll, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
  154. return false
  155. }
  156. m := &models.OkMsg{}
  157. e = json.Unmarshal(rep, m)
  158. if e != nil {
  159. logger.Logf("tDeliveryWorker.deliver, failed json.Unmarshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
  160. return false
  161. }
  162. if m.OK {
  163. return true
  164. } else {
  165. logger.Logf("tDeliveryWorker.deliver, failed OkMsg.OK, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
  166. return false
  167. }
  168. }
  169. // handleDeliveringMsg: if delivery timeout, then retry delivery
  170. func (me *tDeliveryWorker) handleDeliveringMsg(msg *models.QueuedMsg) {
  171. now := time.Now().UnixNano()
  172. if msg.UpdateTime + gDeliveryTimeoutNanos > now {
  173. return
  174. }
  175. // delivery timeout
  176. me.afterDeliveryTimeout(msg)
  177. }
  178. // afterDeliverySuccess: if done, move msg to success queue
  179. func (me *tDeliveryWorker) afterDeliverySuccess(msg *models.QueuedMsg) {
  180. err := database.TX(func(db *sqlx.DB, tx *sqlx.Tx) error {
  181. r,e := db.Exec(
  182. "delete from delivery_queue where id=? and update_time=? and status_flag=1",
  183. msg.ID,
  184. msg.UpdateTime,
  185. )
  186. if e != nil {
  187. return e
  188. }
  189. rows, e := r.RowsAffected()
  190. if e != nil {
  191. return e
  192. }
  193. if rows != 1 {
  194. return gOneRowsErr
  195. }
  196. r, e = db.Exec(
  197. "insert into success_queue (msg_id, client_id, create_time) values(?, ?, ?)",
  198. msg.ID,
  199. msg.ClientID,
  200. time.Now().UnixNano(),
  201. )
  202. if e != nil {
  203. return e
  204. }
  205. rows, e = r.RowsAffected()
  206. if e != nil {
  207. return e
  208. }
  209. if rows != 1 {
  210. return gOneRowsErr
  211. }
  212. return nil
  213. })
  214. if err != nil {
  215. logger.Logf("tDeliveryWorker.afterDeliverySuccess, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
  216. } else {
  217. logger.Logf("tDeliveryWorker.afterDeliverySuccess, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
  218. }
  219. }
  220. // afterDeliveryFailed: if failed, do nothing but just log it
  221. func (me *tDeliveryWorker) afterDeliveryFailed(msg *models.QueuedMsg) {
  222. logger.Logf("tDeliveryWorker.afterDeliveryFailed, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
  223. }
  224. // afterDeliveryTimeout: if timeout, then reset status and retry
  225. func (me *tDeliveryWorker) afterDeliveryTimeout(msg *models.QueuedMsg) {
  226. err := database.DB(func(db *sqlx.DB) error {
  227. r,e := db.Exec(
  228. "update delivery_queue set status_flag=0 where id=? and status_flag=1 and update_time=?",
  229. msg.ID,
  230. msg.UpdateTime,
  231. )
  232. if e != nil {
  233. return e
  234. }
  235. rows,e := r.RowsAffected()
  236. if e != nil {
  237. return e
  238. }
  239. if rows != 1 {
  240. return gOneRowsErr
  241. }
  242. return nil
  243. })
  244. if err != nil {
  245. logger.Logf("tDeliveryWorker.afterDeliveryTimeout, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
  246. } else {
  247. logger.Logf("tDeliveryWorker.afterDeliveryTimeout, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
  248. }
  249. }
  250. var gEmptyRowsErr = errors.New("empty rows")
  251. var gOneRowsErr = errors.New("expecting one row affected")
  252. var gDeliveryTimeoutNanos = int64(10 * (time.Second / time.Nanosecond))

database.go

  • 数据库表相应的细节调整
    • delivery_queue: 去掉failed_count, 增加update_time时间戳
    • success_queue: 去掉sub_id, 改为client_id, 并增加create_time时间戳
    • failed_queue: 因为不允许失败, 因此删除失败投递表 ```go package database

import “github.com/jmoiron/sqlx” import _ “github.com/mattn/go-sqlite3”

type DBFunc func(db sqlx.DB) error type TXFunc func(db sqlx.DB, tx *sqlx.Tx) error

func init() { // must prepare tables err := DB(initDB) if err != nil { panic(err) } }

func initDB(db *sqlx.DB) error { // 订阅者/消费者: subscriber _, e := db.Exec(create table if not exists subscriber( id integer primary key autoincrement, client_id varchar(50) unique not null, topic varchar(100) not null, notify_url varchar(500) not null, expire_time integer )) if e != nil { return e }

  1. // 事务消息: tx_msg
  2. _, e = db.Exec(`create table if not exists tx_msg (
  3. id integer primary key autoincrement,
  4. global_id string varchar(50) not null,
  5. sub_id string varchar(50) unique not null,
  6. sender_id varchar(50) not null,
  7. create_time integer not null,
  8. topic varchar(100) not null,
  9. content nvarchar(2048) not null

)`) if e != nil { return e }

  1. // 投递队列: delivery_queue
  2. _, e = db.Exec(`create table if not exists delivery_queue (
  3. id integer primary key autoincrement,
  4. client_id varchar(50) not null,
  5. notify_url varchar(500) not null,
  6. msg_id integer not null,
  7. global_id string varchar(50) not null,
  8. sub_id string varchar(50) unique not null,
  9. sender_id varchar(50) not null,
  10. create_time integer not null,
  11. topic varchar(100) not null,
  12. content nvarchar(2048) not null,
  13. status_flag integer not null,
  14. update_time integer not null

)`) if e != nil { return e }

  1. // 成功投递队列: success_queue
  2. _, e = db.Exec(`create table if not exists success_queue (
  3. id integer primary key autoincrement,
  4. msg_id integer not null,
  5. client_id varchar(50) not null,
  6. create_time integer not null

)`) if e != nil { return e }

// // 投递失败队列: failedqueue // , e = db.Exec(create table if not exists failed_queue ( // id integer primary key autoincrement, // msg_id integer not null, // client_id varchar(50) not null, // create_time integer not null //)) // if e != nil { // return e // }

  1. return nil

}

func open() (*sqlx.DB, error) { return sqlx.Open(“sqlite3”, “./mqs.db”) }

func DB(action DBFunc) error { db,err := open() if err != nil { return err } defer func() { _ = db.Close() }()

  1. return action(db)

}

func TX(action TXFunc) error { return DB(func(db *sqlx.DB) error { tx, err := db.Beginx() if err != nil { return err }

  1. err = action(db, tx)
  2. if err == nil {
  3. return tx.Commit()
  4. } else {
  5. return tx.Rollback()
  6. }
  7. })

} ```

(未完待续)