缘起

最近阅读<> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之

Saga模式

  • saga模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过补偿动作进行撤销的
  • 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用 ``` Saga由一系列的子事务“Ti”组成, 每个Ti都有对应的补偿“Ci”, 当Ti出现问题时Ci用于处理Ti执行带来的问题。

可以通过下面的两个公式理解Saga模式。 T = T1 T2 … Tn T = TCT

Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务, 而应该将事务切分为一组按序依次提交的短事务, Saga模式满足ACD(原子性、一致性、持久性)特征。

摘自 <> 刘金亮, 2021.1

  1. <a name="fGPkq"></a>
  2. # 目标
  3. - 为实现saga模式的分布式事务, 先撸一个pub/sub事务消息队列服务
  4. - 事务消息队列服务的功能性要求
  5. - 消息不会丢失: 消息的持久化
  6. - 消息的唯一性: 要求每个消息有全局ID和子事务ID
  7. - 确保投递成功: 投递队列持久化, 投递状态持久化, 失败重试
  8. <a name="naUyZ"></a>
  9. # 子目标(Day 5)
  10. - 重构和完善消息投递机制
  11. - iMsgHeap: 使用待投递消息堆缓存消息. 总是优先投递创建时间最小的消息
  12. - iMsgSource: 定义消息来源接口. 有两种消息来源, 1-数据库;2-eventbus
  13. - iMsgHistoryRing: 使用ring buffer记录近期已投递成功的消息, 防止重复投递
  14. - tConcurrentMsgHeap: 最小CreateTime优先的消息堆, 实现iMsgHeap接口, 并且是线程安全的.
  15. - tDBMsgSource: 从数据库拉取待投递消息, 实现iMsgSource接口
  16. - tLiveMsgSource: 监听eventbus即时推送的投递消息, 实现iMsgSource接口
  17. - tMsgHistoryRing: 历史消息的固定大小环形队列, 实现iMsgHistoryRing接口, 缓存近期已投递成功的消息
  18. - tDeliveryWorker:
  19. - 初始化时, 优先从数据库加载待投递消息
  20. - 使用iMsgHeap缓存待投递消息, 并确保有序
  21. - 使用iMsgSource接口, 分别从db和eventbus接收投递消息
  22. - 使用iMsgHistoryRing, 缓存已投递成功的消息, 防止重复投递
  23. <a name="SoPHV"></a>
  24. # iMsgHeap.go
  25. 使用待投递消息堆缓存消息. 总是优先投递创建时间最小的消息
  26. ```go
  27. package delivery
  28. import "learning/gooop/saga/mqs/models"
  29. type iMsgHeap interface {
  30. Size() int
  31. IsEmpty() bool
  32. IsNotEmpty() bool
  33. Push(msg *models.QueuedMsg)
  34. Peek() *models.QueuedMsg
  35. Pop() *models.QueuedMsg
  36. }

iMsgSource.go

定义消息来源接口. 有两种消息来源, 1-数据库;2-eventbus

  1. package delivery
  2. import "learning/gooop/saga/mqs/models"
  3. type iMsgSource interface {
  4. MsgChan() <- chan *models.QueuedMsg
  5. }
  6. type tSourceExpireFunc func() bool

iMsgHistoryRing.go

使用ring buffer记录近期已投递成功的消息, 防止重复投递

  1. package delivery
  2. import "learning/gooop/saga/mqs/models"
  3. type iMsgHistoryRing interface {
  4. Push(msg *models.QueuedMsg)
  5. Has(id int) bool
  6. }

tConcurrentMsgHeap.go

