缘起
最近阅读<
本系列笔记拟采用golang练习之
Saga模式
- saga模式将分布式长事务切分为一系列独立短事务
- 每个短事务是可通过补偿动作进行撤销的
- 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用 ``` Saga由一系列的子事务“Ti”组成, 每个Ti都有对应的补偿“Ci”, 当Ti出现问题时Ci用于处理Ti执行带来的问题。
可以通过下面的两个公式理解Saga模式。 T = T1 T2 … Tn T = TCT
Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务, 而应该将事务切分为一组按序依次提交的短事务, Saga模式满足ACD(原子性、一致性、持久性)特征。
摘自 <
<a name="fGPkq"></a># 目标- 为实现saga模式的分布式事务, 先模拟实现一个简单的pub/sub消息服务- 使用sqlx + sqlite3持久化消息, 使用gin作为http框架, 使用toml配置文件<a name="rJKhE"></a># 代码结构
$ tree saga saga └── mqs ├── cmd │ └── boot.go ├── config │ └── config.go ├── database │ └── database.go ├── handlers │ ├── ping.go │ └── subscribe.go ├── logger │ └── logger.go └── routers └── routers.go
<a name="KFVPw"></a># 设计- config: 读取并解析toml配置文件- database: 封装sqlx + sqlite3- handlers/ping: 预留http服务保活探针接口- handlers/subscribe: 注册一个消息订阅者. 消息订阅者包含订阅者ID, 主题和回调地址(以便推送消息)- routers: 注册gin路由<a name="fjuxD"></a># config.go读取并解析toml配置文件```gopackage configimport "github.com/BurntSushi/toml"type tomlConfig struct {MQS tServiceConfig}type tServiceConfig struct {Port intLogDir string}var gServiceConfig = &tServiceConfig{}func init() {cfg := &tomlConfig{}if _,err := toml.DecodeFile("./mqs.toml", cfg);err != nil {panic(err)}*gServiceConfig = cfg.MQS}func GetPort() int {return gServiceConfig.Port}func GetLogDir() string {return gServiceConfig.LogDir}
database.go
封装sqlx + sqlite3
package databaseimport "github.com/jmoiron/sqlx"import _ "github.com/mattn/go-sqlite3"type DBFunc func(db *sqlx.DB) errortype TXFunc func(db *sqlx.DB, tx *sqlx.Tx) errorfunc init() {// must prepare tableserr := DB(func(db *sqlx.DB) error {// table: sub_info_,e := db.Exec(`create table if not exists sub_info(id integer primary key autoincrement,client_id varchar(50) unique not null,topic varchar(100) not null,callback_url varchar(500) not null)`)return e})if err != nil {panic(err)}}func open() (*sqlx.DB, error) {return sqlx.Open("sqlite3", "./mqs.db")}func DB(action DBFunc) error {db,err := open()if err != nil {return err}defer func() { _ = db.Close() }()return action(db)}func TX(action TXFunc) error {return DB(func(db *sqlx.DB) error {tx, err := db.Beginx()if err != nil {return err}err = action(db, tx)if err == nil {return tx.Commit()} else {return tx.Rollback()}})}
ping.go
预留http服务保活探针接口
package handlersimport ("github.com/gin-gonic/gin""net/http""time")func Ping(c *gin.Context) {c.JSON(http.StatusOK, gin.H{ "ok": true, "time": time.Now().Format(time.RFC3339)})}
subscribe.go
注册一个消息订阅者. 消息订阅者包含订阅者ID, 主题和回调地址(以便推送消息)
package handlersimport ("github.com/gin-gonic/gin""github.com/gin-gonic/gin/binding""github.com/jmoiron/sqlx""learning/gooop/saga/mqs/database""net/http")type tSubMsg struct {ClientID stringTopic stringCallbackUrl string}func Subscribe(c *gin.Context) {msg := &tSubMsg{}if err := c.ShouldBindBodyWith(&msg, binding.JSON); err != nil {c.AbortWithStatusJSON(http.StatusInternalServerError,gin.H{"error": err.Error()})return}err := database.DB(func(db *sqlx.DB) error {_,e := db.Exec("replace into sub_info(client_id, topic, callback_url) values(?, ?, ?)",msg.ClientID,msg.Topic,msg.CallbackUrl)return e})if err != nil {c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})} else {c.JSON(http.StatusOK, gin.H { "ok": true })}}
routers.go
注册gin路由
package routersimport ("github.com/gin-gonic/gin""learning/gooop/saga/mqs/handlers")func RegisterRouters() *gin.Engine {r := gin.Default()r.Use(gin.Logger())r.GET("/ping", handlers.Ping)r.POST("/subscribe", handlers.Subscribe)return r}
(未完待续)
