缘起

最近阅读 [Go微服务实战] (刘金亮, 2021.1)
本系列笔记拟采用golang练习之
git地址: https://gitee.com/ioly/learning.gooop

ES-CQRS模式

  1. ES(Event Sourcing)事件溯源非常好理解,
  2. 指的是将每次的事件都记录下来,
  3. 而不是去记录对象的状态。
  4. 比如新建、修改等都会作为事件记录下来,
  5. 当需要最新的状态时,通过事件的堆叠来计算最新的状态。
  6. 按照事件溯源的模式进行架构设计,
  7. 就是事件驱动架构(Event DrivenArchitecture, EDA)。
  8. 命令查询职责分离(CQRS)最早来自Betrand Meyer写的
  9. Object-OrientedSoftware Construction一书,
  10. 指的是命令查询分离(Command Query Separation,CQS)。
  11. 其基本思想是任何一个对象的方法都可以分为以下两大类:
  12. 命令(Command):不返回任何结果(void),但会改变对象的状态。
  13. 查询(Query):返回结果,但是不会改变对象的状态,对系统没有副作用。
  14. CQRS的核心出发点就是把系统分为读和写两部分,从而方便分别进行优化。

目标(Day 1)

  • 根据ES-CQRS模式, 设计”TODO - 待办事宜”程序

设计

  • TodoDTO: 待办事宜数值对象
  • TodoCreatedEvent: 创建todo事件
  • TodoUpdatedEvent: 修改todo事件
  • TodoRemovedEvent: 删除todo事件
  • IEventBus: 事件总线接口
  • iTodoEventSerializer: 事件序列化到JSON数据的接口
  • iTodoReader: todo读取接口
  • iTodoWriter: todo写入接口
  • iJSONStore: json文件读写接口
  • tEventBus: 事件总线的实现
  • tTodoEventSerializer: 事件序列化到JSON的实现
  • tTodoWriter: 事件写入器的实现
  • tMockJSONStore: 虚拟的JSON文件读写实现
  • tTodoReader: 未完成

TodoDTO.go

待办事宜数值对象

  1. package todo_app
  2. type TodoDTO struct {
  3. NO int
  4. Title string
  5. Content string
  6. }

TodoCreatedEvent.go

todo事项创建事件

  1. package todo_app
  2. type TodoCreatedEvent struct {
  3. Data *TodoDTO
  4. }

TodoUpdatedEvent.go

todo事项修改事件

  1. package todo_app
  2. type TodoUpdatedEvent struct {
  3. Data *TodoDTO
  4. }

TodoRemovedEvent.go

todo事项删除事件

  1. package todo_app
  2. type TodoRemovedEvent struct {
  3. NO int
  4. }

IEventBus.go

事件总线接口

  1. package todo_app
  2. type EventHandleFunc func(e string, args interface{})
  3. type EventHandler struct {
  4. ID string
  5. Handler EventHandleFunc
  6. }
  7. type IEventBus interface {
  8. Pub(e string, args interface{})
  9. Sub(e string, id string, handleFunc EventHandleFunc)
  10. Unsub(e string, id string)
  11. }
  12. const EventTodoCreated = "todo.created"
  13. const EventTodoUpdated = "todo.updated"
  14. const EventTodoRemoved = "todo.removed"

iTodoEventSerializer.go

事件序列化到JSON数据的接口

  1. package todo_app
  2. type iTodoEventSerializer interface {
  3. SerializeCreatedEvent(it *TodoCreatedEvent) *tJSONData
  4. SerializeUpdatedEvent(it *TodoUpdatedEvent) *tJSONData
  5. SerializeRemovedEvent(it *TodoRemovedEvent) *tJSONData
  6. }

iTodoReader.go

todo读取接口

  1. package todo_app
  2. type iTodoReader interface {
  3. All() []*TodoDTO
  4. }

iTodoWriter.go

