缘起
最近阅读<
本系列笔记拟采用golang练习之
Saga模式
- saga模式将分布式长事务切分为一系列独立短事务
- 每个短事务是可通过补偿动作进行撤销的
- 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用 ``` Saga由一系列的子事务“Ti”组成, 每个Ti都有对应的补偿“Ci”, 当Ti出现问题时Ci用于处理Ti执行带来的问题。
可以通过下面的两个公式理解Saga模式。 T = T1 T2 … Tn T = TCT
Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务, 而应该将事务切分为一组按序依次提交的短事务, Saga模式满足ACD(原子性、一致性、持久性)特征。
摘自 <
<a name="fGPkq"></a># 目标- 为实现saga模式的分布式事务, 先撸一个pub/sub事务消息队列服务- 事务消息队列服务的功能性要求- 消息不会丢失: 消息的持久化- 消息的唯一性: 要求每个消息有全局ID和子事务ID- 确保投递成功: 投递队列持久化, 投递状态持久化, 失败重试<a name="naUyZ"></a># 子目标(Day 7)- MQS已基本可用, 现在实现一个模拟的订单微服务, 并与MQ联动- 长事务: 订单创建后, 联动库存服务, 扣减库存- 补偿动作- 如果扣库成功, 更新订单状态为已出库(实际系统中, 可能还涉及物流发货等复杂流程)- 否则(库存不足), 更新订单状态为出库失败(实际系统中, 可能还涉及退款和通知客户等复杂流程)- 流程- 创建订单后, 向MQ发布[销售订单.创建]消息- 订阅MQ的[销售订单.出库.成功], [销售订单.出库.失败]消息- 接收到MQ的出库消息后, 更新订单状态<a name="WjBvI"></a># 设计- ISaleOrderService: 订单服务接口- SaleOrder: 销售订单实体- tSaleOrderService: 模拟订单服务, 实现ISaleOrderService接口- NotifyStockOutbound: 接收库存服务的扣库结果消息<a name="v6aR0"></a># ISaleOrderService.go订单服务接口```gopackage order// ISaleOrderService to manage sale order creation and modificationtype ISaleOrderService interface {// get order infoGet(orderID string) *SaleOrder// create new orderCreate(it *SaleOrder) error// update order statusUpdate(orderID string, oldStatusFlag int32, newStatusFlag int32) (error, *SaleOrder)}
SaleOrder.go
销售订单实体
package ordertype SaleOrder struct {OrderID stringCustomerID stringProductID stringQuantity intPrice float64Amount float64CreateTime int64StatusFlag int32}const StatusNotDelivered int32 = 0const StatusStockOutboundDone int32 = 1const StatusStockOutboundFailed int32 = 2const StatusMQServiceFailed int32 = 3
tSaleOrderService.go
模拟订单服务, 实现ISaleOrderService接口
package orderimport ("bytes""encoding/json""errors""io/ioutil""learning/gooop/saga/mqs/logger""learning/gooop/saga/mqs/models""net/http""sync""sync/atomic""time")type tSaleOrderService struct {rwmutex *sync.RWMutexorders map[string]*SaleOrderbMQReady boolpublishQueue chan *SaleOrder}func newSaleOrderService() ISaleOrderService {it := new(tSaleOrderService)it.init()return it}func (me *tSaleOrderService) init() {me.rwmutex = new(sync.RWMutex)me.orders = make(map[string]*SaleOrder)me.bMQReady = falseme.publishQueue = make(chan *SaleOrder, gMQMaxQueuedMsg)go me.beginSubscribeMQ()go me.beginPublishMQ()}func (me *tSaleOrderService) beginSubscribeMQ() {expireDuration := int64(1 * time.Hour)subscribeDuration := 20 * time.MinutepauseDuration := 3*time.SecondlastSubscribeTime := int64(0)for {now := time.Now().UnixNano()if now - lastSubscribeTime >= int64(subscribeDuration) {expireTime := now + expireDurationerr := fnSubscribeMQ(expireTime)if err != nil {me.bMQReady = falselogger.Logf("tSaleOrderService.beginSubscribeMQ, failed, err=%v", err)} else {lastSubscribeTime = nowme.bMQReady = truelogger.Logf("tSaleOrderService.beginSubscribeMQ, done")}}time.Sleep(pauseDuration)}}func fnSubscribeMQ(expireTime int64) error {msg := &models.SubscribeMsg{ClientID: gMQClientID,Topic: gMQSubscribeTopic,NotifyUrl: gMQServerURL + PathOfNotifyStockOutbound,ExpireTime: expireTime,}url := gMQServerURL + "/subscribe"return fnPost(msg, url)}func fnPost(msg interface{}, url string) error {body,_ := json.Marshal(msg)rsp, err := http.Post(url, "application/json;charset=utf-8", bytes.NewReader(body))if err != nil {return err}defer rsp.Body.Close()j, err := ioutil.ReadAll(rsp.Body)if err != nil {return err}ok := &models.OkMsg{}err = json.Unmarshal(j, ok)if err != nil {return err}if !ok.OK {return gMQReplyFalse}return nil}func (me *tSaleOrderService) beginPublishMQ() {for {select {case msg := <- me.publishQueue :me.publishMQ(msg)break}}}func (me *tSaleOrderService) Get(orderID string) *SaleOrder {me.rwmutex.RLock()defer me.rwmutex.RUnlock()it,ok := me.orders[orderID]if ok {return it} else {return nil}}func (me *tSaleOrderService) Create(it *SaleOrder) error {me.rwmutex.Lock()defer me.rwmutex.Unlock()if len(me.publishQueue) >= gMQMaxQueuedMsg {return gMQNotAvailableError}me.orders[it.OrderID] = itme.publishQueue <- itreturn nil}func (me *tSaleOrderService) publishMQ(it *SaleOrder) {url := gMQServerURL + "/publish"j,_ := json.Marshal(it)msg := &models.TxMsg{GlobalID: it.OrderID,SubID: it.OrderID,SenderID: gMQClientID,Topic: gMQPublishTopic,CreateTime: it.CreateTime,Content: string(j),}for i := 0;i < gMQMaxPublishRetry;i++ {err := fnPost(msg, url)if err != nil {logger.Logf("tSaleOrderService.publishMQ, failed, err=%v, order=%v", err, it)time.Sleep(gMQPublishInterval)} else {logger.Logf("tSaleOrderService.publishMQ, done, order=%v", it)return}}// publish failedlogger.Logf("tSaleOrderService.publishMQ, failed max retries, order=%v", it)_,_ = me.Update(it.OrderID, StatusNotDelivered, StatusMQServiceFailed)}func (me *tSaleOrderService) Update(orderID string, oldStatusFlag int32, newStatusFlag int32) (error, *SaleOrder) {me.rwmutex.RLock()defer me.rwmutex.RUnlock()it, ok := me.orders[orderID]if !ok {return gNotFoundError, nil}if !atomic.CompareAndSwapInt32(&it.StatusFlag, oldStatusFlag, newStatusFlag) {return gStatusChangedError, it}it.StatusFlag = newStatusFlagreturn nil, it}var gMQReplyFalse = errors.New("mq reply false")var gMQNotAvailableError = errors.New("mq not ready")var gNotFoundError = errors.New("order not found")var gStatusChangedError = errors.New("status changed")var gMQMaxPublishRetry = 3var gMQPublishInterval = 1*time.Secondvar gMQSubscribeTopic = "sale-order.stock.outbound"var gMQPublishTopic = "sale-order.created"var gMQClientID = "sale-order-service"var gMQServerURL = "http://localhost:333"var gMQMaxQueuedMsg = 1024var SaleOrderService = newSaleOrderService()
NotifyStockOutbound.go
接收库存服务的扣库结果消息
package orderimport ("encoding/json""github.com/gin-gonic/gin""io/ioutil""learning/gooop/saga/mqs/logger""learning/gooop/saga/mqs/models""net/http")func NotifyStockOutbound(c *gin.Context) {body := c.Request.Bodydefer body.Close()j, e := ioutil.ReadAll(body)if e != nil {logger.Logf("order.NotifyStockOutbound, failed ioutil.ReadAll")c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})return}msg := &models.TxMsg{}e = json.Unmarshal(j, msg)if e != nil {logger.Logf("order.NotifyStockOutbound, failed json.Unmarshal")c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})return}orderID := msg.GlobalIDsucceeded := msg.Content == "1"logger.Logf("order.NotifyStockOutbound, orderID=%v, succeeded=%s", orderID, succeeded)var newStatusFlag int32if succeeded {newStatusFlag = StatusStockOutboundDone} else {newStatusFlag = StatusStockOutboundFailed}err, order := SaleOrderService.Update(orderID, StatusNotDelivered, newStatusFlag)if err != nil {logger.Logf("order.NotifyStockOutbound, failed SaleOrderService.Update, err=%v, order=%v", err, order)}}var PathOfNotifyStockOutbound = "/notify/sale-order.stock.outbound"
(未完待续)
