缘起

最近阅读<> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之

Saga模式

  • saga模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过补偿动作进行撤销的
  • 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用 ``` Saga由一系列的子事务“Ti”组成, 每个Ti都有对应的补偿“Ci”, 当Ti出现问题时Ci用于处理Ti执行带来的问题。

可以通过下面的两个公式理解Saga模式。 T = T1 T2 … Tn T = TCT

Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务, 而应该将事务切分为一组按序依次提交的短事务, Saga模式满足ACD(原子性、一致性、持久性)特征。

摘自 <> 刘金亮, 2021.1

  1. <a name="fGPkq"></a>
  2. # 目标
  3. - 为实现saga模式的分布式事务, 先模拟实现一个简单的pub/sub消息服务
  4. - 使用sqlx + sqlite3持久化消息, 使用gin作为http框架, 使用toml配置文件
  5. <a name="rJKhE"></a>
  6. # 代码结构

$ tree saga saga └── mqs ├── cmd │ └── boot.go ├── config │ └── config.go ├── database │ └── database.go ├── handlers │ ├── ping.go │ └── subscribe.go ├── logger │ └── logger.go └── routers └── routers.go

  1. <a name="KFVPw"></a>
  2. # 设计
  3. - config: 读取并解析toml配置文件
  4. - database: 封装sqlx + sqlite3
  5. - handlers/ping: 预留http服务保活探针接口
  6. - handlers/subscribe: 注册一个消息订阅者. 消息订阅者包含订阅者ID, 主题和回调地址(以便推送消息)
  7. - routers: 注册gin路由
  8. <a name="fjuxD"></a>
  9. # config.go
  10. 读取并解析toml配置文件
  11. ```go
  12. package config
  13. import "github.com/BurntSushi/toml"
  14. type tomlConfig struct {
  15. MQS tServiceConfig
  16. }
  17. type tServiceConfig struct {
  18. Port int
  19. LogDir string
  20. }
  21. var gServiceConfig = &tServiceConfig{}
  22. func init() {
  23. cfg := &tomlConfig{}
  24. if _,err := toml.DecodeFile("./mqs.toml", cfg);err != nil {
  25. panic(err)
  26. }
  27. *gServiceConfig = cfg.MQS
  28. }
  29. func GetPort() int {
  30. return gServiceConfig.Port
  31. }
  32. func GetLogDir() string {
  33. return gServiceConfig.LogDir
  34. }

database.go

封装sqlx + sqlite3

  1. package database
  2. import "github.com/jmoiron/sqlx"
  3. import _ "github.com/mattn/go-sqlite3"
  4. type DBFunc func(db *sqlx.DB) error
  5. type TXFunc func(db *sqlx.DB, tx *sqlx.Tx) error
  6. func init() {
  7. // must prepare tables
  8. err := DB(func(db *sqlx.DB) error {
  9. // table: sub_info
  10. _,e := db.Exec(`
  11. create table if not exists sub_info(
  12. id integer primary key autoincrement,
  13. client_id varchar(50) unique not null,
  14. topic varchar(100) not null,
  15. callback_url varchar(500) not null
  16. )`)
  17. return e
  18. })
  19. if err != nil {
  20. panic(err)
  21. }
  22. }
  23. func open() (*sqlx.DB, error) {
  24. return sqlx.Open("sqlite3", "./mqs.db")
  25. }
  26. func DB(action DBFunc) error {
  27. db,err := open()
  28. if err != nil {
  29. return err
  30. }
  31. defer func() { _ = db.Close() }()
  32. return action(db)
  33. }
  34. func TX(action TXFunc) error {
  35. return DB(func(db *sqlx.DB) error {
  36. tx, err := db.Beginx()
  37. if err != nil {
  38. return err
  39. }
  40. err = action(db, tx)
  41. if err == nil {
  42. return tx.Commit()
  43. } else {
  44. return tx.Rollback()
  45. }
  46. })
  47. }

ping.go

预留http服务保活探针接口

  1. package handlers
  2. import (
  3. "github.com/gin-gonic/gin"
  4. "net/http"
  5. "time"
  6. )
  7. func Ping(c *gin.Context) {
  8. c.JSON(http.StatusOK, gin.H{ "ok": true, "time": time.Now().Format(time.RFC3339)})
  9. }

subscribe.go

注册一个消息订阅者. 消息订阅者包含订阅者ID, 主题和回调地址(以便推送消息)

  1. package handlers
  2. import (
  3. "github.com/gin-gonic/gin"
  4. "github.com/gin-gonic/gin/binding"
  5. "github.com/jmoiron/sqlx"
  6. "learning/gooop/saga/mqs/database"
  7. "net/http"
  8. )
  9. type tSubMsg struct {
  10. ClientID string
  11. Topic string
  12. CallbackUrl string
  13. }
  14. func Subscribe(c *gin.Context) {
  15. msg := &tSubMsg{}
  16. if err := c.ShouldBindBodyWith(&msg, binding.JSON); err != nil {
  17. c.AbortWithStatusJSON(
  18. http.StatusInternalServerError,
  19. gin.H{"error": err.Error()})
  20. return
  21. }
  22. err := database.DB(func(db *sqlx.DB) error {
  23. _,e := db.Exec(
  24. "replace into sub_info(client_id, topic, callback_url) values(?, ?, ?)",
  25. msg.ClientID,
  26. msg.Topic,
  27. msg.CallbackUrl)
  28. return e
  29. })
  30. if err != nil {
  31. c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
  32. } else {
  33. c.JSON(http.StatusOK, gin.H { "ok": true })
  34. }
  35. }

routers.go

注册gin路由

  1. package routers
  2. import (
  3. "github.com/gin-gonic/gin"
  4. "learning/gooop/saga/mqs/handlers"
  5. )
  6. func RegisterRouters() *gin.Engine {
  7. r := gin.Default()
  8. r.Use(gin.Logger())
  9. r.GET("/ping", handlers.Ping)
  10. r.POST("/subscribe", handlers.Subscribe)
  11. return r
  12. }

(未完待续)