go-micro
Go Micro是一个流行的微服务架构,是一个插件化的基础框架,基于此可以构建微服务,Micro的设计哲学是可插拔的插件化架构。Go Micro 简单轻巧、易于上手、功能强大、扩展方便,是基于 Go 语言进行微服务架构时非常值得推荐的一个框架。
Go Micro有以下重要特性:
- 服务发现:自动服务注册和名称解析。服务发现是微服务开发的核心。
- 负载均衡:基于服务发现构建的客户端负载均衡。一旦我们获得了服务的任意数量实例的地址,我们现在需要一种方法来决定要路由到哪个节点。
- 消息编码:基于内容类型的动态消息编码。这包括默认的protobuf和json。
- 请求/响应:基于RPC的请求/响应,支持双向流。
- Async Messaging:PubSub是异步通信和事件驱动架构的重要设计思想。事件通知是微服务开发的核心模式。
- 可插拔接口:Go Micro为每个分布式系统抽象使用Go接口,因此,这些接口是可插拔的,并允许Go Micro与运行时无关,可以插入任何基础技术
go-micro通信流程
通信的角色一共4个:server,client,register和broker,他们的各种的作用在于:
- Server监听客户端的调用,和Broker推送过来的信息进行处理。并且Server端需要向Register注册自己的存在或消亡,这样Client才能知道自己的状态;
- Register服务的注册的发现,Client端从Register中得到Server的信息,然后每次调用都根据算法选择一个的Server进行通信,当然通信是要经过编码/解码,选择传输协议等一系列过程;
- 如果有需要通知所有的Server端可以使用Broker进行信息的推送,Broker 通过队列进行信息的接收和发布;
Go Micro 框架的基础架构如上图所示,由 8 个核心接口组成,每个接口都有默认实现。Go micro 由以下接口列表组成:
- 最顶层的 Service 接口是构建服务的主要组件,它把底层的各个包需要实现的接口,做了一次封装,包含了一系列用于初始化 Service 和 Client 的方法,使我们可以很简单的创建一个 RPC 服务;
- server - 用于处理请求和通知。服务器是编写服务的构建基块,内置服务器是 RPC 系统
- client - 用于高级别请求/响应和通知;
- broker - 异步消息传递,其实就是一个消息队列系统;
- config - 用于动态配置的。配置是一个接口, 用于从任意数量的源进行动态配置加载,其实就是一个配置进程/中心;
- codec - 用于消息编码的(序列化和反序列化)。编解码器用于编码和解码消息, 然后再通过导线传输消息. 数据可能是 json, protobuf, beson, msgpack 等。
- registry - 服务发现的注册表。注册表提供一种服务发现机制, 用于将名称解析为地址. 它可以由 consul, etcd zookeeper, dns, gossip 等支持. 服务应在启动时使用注册表进行注册, 并在关闭时取消注册。
- selector - 用于负载平衡。选择器是一个负载平衡抽象, 它建立在注册表上。 户在发出请求时利用选择器. 客户端将使用选择器而不是注册表, 因为它提供了内置的负载平衡机制.
- store - 用于数据存储。存储是一个简单的键值存储接口, 用于抽象掉轻量级数据存储,仅用于保存简单的状态信息,比如用户的验证状态。
- transport - 用于同步通信。传输是服务之间同步请求/响应通信的接口. 它类似于 golang 网络包, 但提供了一个更高级别的抽象, 允许我们切换通信机制
Go Micro的接口间的关系如上图所示,每个接口都支持业界流行的开源方案,具体如下:
go-micro最重要的是service的创建,另外负责服务发现的
register以及负责异步通信的broker都是值得掌握的部分,下面就将介绍go-micro的这三个部分。
新建一个RPC服务(基于micro v2)
先安装相关组件
- go get github.com/micro/micro/v3/cmd/protoc-gen-micro@master
- go get github.com/micro/micro/v2
我们先定义一个简单的protobuf协议文件greet.proto,定义的服务名称为Greet。
syntax = "proto3"; // 指定proto版本
// 指定golang包名
option go_package = "pb/proto_demo";
service Greeter {
rpc Greet(Request) returns (Response) {}
}
message Request {
string name = 1;
}
message Response {
string msg = 1;
}
安装之后,利用protoc-gen-go和protoc-gen-micro生成协议go文件。这次一共会生成两个协议文件:greet.pb.go和greet.pb.micro.go。
protoc --micro_out=. --go_out=. ./greet.proto
着重观察下生成的micro版的go协议文件greet.pb.micro.go
// Code generated by protoc-gen-micro. DO NOT EDIT.
// source: greet.proto
package proto_demo
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
math "math"
)
import (
context "context"
client "github.com/micro/go-micro/v2/client"
server "github.com/micro/go-micro/v2/server"
api "github.com/micro/micro/v3/service/api"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
// Reference imports to suppress errors if they are not otherwise used.
var _ api.Endpoint
var _ context.Context
var _ client.Option
var _ server.Option
// Api Endpoints for Greeter service
func NewGreeterEndpoints() []*api.Endpoint {
return []*api.Endpoint{}
}
// Client API for Greeter service
type GreeterService interface {
Greet(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error)
}
type greeterService struct {
c client.Client
name string
}
func NewGreeterService(name string, c client.Client) GreeterService {
return &greeterService{
c: c,
name: name,
}
}
func (c *greeterService) Greet(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error) {
req := c.c.NewRequest(c.name, "Greeter.Greet", in)
out := new(Response)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Greeter service
type GreeterHandler interface {
Greet(context.Context, *Request, *Response) error
}
func RegisterGreeterHandler(s server.Server, hdlr GreeterHandler, opts ...server.HandlerOption) error {
type greeter interface {
Greet(ctx context.Context, in *Request, out *Response) error
}
type Greeter struct {
greeter
}
h := &greeterHandler{hdlr}
return s.Handle(s.NewHandler(&Greeter{h}, opts...))
}
type greeterHandler struct {
GreeterHandler
}
func (h *greeterHandler) Greet(ctx context.Context, in *Request, out *Response) error {
return h.GreeterHandler.Greet(ctx, in, out)
}
这个文件里定义了服务结构体GreeterService,RPC函数实现Greet。
现在先编写服务端代码micro_server/main.go
package main
import (
"context"
"fmt"
micro "github.com/micro/go-micro/v2"
proto "web_demo/proto/pb/proto_demo"
)
type Greeter struct{}
func (g *Greeter) Greet(ctx context.Context, req *proto.Request, rsp *proto.Response) error {
rsp.Msg = "Greet " + req.Name
return nil
}
func main() {
// 创建一个新服务
service := micro.NewService(
micro.Name("greeter"),
)
// 服务初始化
service.Init()
// 注册 handler处理函数
proto.RegisterGreeterHandler(service.Server(), new(Greeter))
// 启动服务
if err := service.Run(); err != nil {
fmt.Println(err)
}
}
编写客户端micro_client/main.go,客户端向服务器发起RPC调用。
package main
import (
"context"
"fmt"
micro "github.com/micro/go-micro/v2"
proto "web_demo/proto/pb/proto_demo"
)
func main() {
// 创建新服务
service := micro.NewService(micro.Name("greeter.client"))
// 服务初始化
service.Init()
// 创建RPC的客户端实例
greeter := proto.NewGreeterService("greeter", service.Client())
// 发起RPC调用
rsp, err := greeter.Greet(context.TODO(), &proto.Request{Name: "John"})
if err != nil {
fmt.Println(err)
}
// 打印返回值
fmt.Println(rsp.Msg)
}
先启动server
unshideMacBook-Pro:micro_server junshili$ go run main.go
2021-05-07 00:50:11 file=v2@v2.9.1/service.go:200 level=info Starting [service] greeter
2021-05-07 00:50:11 file=grpc/grpc.go:864 level=info Server [grpc] Listening on [::]:56014
2021-05-07 00:50:11 file=grpc/grpc.go:697 level=info Registry [mdns] Registering node: greeter-147bbaba-7f7e-46af-bb5e-81d311da0d1a
当然,我们也可以指定端口启动服务,这个参数传入的端口是优先于代码设置的
go run main.go --server_address :8088
我们可以从服务器启动时打印的这些日志得到一些信息:
- RPC通信框架用的是gRPC,这是go-mico默认的;
- 我们启动的这个服务的名字叫greeter;
- 服务监听的端口是56014;
- 服务发现用的组件是mdns,这是go-mirco默认的;
我们也可以指定端口号来启动服务
junshideMacBook-Pro:micro_server junshili$ go run main.go --server_address :8088
2021-05-07 01:38:25 file=v2@v2.9.1/service.go:200 level=info Starting [service] greeter
2021-05-07 01:38:25 file=grpc/grpc.go:864 level=info Server [grpc] Listening on [::]:8088
再启动client发起RPC调用
junshideMacBook-Pro:micro_client junshili$ go run main.go
Greet John
当我kill掉server时,会打印以下日志,表明:1.服务从服务发现组件注销注册了;2.Broker和该服务断开了连接。
2021-05-07 01:32:38 file=grpc/grpc.go:791 level=info Deregistering node: greeter-5e32cdd2-4099-4ee9-b2df-bdeee2cd5ff4
2021-05-07 01:32:38 file=grpc/grpc.go:959 level=info Broker [http] Disconnected from 127.0.0.1:0
注意,使用micro/v2时,protoc3生成micro.protoc文件可能会导致版本冲突,在go run main.go时会报错:
使用micro/v2时,protoc3生成micro.protoc文件导致的版本冲突
cannot use service.Server() (type
“github.com/micro/go-micro/v2/server”.Server) as type
“github.com/micro/micro/v3/service/server”
解决方案:
可将生成的*.pb.micro.go文件中的v3依赖改为v2依赖即可
import (
context "context"
client "github.com/micro/go-micro/v2/client"
server "github.com/micro/go-micro/v2/server"
api "github.com/micro/micro/v3/service/api"
)
新建服务(基于micro v3)
现在go micro已经推出了V3版本,因为V3和V2版本有较多的不同,且存在兼容性不足的问题,如果V2和V3混用的话,会有较多难以解决的兼容性问题,因此这里强烈推荐使用V3作为go micro的实战版本,会少走很多弯路。下面的所有例子都是基于V3的实践。
首先给出官方的上手文档:https://micro.mu/getting-started
首先安装/升级自己的micro版本至V3
go get github.com/micro/micro/v3
安装完之后,启动micro相关的服务进程
junshideMacBook-Pro:~ junshili$ micro server
2021-05-09 00:32:01 file=server/server.go:86 level=info Starting server
2021-05-09 00:32:01 file=server/server.go:114 level=info Registering network
2021-05-09 00:32:01 file=server/server.go:114 level=info Registering runtime
2021-05-09 00:32:01 file=server/server.go:114 level=info Registering registry
2021-05-09 00:32:01 file=server/server.go:114 level=info Registering config
2021-05-09 00:32:01 file=server/server.go:114 level=info Registering store
2021-05-09 00:32:01 file=server/server.go:114 level=info Registering broker
2021-05-09 00:32:01 file=server/server.go:114 level=info Registering events
2021-05-09 00:32:01 file=server/server.go:114 level=info Registering auth
2021-05-09 00:32:01 file=server/server.go:114 level=info Registering proxy
2021-05-09 00:32:01 file=server/server.go:114 level=info Registering api
2021-05-09 00:32:01 file=server/server.go:201 level=info Starting server runtime
2021-05-09 00:32:01 file=service/service.go:195 level=info Starting [service] server
2021-05-09 00:32:01 file=grpc/grpc.go:939 level=info Server [grpc] Listening on [::]:10001
2021-05-09 00:32:01 file=grpc/grpc.go:769 level=info Registry [mdns] Registering node: server-fc07e464-99c4-406d-b608-b54ef3c9bdde
可以注意到,registry,auth等Micro组件都启动起来了。与V2版本不一样的是,接下来需要登录账号,做身份验证。这一步很重要,不然没有登录,后续操作micro会提示权限不足。username固定为admin,password固定为micro。
$ micro login
Enter username: admin
Enter password:
Successfully logged in.
查看micro框架下跑着哪些服务,可以看到这些服务都是micro框架自带的,在我们执行micro server时启动。
junshideMacBook-Pro:~ junshili$ micro services
api
auth
broker
config
events
network
proxy
registry
runtime
server
store
现在我们打算启动一个自己的服务,micro v3提供了一个非常好用的服务生成工具,可以帮我们直接生成服务模板,我们只需要在模板上增添自己的内容就可以了,生产效率大幅提升。使用micro new 服务名就可以生成模板代码。服务名不要使用下划线。
junshideMacBook-Pro:~ junshili$ micro new hellodemo
Creating service hellodemo
.
├── micro.mu
├── main.go
├── generate.go
├── handler
│ └── hellodemo.go
├── proto
│ └── hellodemo.proto
├── Dockerfile
├── Makefile
├── README.md
├── .gitignore
└── go.mod
进入项目目录,编译协议
junshideMacBook-Pro:hellodemo junshili$ make proto
protoc --proto_path=. --micro_out=. --go_out=:. proto/hellodemo.proto
在proto目录下自动生成了pb.go和.pb.micro.go的文件。
../hellodemo/
├── Dockerfile
├── Makefile
├── README.md
├── generate.go
├── go.mod
├── handler
│ └── hellodemo.go
├── main.go
├── micro.mu
└── proto
├── hellodemo.pb.go
├── hellodemo.pb.micro.go
└── hellodemo.proto
启动我们的服务
junshideMacBook-Pro:hellodemo junshili$ micro run .
查看我们的服务是否正常启动
junshideMacBook-Pro:hellodemo junshili$ micro services
api
auth
broker
config
events
hellodemo
network
proxy
registry
runtime
server
store
junshideMacBook-Pro:hellodemo junshili$ micro status
NAME VERSION SOURCE STATUS BUILD UPDATED METADATA
hellodemo latest /Users/junshili/hellodemo running n/a 1m58s ago owner=admin, group=micro
可以看到,我们的服务正常运行在micro框架之内。现在我们要基于这个服务模板做点自己的修改。
现在协议文件上增加rpc 函数和消息结构体。
service Hellodemo {
rpc Call(Request) returns (Response) {}
rpc Stream(StreamingRequest) returns (stream StreamingResponse) {}
rpc PingPong(stream Ping) returns (stream Pong) {}
rpc Greet(GreetReq) returns (GreetRsp) {}
}
message GreetReq {
string name = 1;
int32 age = 2;
}
message GreetRsp {
string msg = 1;
int32 status = 2;
}
再到在handler/hellodemo.go这个文件里,加一个自己的Greet的实现。
func (e *Hellodemo) Greet(ctx context.Context, req *hellodemo.GreetReq, rsp *hellodemo.GreetRsp) error {
log.Info("Received Hellodemo.Greet request")
rsp.Msg = fmt.Sprintf("Greet %s, your age is %d", req.Name, req.Age)
rsp.Status = 200
return nil
}
我们的功能代码已经完成编写,准备把服务更新一下,因为服务已经在run了,所以我们直接使用micro update .
把服务更出去,类似于我们常用的热更,不停服更新。
然后用micro logs hellodemo
看下输出日志是否正常。确认启动正常后,我们可以使用micro框架cli给该服务直接发送请求,测试服务可用性,而无须再写测试client。
先用help指令查看hellodemo对外提供了哪些可调用的方法。
junshideMacBook-Pro:hellodemo junshili$ micro hellodemo --help
NAME:
micro hellodemo
VERSION:
latest
USAGE:
micro hellodemo [command]
COMMANDS:
call
greet
pingPong
stream
我们测试下call和greet方法。
junshideMacBook-Pro:hellodemo junshili$ micro hellodemo call --name=James
{
"msg": "Hello James"
}
junshideMacBook-Pro:hellodemo junshili$ micro hellodemo greet --name=James --age=20
{
"msg": "Greet James, your age is 20",
"status": 200
}
我们同样可以通过http的方式访问请求RPC,中介是micro的api服务,请求会通过api服务,再调到我们制定的服务。默认API的address是127.0.0.1:8080。
junshideMacBook-Pro:helloworld junshili$ curl -XPOST --header "Content-Type: application/json" -d '{"name":"Joe", "age":30}' http://127.0.0.1:8080/hellodemo/greet
{"msg":"Greet Joe, your age is 30","status":200}
搭建Http服务
上面介绍了搭建RPC服务的流程,如果要搭建HTTP服务,其实跟上面流程一样,区别只在于在main.go中调用http相关相关处理即可,比如我们使用gin来实现我们的http服务。
第一步先micro new helloweb,创建服务模板,然后我们之间在main.go添加http处理的相关代码:
package main
import (
"net/http"
"github.com/gin-gonic/gin"
)
func HelloWeb(c *gin.Context) {
c.String(http.StatusOK, "Hello, Go\n")
}
func HiWeb(c *gin.Context) {
c.String(http.StatusOK, "Hi, Go\n")
}
func main() {
// 使用gin作为路由
r := gin.Default()
r.GET("/hello", func(c *gin.Context) {
HelloWeb(c)
})
r.POST("/hi", func(c *gin.Context) {
HiWeb(c)
})
r.Run(":9999") // listen and serve on 0.0.0.0:9999
}
请求和响应如下,如果想要构建更复杂的http项目,请参考我的上一篇文章:Go快速上手—Web服务器篇.
junshideMacBook-Pro:blog junshili$ curl -X POST http://127.0.0.1:9999/hi
Hi, Go
junshideMacBook-Pro:blog junshili$ curl http://127.0.0.1:9999/hello
Hello, Go
需要kill掉这个服务,需要使用
micro kill example
发布订阅(基于消息队列的异步通信)
发布订阅模式在后台服务中广泛使用,go micro框架中,Broker服务就是用于支持发布订阅模式(pub/sub)。发布订阅模式,我们一般使用消息队列作为中间件实现异步通信,从而做到系统解耦、削峰稳流。在go micro框架中,Broker就是负责消息队列这个功能,消息生产者负责生产消息,推送给Broker,消息消费者server向Broker订阅指定类型的消息(我们称为topic),即Broker注册自己的消息处理,一旦有消息到来,则调用响应的消息处理逻辑。Micro的Broker默认使用了Nats作为消息队列,同时他也支持业界常用的消息队列,比如Kafka,RabbitMQ等,因此我们可以根据我们的需求选择消息队列组件作为Broker的底层支持。
Micro内置的Pub/Sub模式很简单易用,用户只需要定义好publisher, subscriber以及消息内容,其他工作都将由框架实现。这里给出micor pub/sub的实践案例。
Broker、publisher、subscriber三者通信的消息结构体为Message,Message结构体的定义如下。
type Message struct {
Header map[string]string
Body []byte
}
消息订阅者的实现消息订阅如下:
- subscrber先连接上broker,broker.Connect()
- 对指定的topic进行注册,broker.Subscribe。特殊地,如果是点对点通信,那需要在,broker.Subscribe的第三个参数加上broker.Queue(topic)。如果是发布订阅模式则不需要。
- 实现消息处理函数,传入broker.Subscribe第二个参数。
package main
import (
"fmt"
"github.com/micro/micro/v3/service"
"github.com/micro/micro/v3/service/logger"
"github.com/micro/go-micro/broker"
)
var topic1 = "topic1"
var topic2 = "topic2"
func handleEvent(b broker.Event) error {
msg := string(b.Message().Body)
logger.Infof("[sub] recieve message: %s, header: %s\n", msg, b.Message().Header)
return nil
}
// 点对点消息通信,消息不会复制,只会被一个消费者消费
func subTopicQ(topic string) {
_, err := broker.Subscribe(topic, handleEvent, broker.Queue(topic))
if err != nil {
fmt.Println(err)
}
}
// 发布订阅模式消息通信,消息会复制,可以被多个消费者消费
func subTopic(topic string) {
_, err := broker.Subscribe(topic, handleEvent)
if err != nil {
fmt.Println(err)
}
}
func main() {
// Create service
srv := service.New(
service.Name("hellosubscriber"),
service.Version("latest"),
)
srv.Init()
if err := broker.Connect(); err != nil {
logger.Error("Broker Connect error: ", err)
}
subTopic(topic1)
subTopicQ(topic2)
// Run service
if err := srv.Run(); err != nil {
logger.Fatal(err)
}
}
消息发布的实现消息发布的步骤如下:
- publisher先连接上broker,broker.Connect()
- broker.Publish(topic, msg) 直接对指定topic发送消息。
package main
import (
"fmt"
"github.com/micro/micro/v3/service"
"github.com/micro/micro/v3/service/logger"
"github.com/micro/go-micro/broker"
"time"
"github.com/pborman/uuid"
)
var topic1 = "topic1"
var topic2 = "topic2"
func handleEvent(b broker.Event) error {
msg := string(b.Message().Body)
logger.Infof("[sub] recieve message: %s, header: %s\n", msg, b.Message().Header)
return nil
}
// 发布消息
func pubTopic(topic string) {
for range time.Tick(time.Second) {
msg := &broker.Message {
Header: map[string]string {
"id": uuid.NewUUID().String(),
},
Body: []byte(fmt.Sprintf("Messaging you all day on %s", topic)),
}
if err := broker.Publish(topic, msg); err != nil {
logger.Error("Broker Publish error: ", err)
} else {
logger.Infof("Broker Publish topic:%s msg: %s", topic, msg)
}
}
}
func main() {
// Create service
srv := service.New(
service.Name("hellopublisher"),
service.Version("latest"),
)
srv.Init()
if err := broker.Connect(); err != nil {
logger.Error("Broker Connect error: ", err)
}
go pubTopic(topic1)
go pubTopic(topic2)
// Run service
if err := srv.Run(); err != nil {
logger.Fatal(err)
}
}
实践过程:
- 订阅服务,我起了两个,命名为sub1,sub2
- 发布服务,我起了pub,pub里两个协程向topic1和topic2发送消息
micro run --name sub1 .
micro run --name sub2 .
micro run --name pub .
unshideMacBook-Pro:hellosubcriber junshili$ micro status
NAME VERSION SOURCE STATUS BUILD UPDATED METADATA
pub latest /Users/junshili/hellopublisher running n/a 1m48s ago owner=admin, group=micro
sub1 latest /Users/junshili/hellosubcriber running n/a 13s ago owner=admin, group=micro
sub2 latest /Users/junshili/hellosubcriber running n/a 6s ago owner=admin, group=micro
junshideMacBook-Pro:hellosubcriber junshili$ micro stats --all custom
SERVICE hellopublisher
VERSION latest
NODE ADDRESS:PORT STARTED UPTIME MEMORY THREADS GC
hellopublisher-a5c3156e-cb26-40af-b899-092195a49fd4 192.168.1.101:54250 May 13 00:46:28 2m5s 2.88mb 43 10.670273ms
SERVICE hellosubscriber
VERSION latest
NODE ADDRESS:PORT STARTED UPTIME MEMORY THREADS GC
hellosubscriber-7035f6b4-ebd8-4dbf-bb10-eb49cf7ca34c 192.168.1.101:54578 May 13 00:48:13 21s 2.54mb 33 114.399µs
hellosubscriber-aa456e83-c035-43dc-9a71-a459c2c058c3 192.168.1.101:54572 May 13 00:48:13 21s 2.63mb 33 121.528µs
我们使用micro logs -f helloworld
或者 micro logs helloworld
查看服务输出的日志。
先观察pub服务的日志,pub在向发送topic1和topic2消息
2021-05-13 00:46:29 file=blob-940620946/main.go:34 level=info Broker Publish topic:topic2 msg: &{map[id:99b30102-b341-11eb-b5c8-acde48001122] Messaging you all day on topic2}
2021-05-13 00:46:29 file=blob-940620946/main.go:34 level=info Broker Publish topic:topic1 msg: &{map[id:99b30184-b341-11eb-b5c8-acde48001122] Messaging you all day on topic1}
2021-05-13 00:46:30 file=blob-940620946/main.go:34 level=info Broker Publish topic:topic2 msg: &{map[id:9a4af598-b341-11eb-b5c8-acde48001122] Messaging you all day on topic2}
2021-05-13 00:46:30 file=blob-940620946/main.go:34 level=info Broker Publish topic:topic1 msg: &{map[id:9a4af692-b341-11eb-b5c8-acde48001122] Messaging you all day on topic1}
2021-05-13 00:46:31 file=blob-940620946/main.go:34 level=info Broker Publish topic:topic1 msg: &{map[id:9ae409d6-b341-11eb-b5c8-acde48001122] Messaging you all day on topic1}
2021-05-13 00:46:31 file=blob-940620946/main.go:34 level=info Broker Publish topic:topic2 msg: &{map[id:9ae40ae4-b341-11eb-b5c8-acde48001122] Messaging you all day on topic2}
2021-05-13 00:46:32 file=blob-940620946/main.go:34 level=info Broker Publish topic:topic2 msg: &{map[id:9b7ca894-b341-11eb-b5c8-acde48001122] Messaging you all day on topic2}
2021-05-13 00:46:32 file=blob-940620946/main.go:34 level=info Broker Publish topic:topic1 msg: &{map[id:9b7ca7ea-b341-11eb-b5c8-acde48001122] Messaging you all day on topic1}
2021-05-13 00:46:33 file=blob-940620946/main.go:34 level=info Broker Publish topic:topic1 msg: &{map[id:9c152dc6-b341-11eb-b5c8-acde48001122] Messaging you all day on topic1}
2021-05-13 00:46:33 file=blob-940620946/main.go:34 level=info Broker Publish topic:topic2 msg: &{map[id:9c152a88-b341-11eb-b5c8-acde48001122] Messaging you all day on topic2}
观察sub1的日志
2021-05-13 00:50:14 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:1fcede3c-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:14 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:1fcedfb8-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:15 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:2067d614-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:15 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:2067d876-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:16 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:20ffd928-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:16 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:20ffd6c6-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:17 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:2198c3d6-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:17 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:2198c2e6-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:18 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:22317f90-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:18 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:22318634-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:19 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:22ca283a-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:19 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:22ca25b0-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:20 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:23628e5e-b342-11eb-b5c8-acde48001122]
观察sub2的日志
2021-05-13 00:50:14 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:1fcedfb8-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:15 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:2067d876-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:16 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:20ffd6c6-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:17 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:2198c2e6-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:18 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:22318634-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:19 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:22ca283a-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:20 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:23628d1e-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:20 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:23628e5e-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:21 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:23fb6cc8-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:21 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:23fb6d90-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:22 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:24940438-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:22 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:2494015e-b342-11eb-b5c8-acde48001122]
对比sub1和sub2的日志可以看出,sub1和sub2都能收到topic1的消息,证实topic1可以被多个订阅者消费,符合发布订阅模式。至于topic2,只能被一个消费者消费,不存在一条消息被多个消费者消费的情况,对应的是点对点消息队列通信机制。因此,需要区分这两种通信机制的go micro写法。
go micro默认的消息队列组件是自己实现的,实际上其实就是一个map,topic是map的key,发布消息时就往map里存,然后broker调度给订阅了该topic的服务推送。生产环境请更换为kafka,rabbitmq这类专用消息队列。
https://github.com/micro/micro/blob/master/service/broker/memory/memory.go
type memoryBroker struct {
opts broker.Options
addr string
sync.RWMutex
connected bool
Subscribers map[string][]*memorySubscriber
}
type memorySubscriber struct {
id string
topic string
exit chan bool
handler broker.Handler
opts broker.SubscribeOptions
}
go micro broker支持的消息队列在以下链接可以获取:
https://github.com/microhq/go-plugins/tree/master/broker