缘起

最近阅读 [Go微服务实战] (刘金亮, 2021.1)
本系列笔记拟采用golang练习之
gitee:

ES-CQRS模式

  1. ES(Event Sourcing)事件溯源非常好理解,
  2. 指的是将每次的事件都记录下来,
  3. 而不是去记录对象的状态。
  4. 比如新建、修改等都会作为事件记录下来,
  5. 当需要最新的状态时,通过事件的堆叠来计算最新的状态。
  6. 按照事件溯源的模式进行架构设计,
  7. 就是事件驱动架构(Event DrivenArchitecture, EDA)。
  8. 命令查询职责分离(CQRS)最早来自Betrand Meyer写的
  9. Object-OrientedSoftware Construction一书,
  10. 指的是命令查询分离(Command Query Separation,CQS)。
  11. 其基本思想是任何一个对象的方法都可以分为以下两大类:
  12. 命令(Command):不返回任何结果(void),但会改变对象的状态。
  13. 查询(Query):返回结果,但是不会改变对象的状态,对系统没有副作用。
  14. CQRS的核心出发点就是把系统分为读和写两部分,从而方便分别进行优化。

目标(Day 2)

  • 根据ES-CQRS模式, 大幅重构Day 1的设计, 并进行单元测试

设计

  • TodoDTO: 待办事宜数值对象
  • OperationTag: todo写入事件的类型标记
  • TodoEvent: todo写入事件
  • ClassTag: json序列化的类型标记
  • tJSONData: json序列化的数据容器
  • IEventBus: 事件总线接口
  • iTodoEventSerializer: 事件序列化到JSON数据的接口
  • iTodoReader: todo读取接口
  • iTodoWriter: todo写入接口
  • iJSONStore: json文件读写接口
  • ITodoService: todo待办事宜服务接口
  • tEventBus: 事件总线的实现
  • tTodoEventSerializer: 事件序列化到JSON的实现
  • tTodoWriter: 事件写入器的实现, 监听write指令, 并持久化到json存储
  • tMockJSONStore: 虚拟的JSON文件读写实现
  • tTodoReader: 待办事宜读取器, 监听write和load指令, 并计算todo列表的当前状态
  • tMockTodoService: 待办事宜服务的实现

单元测试

todo_app_test.go

  1. package es_cqrs
  2. import (
  3. td "learning/gooop/es_cqrs/todo_app"
  4. "testing"
  5. )
  6. func fnAssertTrue (t *testing.T, b bool, msg string) {
  7. if !b {
  8. t.Fatal(msg)
  9. }
  10. }
  11. func Test_TodoApp(t *testing.T) {
  12. t1 := &td.TodoDTO{ 1, "title-1", "content-1" }
  13. td.MockTodoService.Create(t1)
  14. all := td.MockTodoService.GetAll()
  15. fnAssertTrue(t, len(all) == 1, "expecting 1 item")
  16. fnAssertTrue(t, all[0].Title == t1.Title, "expecting " + t1.Title)
  17. t.Log("pass creating")
  18. t1.Content = "content-1 updated"
  19. t1.Title = "title-1 updated"
  20. td.MockTodoService.Update(t1)
  21. all = td.MockTodoService.GetAll()
  22. fnAssertTrue(t, len(all) == 1, "expecting 1 item")
  23. fnAssertTrue(t, all[0].Content == t1.Content, "expecting " + t1.Content)
  24. t.Log("pass updating")
  25. td.MockTodoService.Delete(t1)
  26. all = td.MockTodoService.GetAll()
  27. fnAssertTrue(t, len(all) == 0, "expecting 0 items")
  28. t.Log("pass deleting")
  29. }

