go-micro

Go Micro是一个流行的微服务架构,是一个插件化的基础框架,基于此可以构建微服务,Micro的设计哲学是可插拔的插件化架构。Go Micro 简单轻巧、易于上手、功能强大、扩展方便,是基于 Go 语言进行微服务架构时非常值得推荐的一个框架。

Go Micro有以下重要特性:

  • 服务发现:自动服务注册和名称解析。服务发现是微服务开发的核心。
  • 负载均衡:基于服务发现构建的客户端负载均衡。一旦我们获得了服务的任意数量实例的地址,我们现在需要一种方法来决定要路由到哪个节点。
  • 消息编码:基于内容类型的动态消息编码。这包括默认的protobuf和json。
  • 请求/响应:基于RPC的请求/响应,支持双向流。
  • Async Messaging:PubSub是异步通信和事件驱动架构的重要设计思想。事件通知是微服务开发的核心模式。
  • 可插拔接口:Go Micro为每个分布式系统抽象使用Go接口,因此,这些接口是可插拔的,并允许Go Micro与运行时无关,可以插入任何基础技术

2.png

go-micro通信流程

通信的角色一共4个:server,client,register和broker,他们的各种的作用在于:

  1. Server监听客户端的调用,和Broker推送过来的信息进行处理。并且Server端需要向Register注册自己的存在或消亡,这样Client才能知道自己的状态;
  2. Register服务的注册的发现,Client端从Register中得到Server的信息,然后每次调用都根据算法选择一个的Server进行通信,当然通信是要经过编码/解码,选择传输协议等一系列过程;
  3. 如果有需要通知所有的Server端可以使用Broker进行信息的推送,Broker 通过队列进行信息的接收和发布;

3.png

Go Micro 框架的基础架构如上图所示,由 8 个核心接口组成,每个接口都有默认实现。Go micro 由以下接口列表组成:

  • 最顶层的 Service 接口是构建服务的主要组件,它把底层的各个包需要实现的接口,做了一次封装,包含了一系列用于初始化 Service 和 Client 的方法,使我们可以很简单的创建一个 RPC 服务;
  • server - 用于处理请求和通知。服务器是编写服务的构建基块,内置服务器是 RPC 系统
  • client - 用于高级别请求/响应和通知;
  • broker - 异步消息传递,其实就是一个消息队列系统;
  • config - 用于动态配置的。配置是一个接口, 用于从任意数量的源进行动态配置加载,其实就是一个配置进程/中心;
  • codec - 用于消息编码的(序列化和反序列化)。编解码器用于编码和解码消息, 然后再通过导线传输消息. 数据可能是 json, protobuf, beson, msgpack 等。
  • registry - 服务发现的注册表。注册表提供一种服务发现机制, 用于将名称解析为地址. 它可以由 consul, etcd zookeeper, dns, gossip 等支持. 服务应在启动时使用注册表进行注册, 并在关闭时取消注册。
  • selector - 用于负载平衡。选择器是一个负载平衡抽象, 它建立在注册表上。 户在发出请求时利用选择器. 客户端将使用选择器而不是注册表, 因为它提供了内置的负载平衡机制.
  • store - 用于数据存储。存储是一个简单的键值存储接口, 用于抽象掉轻量级数据存储,仅用于保存简单的状态信息,比如用户的验证状态。
  • transport - 用于同步通信。传输是服务之间同步请求/响应通信的接口. 它类似于 golang 网络包, 但提供了一个更高级别的抽象, 允许我们切换通信机制

4.png

Go Micro的接口间的关系如上图所示,每个接口都支持业界流行的开源方案,具体如下:

5.png

go-micro最重要的是service的创建,另外负责服务发现的
register以及负责异步通信的broker都是值得掌握的部分,下面就将介绍go-micro的这三个部分。

新建一个RPC服务(基于micro v2)

先安装相关组件

  1. go get github.com/micro/micro/v3/cmd/protoc-gen-micro@master
  2. go get github.com/micro/micro/v2

我们先定义一个简单的protobuf协议文件greet.proto,定义的服务名称为Greet。

  1. syntax = "proto3"; // 指定proto版本
  2. // 指定golang包名
  3. option go_package = "pb/proto_demo";
  4. service Greeter {
  5. rpc Greet(Request) returns (Response) {}
  6. }
  7. message Request {
  8. string name = 1;
  9. }
  10. message Response {
  11. string msg = 1;
  12. }