最小CreateTime优先的消息堆, 实现iMsgHeap接口, 并且是线程安全的.

  1. package delivery
  2. import (
  3. "learning/gooop/saga/mqs/models"
  4. "sync"
  5. )
  6. type tConcurrentMsgHeap struct {
  7. items []*models.QueuedMsg
  8. size int
  9. mutex *sync.Mutex
  10. }
  11. func newMsgHeap() iMsgHeap {
  12. it := new(tConcurrentMsgHeap)
  13. it.init()
  14. return it
  15. }
  16. func (me *tConcurrentMsgHeap) init() {
  17. me.items = make([]*models.QueuedMsg, 0)
  18. me.size = 0
  19. me.mutex = new(sync.Mutex)
  20. }
  21. func (me *tConcurrentMsgHeap) Size() int {
  22. return me.size
  23. }
  24. func (me *tConcurrentMsgHeap) IsEmpty() bool {
  25. return me.size <= 0
  26. }
  27. func (me *tConcurrentMsgHeap) IsNotEmpty() bool {
  28. return !me.IsEmpty()
  29. }
  30. func (me *tConcurrentMsgHeap) has(msgID int) bool {
  31. for _,it := range me.items {
  32. if it.MsgID == msgID {
  33. return true
  34. }
  35. }
  36. return false
  37. }
  38. func (me *tConcurrentMsgHeap) Push(msg *models.QueuedMsg) {
  39. me.mutex.Lock()
  40. defer me.mutex.Unlock()
  41. if me.has(msg.MsgID) {
  42. return
  43. }
  44. me.ensureSize(me.size + 1)
  45. me.items[me.size] = msg
  46. me.size++
  47. me.shiftUp(me.size - 1)
  48. }
  49. func (me *tConcurrentMsgHeap) ensureSize(size int) {
  50. for ;len(me.items) < size; {
  51. me.items = append(me.items, nil)
  52. }
  53. }
  54. func (me *tConcurrentMsgHeap) parentOf(i int) int {
  55. return (i - 1) / 2
  56. }
  57. func (me *tConcurrentMsgHeap) leftChildOf(i int) int {
  58. return i*2 + 1
  59. }
  60. func (me *tConcurrentMsgHeap) rightChildOf(i int) int {
  61. return me.leftChildOf(i) + 1
  62. }
  63. func (me *tConcurrentMsgHeap) last() (i int, v *models.QueuedMsg) {
  64. if me.IsEmpty() {
  65. return -1, nil
  66. }
  67. i = me.size - 1
  68. v = me.items[i]
  69. return i,v
  70. }
  71. func (me *tConcurrentMsgHeap) shiftUp(i int) {
  72. if i <= 0 {
  73. return
  74. }
  75. v := me.items[i]
  76. pi := me.parentOf(i)
  77. pv := me.items[pi]
  78. if me.less(v, pv) {
  79. me.items[pi], me.items[i] = v, pv
  80. me.shiftUp(pi)
  81. }
  82. }
  83. func (me *tConcurrentMsgHeap) less(a, b *models.QueuedMsg) bool {
  84. return a.CreateTime < b.CreateTime
  85. }
  86. func (me *tConcurrentMsgHeap) Pop() *models.QueuedMsg {
  87. me.mutex.Lock()
  88. defer me.mutex.Unlock()
  89. if me.IsEmpty() {
  90. return nil
  91. }
  92. top := me.items[0]
  93. li, lv := me.last()
  94. me.items[0] = nil
  95. me.size--
  96. if me.IsEmpty() {
  97. return top
  98. }
  99. me.items[0] = lv
  100. me.items[li] = nil
  101. me.shiftDown(0)
  102. return top
  103. }
  104. func (me *tConcurrentMsgHeap) Peek() *models.QueuedMsg {
  105. me.mutex.Lock()
  106. defer me.mutex.Unlock()
  107. if me.IsEmpty() {
  108. return nil
  109. }
  110. return me.items[0]
  111. }
  112. func (me *tConcurrentMsgHeap) shiftDown(i int) {
  113. pv := me.items[i]
  114. ok, ci, cv := me.minChildOf(i)
  115. if ok && me.less(cv, pv) {
  116. me.items[i], me.items[ci] = cv, pv
  117. me.shiftDown(ci)
  118. }
  119. }
  120. func (me *tConcurrentMsgHeap) minChildOf(p int) (ok bool, i int, v *models.QueuedMsg) {
  121. li := me.leftChildOf(p)
  122. if li >= me.size {
  123. return false, 0, nil
  124. }
  125. lv := me.items[li]
  126. ri := me.rightChildOf(p)
  127. if ri >= me.size {
  128. return true, li, lv
  129. }
  130. rv := me.items[ri]
  131. if me.less(lv, rv) {
  132. return true, li, lv
  133. } else {
  134. return true, ri, rv
  135. }
  136. }