测试输出

  1. $ go test -v todo_app_test.go
  2. === RUN Test_TodoApp
  3. 22:38:08.180382833 eventbus.Pub, event=todo.write.cmd, handler=tTodoWriter.1
  4. 22:38:08.180533659 tMockJSONStore.items: {"Tag":1,"Data":{"NO":1,"Title":"title-1","Content":"content-1"}}
  5. 22:38:08.180539669 eventbus.Pub, event=todo.write.cmd, handler=tTodoReader.2
  6. 22:38:08.180552255 tTodoReader.items: [&{1 title-1 content-1}]
  7. 22:38:08.180557245 eventbus.Pub, event=todo.read.cmd, handler=tTodoReader.2
  8. 22:38:08.180560995 eventbus.Pub, event=todo.read.ret, handler=tMockTodoService
  9. todo_app_test.go:21: pass creating
  10. 22:38:08.180580644 eventbus.Pub, event=todo.write.cmd, handler=tTodoWriter.1
  11. 22:38:08.180604465 tMockJSONStore.items: {"Tag":1,"Data":{"NO":1,"Title":"title-1","Content":"content-1"}}, {"Tag":2,"Data":{"NO":1,"Title":"title-1 updated","Content":"content-1 updated"}}
  12. 22:38:08.180612665 eventbus.Pub, event=todo.write.cmd, handler=tTodoReader.2
  13. 22:38:08.180618512 tTodoReader.items: [&{1 title-1 updated content-1 updated}]
  14. 22:38:08.18062244 eventbus.Pub, event=todo.read.cmd, handler=tTodoReader.2
  15. 22:38:08.180626445 eventbus.Pub, event=todo.read.ret, handler=tMockTodoService
  16. todo_app_test.go:29: pass updating
  17. 22:38:08.180642172 eventbus.Pub, event=todo.write.cmd, handler=tTodoWriter.1
  18. 22:38:08.180656612 tMockJSONStore.items: {"Tag":1,"Data":{"NO":1,"Title":"title-1","Content":"content-1"}}, {"Tag":2,"Data":{"NO":1,"Title":"title-1 updated","Content":"content-1 updated"}}, {"Tag":3,"Data":{"NO":1,"Title":"title-1 updated","Content":"content-1 updated"}}
  19. 22:38:08.180669129 eventbus.Pub, event=todo.write.cmd, handler=tTodoReader.2
  20. 22:38:08.180672774 tTodoReader.items: []
  21. 22:38:08.180675952 eventbus.Pub, event=todo.read.cmd, handler=tTodoReader.2
  22. 22:38:08.180679309 eventbus.Pub, event=todo.read.ret, handler=tMockTodoService
  23. todo_app_test.go:34: pass deleting
  24. --- PASS: Test_TodoApp (0.00s)
  25. PASS
  26. ok command-line-arguments 0.002s

TodoDTO.go

待办事宜数值对象

  1. package todo_app
  2. type TodoDTO struct {
  3. NO int
  4. Title string
  5. Content string
  6. }
  7. func (me *TodoDTO) Clone() *TodoDTO {
  8. return &TodoDTO{
  9. me.NO, me.Title, me.Content,
  10. }
  11. }

OperationTag.go

todo写入事件的类型标记

  1. package todo_app
  2. type OperationTag int
  3. const OPCreated OperationTag = 1
  4. const OPUpdated OperationTag = 2
  5. const OPDeleted OperationTag = 3

TodoEvent.go

todo写入事件

  1. package todo_app
  2. type TodoEvent struct {
  3. Tag OperationTag
  4. Data *TodoDTO
  5. }

ClassTag.go

json序列化的类型标记

  1. package todo_app
  2. type ClassTag int
  3. const TodoEventClass ClassTag = 1

tJSONData.go

json序列化的数据容器

  1. package todo_app
  2. import "encoding/json"
  3. type tJSONData struct {
  4. Tag ClassTag
  5. Content []byte
  6. }
  7. func (me *tJSONData) Set(tag ClassTag, it interface{}) error {
  8. me.Tag = tag
  9. j, e := json.Marshal(it)
  10. if e != nil {
  11. return e
  12. }
  13. me.Content = j
  14. return nil
  15. }
  16. func (me *tJSONData) Get(it interface{}) error {
  17. return json.Unmarshal(me.Content, it)
  18. }

IEventBus.go

