缘起
最近阅读<
本系列笔记拟采用golang练习之
Saga模式
- saga模式将分布式长事务切分为一系列独立短事务
- 每个短事务是可通过补偿动作进行撤销的
- 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用 ``` Saga由一系列的子事务“Ti”组成, 每个Ti都有对应的补偿“Ci”, 当Ti出现问题时Ci用于处理Ti执行带来的问题。
可以通过下面的两个公式理解Saga模式。 T = T1 T2 … Tn T = TCT
Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务, 而应该将事务切分为一组按序依次提交的短事务, Saga模式满足ACD(原子性、一致性、持久性)特征。
摘自 <
<a name="fGPkq"></a># 目标- 为实现saga模式的分布式事务, 先撸一个pub/sub事务消息队列服务- 事务消息队列服务的功能性要求- 消息不会丢失: 消息的持久化- 消息的唯一性: 要求每个消息有全局ID和子事务ID- 确保投递成功: 投递队列持久化, 投递状态持久化, 失败重试<a name="naUyZ"></a># 子目标(Day 2)- 消息的设计- 消息的持久化- 投递队列的持久化- 投递状态的持久化<a name="3CP5G"></a># 设计- TxMsg: 事务消息模型- database: 添加事务消息表, 消息投递表, 成功投递表, 失败投递表- publish: 消息发布api- routers: 添加/publish路由<a name="YILum"></a># TxMsg.go事务消息模型```gopackage models// 事务消息体type TxMsg struct {// 全局事务IDGlobalID string// 子事务IDSubID string// 发送者IDSenderID string// 时间戳, 使用time.Now().UnixNano()CreateTime int64// 主题, 即消息类型Topic string// 消息内容, 一般是jsonContent string}
database.go
添加事务消息表, 消息投递表, 成功投递表, 失败投递表
package databaseimport "github.com/jmoiron/sqlx"import _ "github.com/mattn/go-sqlite3"type DBFunc func(db *sqlx.DB) errortype TXFunc func(db *sqlx.DB, tx *sqlx.Tx) errorfunc init() {// must prepare tableserr := DB(initDB)if err != nil {panic(err)}}func initDB(db *sqlx.DB) error {// 订阅者/消费者: subscriber_, e := db.Exec(`create table if not exists subscriber(id integer primary key autoincrement,client_id varchar(50) unique not null,topic varchar(100) not null,notify_url varchar(500) not null,expire_time integer)`)if e != nil {return e}// 事务消息: tx_msg_, e = db.Exec(`create table if not exists tx_msg (id integer primary key autoincrement,global_id string varchar(50) not null,sub_id string varchar(50) unique not null,sender_id varchar(50) not null,create_time integer not null,topic varchar(100) not null,content nvarchar(2048) not null)`)if e != nil {return e}// 投递队列: delivery_queue_, e = db.Exec(`create table if not exists delivery_queue (id integer primary key autoincrement,msg_id integer not null,sub_id integer not null,status_flag integer not null,failed_count integer not null)`)if e != nil {return e}// 成功投递队列: success_queue_, e = db.Exec(`create table if not exists success_queue (id integer primary key autoincrement,msg_id integer not null,sub_id integer not null)`)if e != nil {return e}// 投递失败队列: failed_queue_, e = db.Exec(`create table if not exists failed_queue (id integer primary key autoincrement,msg_id integer not null,sub_id integer not null)`)if e != nil {return e}return nil}func open() (*sqlx.DB, error) {return sqlx.Open("sqlite3", "./mqs.db")}func DB(action DBFunc) error {db,err := open()if err != nil {return err}defer func() { _ = db.Close() }()return action(db)}func TX(action TXFunc) error {return DB(func(db *sqlx.DB) error {tx, err := db.Beginx()if err != nil {return err}err = action(db, tx)if err == nil {return tx.Commit()} else {return tx.Rollback()}})}
publish.go
消息发布api
package handlersimport ("github.com/gin-gonic/gin""github.com/gin-gonic/gin/binding""github.com/jmoiron/sqlx""learning/gooop/saga/mqs/database""learning/gooop/saga/mqs/logger""learning/gooop/saga/mqs/models""net/http""time")func Publish(c *gin.Context) {// parse requestmsg := &models.TxMsg{}if err := c.ShouldBindBodyWith(&msg, binding.JSON); err != nil {c.AbortWithStatusJSON(http.StatusInternalServerError,gin.H{ "ok": false, "error": err.Error()})return}// fixme: validate msg// save to dbids := []int64{0}err := database.TX(func(db *sqlx.DB, tx *sqlx.Tx) error {id, e := saveTxMsg(db, tx, msg)if e != nil {return e}ids[0] = idreturn nil})if ids[0] > 0 {newMsgId := ids[0]logger.Logf("publish new msg: %d", newMsgId)// todo: 新增消息, 开始投送}// replyif err != nil {c.JSON(http.StatusInternalServerError, gin.H{ "ok": false, "error": err.Error()})} else {c.JSON(http.StatusOK, gin.H { "ok": true })}}func saveTxMsg(db *sqlx.DB, tx *sqlx.Tx, msg *models.TxMsg) (int64,error) {// insert tx_msgr, e := db.Exec(`replace into tx_msg(global_id, sub_id, sender_id, create_time, topic, content) values(?,?,?,?,?,?)`,msg.GlobalID, msg.SubID, msg.SenderID, msg.CreateTime, msg.Topic, msg.Content,)if e != nil {return 0,e}// get last insert idid,e := r.LastInsertId()if e != nil {return 0,e}if id > 0 {// copy to delivery queuenow := time.Now().UnixNano()r, e = db.Exec(`insert into delivery_queue(msg_id, sub_id, status_flag, failed_count)select?, s.id, 0, 0fromsub_info swheres.expire_time>?`, id, now)if e != nil {return 0,e}}return id, nil}
routers.go
添加/publish路由
package routersimport ("github.com/gin-gonic/gin""learning/gooop/saga/mqs/handlers")func RegisterRouters() *gin.Engine {r := gin.Default()r.Use(gin.Logger())r.GET("/ping", handlers.Ping)r.POST("/subscribe", handlers.Subscribe)r.POST("/publish", handlers.Publish)return r}
(未完待续)
