简介

在一个涉及多模块交互的系统中,如果模块的交互需要手动去调用对方的方法,那么代码的耦合度就太高了。所以产生了异步消息通信。实际上,各种各样的消息队列都是基于异步消息的。不过它们大部分都有着非常复杂的设计,很多被设计成一个独立的软件来使用。今天我们介绍一个非常小巧的异步消息通信库[message-bus](https://github.com/vardius/message-bus),它只能在一个进程中使用。源代码只有一个文件,我们也简单看一下实现。

快速使用

安装:

  1. $ go get github.com/vardius/message-bus

使用:

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. messagebus "github.com/vardius/message-bus"
  6. )
  7. func main() {
  8. queueSize := 100
  9. bus := messagebus.New(queueSize)
  10. var wg sync.WaitGroup
  11. wg.Add(2)
  12. _ = bus.Subscribe("topic", func(v bool) {
  13. defer wg.Done()
  14. fmt.Println(v)
  15. })
  16. _ = bus.Subscribe("topic", func(v bool) {
  17. defer wg.Done()
  18. fmt.Println(v)
  19. })
  20. bus.Publish("topic", true)
  21. wg.Wait()
  22. }

这是官网提供的例子,message-bus承担了模块间消息分发的角色。模块 A 和 模块 B 先向message-bus订阅主题(topic),即告诉message-bus对什么样的消息感兴趣。其他模块 C 产生某个主题的消息,通知message-bus,由message-bus分发到对此感兴趣的模块。这样就实现了模块之间的解耦,模块 A、B 和 C 之间不需要知道彼此。

上面的例子中:

  • 首先,调用messagebuss.New()创建一个消息管理器;
  • 其次调用Subscribe()方法向管理器订阅主题;
  • 调用Publish()向管理器发布主题消息,这样订阅该主题的模块就会收到通知。

更复杂的例子

其实很多人会对何时使用这个库产生疑问,message-bus GitHub 仓库中 issue 中至今还躺着这个问题,https://github.com/vardius/message-bus/issues/4。我是做游戏后端开发的,在一个游戏中会涉及各种各样的功能模块,它们需要了解其他模块产生的事件。例如每日任务有玩家升多少级的任务、成就系统有等级的成就、其他系统还可能根据玩家等级执行其他操作…如果硬写的话,最后可能是这样:

  1. func (p *Player) LevelUp() {
  2. // ...
  3. p.DailyMission.OnPlayerLevelUp(oldLevel, newLevel)
  4. p.Achievement.OnPlayerLevelUp(oldLevel, newLevel)
  5. p.OtherSystem.OnPlayerLevelUp(oldLevel, newLevel)
  6. }

需求一直在新增和迭代,如果新增一个模块,也需要在玩家升级时进行一些处理,除了实现模块自身的OnPlayerLevelUp方法,还必须在玩家的LevelUp()方法调用。这样玩家模块必须清楚地知道其他模块的情况。如果功能模块再多一点,而且由不同的人开发的,那么情况会更复杂。使用异步消息可有效解决这个问题:在升级时我们只需要向消息管理器发布这个升级“消息”,由消息管理器通知订阅该消息的模块。

我们设计的目录结构如下:

  1. game
  2. ├── achievement.go
  3. ├── daily_mission.go
  4. ├── main.go
  5. ├── manager.go
  6. └── player.go

其中manager.go负责message-bus的创建:

  1. package main
  2. import (
  3. messagebus "github.com/vardius/message-bus"
  4. )
  5. var bus = messagebus.New(10)

player.go对应玩家结构(为了简便起见,很多字段省略了):

  1. package main
  2. type Player struct {
  3. level uint32
  4. }
  5. func NewPlayer() *Player {
  6. return &Player{}
  7. }
  8. func (p *Player) LevelUp() {
  9. oldLevel := p.level
  10. newLevel := p.level+1
  11. p.level++
  12. bus.Publish("UserLevelUp", oldLevel, newLevel)
  13. }

achievement.godaily_mission.go分别是成就和每日任务(也是省略了很多无关细节):

  1. // achievement.go
  2. package main
  3. import "fmt"
  4. type Achievement struct {
  5. // ...
  6. }
  7. func NewAchievement() *Achievement {
  8. a := &Achievement{}
  9. bus.Subscribe("UserLevelUp", a.OnUserLevelUp)
  10. return a
  11. }
  12. func (a *Achievement) OnUserLevelUp(oldLevel, newLevel uint32) {
  13. fmt.Printf("daily mission old level:%d new level:%d\n", oldLevel, newLevel)
  14. }
  1. // daily_mission.go
  2. package main
  3. import "fmt"
  4. type DailyMission struct {
  5. // ...
  6. }
  7. func NewDailyMission() *DailyMission {
  8. d := &DailyMission{}
  9. bus.Subscribe("UserLevelUp", d.OnUserLevelUp)
  10. return d
  11. }
  12. func (d *DailyMission) OnUserLevelUp(oldLevel, newLevel uint32) {
  13. fmt.Printf("daily mission old level:%d new level:%d\n", oldLevel, newLevel)
  14. }

在创建这两个功能的对象时,我们订阅了UserLevelUp主题。玩家在升级时会发布这个主题。

最后main.go驱动整个程序:

  1. package main
  2. import "time"
  3. func main() {
  4. p := NewPlayer()
  5. NewDailyMission()
  6. NewAchievement()
  7. p.LevelUp()
  8. p.LevelUp()
  9. p.LevelUp()
  10. time.Sleep(1000)
  11. }

注意,由于message-bus是异步通信,为了能看到结果我特意加了time.Sleep,实际开发中不太可能使用Sleep