事件总线接口

  1. package todo_app
  2. type EventHandleFunc func(e string, args interface{})
  3. type EventHandler struct {
  4. ID string
  5. Handler EventHandleFunc
  6. }
  7. type IEventBus interface {
  8. Pub(e string, args interface{})
  9. Sub(e string, id string, handleFunc EventHandleFunc)
  10. Unsub(e string, id string)
  11. }
  12. const EventWriteTodoCmd = "todo.write.cmd"
  13. const EventReadTodoCmd = "todo.read.cmd"
  14. const EventReadTodoRet = "todo.read.ret"
  15. const EventLoadTodoCmd = "todo.load.cmd"

iTodoEventSerializer.go

事件序列化到JSON数据的接口

  1. package todo_app
  2. type iTodoEventSerializer interface {
  3. Serialize(it *TodoEvent) *tJSONData
  4. }

iTodoReader.go

todo读取接口

  1. package todo_app
  2. type iTodoReader interface {
  3. All() []*TodoDTO
  4. HandleTodoEvent(e *TodoEvent)
  5. }

iTodoWriter.go

todo写入接口

  1. package todo_app
  2. type iTodoWriter interface {
  3. HandleTodoEvent(e *TodoEvent)
  4. }

iJSONStore.go

json文件读写接口

  1. package todo_app
  2. type iJSONStore interface {
  3. Load()
  4. Append(it *tJSONData)
  5. }

ITodoService.go

todo待办事宜服务接口

  1. package todo_app
  2. type ITodoService interface {
  3. Create(it *TodoDTO)
  4. Update(it *TodoDTO)
  5. Delete(it *TodoDTO)
  6. GetAll() []*TodoDTO
  7. }

tEventBus.go

事件总线的实现

  1. package todo_app
  2. import (
  3. "learning/gooop/saga/mqs/logger"
  4. "sync"
  5. )
  6. type tEventBus struct {
  7. rwmutex *sync.RWMutex
  8. items map[string][]*EventHandler
  9. }
  10. func newEventHandler(id string, handleFunc EventHandleFunc) *EventHandler {
  11. return &EventHandler{
  12. id, handleFunc,
  13. }
  14. }
  15. func newEventBus() IEventBus {
  16. it := new(tEventBus)
  17. it.init()
  18. return it
  19. }
  20. func (me *tEventBus) init() {
  21. me.rwmutex = new(sync.RWMutex)
  22. me.items = make(map[string][]*EventHandler)
  23. }
  24. func (me *tEventBus) Pub(e string, args interface{}) {
  25. me.rwmutex.RLock()
  26. defer me.rwmutex.RUnlock()
  27. handlers, ok := me.items[e]
  28. if ok {
  29. for _, it := range handlers {
  30. logger.Logf("eventbus.Pub, event=%s, handler=%s", e, it.ID)
  31. it.Handler(e, args)
  32. }
  33. }
  34. }
  35. func (me *tEventBus) Sub(e string, id string, handleFunc EventHandleFunc) {
  36. me.rwmutex.Lock()
  37. defer me.rwmutex.Unlock()
  38. handler := newEventHandler(id, handleFunc)
  39. handlers, ok := me.items[e]
  40. if ok {
  41. me.items[e] = append(handlers, handler)
  42. } else {
  43. me.items[e] = []*EventHandler{handler}
  44. }
  45. }
  46. func (me *tEventBus) Unsub(e string, id string) {
  47. me.rwmutex.Lock()
  48. defer me.rwmutex.Unlock()
  49. handlers, ok := me.items[e]
  50. if ok {
  51. for i, it := range handlers {
  52. if it.ID == id {
  53. lastI := len(handlers) - 1
  54. if i != lastI {
  55. handlers[i], handlers[lastI] = handlers[lastI], handlers[i]
  56. }
  57. me.items[e] = handlers[:lastI]
  58. }
  59. }
  60. }
  61. }
  62. var GlobalEventBus = newEventBus()

tTodoEventSerializer.go

事件序列化到JSON的实现

  1. package todo_app
  2. type tTodoEventSerializer struct {
  3. }
  4. func newEventSeiralizer() iTodoEventSerializer {
  5. it := new(tTodoEventSerializer)
  6. return it
  7. }
  8. func (me *tTodoEventSerializer) Serialize(e *TodoEvent) *tJSONData {
  9. it := new(tJSONData)
  10. err := it.Set(TodoEventClass, e)
  11. if err != nil {
  12. return nil
  13. }
  14. return it
  15. }
  16. var gDefaultEventSerializer = newEventSeiralizer()