tDBMsgSource.go

从数据库拉取待投递消息, 实现iMsgSource接口

  1. package delivery
  2. import (
  3. "github.com/jmoiron/sqlx"
  4. "learning/gooop/saga/mqs/database"
  5. "learning/gooop/saga/mqs/models"
  6. "time"
  7. )
  8. type tDBMsgSource struct {
  9. clientID string
  10. expireFunc tSourceExpireFunc
  11. msgChan chan *models.QueuedMsg
  12. }
  13. func newDBMsgSource(clientID string, expireFunc tSourceExpireFunc) iMsgSource {
  14. it := new(tDBMsgSource)
  15. it.init(clientID, expireFunc)
  16. return it
  17. }
  18. func (me *tDBMsgSource) init(clientID string, expireFunc tSourceExpireFunc) {
  19. me.clientID = clientID
  20. me.expireFunc = expireFunc
  21. me.msgChan = make(chan *models.QueuedMsg, 1)
  22. go me.beginPollDB()
  23. }
  24. func (me *tDBMsgSource) MsgChan() <- chan *models.QueuedMsg {
  25. return me.msgChan
  26. }
  27. func (me *tDBMsgSource) beginPollDB() {
  28. interval := time.Duration(1) * time.Second
  29. for !me.expireFunc() {
  30. if len(me.msgChan) <= 0 {
  31. ok, msg := me.poll()
  32. if ok {
  33. me.msgChan <- msg
  34. continue
  35. }
  36. }
  37. // poll failed, or chan full
  38. time.Sleep(interval)
  39. }
  40. close(me.msgChan)
  41. }
  42. func (me *tDBMsgSource) poll() (bool, *models.QueuedMsg) {
  43. msg := &models.QueuedMsg{}
  44. e := database.DB(func(db *sqlx.DB) error {
  45. rows, err := db.Queryx(
  46. "select * from delivery_queue where client_id=? order by create_time asc limit 1",
  47. me.clientID,
  48. )
  49. if err != nil {
  50. return err
  51. }
  52. if rows.Next() {
  53. err = rows.StructScan(msg)
  54. if err != nil {
  55. return err
  56. }
  57. return nil
  58. } else {
  59. return gEmptyRowsErr
  60. }
  61. })
  62. if e != nil {
  63. return false, nil
  64. } else {
  65. return true, msg
  66. }
  67. }

tLiveMsgSource.go

监听eventbus即时推送的投递消息, 实现iMsgSource接口

  1. package delivery
  2. import (
  3. "fmt"
  4. "learning/gooop/saga/mqs/eventbus"
  5. "learning/gooop/saga/mqs/logger"
  6. "learning/gooop/saga/mqs/models"
  7. "learning/gooop/saga/mqs/models/events"
  8. "time"
  9. )
  10. type tLiveMsgSource struct {
  11. clientID string
  12. expireFunc tSourceExpireFunc
  13. msgChan chan *models.QueuedMsg
  14. }
  15. func newLiveMsgSource(clientID string, expireFunc tSourceExpireFunc) iMsgSource {
  16. it := new(tLiveMsgSource)
  17. it.init(clientID, expireFunc)
  18. return it
  19. }
  20. func (me *tLiveMsgSource) init(clientID string, expireFunc tSourceExpireFunc) {
  21. me.clientID = clientID
  22. me.expireFunc = expireFunc
  23. me.msgChan = make(chan *models.QueuedMsg, 1)
  24. eventbus.GlobalEventBus.Sub(events.MsgPublishedEvent,
  25. me.id(),
  26. me.handleMsgPublished)
  27. go me.beginWatchExpire()
  28. }
  29. func (me *tLiveMsgSource) id() string {
  30. return fmt.Sprintf("tLiveMsgSource.%s", me.clientID)
  31. }
  32. func (me *tLiveMsgSource) beginWatchExpire() {
  33. for range time.Tick(1 * time.Second) {
  34. if me.expireFunc() {
  35. me.afterExpired()
  36. return
  37. }
  38. }
  39. }
  40. func (me *tLiveMsgSource) afterExpired() {
  41. eventbus.GlobalEventBus.Unsub(events.MsgPublishedEvent, me.id())
  42. close(me.msgChan)
  43. }
  44. func (me *tLiveMsgSource) handleMsgPublished(_ string, args interface{}) {
  45. msg, ok := args.(*models.QueuedMsg)
  46. if !ok {
  47. return
  48. }
  49. if msg.ClientID != me.clientID {
  50. return
  51. }
  52. if len(me.msgChan) >= 0 {
  53. return
  54. }
  55. logger.Logf(
  56. "tLiveMsgSource.handleMsgPublished, clientID=%s, msg=%s/%s/%s",
  57. me.clientID, msg.GlobalID, msg.SubID, msg.Topic )
  58. me.msgChan <- msg
  59. }
  60. func (me *tLiveMsgSource) MsgChan() <- chan *models.QueuedMsg {
  61. return me.msgChan
  62. }

