缘起
最近阅读<
本系列笔记拟采用golang练习之
Saga模式
- saga模式将分布式长事务切分为一系列独立短事务
- 每个短事务是可通过补偿动作进行撤销的
- 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用 ``` Saga由一系列的子事务“Ti”组成, 每个Ti都有对应的补偿“Ci”, 当Ti出现问题时Ci用于处理Ti执行带来的问题。
可以通过下面的两个公式理解Saga模式。 T = T1 T2 … Tn T = TCT
Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务, 而应该将事务切分为一组按序依次提交的短事务, Saga模式满足ACD(原子性、一致性、持久性)特征。
摘自 <
<a name="fGPkq"></a># 目标- 为实现saga模式的分布式事务, 先撸一个pub/sub事务消息队列服务- 事务消息队列服务的功能性要求- 消息不会丢失: 消息的持久化- 消息的唯一性: 要求每个消息有全局ID和子事务ID- 确保投递成功: 投递队列持久化, 投递状态持久化, 失败重试<a name="naUyZ"></a># 子目标(Day 6)- 终于可以进行基本功能测试了- 订阅接口测试: /subscribe- 发布接口测试: /publish- 通知接口测试: /notify- 重构- database.go: 重构DDL语句, 以兼容sqlx的StructScan字段映射- 所有Queryx返回的rows, 必须加上defer rows.Close()- 添加大量的过程诊断日志- tLiveMsgSource: fix handleMsgPublished未接收数据就return的bug<a name="mLMji"></a># 单元测试mqs_test.go, 依次清空数据库, 测试订阅接口, 发布接口和通知接口, 并诊断过程日志和数据库记录变化.```gopackage sagaimport ("bytes""encoding/json""errors""fmt""github.com/jmoiron/sqlx""io/ioutil""learning/gooop/saga/mqs/cmd""learning/gooop/saga/mqs/database""learning/gooop/saga/mqs/logger""learning/gooop/saga/mqs/models""net/http""sync""testing""time")var gRunOnce sync.Oncefunc fnBootMQS() {gRunOnce.Do(func() {// boot mqsgo cmd.BootMQS()// wait for mqs uptime.Sleep(1 * time.Second)})}func fnAssertTrue (t *testing.T, b bool, msg string) {if !b {t.Fatal(msg)}}func Test_MQS(t *testing.T) {// prepare mqsfnClearDB(t)fnBootMQS()// subscribefnTestSubscribe(t)t.Log("passed fnTestSubscribe")// publish and notifyfnTestPublishAndNotify(t)t.Log("passed fnTestPublishAndNotify")}func fnTestPublishAndNotify(t *testing.T) {t.Log("testing fnTestPublishAndNotify")msg := &models.TxMsg{GlobalID: "test-global-id",SubID: "test-sub-id",SenderID: "test-sender-id",Topic: "test-topic",Content: "test-content",}fnPost(t, msg, "http://localhost:3333/publish")// check logfnAssertTrue(t, logger.Count("handlers.Publish, msg=test-global-id")==1, "expecting log: handlers.Publish, msg=test-global-id")// check notifytime.Sleep(100 * time.Millisecond)fnAssertTrue(t, logger.Count("tLiveMsgSource.handleMsgPublished, clientID=test-client, msg=test-global-id")==1, "expecting log: tLiveMsgSource.handleMsgPublished")fnAssertTrue(t, logger.Count("tDeliveryWorker.afterDeliverySuccess, done, id=test-client, msg=test-global-id")==1, "expecting log: tDeliveryWorker.afterDeliverySuccess")fnAssertTrue(t, logger.Count("handlers.Notify, msg=")==1, "expecting log: handlers.Notify, msg=")// check success queuefnAssertTrue(t, fnDBCount(t, "select count(1) from success_queue where ClientID='test-client'")==1, "expectiang db.success_queue.count == 1")}func fnClearDB(t *testing.T) {fnDBExec(t, "delete from subscriber")fnDBExec(t, "delete from tx_msg")fnDBExec(t, "delete from delivery_queue")fnDBExec(t, "delete from success_queue")}func fnDBCount(t *testing.T, sql string, args... interface{}) int {sum := []int{ 0 }err := database.DB(func(db *sqlx.DB) error {r,e := db.Queryx(sql, args...)if e != nil {return e}defer r.Close()if !r.Next() {return errors.New("empty rows")}e = r.Scan(&sum[0])if e != nil {return e}return nil})if err != nil {t.Fatal(err)}return sum[0]}func fnDBExec(t *testing.T, sql string, args... interface{}) int {rows := []int64{ 0 }err := database.DB(func(db *sqlx.DB) error {r,e := db.Exec(sql, args...)if e != nil {return e}rows[0], e = r.RowsAffected()if e != nil {return e}return nil})if err != nil {t.Fatal(err)}return int(rows[0])}func fnTestSubscribe(t *testing.T) {t.Log("testing fnTestSubscribe")// clear subscriberfnDBExec(t, "delete from subscriber")msg := &models.SubscribeMsg{ClientID: "test-client",Topic: "test-topic",NotifyUrl: "http://localhost:3333/notify",ExpireTime: time.Now().UnixNano() + int64(30*time.Second),}fnPost(t, msg, "http://localhost:3333/subscribe")// check logfnAssertTrue(t, logger.Count("handlers.Subscribe, event=subscriber.registered") == 1, "expecting event=subscriber.registered")// check dbcount := fnDBCount(t, "select count(1) as n from subscriber where ClientID=? and topic=?", msg.ClientID, msg.Topic)fnAssertTrue(t, count == 1, "expecting subscriber.count == 1")}func fnPost(t *testing.T, msg interface{}, url string) {body,_ := json.Marshal(msg)rsp, err := http.Post(url, "application/json;charset=utf-8", bytes.NewReader(body))if err != nil {t.Fatal(err)}defer rsp.Body.Close()j, err := ioutil.ReadAll(rsp.Body)if err != nil {t.Fatal(err)}ok := &models.OkMsg{}err = json.Unmarshal(j, ok)if err != nil {t.Fatal(err)}fnAssertTrue(t, ok.OK, fmt.Sprintf("expecting replying ok from %s", url))}
测试输出
根据assert失败提示, 逐步排查诊断日志, 主要是sqlx字段映射错误, 和未及时调用rows.Close()错误.
$ go test -v mqs_test.go=== RUN Test_MQSeventbus.Pub, event=system.boot, handler=gDeliveryService.handleBootEvent[GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached.[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.- using env: export GIN_MODE=release- using code: gin.SetMode(gin.ReleaseMode)[GIN-debug] GET /ping --> learning/gooop/saga/mqs/handlers.Ping (4 handlers)[GIN-debug] POST /subscribe --> learning/gooop/saga/mqs/handlers.Subscribe (4 handlers)[GIN-debug] POST /publish --> learning/gooop/saga/mqs/handlers.Publish (4 handlers)[GIN-debug] POST /notify --> learning/gooop/saga/mqs/handlers.Notify (4 handlers)[GIN-debug] Listening and serving HTTP on :3333tDeliveryService.beginCreatingWorkerstDeliveryService.beginCleanExpiredWorkersmqs_test.go:139: testing fnTestSubscribehandlers.Subscribe, event=subscriber.registered, msg=&{test-client test-topic http://localhost:3333/notify 1615868155394998875}eventbus.Pub, event=subscriber.registered, handler=gDeliveryService.handleSubscriberRegistered[GIN] 2021/03/16 - 12:15:25 | 200 | 8.118873ms | ::1 | POST "/subscribe"[GIN] 2021/03/16 - 12:15:25 | 200 | 8.214968ms | ::1 | POST "/subscribe"tDeliveryWorker.afterInitialLoad, clientID=test-client, rows=0database.DB, err=empty rowsmqs_test.go:45: passed fnTestSubscribemqs_test.go:54: testing fnTestPublishAndNotifyhandlers.Publish, msg=test-global-id/test-sub-id/test-topic, msgId=15[GIN] 2021/03/16 - 12:15:25 | 200 | 15.200109ms | ::1 | POST "/publish"[GIN] 2021/03/16 - 12:15:25 | 200 | 15.216578ms | ::1 | POST "/publish"handlers.Publish, pubLiveMsg 15handlers.Publish, pubLiveMsg, msgId=15, rows=1handlers.Publish, event=msg.published, clientID=test-client, msg=test-global-id/test-sub-id/http://localhost:3333/notifyeventbus.Pub, event=msg.published, handler=tLiveMsgSource.test-clienttLiveMsgSource.handleMsgPublished, clientID=test-client, msg=test-global-id/test-sub-id/test-topichandlers.Notify, msg=&{test-global-id test-sub-id 0 test-topic test-content}[GIN] 2021/03/16 - 12:15:25 | 200 | 48.38µs | ::1 | POST "/notify"[GIN] 2021/03/16 - 12:15:25 | 200 | 110.659µs | ::1 | POST "/notify"tDeliveryWorker.afterDeliverySuccess, done, id=test-client, msg=test-global-id/test-sub-idmqs_test.go:49: passed fnTestPublishAndNotify--- PASS: Test_MQS (1.18s)PASSok command-line-arguments 1.187s
(未完待续)