最后我们运行整个程序:

  1. $ go run .

因为要运行的是一个多文件程序,不能使用go run main.go

实际上,当年我因为苦于模块之间调来调去太麻烦了,自己用 C++ 撸了一个event-managerhttps://github.com/go-quiz/event-manager。思路是一样的。

缺点

message-bus订阅主题时传入一个函数,函数的参数可任意设置,发布时必须使用相同数量的参数,这个限制感觉有点勉强。如果我们传入的参数个数不一致,程序就panic了。我认为可以只用一个参数interface{},传入对象即可。例如,上面的升级事件可以使用EventUserLevelUp的对象:

  1. type EventUserLevelUp struct {
  2. oldLevel uint32
  3. newLevel uint32
  4. }

对应地修改一下PlayerLevelUp方法:

  1. func (p *Player) LevelUp() {
  2. event := &EventUserLevelUp {
  3. oldLevel: p.level,
  4. newLevel: p.level+1,
  5. }
  6. p.level++
  7. bus.Publish("UserLevelUp", event)
  8. }

和处理方法:

  1. func (d *DailyMission) OnUserLevelUp(arg interface{}) {
  2. event := arg.(*EventUserLevelUp)
  3. fmt.Printf("daily mission old level:%d new level:%d\n", event.oldLevel, event.newLevel)
  4. }

这样一来,我们似乎用不上反射了,订阅者都是func (interface{})类型的函数或方法。感兴趣的可自己实现一下,我 fork 了message-bus,做了这个修改。改动在这里:https://github.com/go-quiz/message-busmessage-bus有测试和性能用例,改完跑一下😄。

源码分析

message-bus的源码只有一个文件,加上注释还不到 130 行,我们简单来看一下。

MessageBus就是一个简单的接口:

  1. type MessageBus interface {
  2. Publish(topic string, args ...interface{})
  3. Close(topic string)
  4. Subscribe(topic string, fn interface{}) error
  5. Unsubscribe(topic string, fn interface{}) error
  6. }

PublishSubscribe都讲过了,Unsubscribe表示对某个主题不感兴趣了,取消订阅,Close直接关闭某个主题的队列,删除所有订阅者。

message-bus内部,每个主题对应一组订阅者。每个订阅者使用handler结构存储回调和参数通道:

  1. type handler struct {
  2. callback reflect.Value
  3. queue chan []reflect.Value
  4. }

所有订阅者都存储在一个 map 中:

  1. type handlersMap map[string][]*handler
  2. type messageBus struct {
  3. handlerQueueSize int
  4. mtx sync.RWMutex
  5. handlers handlersMap
  6. }

messageBusMessageBus接口的实现。我们来看看各个方法是如何实现的。

  1. func (b *messageBus) Subscribe(topic string, fn interface{}) error {
  2. h := &handler{
  3. callback: reflect.ValueOf(fn),
  4. queue: make(chan []reflect.Value, b.handlerQueueSize),
  5. }
  6. go func() {
  7. for args := range h.queue {
  8. h.callback.Call(args)
  9. }
  10. }()
  11. b.handlers[topic] = append(b.handlers[topic], h)
  12. return nil
  13. }

调用Subscribe时传入一个函数,message-bus为每个订阅者创建一个handler对象,在该对象中创建一个带缓冲的参数通道,缓冲大小由message-bus创建时的参数指定。
同时启动一个goroutine,监听通道,每当有参数到来时就执行注册的回调。

  1. func (b *messageBus) Publish(topic string, args ...interface{}) {
  2. rArgs := buildHandlerArgs(args)
  3. if hs, ok := b.handlers[topic]; ok {
  4. for _, h := range hs {
  5. h.queue <- rArgs
  6. }
  7. }
  8. }

Publish发布主题,buildHandlerArgs将传入的参数转为[]reflect.Value,以便反射调用回调时传入。发送参数到该主题下所有handler的通道中。由Subscribe时创建的goroutine读取并触发回调。

  1. func (b *messageBus) Unsubscribe(topic string, fn interface{}) error {
  2. rv := reflect.ValueOf(fn)
  3. if _, ok := b.handlers[topic]; ok {
  4. for i, h := range b.handlers[topic] {
  5. if h.callback == rv {
  6. close(h.queue)
  7. b.handlers[topic] = append(b.handlers[topic][:i], b.handlers[topic][i+1:]...)
  8. }
  9. }
  10. return nil
  11. }
  12. return fmt.Errorf("topic %s doesn't exist", topic)
  13. }

Unsubscribe将某个订阅者从message-bus中移除,移除时需要关闭通道,否则会造成订阅者的 goroutine 泄露。

  1. func (b *messageBus) Close(topic string) {
  2. if _, ok := b.handlers[topic]; ok {
  3. for _, h := range b.handlers[topic] {
  4. close(h.queue)
  5. }
  6. delete(b.handlers, topic)
  7. return
  8. }
  9. }

Close关闭某主题下所有的订阅者参数通道,并删除该主题。

注意,为了保证并发安全,每个方法都加了锁,分析实现时先忽略锁和错误处理。

为了更直观的理解,我画了一个message-bus内部结构图:

每日一库之16:message-bus - 图1

总结

message-bus是一个小巧的异步通信库,实际使用可能不多,但却是学习源码的好资源。

大家如果发现好玩、好用的 Go 语言库,欢迎到 Go 每日一库 GitHub 上提交 issue😄

参考

  1. message-bus GitHub:https://github.com/vardius/message-bus
  2. Go 每日一库 GitHub:https://github.com/go-quiz/go-daily-lib