todo写入接口

  1. package todo_app
  2. type iTodoWriter interface {
  3. HandleCreated(e *TodoCreatedEvent)
  4. HandleUpdated(e *TodoUpdatedEvent)
  5. HandleRemoved(e *TodoRemovedEvent)
  6. }

iJSONStore.go

json文件读写接口

  1. package todo_app
  2. type iJSONStore interface {
  3. Load()
  4. Append(it *tJSONData)
  5. }

tEventBus.go

事件总线的实现

  1. package todo_app
  2. import (
  3. "learning/gooop/saga/mqs/logger"
  4. "sync"
  5. )
  6. type tEventBus struct {
  7. rwmutex *sync.RWMutex
  8. items map[string][]*EventHandler
  9. }
  10. func newEventHandler(id string, handleFunc EventHandleFunc) *EventHandler {
  11. return &EventHandler{
  12. id, handleFunc,
  13. }
  14. }
  15. func newEventBus() IEventBus {
  16. it := new(tEventBus)
  17. it.init()
  18. return it
  19. }
  20. func (me *tEventBus) init() {
  21. me.rwmutex = new(sync.RWMutex)
  22. me.items = make(map[string][]*EventHandler)
  23. }
  24. func (me *tEventBus) Pub(e string, args interface{}) {
  25. me.rwmutex.RLock()
  26. defer me.rwmutex.RUnlock()
  27. handlers,ok := me.items[e]
  28. if ok {
  29. for _,it := range handlers {
  30. logger.Logf("eventbus.Pub, event=%s, handler=%s", e, it.ID)
  31. go it.Handler(e, args)
  32. }
  33. }
  34. }
  35. func (me *tEventBus) Sub(e string, id string, handleFunc EventHandleFunc) {
  36. me.rwmutex.Lock()
  37. defer me.rwmutex.Unlock()
  38. handler := newEventHandler(id, handleFunc)
  39. handlers,ok := me.items[e]
  40. if ok {
  41. me.items[e] = append(handlers, handler)
  42. } else {
  43. me.items[e] = []*EventHandler{handler }
  44. }
  45. }
  46. func (me *tEventBus) Unsub(e string, id string) {
  47. me.rwmutex.Lock()
  48. defer me.rwmutex.Unlock()
  49. handlers,ok := me.items[e]
  50. if ok {
  51. for i,it := range handlers {
  52. if it.ID == id {
  53. lastI := len(handlers) - 1
  54. if i != lastI {
  55. handlers[i], handlers[lastI] = handlers[lastI], handlers[i]
  56. }
  57. me.items[e] = handlers[:lastI]
  58. }
  59. }
  60. }
  61. }
  62. var GlobalEventBus = newEventBus()

tTodoEventSerializer.go

事件序列化到JSON的实现

  1. package todo_app
  2. type tTodoEventSerializer struct {
  3. }
  4. func newEventSeiralizer() iTodoEventSerializer {
  5. it := new(tTodoEventSerializer)
  6. return it
  7. }
  8. func (me *tTodoEventSerializer) serializeWithTag(tag int, v interface{}) *tJSONData {
  9. it := new(tJSONData)
  10. err := it.Set(TagCreated, v)
  11. if err != nil {
  12. return nil
  13. }
  14. return it
  15. }
  16. func (me *tTodoEventSerializer) SerializeCreatedEvent(e *TodoCreatedEvent) *tJSONData {
  17. return me.serializeWithTag(TagCreated, e)
  18. }
  19. func (me *tTodoEventSerializer) SerializeUpdatedEvent(e *TodoUpdatedEvent) *tJSONData {
  20. return me.serializeWithTag(TagUpdated, e)
  21. }
  22. func (me *tTodoEventSerializer) SerializeRemovedEvent(e *TodoRemovedEvent) *tJSONData {
  23. return me.serializeWithTag(TagRemoved, e)
  24. }
  25. const TagCreated = 1
  26. const TagUpdated = 2
  27. const TagRemoved = 3
  28. var gDefaultEventSerializer = newEventSeiralizer()