tMsgHistoryRing.go

历史消息的固定大小环形队列, 实现iMsgHistoryRing接口, 缓存近期已投递成功的消息

  1. package delivery
  2. import "learning/gooop/saga/mqs/models"
  3. type tMsgHistoryRing struct {
  4. items []*models.QueuedMsg
  5. capacity int
  6. index int
  7. }
  8. func newMsgHistoryRing(capacity int) iMsgHistoryRing {
  9. it := new(tMsgHistoryRing)
  10. it.init(capacity)
  11. return it
  12. }
  13. func (me *tMsgHistoryRing) init(capacity int) {
  14. me.items = make([]*models.QueuedMsg, capacity)
  15. me.capacity = capacity
  16. me.index = 0
  17. }
  18. func (me *tMsgHistoryRing) Has(id int) bool {
  19. for _,it := range me.items {
  20. if it != nil && it.ID == id {
  21. return true
  22. }
  23. }
  24. return false
  25. }
  26. func (me *tMsgHistoryRing) Push(msg *models.QueuedMsg) {
  27. me.items[me.index] = msg
  28. me.index++
  29. if me.index >= me.capacity {
  30. me.index = 0
  31. }
  32. }

tDeliveryWorker.go

  • 初始化时, 优先从数据库加载待投递消息
  • 使用iMsgHeap缓存待投递消息, 并确保有序
  • 使用iMsgSource接口, 分别从db和eventbus接收投递消息
  • 使用iMsgHistoryRing, 缓存已投递成功的消息, 防止重复投递 ```go package delivery

import ( “bytes” “encoding/json” “errors” “github.com/jmoiron/sqlx” “io/ioutil” “learning/gooop/saga/mqs/database” “learning/gooop/saga/mqs/logger” “learning/gooop/saga/mqs/models” “net/http” “time” )

type tDeliveryWorker struct { info *tWorkerInfo successRing iMsgHistoryRing dbSource iMsgSource liveSource iMsgSource msgHeap iMsgHeap }

func newDeliveryWorker(info tWorkerInfo) tDeliveryWorker { it := new(tDeliveryWorker) it.init(info) return it }

// init: do initialization, and start initial load func (me tDeliveryWorker) init(info tWorkerInfo) { me.info = info me.successRing = newMsgHistoryRing(64)

  1. me.dbSource = newDBMsgSource(info.ClientID, me.isExpired)
  2. me.liveSource = newLiveMsgSource(info.ClientID, me.isExpired)
  3. me.msgHeap = newMsgHeap()
  4. go me.beginInitialLoadFromDB()

}

// beginInitialLoadFromDB: initially, load queued msg from database func (me tDeliveryWorker) beginInitialLoadFromDB() { buf := [][]models.QueuedMsg{ nil } for !me.isExpired() { err := database.DB(func(db *sqlx.DB) error { e, rows := me.loadFromDB(db) if e != nil { return e }

  1. buf[0] = rows
  2. return nil
  3. })
  4. if err != nil {
  5. logger.Logf("tDeliveryWorker.initialLoadFromDB, clientID=%s, err=%s", me.info.ClientID, err.Error())
  6. time.Sleep(3 * time.Second)
  7. } else {
  8. me.afterInitialLoad(buf[0])
  9. }
  10. }

}

// loadFromDB: load queued msg from database func (me tDeliveryWorker) loadFromDB(db sqlx.DB) (error, []models.QueuedMsg) { rows, err := db.Queryx( “select from delivery_queue where client_id=? order by create_time asc limit ?”, me.info.ClientID, gInitialLoadRows, ) if err != nil { return err, nil }

  1. msgList := []*models.QueuedMsg{}
  2. for rows.Next() {
  3. msg := &models.QueuedMsg{}
  4. err = rows.StructScan(msg)
  5. if err != nil {
  6. return err, nil
  7. }
  8. msgList = append(msgList, msg)
  9. }
  10. return nil, msgList

}

// afterInitialLoad: after initial load done, push msgs into heap, and start delivery loop func (me tDeliveryWorker) afterInitialLoad(msgList []models.QueuedMsg) { logger.Logf(“tDeliveryWorker.afterInitialLoad, clientID=%s, rows=%d”, me.info.ClientID, len(msgList)) for _,it := range msgList { me.msgHeap.Push(it) }

  1. go me.beginPollAndDeliver()

}

// beginPollAndDeliver: poll msg from heap, and then deliver it func (me *tDeliveryWorker) beginPollAndDeliver() { for !me.isExpired() { select { case msg := <- me.dbSource.MsgChan(): me.msgHeap.Push(msg) break

  1. case msg := <- me.liveSource.MsgChan():
  2. me.msgHeap.Push(msg)
  3. break
  4. }
  5. if me.msgHeap.IsEmpty() {
  6. continue
  7. }
  8. msg := me.msgHeap.Pop()
  9. if msg == nil {
  10. continue
  11. }
  12. switch msg.StatusFlag {
  13. case 0:
  14. // 未处理的消息
  15. me.handleUndeliveredMsg(msg)
  16. break
  17. case 1:
  18. // 处理中的消息
  19. me.handleDeliveringMsg(msg)
  20. break
  21. }
  22. }

}

// isExpired: is me expired? func (me *tDeliveryWorker) isExpired() bool { return time.Now().UnixNano() >= me.info.ExpireTime }

// handleUndeliveredMsg: if msg unhandled, then try to deliver it func (me tDeliveryWorker) handleUndeliveredMsg(msg models.QueuedMsg) { err := database.DB(func(db *sqlx.DB) error { now := time.Now().UnixNano() r,e := db.Exec( “update delivery_queue set status_flag=1, update_time=? where id=? and status_flag=0 and update_time=?”, now, msg.ID, msg.UpdateTime, ) if e != nil { return e }

  1. rows, e := r.RowsAffected()
  2. if e != nil {
  3. return e
  4. }
  5. if rows != 1 {
  6. return gOneRowsErr
  7. }
  8. msg.StatusFlag = 1
  9. msg.UpdateTime = now
  10. return nil
  11. })
  12. if err != nil {
  13. logger.Logf("tDeliveryWorker.handleNewMsg, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
  14. return
  15. }
  16. if me.deliver(msg) {
  17. me.afterDeliverySuccess(msg)
  18. } else {
  19. me.afterDeliveryFailed(msg)
  20. }

}

// deliver: use http.Post function to delivery msg func (me tDeliveryWorker) deliver(msg models.QueuedMsg) bool { if me.successRing.Has(msg.ID) { return true }

  1. t := &models.TxMsg{
  2. GlobalID: msg.GlobalID,
  3. SubID: msg.SubID,
  4. Topic: msg.Topic,
  5. CreateTime: msg.CreateTime,
  6. Content: msg.Content,
  7. }
  8. j,e := json.Marshal(t)
  9. if e != nil {
  10. logger.Logf("tDeliveryWorker.deliver, failed json.Marshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
  11. return false
  12. }
  13. r, e := http.Post(me.info.NotifyURL, "application/json", bytes.NewReader(j))
  14. if e != nil {
  15. logger.Logf("tDeliveryWorker.deliver, failed http.Post, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
  16. return false
  17. }
  18. defer r.Body.Close()
  19. rep, e := ioutil.ReadAll(r.Body)
  20. if e != nil {
  21. logger.Logf("tDeliveryWorker.deliver, failed ioutil.ReadAll, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
  22. return false
  23. }
  24. m := &models.OkMsg{}
  25. e = json.Unmarshal(rep, m)
  26. if e != nil {
  27. logger.Logf("tDeliveryWorker.deliver, failed json.Unmarshal, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
  28. return false
  29. }
  30. if m.OK {
  31. return true
  32. } else {
  33. logger.Logf("tDeliveryWorker.deliver, failed OkMsg.OK, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
  34. return false
  35. }

}

// handleDeliveringMsg: if delivery timeout, then retry delivery func (me tDeliveryWorker) handleDeliveringMsg(msg models.QueuedMsg) { now := time.Now().UnixNano() if msg.UpdateTime + gDeliveryTimeoutNanos > now { return }

  1. // delivery timeout
  2. me.afterDeliveryTimeout(msg)

}

// afterDeliverySuccess: if done, move msg to success queue func (me tDeliveryWorker) afterDeliverySuccess(msg models.QueuedMsg) { if me.successRing.Has(msg.ID) { return } me.successRing.Push(msg)

  1. err := database.TX(func(db *sqlx.DB, tx *sqlx.Tx) error {
  2. r,e := db.Exec(
  3. "delete from delivery_queue where id=? and update_time=? and status_flag=1",
  4. msg.ID,
  5. msg.UpdateTime,
  6. )
  7. if e != nil {
  8. return e
  9. }
  10. rows, e := r.RowsAffected()
  11. if e != nil {
  12. return e
  13. }
  14. if rows != 1 {
  15. return gOneRowsErr
  16. }
  17. r, e = db.Exec(
  18. "insert into success_queue (msg_id, client_id, create_time) values(?, ?, ?)",
  19. msg.ID,
  20. msg.ClientID,
  21. time.Now().UnixNano(),
  22. )
  23. if e != nil {
  24. return e
  25. }
  26. rows, e = r.RowsAffected()
  27. if e != nil {
  28. return e
  29. }
  30. if rows != 1 {
  31. return gOneRowsErr
  32. }
  33. return nil
  34. })
  35. if err != nil {
  36. logger.Logf("tDeliveryWorker.afterDeliverySuccess, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
  37. } else {
  38. logger.Logf("tDeliveryWorker.afterDeliverySuccess, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
  39. }

}

// afterDeliveryFailed: if failed, do nothing but just log it func (me tDeliveryWorker) afterDeliveryFailed(msg models.QueuedMsg) { logger.Logf(“tDeliveryWorker.afterDeliveryFailed, id=%v, msg=%s/%s”, me.info.ClientID, msg.GlobalID, msg.SubID) }

// afterDeliveryTimeout: if timeout, then reset status and retry func (me tDeliveryWorker) afterDeliveryTimeout(msg models.QueuedMsg) { err := database.DB(func(db *sqlx.DB) error { r,e := db.Exec( “update delivery_queue set status_flag=0 where id=? and status_flag=1 and update_time=?”, msg.ID, msg.UpdateTime, ) if e != nil { return e }

  1. rows,e := r.RowsAffected()
  2. if e != nil {
  3. return e
  4. }
  5. if rows != 1 {
  6. return gOneRowsErr
  7. }
  8. return nil
  9. })
  10. if err != nil {
  11. logger.Logf("tDeliveryWorker.afterDeliveryTimeout, failed, id=%v, msg=%s/%s, err=%s", me.info.ClientID, msg.GlobalID, msg.SubID, err.Error())
  12. } else {
  13. logger.Logf("tDeliveryWorker.afterDeliveryTimeout, done, id=%v, msg=%s/%s", me.info.ClientID, msg.GlobalID, msg.SubID)
  14. }

}

var gEmptyRowsErr = errors.New(“empty rows”) var gOneRowsErr = errors.New(“expecting one row affected”) var gDeliveryTimeoutNanos = int64(10 * (time.Second / time.Nanosecond)) var gInitialLoadRows = 100 ```

(未完待续)