缘起
最近阅读 [Go微服务实战] (刘金亮, 2021.1)
本系列笔记拟采用golang练习之
git地址: https://gitee.com/ioly/learning.gooop
ES-CQRS模式
ES(Event Sourcing)事件溯源非常好理解,指的是将每次的事件都记录下来,而不是去记录对象的状态。比如新建、修改等都会作为事件记录下来,当需要最新的状态时,通过事件的堆叠来计算最新的状态。按照事件溯源的模式进行架构设计,就是事件驱动架构(Event DrivenArchitecture, EDA)。命令查询职责分离(CQRS)最早来自Betrand Meyer写的Object-OrientedSoftware Construction一书,指的是命令查询分离(Command Query Separation,CQS)。其基本思想是任何一个对象的方法都可以分为以下两大类:▪ 命令(Command):不返回任何结果(void),但会改变对象的状态。▪ 查询(Query):返回结果,但是不会改变对象的状态,对系统没有副作用。CQRS的核心出发点就是把系统分为读和写两部分,从而方便分别进行优化。
目标(Day 1)
- 根据ES-CQRS模式, 设计”TODO - 待办事宜”程序
设计
- TodoDTO: 待办事宜数值对象
- TodoCreatedEvent: 创建todo事件
- TodoUpdatedEvent: 修改todo事件
- TodoRemovedEvent: 删除todo事件
- IEventBus: 事件总线接口
- iTodoEventSerializer: 事件序列化到JSON数据的接口
- iTodoReader: todo读取接口
- iTodoWriter: todo写入接口
- iJSONStore: json文件读写接口
- tEventBus: 事件总线的实现
- tTodoEventSerializer: 事件序列化到JSON的实现
- tTodoWriter: 事件写入器的实现
- tMockJSONStore: 虚拟的JSON文件读写实现
- tTodoReader: 未完成
TodoDTO.go
待办事宜数值对象
package todo_apptype TodoDTO struct {NO intTitle stringContent string}
TodoCreatedEvent.go
todo事项创建事件
package todo_apptype TodoCreatedEvent struct {Data *TodoDTO}
TodoUpdatedEvent.go
todo事项修改事件
package todo_apptype TodoUpdatedEvent struct {Data *TodoDTO}
TodoRemovedEvent.go
todo事项删除事件
package todo_apptype TodoRemovedEvent struct {NO int}
IEventBus.go
事件总线接口
package todo_apptype EventHandleFunc func(e string, args interface{})type EventHandler struct {ID stringHandler EventHandleFunc}type IEventBus interface {Pub(e string, args interface{})Sub(e string, id string, handleFunc EventHandleFunc)Unsub(e string, id string)}const EventTodoCreated = "todo.created"const EventTodoUpdated = "todo.updated"const EventTodoRemoved = "todo.removed"
iTodoEventSerializer.go
事件序列化到JSON数据的接口
package todo_apptype iTodoEventSerializer interface {SerializeCreatedEvent(it *TodoCreatedEvent) *tJSONDataSerializeUpdatedEvent(it *TodoUpdatedEvent) *tJSONDataSerializeRemovedEvent(it *TodoRemovedEvent) *tJSONData}
iTodoReader.go
todo读取接口
package todo_apptype iTodoReader interface {All() []*TodoDTO}
iTodoWriter.go
todo写入接口
package todo_apptype iTodoWriter interface {HandleCreated(e *TodoCreatedEvent)HandleUpdated(e *TodoUpdatedEvent)HandleRemoved(e *TodoRemovedEvent)}
iJSONStore.go
json文件读写接口
package todo_apptype iJSONStore interface {Load()Append(it *tJSONData)}
tEventBus.go
事件总线的实现
package todo_appimport ("learning/gooop/saga/mqs/logger""sync")type tEventBus struct {rwmutex *sync.RWMutexitems map[string][]*EventHandler}func newEventHandler(id string, handleFunc EventHandleFunc) *EventHandler {return &EventHandler{id, handleFunc,}}func newEventBus() IEventBus {it := new(tEventBus)it.init()return it}func (me *tEventBus) init() {me.rwmutex = new(sync.RWMutex)me.items = make(map[string][]*EventHandler)}func (me *tEventBus) Pub(e string, args interface{}) {me.rwmutex.RLock()defer me.rwmutex.RUnlock()handlers,ok := me.items[e]if ok {for _,it := range handlers {logger.Logf("eventbus.Pub, event=%s, handler=%s", e, it.ID)go it.Handler(e, args)}}}func (me *tEventBus) Sub(e string, id string, handleFunc EventHandleFunc) {me.rwmutex.Lock()defer me.rwmutex.Unlock()handler := newEventHandler(id, handleFunc)handlers,ok := me.items[e]if ok {me.items[e] = append(handlers, handler)} else {me.items[e] = []*EventHandler{handler }}}func (me *tEventBus) Unsub(e string, id string) {me.rwmutex.Lock()defer me.rwmutex.Unlock()handlers,ok := me.items[e]if ok {for i,it := range handlers {if it.ID == id {lastI := len(handlers) - 1if i != lastI {handlers[i], handlers[lastI] = handlers[lastI], handlers[i]}me.items[e] = handlers[:lastI]}}}}var GlobalEventBus = newEventBus()
tTodoEventSerializer.go
事件序列化到JSON的实现
package todo_apptype tTodoEventSerializer struct {}func newEventSeiralizer() iTodoEventSerializer {it := new(tTodoEventSerializer)return it}func (me *tTodoEventSerializer) serializeWithTag(tag int, v interface{}) *tJSONData {it := new(tJSONData)err := it.Set(TagCreated, v)if err != nil {return nil}return it}func (me *tTodoEventSerializer) SerializeCreatedEvent(e *TodoCreatedEvent) *tJSONData {return me.serializeWithTag(TagCreated, e)}func (me *tTodoEventSerializer) SerializeUpdatedEvent(e *TodoUpdatedEvent) *tJSONData {return me.serializeWithTag(TagUpdated, e)}func (me *tTodoEventSerializer) SerializeRemovedEvent(e *TodoRemovedEvent) *tJSONData {return me.serializeWithTag(TagRemoved, e)}const TagCreated = 1const TagUpdated = 2const TagRemoved = 3var gDefaultEventSerializer = newEventSeiralizer()
tTodoWriter.go
事件写入器的实现
package todo_apptype tTodoWriter struct {}func newTodoWriter() iTodoWriter {it := new(tTodoWriter)it.init()return it}func (me *tTodoWriter) init() {GlobalEventBus.Sub("todo.created", "", me.handleEvent)}func (me *tTodoWriter) handleEvent(e string, args interface{}) {switch e {case EventTodoCreated:if it,ok := args.(*TodoCreatedEvent);ok {me.HandleCreated(it)}breakcase EventTodoUpdated:if it,ok := args.(*TodoUpdatedEvent);ok {me.HandleUpdated(it)}breakcase EventTodoRemoved:if it,ok := args.(*TodoRemovedEvent);ok {me.HandleRemoved(it)}break}}func (me *tTodoWriter) HandleCreated(e *TodoCreatedEvent) {j := gDefaultEventSerializer.SerializeCreatedEvent(e)if j != nil {MockJSONStore.Append(j)}}func (me *tTodoWriter) HandleUpdated(e *TodoUpdatedEvent) {j := gDefaultEventSerializer.SerializeUpdatedEvent(e)if j != nil {MockJSONStore.Append(j)}}func (me *tTodoWriter) HandleRemoved(e *TodoRemovedEvent) {j := gDefaultEventSerializer.SerializeRemovedEvent(e)if j != nil {MockJSONStore.Append(j)}}
tMockJSONStore.go
虚拟的JSON文件读写实现
package todo_appimport "sync"type tMockJSONStore struct {rwmutex *sync.RWMutexonce sync.Onceitems []*tJSONData}func newMockJSONStore() iJSONStore {it := new(tMockJSONStore)it.init()return it}func (me *tMockJSONStore) init() {me.rwmutex = new(sync.RWMutex)me.items = []*tJSONData{}}func (me *tMockJSONStore) Load() {me.once.Do(func() {me.rwmutex.RLock()defer me.rwmutex.RUnlock()for _,it := range me.items {switch it.Tag {case TagCreated:v := new(TodoCreatedEvent)e := it.Get(v)if e == nil {GlobalEventBus.Pub(EventTodoCreated, e)}breakcase TagUpdated:v := new(TodoUpdatedEvent)e := it.Get(v)if e == nil {GlobalEventBus.Pub(EventTodoUpdated, e)}breakcase TagRemoved:v := new(TodoRemovedEvent)e := it.Get(v)if e == nil {GlobalEventBus.Pub(EventTodoRemoved, e)}break}}})}func (me *tMockJSONStore) Append(it *tJSONData) {me.rwmutex.Lock()defer me.rwmutex.Unlock()me.items = append(me.items, it)}var MockJSONStore = newMockJSONStore()
(未完待续)