tTodoWriter.go

事件写入器的实现

  1. package todo_app
  2. type tTodoWriter struct {
  3. }
  4. func newTodoWriter() iTodoWriter {
  5. it := new(tTodoWriter)
  6. it.init()
  7. return it
  8. }
  9. func (me *tTodoWriter) init() {
  10. GlobalEventBus.Sub("todo.created", "", me.handleEvent)
  11. }
  12. func (me *tTodoWriter) handleEvent(e string, args interface{}) {
  13. switch e {
  14. case EventTodoCreated:
  15. if it,ok := args.(*TodoCreatedEvent);ok {
  16. me.HandleCreated(it)
  17. }
  18. break
  19. case EventTodoUpdated:
  20. if it,ok := args.(*TodoUpdatedEvent);ok {
  21. me.HandleUpdated(it)
  22. }
  23. break
  24. case EventTodoRemoved:
  25. if it,ok := args.(*TodoRemovedEvent);ok {
  26. me.HandleRemoved(it)
  27. }
  28. break
  29. }
  30. }
  31. func (me *tTodoWriter) HandleCreated(e *TodoCreatedEvent) {
  32. j := gDefaultEventSerializer.SerializeCreatedEvent(e)
  33. if j != nil {
  34. MockJSONStore.Append(j)
  35. }
  36. }
  37. func (me *tTodoWriter) HandleUpdated(e *TodoUpdatedEvent) {
  38. j := gDefaultEventSerializer.SerializeUpdatedEvent(e)
  39. if j != nil {
  40. MockJSONStore.Append(j)
  41. }
  42. }
  43. func (me *tTodoWriter) HandleRemoved(e *TodoRemovedEvent) {
  44. j := gDefaultEventSerializer.SerializeRemovedEvent(e)
  45. if j != nil {
  46. MockJSONStore.Append(j)
  47. }
  48. }

tMockJSONStore.go

虚拟的JSON文件读写实现

  1. package todo_app
  2. import "sync"
  3. type tMockJSONStore struct {
  4. rwmutex *sync.RWMutex
  5. once sync.Once
  6. items []*tJSONData
  7. }
  8. func newMockJSONStore() iJSONStore {
  9. it := new(tMockJSONStore)
  10. it.init()
  11. return it
  12. }
  13. func (me *tMockJSONStore) init() {
  14. me.rwmutex = new(sync.RWMutex)
  15. me.items = []*tJSONData{}
  16. }
  17. func (me *tMockJSONStore) Load() {
  18. me.once.Do(func() {
  19. me.rwmutex.RLock()
  20. defer me.rwmutex.RUnlock()
  21. for _,it := range me.items {
  22. switch it.Tag {
  23. case TagCreated:
  24. v := new(TodoCreatedEvent)
  25. e := it.Get(v)
  26. if e == nil {
  27. GlobalEventBus.Pub(EventTodoCreated, e)
  28. }
  29. break
  30. case TagUpdated:
  31. v := new(TodoUpdatedEvent)
  32. e := it.Get(v)
  33. if e == nil {
  34. GlobalEventBus.Pub(EventTodoUpdated, e)
  35. }
  36. break
  37. case TagRemoved:
  38. v := new(TodoRemovedEvent)
  39. e := it.Get(v)
  40. if e == nil {
  41. GlobalEventBus.Pub(EventTodoRemoved, e)
  42. }
  43. break
  44. }
  45. }
  46. })
  47. }
  48. func (me *tMockJSONStore) Append(it *tJSONData) {
  49. me.rwmutex.Lock()
  50. defer me.rwmutex.Unlock()
  51. me.items = append(me.items, it)
  52. }
  53. var MockJSONStore = newMockJSONStore()

(未完待续)