缘起

最近阅读<> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之

Saga模式

  • saga模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过补偿动作进行撤销的
  • 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用 ``` Saga由一系列的子事务“Ti”组成, 每个Ti都有对应的补偿“Ci”, 当Ti出现问题时Ci用于处理Ti执行带来的问题。

可以通过下面的两个公式理解Saga模式。 T = T1 T2 … Tn T = TCT

Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务, 而应该将事务切分为一组按序依次提交的短事务, Saga模式满足ACD(原子性、一致性、持久性)特征。

摘自 <> 刘金亮, 2021.1

  1. <a name="fGPkq"></a>
  2. # 目标
  3. - 为实现saga模式的分布式事务, 先撸一个pub/sub事务消息队列服务
  4. - 事务消息队列服务的功能性要求
  5. - 消息不会丢失: 消息的持久化
  6. - 消息的唯一性: 要求每个消息有全局ID和子事务ID
  7. - 确保投递成功: 投递队列持久化, 投递状态持久化, 失败重试
  8. <a name="naUyZ"></a>
  9. # 子目标(Day 2)
  10. - 消息的设计
  11. - 消息的持久化
  12. - 投递队列的持久化
  13. - 投递状态的持久化
  14. <a name="3CP5G"></a>
  15. # 设计
  16. - TxMsg: 事务消息模型
  17. - database: 添加事务消息表, 消息投递表, 成功投递表, 失败投递表
  18. - publish: 消息发布api
  19. - routers: 添加/publish路由
  20. <a name="YILum"></a>
  21. # TxMsg.go
  22. 事务消息模型
  23. ```go
  24. package models
  25. // 事务消息体
  26. type TxMsg struct {
  27. // 全局事务ID
  28. GlobalID string
  29. // 子事务ID
  30. SubID string
  31. // 发送者ID
  32. SenderID string
  33. // 时间戳, 使用time.Now().UnixNano()
  34. CreateTime int64
  35. // 主题, 即消息类型
  36. Topic string
  37. // 消息内容, 一般是json
  38. Content string
  39. }

database.go

添加事务消息表, 消息投递表, 成功投递表, 失败投递表

  1. package database
  2. import "github.com/jmoiron/sqlx"
  3. import _ "github.com/mattn/go-sqlite3"
  4. type DBFunc func(db *sqlx.DB) error
  5. type TXFunc func(db *sqlx.DB, tx *sqlx.Tx) error
  6. func init() {
  7. // must prepare tables
  8. err := DB(initDB)
  9. if err != nil {
  10. panic(err)
  11. }
  12. }
  13. func initDB(db *sqlx.DB) error {
  14. // 订阅者/消费者: subscriber
  15. _, e := db.Exec(`create table if not exists subscriber(
  16. id integer primary key autoincrement,
  17. client_id varchar(50) unique not null,
  18. topic varchar(100) not null,
  19. notify_url varchar(500) not null,
  20. expire_time integer
  21. )`)
  22. if e != nil {
  23. return e
  24. }
  25. // 事务消息: tx_msg
  26. _, e = db.Exec(`create table if not exists tx_msg (
  27. id integer primary key autoincrement,
  28. global_id string varchar(50) not null,
  29. sub_id string varchar(50) unique not null,
  30. sender_id varchar(50) not null,
  31. create_time integer not null,
  32. topic varchar(100) not null,
  33. content nvarchar(2048) not null
  34. )`)
  35. if e != nil {
  36. return e
  37. }
  38. // 投递队列: delivery_queue
  39. _, e = db.Exec(`create table if not exists delivery_queue (
  40. id integer primary key autoincrement,
  41. msg_id integer not null,
  42. sub_id integer not null,
  43. status_flag integer not null,
  44. failed_count integer not null
  45. )`)
  46. if e != nil {
  47. return e
  48. }
  49. // 成功投递队列: success_queue
  50. _, e = db.Exec(`create table if not exists success_queue (
  51. id integer primary key autoincrement,
  52. msg_id integer not null,
  53. sub_id integer not null
  54. )`)
  55. if e != nil {
  56. return e
  57. }
  58. // 投递失败队列: failed_queue
  59. _, e = db.Exec(`create table if not exists failed_queue (
  60. id integer primary key autoincrement,
  61. msg_id integer not null,
  62. sub_id integer not null
  63. )`)
  64. if e != nil {
  65. return e
  66. }
  67. return nil
  68. }
  69. func open() (*sqlx.DB, error) {
  70. return sqlx.Open("sqlite3", "./mqs.db")
  71. }
  72. func DB(action DBFunc) error {
  73. db,err := open()
  74. if err != nil {
  75. return err
  76. }
  77. defer func() { _ = db.Close() }()
  78. return action(db)
  79. }
  80. func TX(action TXFunc) error {
  81. return DB(func(db *sqlx.DB) error {
  82. tx, err := db.Beginx()
  83. if err != nil {
  84. return err
  85. }
  86. err = action(db, tx)
  87. if err == nil {
  88. return tx.Commit()
  89. } else {
  90. return tx.Rollback()
  91. }
  92. })
  93. }

