缘起
最近阅读<
本系列笔记拟采用golang练习之
Saga模式
- saga模式将分布式长事务切分为一系列独立短事务
- 每个短事务是可通过补偿动作进行撤销的
- 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用 ``` Saga由一系列的子事务“Ti”组成, 每个Ti都有对应的补偿“Ci”, 当Ti出现问题时Ci用于处理Ti执行带来的问题。
可以通过下面的两个公式理解Saga模式。 T = T1 T2 … Tn T = TCT
Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务, 而应该将事务切分为一组按序依次提交的短事务, Saga模式满足ACD(原子性、一致性、持久性)特征。
摘自 <
<a name="fGPkq"></a># 目标- 为实现saga模式的分布式事务, 先撸一个pub/sub事务消息队列服务- 事务消息队列服务的功能性要求- 消息不会丢失: 消息的持久化- 消息的唯一性: 要求每个消息有全局ID和子事务ID- 确保投递成功: 投递队列持久化, 投递状态持久化, 失败重试<a name="naUyZ"></a># 子目标(Day 4)- 完善投递worker- 未处理消息: 标记, 并尝试投递- 已处理消息: 判断是否超时, 并重试投递- 投递成功: 移动到成功投递表- 投递失败: 重置标记, 下轮重试- 数据库表相应的细节调整- delivery_queue: 去掉failed_count, 增加update_time时间戳- success_queue: 去掉sub_id, 改为client_id, 并增加create_time时间戳- failed_queue: 因为不允许失败, 因此删除失败投递表<a name="beZzA"></a># tDeliveryWorker.go- 完善投递worker- 未处理消息: 标记, 并尝试投递- 已处理消息: 判断是否超时, 并重试投递- 投递成功: 移动到成功投递表- 投递失败: 重置标记, 下轮重试```gopackage deliveryimport ("bytes""encoding/json""errors""github.com/jmoiron/sqlx""io/ioutil""learning/gooop/saga/mqs/database""learning/gooop/saga/mqs/logger""learning/gooop/saga/mqs/models""net/http""time")type tDeliveryWorker struct {info *tWorkerInfo}func newDeliveryWorker(info *tWorkerInfo) *tDeliveryWorker {it := new(tDeliveryWorker)it.info = infogo it.beginMainLoop()return it}func (me *tDeliveryWorker) beginMainLoop() {for !me.isExpired() {ok, msg := me.peek()if ok {switch msg.StatusFlag {case 0:// 未处理的消息me.handleUndeliveredMsg(msg)breakcase 1:// 处理中的消息me.handleDeliveringMsg(msg)break}} else {time.Sleep(time.Duration(1) * time.Second)}}}func (me *tDeliveryWorker) isExpired() bool {return time.Now().UnixNano() >= me.info.ExpireTime}// peek: 从待投递队列中获取最早的一条记录func (me *tDeliveryWorker) peek() (bool, *models.QueuedMsg) {msg := &models.QueuedMsg{}e := database.DB(func(db *sqlx.DB) error {rows, err := db.Queryx("select * from delivery_queue where client_id=? order by create_time asc limit 1",me.info.ClientID,)if err != nil {return err}if rows.Next() {err = rows.StructScan(msg)if err != nil {return err}return nil} else {return gEmptyRowsErr}})if e != nil {return false, nil} else {return true, msg}}// handleUndeliveredMsg: if msg unhandled, then try to deliver itfunc (me *tDeliveryWorker) handleUndeliveredMsg(msg *models.QueuedMsg) {err := database.DB(func(db *sqlx.DB) error {now := time.Now().UnixNano()r,e := db.Exec("update delivery_queue set status_flag=1, update_time=? where id=? and status_flag=0 and update_time=?",now,msg.ID,msg.UpdateTime,)if e != nil {return e}rows, e := r.RowsAffected()if e != nil {return e}if rows != 1 {return gOneRowsErr}msg.UpdateTime = nowreturn nil})if err != nil {logger.Logf("tDeliveryWorker.handleNewMsg, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())return}if me.deliver(msg) {me.afterDeliverySuccess(msg)} else {me.afterDeliveryFailed(msg)}}// deliver: use http.Post function to delivery msgfunc (me *tDeliveryWorker) deliver(msg *models.QueuedMsg) bool {t := &models.TxMsg{GlobalID: msg.GlobalID,SubID: msg.SubID,Topic: msg.Topic,CreateTime: msg.CreateTime,Content: msg.Content,}j,e := json.Marshal(t)if e != nil {logger.Logf("tDeliveryWorker.deliver, failed json.Marshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)return false}r, e := http.Post(me.info.NotifyURL, "application/json", bytes.NewReader(j))if e != nil {logger.Logf("tDeliveryWorker.deliver, failed http.Post, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)return false}defer r.Body.Close()rep, e := ioutil.ReadAll(r.Body)if e != nil {logger.Logf("tDeliveryWorker.deliver, failed ioutil.ReadAll, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)return false}m := &models.OkMsg{}e = json.Unmarshal(rep, m)if e != nil {logger.Logf("tDeliveryWorker.deliver, failed json.Unmarshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)return false}if m.OK {return true} else {logger.Logf("tDeliveryWorker.deliver, failed OkMsg.OK, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)return false}}// handleDeliveringMsg: if delivery timeout, then retry deliveryfunc (me *tDeliveryWorker) handleDeliveringMsg(msg *models.QueuedMsg) {now := time.Now().UnixNano()if msg.UpdateTime + gDeliveryTimeoutNanos > now {return}// delivery timeoutme.afterDeliveryTimeout(msg)}// afterDeliverySuccess: if done, move msg to success queuefunc (me *tDeliveryWorker) afterDeliverySuccess(msg *models.QueuedMsg) {err := database.TX(func(db *sqlx.DB, tx *sqlx.Tx) error {r,e := db.Exec("delete from delivery_queue where id=? and update_time=? and status_flag=1",msg.ID,msg.UpdateTime,)if e != nil {return e}rows, e := r.RowsAffected()if e != nil {return e}if rows != 1 {return gOneRowsErr}r, e = db.Exec("insert into success_queue (msg_id, client_id, create_time) values(?, ?, ?)",msg.ID,msg.ClientID,time.Now().UnixNano(),)if e != nil {return e}rows, e = r.RowsAffected()if e != nil {return e}if rows != 1 {return gOneRowsErr}return nil})if err != nil {logger.Logf("tDeliveryWorker.afterDeliverySuccess, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())} else {logger.Logf("tDeliveryWorker.afterDeliverySuccess, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)}}// afterDeliveryFailed: if failed, do nothing but just log itfunc (me *tDeliveryWorker) afterDeliveryFailed(msg *models.QueuedMsg) {logger.Logf("tDeliveryWorker.afterDeliveryFailed, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)}// afterDeliveryTimeout: if timeout, then reset status and retryfunc (me *tDeliveryWorker) afterDeliveryTimeout(msg *models.QueuedMsg) {err := database.DB(func(db *sqlx.DB) error {r,e := db.Exec("update delivery_queue set status_flag=0 where id=? and status_flag=1 and update_time=?",msg.ID,msg.UpdateTime,)if e != nil {return e}rows,e := r.RowsAffected()if e != nil {return e}if rows != 1 {return gOneRowsErr}return nil})if err != nil {logger.Logf("tDeliveryWorker.afterDeliveryTimeout, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())} else {logger.Logf("tDeliveryWorker.afterDeliveryTimeout, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)}}var gEmptyRowsErr = errors.New("empty rows")var gOneRowsErr = errors.New("expecting one row affected")var gDeliveryTimeoutNanos = int64(10 * (time.Second / time.Nanosecond))
database.go
- 数据库表相应的细节调整
- delivery_queue: 去掉failed_count, 增加update_time时间戳
- success_queue: 去掉sub_id, 改为client_id, 并增加create_time时间戳
- failed_queue: 因为不允许失败, 因此删除失败投递表 ```go package database
import “github.com/jmoiron/sqlx” import _ “github.com/mattn/go-sqlite3”
type DBFunc func(db sqlx.DB) error type TXFunc func(db sqlx.DB, tx *sqlx.Tx) error
func init() { // must prepare tables err := DB(initDB) if err != nil { panic(err) } }
func initDB(db *sqlx.DB) error {
// 订阅者/消费者: subscriber
_, e := db.Exec(create table if not exists subscriber(
id integer primary key autoincrement,
client_id varchar(50) unique not null,
topic varchar(100) not null,
notify_url varchar(500) not null,
expire_time integer
))
if e != nil {
return e
}
// 事务消息: tx_msg_, e = db.Exec(`create table if not exists tx_msg (id integer primary key autoincrement,global_id string varchar(50) not null,sub_id string varchar(50) unique not null,sender_id varchar(50) not null,create_time integer not null,topic varchar(100) not null,content nvarchar(2048) not null
)`) if e != nil { return e }
// 投递队列: delivery_queue_, e = db.Exec(`create table if not exists delivery_queue (id integer primary key autoincrement,client_id varchar(50) not null,notify_url varchar(500) not null,msg_id integer not null,global_id string varchar(50) not null,sub_id string varchar(50) unique not null,sender_id varchar(50) not null,create_time integer not null,topic varchar(100) not null,content nvarchar(2048) not null,status_flag integer not null,update_time integer not null
)`) if e != nil { return e }
// 成功投递队列: success_queue_, e = db.Exec(`create table if not exists success_queue (id integer primary key autoincrement,msg_id integer not null,client_id varchar(50) not null,create_time integer not null
)`) if e != nil { return e }
// // 投递失败队列: failedqueue
// , e = db.Exec(create table if not exists failed_queue (
// id integer primary key autoincrement,
// msg_id integer not null,
// client_id varchar(50) not null,
// create_time integer not null
//))
// if e != nil {
// return e
// }
return nil
}
func open() (*sqlx.DB, error) { return sqlx.Open(“sqlite3”, “./mqs.db”) }
func DB(action DBFunc) error { db,err := open() if err != nil { return err } defer func() { _ = db.Close() }()
return action(db)
}
func TX(action TXFunc) error { return DB(func(db *sqlx.DB) error { tx, err := db.Beginx() if err != nil { return err }
err = action(db, tx)if err == nil {return tx.Commit()} else {return tx.Rollback()}})
} ```
(未完待续)