安装之后,利用protoc-gen-go和protoc-gen-micro生成协议go文件。这次一共会生成两个协议文件:greet.pb.go和greet.pb.micro.go。

protoc --micro_out=. --go_out=. ./greet.proto

着重观察下生成的micro版的go协议文件greet.pb.micro.go

// Code generated by protoc-gen-micro. DO NOT EDIT.
// source: greet.proto

package proto_demo

import (
    fmt "fmt"
    proto "github.com/golang/protobuf/proto"
    math "math"
)

import (
    context "context"
    client "github.com/micro/go-micro/v2/client"
     server "github.com/micro/go-micro/v2/server"
    api "github.com/micro/micro/v3/service/api"
)

// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf

// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package

// Reference imports to suppress errors if they are not otherwise used.
var _ api.Endpoint
var _ context.Context
var _ client.Option
var _ server.Option

// Api Endpoints for Greeter service

func NewGreeterEndpoints() []*api.Endpoint {
    return []*api.Endpoint{}
}

// Client API for Greeter service

type GreeterService interface {
    Greet(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error)
}

type greeterService struct {
    c    client.Client
    name string
}

func NewGreeterService(name string, c client.Client) GreeterService {
    return &greeterService{
        c:    c,
        name: name,
    }
}

func (c *greeterService) Greet(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error) {
    req := c.c.NewRequest(c.name, "Greeter.Greet", in)
    out := new(Response)
    err := c.c.Call(ctx, req, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

// Server API for Greeter service

type GreeterHandler interface {
    Greet(context.Context, *Request, *Response) error
}

func RegisterGreeterHandler(s server.Server, hdlr GreeterHandler, opts ...server.HandlerOption) error {
    type greeter interface {
        Greet(ctx context.Context, in *Request, out *Response) error
    }
    type Greeter struct {
        greeter
    }
    h := &greeterHandler{hdlr}
    return s.Handle(s.NewHandler(&Greeter{h}, opts...))
}

type greeterHandler struct {
    GreeterHandler
}

func (h *greeterHandler) Greet(ctx context.Context, in *Request, out *Response) error {
    return h.GreeterHandler.Greet(ctx, in, out)
}

这个文件里定义了服务结构体GreeterService,RPC函数实现Greet。

现在先编写服务端代码micro_server/main.go

package main

import (
    "context"
    "fmt"

    micro "github.com/micro/go-micro/v2"
    proto "web_demo/proto/pb/proto_demo"
)

type Greeter struct{}

func (g *Greeter) Greet(ctx context.Context, req *proto.Request, rsp *proto.Response) error {
    rsp.Msg = "Greet " + req.Name
    return nil
}

func main() {
    // 创建一个新服务
    service := micro.NewService(
        micro.Name("greeter"),
    )

    // 服务初始化
    service.Init()

    // 注册 handler处理函数
    proto.RegisterGreeterHandler(service.Server(), new(Greeter))

    // 启动服务
    if err := service.Run(); err != nil {
        fmt.Println(err)
    }
}

编写客户端micro_client/main.go,客户端向服务器发起RPC调用。

package main

import (
    "context"
    "fmt"

    micro "github.com/micro/go-micro/v2"
    proto  "web_demo/proto/pb/proto_demo"
)

func main() {
    // 创建新服务
    service := micro.NewService(micro.Name("greeter.client"))
    // 服务初始化
    service.Init()

    // 创建RPC的客户端实例
    greeter := proto.NewGreeterService("greeter", service.Client())

    // 发起RPC调用
    rsp, err := greeter.Greet(context.TODO(), &proto.Request{Name: "John"})
    if err != nil {
        fmt.Println(err)
    }

    // 打印返回值
    fmt.Println(rsp.Msg)
}

先启动server

unshideMacBook-Pro:micro_server junshili$ go run main.go 
2021-05-07 00:50:11  file=v2@v2.9.1/service.go:200 level=info Starting [service] greeter
2021-05-07 00:50:11  file=grpc/grpc.go:864 level=info Server [grpc] Listening on [::]:56014
2021-05-07 00:50:11  file=grpc/grpc.go:697 level=info Registry [mdns] Registering node: greeter-147bbaba-7f7e-46af-bb5e-81d311da0d1a

当然,我们也可以指定端口启动服务,这个参数传入的端口是优先于代码设置的

 go run main.go --server_address :8088

我们可以从服务器启动时打印的这些日志得到一些信息:

  1. RPC通信框架用的是gRPC,这是go-mico默认的;
  2. 我们启动的这个服务的名字叫greeter;
  3. 服务监听的端口是56014;
  4. 服务发现用的组件是mdns,这是go-mirco默认的;

我们也可以指定端口号来启动服务

junshideMacBook-Pro:micro_server junshili$ go run main.go --server_address :8088
2021-05-07 01:38:25  file=v2@v2.9.1/service.go:200 level=info Starting [service] greeter
2021-05-07 01:38:25  file=grpc/grpc.go:864 level=info Server [grpc] Listening on [::]:8088

再启动client发起RPC调用

junshideMacBook-Pro:micro_client junshili$ go run main.go 
Greet John

当我kill掉server时,会打印以下日志,表明:1.服务从服务发现组件注销注册了;2.Broker和该服务断开了连接。

2021-05-07 01:32:38  file=grpc/grpc.go:791 level=info Deregistering node: greeter-5e32cdd2-4099-4ee9-b2df-bdeee2cd5ff4
2021-05-07 01:32:38  file=grpc/grpc.go:959 level=info Broker [http] Disconnected from 127.0.0.1:0

注意,使用micro/v2时,protoc3生成micro.protoc文件可能会导致版本冲突,在go run main.go时会报错:

使用micro/v2时,protoc3生成micro.protoc文件导致的版本冲突
cannot use service.Server() (type
“github.com/micro/go-micro/v2/server”.Server) as type
“github.com/micro/micro/v3/service/server”

解决方案:
可将生成的*.pb.micro.go文件中的v3依赖改为v2依赖即可

import (     
     context "context"     
    client "github.com/micro/go-micro/v2/client"
     server "github.com/micro/go-micro/v2/server"
    api "github.com/micro/micro/v3/service/api" 
 )

新建服务(基于micro v3)

现在go micro已经推出了V3版本,因为V3和V2版本有较多的不同,且存在兼容性不足的问题,如果V2和V3混用的话,会有较多难以解决的兼容性问题,因此这里强烈推荐使用V3作为go micro的实战版本,会少走很多弯路。下面的所有例子都是基于V3的实践。

首先给出官方的上手文档:https://micro.mu/getting-started

首先安装/升级自己的micro版本至V3

go get github.com/micro/micro/v3

安装完之后,启动micro相关的服务进程

junshideMacBook-Pro:~ junshili$ micro server
2021-05-09 00:32:01  file=server/server.go:86 level=info Starting server
2021-05-09 00:32:01  file=server/server.go:114 level=info Registering network
2021-05-09 00:32:01  file=server/server.go:114 level=info Registering runtime
2021-05-09 00:32:01  file=server/server.go:114 level=info Registering registry
2021-05-09 00:32:01  file=server/server.go:114 level=info Registering config
2021-05-09 00:32:01  file=server/server.go:114 level=info Registering store
2021-05-09 00:32:01  file=server/server.go:114 level=info Registering broker
2021-05-09 00:32:01  file=server/server.go:114 level=info Registering events
2021-05-09 00:32:01  file=server/server.go:114 level=info Registering auth
2021-05-09 00:32:01  file=server/server.go:114 level=info Registering proxy
2021-05-09 00:32:01  file=server/server.go:114 level=info Registering api
2021-05-09 00:32:01  file=server/server.go:201 level=info Starting server runtime
2021-05-09 00:32:01  file=service/service.go:195 level=info Starting [service] server
2021-05-09 00:32:01  file=grpc/grpc.go:939 level=info Server [grpc] Listening on [::]:10001
2021-05-09 00:32:01  file=grpc/grpc.go:769 level=info Registry [mdns] Registering node: server-fc07e464-99c4-406d-b608-b54ef3c9bdde

可以注意到,registry,auth等Micro组件都启动起来了。与V2版本不一样的是,接下来需要登录账号,做身份验证。这一步很重要,不然没有登录,后续操作micro会提示权限不足。username固定为admin,password固定为micro。

$ micro login
Enter username: admin
Enter password:
Successfully logged in.

查看micro框架下跑着哪些服务,可以看到这些服务都是micro框架自带的,在我们执行micro server时启动。

junshideMacBook-Pro:~ junshili$ micro services
api
auth
broker
config
events
network
proxy
registry
runtime
server
store

现在我们打算启动一个自己的服务,micro v3提供了一个非常好用的服务生成工具,可以帮我们直接生成服务模板,我们只需要在模板上增添自己的内容就可以了,生产效率大幅提升。使用micro new 服务名就可以生成模板代码。服务名不要使用下划线

junshideMacBook-Pro:~ junshili$ micro new hellodemo
Creating service hellodemo

.
├── micro.mu
├── main.go
├── generate.go
├── handler
│   └── hellodemo.go
├── proto
│   └── hellodemo.proto
├── Dockerfile
├── Makefile
├── README.md
├── .gitignore
└── go.mod

进入项目目录,编译协议

junshideMacBook-Pro:hellodemo junshili$ make proto
protoc --proto_path=. --micro_out=. --go_out=:. proto/hellodemo.proto

在proto目录下自动生成了pb.go和.pb.micro.go的文件。

../hellodemo/
├── Dockerfile
├── Makefile
├── README.md
├── generate.go
├── go.mod
├── handler
│   └── hellodemo.go
├── main.go
├── micro.mu
└── proto
    ├── hellodemo.pb.go
    ├── hellodemo.pb.micro.go
    └── hellodemo.proto

启动我们的服务

junshideMacBook-Pro:hellodemo junshili$ micro run .

查看我们的服务是否正常启动

junshideMacBook-Pro:hellodemo junshili$ micro services
api
auth
broker
config
events
hellodemo
network
proxy
registry
runtime
server
store

junshideMacBook-Pro:hellodemo junshili$ micro status
NAME        VERSION    SOURCE                STATUS    BUILD    UPDATED        METADATA
hellodemo    latest    /Users/junshili/hellodemo    running    n/a    1m58s ago    owner=admin, group=micro

可以看到,我们的服务正常运行在micro框架之内。现在我们要基于这个服务模板做点自己的修改。

现在协议文件上增加rpc 函数和消息结构体。

service Hellodemo {
    rpc Call(Request) returns (Response) {}
    rpc Stream(StreamingRequest) returns (stream StreamingResponse) {}
    rpc PingPong(stream Ping) returns (stream Pong) {}
    rpc Greet(GreetReq) returns (GreetRsp) {}
}

message GreetReq {
    string name = 1;
    int32 age = 2;
}

message GreetRsp {
    string msg = 1;
    int32 status = 2;
}

再到在handler/hellodemo.go这个文件里,加一个自己的Greet的实现。

func (e *Hellodemo) Greet(ctx context.Context, req *hellodemo.GreetReq, rsp *hellodemo.GreetRsp) error {
    log.Info("Received Hellodemo.Greet request")
    rsp.Msg = fmt.Sprintf("Greet %s, your age is %d", req.Name, req.Age) 
    rsp.Status = 200
    return nil
}

我们的功能代码已经完成编写,准备把服务更新一下,因为服务已经在run了,所以我们直接使用micro update .把服务更出去,类似于我们常用的热更,不停服更新。
然后用micro logs hellodemo看下输出日志是否正常。确认启动正常后,我们可以使用micro框架cli给该服务直接发送请求,测试服务可用性,而无须再写测试client。

先用help指令查看hellodemo对外提供了哪些可调用的方法。

junshideMacBook-Pro:hellodemo junshili$ micro  hellodemo  --help
NAME:
    micro hellodemo

VERSION:
    latest

USAGE:
    micro hellodemo [command]

COMMANDS:
    call
    greet
    pingPong
    stream

我们测试下call和greet方法。

junshideMacBook-Pro:hellodemo junshili$ micro  hellodemo  call  --name=James
{
    "msg": "Hello James"
}
junshideMacBook-Pro:hellodemo junshili$ micro  hellodemo  greet --name=James --age=20
{
    "msg": "Greet James, your age is 20",
    "status": 200
}

我们同样可以通过http的方式访问请求RPC,中介是micro的api服务,请求会通过api服务,再调到我们制定的服务。默认API的address是127.0.0.1:8080。

junshideMacBook-Pro:helloworld junshili$ curl -XPOST --header "Content-Type: application/json" -d '{"name":"Joe", "age":30}' http://127.0.0.1:8080/hellodemo/greet
{"msg":"Greet Joe, your age is 30","status":200}

搭建Http服务

上面介绍了搭建RPC服务的流程,如果要搭建HTTP服务,其实跟上面流程一样,区别只在于在main.go中调用http相关相关处理即可,比如我们使用gin来实现我们的http服务。

第一步先micro new helloweb,创建服务模板,然后我们之间在main.go添加http处理的相关代码:


package main

import (

    "net/http"

    "github.com/gin-gonic/gin"

)

func HelloWeb(c *gin.Context) {
    c.String(http.StatusOK, "Hello, Go\n")
}

func HiWeb(c *gin.Context) {
    c.String(http.StatusOK, "Hi, Go\n")
}

func main() {
    // 使用gin作为路由
    r := gin.Default()
    r.GET("/hello", func(c *gin.Context) {
        HelloWeb(c)
    })
    r.POST("/hi", func(c *gin.Context) {
        HiWeb(c)
    })

    r.Run(":9999") //  listen and serve on 0.0.0.0:9999
}

请求和响应如下,如果想要构建更复杂的http项目,请参考我的上一篇文章:Go快速上手—Web服务器篇.

junshideMacBook-Pro:blog junshili$ curl -X POST http://127.0.0.1:9999/hi
Hi, Go
junshideMacBook-Pro:blog junshili$ curl http://127.0.0.1:9999/hello
Hello, Go

需要kill掉这个服务,需要使用

micro kill example

发布订阅(基于消息队列的异步通信)

发布订阅模式在后台服务中广泛使用,go micro框架中,Broker服务就是用于支持发布订阅模式(pub/sub)。发布订阅模式,我们一般使用消息队列作为中间件实现异步通信,从而做到系统解耦、削峰稳流。在go micro框架中,Broker就是负责消息队列这个功能,消息生产者负责生产消息,推送给Broker,消息消费者server向Broker订阅指定类型的消息(我们称为topic),即Broker注册自己的消息处理,一旦有消息到来,则调用响应的消息处理逻辑。Micro的Broker默认使用了Nats作为消息队列,同时他也支持业界常用的消息队列,比如Kafka,RabbitMQ等,因此我们可以根据我们的需求选择消息队列组件作为Broker的底层支持。

Micro内置的Pub/Sub模式很简单易用,用户只需要定义好publisher, subscriber以及消息内容,其他工作都将由框架实现。这里给出micor pub/sub的实践案例。

Broker、publisher、subscriber三者通信的消息结构体为Message,Message结构体的定义如下。

type Message struct {
    Header map[string]string
    Body   []byte
}

消息订阅者的实现消息订阅如下:

  1. subscrber先连接上broker,broker.Connect()
  2. 对指定的topic进行注册,broker.Subscribe。特殊地,如果是点对点通信,那需要在,broker.Subscribe的第三个参数加上broker.Queue(topic)。如果是发布订阅模式则不需要。
  3. 实现消息处理函数,传入broker.Subscribe第二个参数。
package main

import (
    "fmt"
    "github.com/micro/micro/v3/service"
    "github.com/micro/micro/v3/service/logger"
    "github.com/micro/go-micro/broker"
)

var topic1 = "topic1"
var topic2 = "topic2"

func handleEvent(b broker.Event) error {
    msg := string(b.Message().Body)
    logger.Infof("[sub] recieve message: %s, header: %s\n", msg, b.Message().Header)
    return nil
}

// 点对点消息通信,消息不会复制,只会被一个消费者消费
func subTopicQ(topic string) {
    _, err := broker.Subscribe(topic, handleEvent, broker.Queue(topic))

    if err != nil {
        fmt.Println(err)
    }
}

// 发布订阅模式消息通信,消息会复制,可以被多个消费者消费
func subTopic(topic string) {
    _, err := broker.Subscribe(topic, handleEvent)

    if err != nil {
        fmt.Println(err)
    }
}

func main() {
    // Create service
    srv := service.New(
        service.Name("hellosubscriber"),
        service.Version("latest"),
    )

    srv.Init()

    if err := broker.Connect(); err != nil {
        logger.Error("Broker Connect error: ", err)
    }

    subTopic(topic1)
    subTopicQ(topic2)

    // Run service
    if err := srv.Run(); err != nil {
        logger.Fatal(err)
    }
}

消息发布的实现消息发布的步骤如下:

  1. publisher先连接上broker,broker.Connect()
  2. broker.Publish(topic, msg) 直接对指定topic发送消息。
package main

import (
    "fmt"
    "github.com/micro/micro/v3/service"
    "github.com/micro/micro/v3/service/logger"
    "github.com/micro/go-micro/broker"
    "time"
    "github.com/pborman/uuid"
)

var topic1 = "topic1"
var topic2 = "topic2"

func handleEvent(b broker.Event) error {
    msg := string(b.Message().Body)
    logger.Infof("[sub] recieve message: %s, header: %s\n", msg, b.Message().Header)
    return nil
}

// 发布消息
func pubTopic(topic string) {
    for range time.Tick(time.Second) {
        msg := &broker.Message {
            Header: map[string]string {
                "id": uuid.NewUUID().String(),
            },
            Body: []byte(fmt.Sprintf("Messaging you all day on %s", topic)),
        }

        if err := broker.Publish(topic, msg); err != nil {
            logger.Error("Broker Publish error: ", err)
        } else {
            logger.Infof("Broker Publish topic:%s msg: %s", topic, msg)
        }
    }
}

func main() {
    // Create service
    srv := service.New(
        service.Name("hellopublisher"),
        service.Version("latest"),
    )

    srv.Init()

    if err := broker.Connect(); err != nil {
        logger.Error("Broker Connect error: ", err)
    }

    go pubTopic(topic1)
    go pubTopic(topic2)

    // Run service
    if err := srv.Run(); err != nil {
        logger.Fatal(err)
    }
}

实践过程:

  1. 订阅服务,我起了两个,命名为sub1,sub2
  2. 发布服务,我起了pub,pub里两个协程向topic1和topic2发送消息
micro run --name sub1 .
micro run --name sub2 .
micro run --name pub .

unshideMacBook-Pro:hellosubcriber junshili$ micro status 
NAME    VERSION    SOURCE                STATUS    BUILD    UPDATED        METADATA
pub    latest    /Users/junshili/hellopublisher    running    n/a    1m48s ago    owner=admin, group=micro
sub1    latest    /Users/junshili/hellosubcriber    running    n/a    13s ago        owner=admin, group=micro
sub2    latest    /Users/junshili/hellosubcriber    running    n/a    6s ago        owner=admin, group=micro




junshideMacBook-Pro:hellosubcriber junshili$ micro stats --all custom
SERVICE    hellopublisher

VERSION    latest

NODE                                                   ADDRESS:PORT           STARTED            UPTIME    MEMORY    THREADS    GC
hellopublisher-a5c3156e-cb26-40af-b899-092195a49fd4    192.168.1.101:54250    May 13 00:46:28    2m5s      2.88mb    43         10.670273ms

SERVICE    hellosubscriber

VERSION    latest

NODE                                                    ADDRESS:PORT           STARTED            UPTIME    MEMORY    THREADS    GC
hellosubscriber-7035f6b4-ebd8-4dbf-bb10-eb49cf7ca34c    192.168.1.101:54578    May 13 00:48:13    21s       2.54mb    33         114.399µs

hellosubscriber-aa456e83-c035-43dc-9a71-a459c2c058c3    192.168.1.101:54572    May 13 00:48:13    21s    2.63mb    33    121.528µs

我们使用micro logs -f helloworld 或者 micro logs helloworld查看服务输出的日志。

先观察pub服务的日志,pub在向发送topic1和topic2消息

2021-05-13 00:46:29  file=blob-940620946/main.go:34 level=info Broker Publish topic:topic2 msg: &{map[id:99b30102-b341-11eb-b5c8-acde48001122] Messaging you all day on topic2}
2021-05-13 00:46:29  file=blob-940620946/main.go:34 level=info Broker Publish topic:topic1 msg: &{map[id:99b30184-b341-11eb-b5c8-acde48001122] Messaging you all day on topic1}
2021-05-13 00:46:30  file=blob-940620946/main.go:34 level=info Broker Publish topic:topic2 msg: &{map[id:9a4af598-b341-11eb-b5c8-acde48001122] Messaging you all day on topic2}
2021-05-13 00:46:30  file=blob-940620946/main.go:34 level=info Broker Publish topic:topic1 msg: &{map[id:9a4af692-b341-11eb-b5c8-acde48001122] Messaging you all day on topic1}
2021-05-13 00:46:31  file=blob-940620946/main.go:34 level=info Broker Publish topic:topic1 msg: &{map[id:9ae409d6-b341-11eb-b5c8-acde48001122] Messaging you all day on topic1}
2021-05-13 00:46:31  file=blob-940620946/main.go:34 level=info Broker Publish topic:topic2 msg: &{map[id:9ae40ae4-b341-11eb-b5c8-acde48001122] Messaging you all day on topic2}
2021-05-13 00:46:32  file=blob-940620946/main.go:34 level=info Broker Publish topic:topic2 msg: &{map[id:9b7ca894-b341-11eb-b5c8-acde48001122] Messaging you all day on topic2}
2021-05-13 00:46:32  file=blob-940620946/main.go:34 level=info Broker Publish topic:topic1 msg: &{map[id:9b7ca7ea-b341-11eb-b5c8-acde48001122] Messaging you all day on topic1}
2021-05-13 00:46:33  file=blob-940620946/main.go:34 level=info Broker Publish topic:topic1 msg: &{map[id:9c152dc6-b341-11eb-b5c8-acde48001122] Messaging you all day on topic1}
2021-05-13 00:46:33  file=blob-940620946/main.go:34 level=info Broker Publish topic:topic2 msg: &{map[id:9c152a88-b341-11eb-b5c8-acde48001122] Messaging you all day on topic2}

观察sub1的日志

2021-05-13 00:50:14  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:1fcede3c-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:14  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:1fcedfb8-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:15  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:2067d614-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:15  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:2067d876-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:16  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:20ffd928-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:16  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:20ffd6c6-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:17  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:2198c3d6-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:17  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:2198c2e6-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:18  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:22317f90-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:18  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:22318634-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:19  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:22ca283a-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:19  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:22ca25b0-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:20  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:23628e5e-b342-11eb-b5c8-acde48001122]

观察sub2的日志

2021-05-13 00:50:14  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:1fcedfb8-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:15  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:2067d876-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:16  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:20ffd6c6-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:17  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:2198c2e6-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:18  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:22318634-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:19  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:22ca283a-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:20  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:23628d1e-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:20  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:23628e5e-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:21  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:23fb6cc8-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:21  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:23fb6d90-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:22  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:24940438-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:22  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:2494015e-b342-11eb-b5c8-acde48001122]

对比sub1和sub2的日志可以看出,sub1和sub2都能收到topic1的消息,证实topic1可以被多个订阅者消费,符合发布订阅模式。至于topic2,只能被一个消费者消费,不存在一条消息被多个消费者消费的情况,对应的是点对点消息队列通信机制。因此,需要区分这两种通信机制的go micro写法。

go micro默认的消息队列组件是自己实现的,实际上其实就是一个map,topic是map的key,发布消息时就往map里存,然后broker调度给订阅了该topic的服务推送。生产环境请更换为kafka,rabbitmq这类专用消息队列。

https://github.com/micro/micro/blob/master/service/broker/memory/memory.go

type memoryBroker struct {
    opts broker.Options

    addr string
    sync.RWMutex
    connected   bool
    Subscribers map[string][]*memorySubscriber
}

type memorySubscriber struct {
    id      string
    topic   string
    exit    chan bool
    handler broker.Handler
    opts    broker.SubscribeOptions
}

go micro broker支持的消息队列在以下链接可以获取:
https://github.com/microhq/go-plugins/tree/master/broker