tTodoWriter.go

事件写入器的实现, 监听write指令, 并持久化到json存储

  1. package todo_app
  2. import (
  3. "fmt"
  4. "sync/atomic"
  5. )
  6. type tTodoWriter struct {
  7. id string
  8. }
  9. func newTodoWriter() iTodoWriter {
  10. it := new(tTodoWriter)
  11. it.init()
  12. return it
  13. }
  14. func (me *tTodoWriter) init() {
  15. me.id = fmt.Sprintf("tTodoWriter.%d", atomic.AddInt32(&gWriterCounter, 1))
  16. GlobalEventBus.Sub(EventWriteTodoCmd, me.id, me.handleWriteTodoCmd)
  17. }
  18. func (me *tTodoWriter) handleWriteTodoCmd(e string, args interface{}) {
  19. switch e {
  20. case EventWriteTodoCmd:
  21. if it, ok := args.(*TodoEvent); ok {
  22. me.HandleTodoEvent(it)
  23. }
  24. break
  25. }
  26. }
  27. func (me *tTodoWriter) HandleTodoEvent(e *TodoEvent) {
  28. j := gDefaultEventSerializer.Serialize(e)
  29. if j != nil {
  30. MockJSONStore.Append(j)
  31. }
  32. }
  33. var gWriterCounter int32 = 0

tMockJSONStore.go

虚拟的JSON文件读写实现

  1. package todo_app
  2. import (
  3. "fmt"
  4. "learning/gooop/saga/mqs/logger"
  5. "strings"
  6. "sync"
  7. )
  8. type tMockJSONStore struct {
  9. rwmutex *sync.RWMutex
  10. once sync.Once
  11. items []*tJSONData
  12. }
  13. func newMockJSONStore() iJSONStore {
  14. it := new(tMockJSONStore)
  15. it.init()
  16. return it
  17. }
  18. func (me *tMockJSONStore) init() {
  19. me.rwmutex = new(sync.RWMutex)
  20. me.items = []*tJSONData{}
  21. }
  22. func (me *tMockJSONStore) Load() {
  23. me.once.Do(func() {
  24. me.rwmutex.RLock()
  25. defer me.rwmutex.RUnlock()
  26. for _, it := range me.items {
  27. switch it.Tag {
  28. case TodoEventClass:
  29. v := new(TodoEvent)
  30. e := it.Get(v)
  31. if e == nil {
  32. GlobalEventBus.Pub(EventLoadTodoCmd, e)
  33. }
  34. break
  35. }
  36. }
  37. })
  38. }
  39. func (me *tMockJSONStore) Append(it *tJSONData) {
  40. me.rwmutex.Lock()
  41. defer me.rwmutex.Unlock()
  42. me.items = append(me.items, it)
  43. lines := []string{}
  44. for _,it := range me.items {
  45. lines = append(lines, fmt.Sprintf("%s", string(it.Content)))
  46. }
  47. logger.Logf("tMockJSONStore.items: %s", strings.Join(lines, ", "))
  48. }
  49. var MockJSONStore = newMockJSONStore()

tTodoReader.go