publish.go

消息发布api

  1. package handlers
  2. import (
  3. "github.com/gin-gonic/gin"
  4. "github.com/gin-gonic/gin/binding"
  5. "github.com/jmoiron/sqlx"
  6. "learning/gooop/saga/mqs/database"
  7. "learning/gooop/saga/mqs/logger"
  8. "learning/gooop/saga/mqs/models"
  9. "net/http"
  10. "time"
  11. )
  12. func Publish(c *gin.Context) {
  13. // parse request
  14. msg := &models.TxMsg{}
  15. if err := c.ShouldBindBodyWith(&msg, binding.JSON); err != nil {
  16. c.AbortWithStatusJSON(
  17. http.StatusInternalServerError,
  18. gin.H{ "ok": false, "error": err.Error()})
  19. return
  20. }
  21. // fixme: validate msg
  22. // save to db
  23. ids := []int64{0}
  24. err := database.TX(func(db *sqlx.DB, tx *sqlx.Tx) error {
  25. id, e := saveTxMsg(db, tx, msg)
  26. if e != nil {
  27. return e
  28. }
  29. ids[0] = id
  30. return nil
  31. })
  32. if ids[0] > 0 {
  33. newMsgId := ids[0]
  34. logger.Logf("publish new msg: %d", newMsgId)
  35. // todo: 新增消息, 开始投送
  36. }
  37. // reply
  38. if err != nil {
  39. c.JSON(http.StatusInternalServerError, gin.H{ "ok": false, "error": err.Error()})
  40. } else {
  41. c.JSON(http.StatusOK, gin.H { "ok": true })
  42. }
  43. }
  44. func saveTxMsg(db *sqlx.DB, tx *sqlx.Tx, msg *models.TxMsg) (int64,error) {
  45. // insert tx_msg
  46. r, e := db.Exec(
  47. `replace into tx_msg(global_id, sub_id, sender_id, create_time, topic, content) values(?,?,?,?,?,?)`,
  48. msg.GlobalID, msg.SubID, msg.SenderID, msg.CreateTime, msg.Topic, msg.Content,
  49. )
  50. if e != nil {
  51. return 0,e
  52. }
  53. // get last insert id
  54. id,e := r.LastInsertId()
  55. if e != nil {
  56. return 0,e
  57. }
  58. if id > 0 {
  59. // copy to delivery queue
  60. now := time.Now().UnixNano()
  61. r, e = db.Exec(`
  62. insert into delivery_queue(msg_id, sub_id, status_flag, failed_count)
  63. select
  64. ?, s.id, 0, 0
  65. from
  66. sub_info s
  67. where
  68. s.expire_time>?
  69. `, id, now)
  70. if e != nil {
  71. return 0,e
  72. }
  73. }
  74. return id, nil
  75. }

routers.go

添加/publish路由

  1. package routers
  2. import (
  3. "github.com/gin-gonic/gin"
  4. "learning/gooop/saga/mqs/handlers"
  5. )
  6. func RegisterRouters() *gin.Engine {
  7. r := gin.Default()
  8. r.Use(gin.Logger())
  9. r.GET("/ping", handlers.Ping)
  10. r.POST("/subscribe", handlers.Subscribe)
  11. r.POST("/publish", handlers.Publish)
  12. return r
  13. }

(未完待续)