待办事宜读取器, 监听write和load指令, 并计算todo列表的当前状态

  1. package todo_app
  2. import (
  3. "fmt"
  4. "learning/gooop/saga/mqs/logger"
  5. "strings"
  6. "sync"
  7. "sync/atomic"
  8. )
  9. type tTodoReader struct {
  10. id string
  11. rwmutex *sync.RWMutex
  12. items []*TodoDTO
  13. }
  14. func newTodoReader() iTodoReader {
  15. it := new(tTodoReader)
  16. it.init()
  17. return it
  18. }
  19. func (me *tTodoReader) init() {
  20. id := fmt.Sprintf("tTodoReader.%d", atomic.AddInt32(&gReaderCounter, 1))
  21. me.id = id
  22. me.rwmutex = new(sync.RWMutex)
  23. GlobalEventBus.Sub(EventWriteTodoCmd, me.id, me.handleEvent)
  24. GlobalEventBus.Sub(EventLoadTodoCmd, me.id, me.handleEvent)
  25. GlobalEventBus.Sub(EventReadTodoCmd, me.id, me.handleEvent)
  26. }
  27. func (me *tTodoReader) handleEvent(e string, args interface{}) {
  28. switch e {
  29. case EventWriteTodoCmd:
  30. fallthrough
  31. case EventLoadTodoCmd:
  32. if v,ok := args.(*TodoEvent);ok {
  33. me.HandleTodoEvent(v)
  34. }
  35. break
  36. case EventReadTodoCmd:
  37. me.handleReadTodoList()
  38. }
  39. }
  40. func (me *tTodoReader) handleReadTodoList() {
  41. GlobalEventBus.Pub(EventReadTodoRet, me.All())
  42. }
  43. func (me *tTodoReader) All() []*TodoDTO {
  44. me.rwmutex.RLock()
  45. defer me.rwmutex.RUnlock()
  46. lst := make([]*TodoDTO, len(me.items))
  47. for i,it := range me.items {
  48. lst[i] = it
  49. }
  50. return lst
  51. }
  52. func (me *tTodoReader) HandleTodoEvent(e *TodoEvent) {
  53. me.rwmutex.Lock()
  54. defer me.rwmutex.Unlock()
  55. switch e.Tag {
  56. case OPCreated:
  57. me.items = append(me.items, e.Data.Clone())
  58. break
  59. case OPUpdated:
  60. for i,it := range me.items {
  61. if it.NO == e.Data.NO {
  62. me.items[i] = e.Data.Clone()
  63. break
  64. }
  65. }
  66. break
  67. case OPDeleted:
  68. for i,it := range me.items {
  69. if it.NO == e.Data.NO {
  70. lastI := len(me.items) - 1
  71. if i == lastI {
  72. me.items[i] = nil
  73. } else {
  74. me.items[i], me.items[lastI] = me.items[lastI], nil
  75. }
  76. me.items = me.items[:lastI]
  77. break
  78. }
  79. }
  80. break
  81. }
  82. lines := []string{}
  83. for _,it := range me.items {
  84. lines = append(lines, fmt.Sprintf("%v", it))
  85. }
  86. logger.Logf("tTodoReader.items: [%s]", strings.Join(lines, ", "))
  87. }
  88. var gReaderCounter int32 = 1

tMockTodoService.go

待办事宜服务的实现, 提供todo项的CRUD

package todo_app

type tMockTodoService struct {
    items []*TodoDTO
    writer iTodoWriter
    reader iTodoReader
}

func newMockTodoService() ITodoService {
    it := new(tMockTodoService)
    it.init()
    return it
}

func (me *tMockTodoService) init() {
    me.writer = newTodoWriter()
    me.reader = newTodoReader()

    GlobalEventBus.Sub(EventReadTodoRet, "tMockTodoService", me.handleReadTodoRet)
}

func (me *tMockTodoService) handleReadTodoRet(e string, args interface{}) {
    switch e {
    case EventReadTodoRet:
        if it,ok := args.([]*TodoDTO);ok {
            me.items = it
        }
        break
    }
}

func (me *tMockTodoService) Create(it *TodoDTO) {
    GlobalEventBus.Pub(EventWriteTodoCmd, &TodoEvent{ OPCreated, it.Clone() })
}

func (me *tMockTodoService) Update(it *TodoDTO) {
    GlobalEventBus.Pub(EventWriteTodoCmd, &TodoEvent{ OPUpdated, it.Clone() })
}

func (me *tMockTodoService) Delete(it *TodoDTO) {
    GlobalEventBus.Pub(EventWriteTodoCmd, &TodoEvent{ OPDeleted, it.Clone() })
}

func (me *tMockTodoService) GetAll() []*TodoDTO {
    me.items = nil
    GlobalEventBus.Pub(EventReadTodoCmd, nil)

    lst := me.items
    me.items = nil
    return lst
}

var MockTodoService = newMockTodoService()

(ES-CQRS end)