简介

上一篇文章中我们介绍了 Go 标准库net/rpc的用法。在默认情况下,rpc库内部使用gob格式传输数据。我们仿造gob的编解码器实现了一个json格式的。实际上标准库net/rpc/jsonrcp中已有实现。本文是对上一篇文章的补充。

快速使用

标准库无需安装。

首先是服务端,使用net/rpc/jsonrpc之后,我们就不用自己去编写json的编解码器了:

  1. package main
  2. import (
  3. "log"
  4. "net"
  5. "net/rpc"
  6. "net/rpc/jsonrpc"
  7. )
  8. type Args struct {
  9. A, B int
  10. }
  11. type Arith int
  12. func (t *Arith) Multiply(args *Args, reply *int) error {
  13. *reply = args.A * args.B
  14. return nil
  15. }
  16. func main() {
  17. l, err := net.Listen("tcp", ":1234")
  18. if err != nil {
  19. log.Fatal("listen error:", err)
  20. }
  21. arith := new(Arith)
  22. rpc.Register(arith)
  23. for {
  24. conn, err := l.Accept()
  25. if err != nil {
  26. log.Fatal("accept error:", err)
  27. }
  28. // 注意这一行
  29. go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
  30. }
  31. }

直接调用jsonrpc.NewServerCodec(conn)创建一个服务端的codec。客户端也是类似的:

  1. func main() {
  2. conn, err := net.Dial("tcp", ":1234")
  3. if err != nil {
  4. log.Fatal("dial error:", err)
  5. }
  6. // 这里,这里😁
  7. client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
  8. args := &Args{7, 8}
  9. var reply int
  10. err = client.Call("Arith.Multiply", args, &reply)
  11. if err != nil {
  12. log.Fatal("Multiply error:", err)
  13. }
  14. fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply)
  15. }

先运行服务端程序:

  1. $ go run main.go

然后在一个新的控制台中运行客户端程序:

  1. $ go run client.go
  2. Multiply: 7*8=56

下面这段代码基本上每个使用jsonrpc的程序都要编写:

  1. conn, err := net.Dial("tcp", ":1234")
  2. if err != nil {
  3. log.Fatal("dial error:", err)
  4. }
  5. client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))

因此jsonrpc为了方便直接提供了一个Dial方法。使用Dial简化上面的客户端程序:

  1. func main() {
  2. client, err := jsonrpc.Dial("tcp", ":1234")
  3. if err != nil {
  4. log.Fatal("dial error:", err)
  5. }
  6. args := &Args{7, 8}
  7. var reply int
  8. err = client.Call("Arith.Multiply", args, &reply)
  9. if err != nil {
  10. log.Fatal("Multiply error:", err)
  11. }
  12. fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply)
  13. }

效果是一样的。

JSON-RPC 标准

JSON-RPC 1.0 标准在 2005 年发布,经过数年演化,于 2010 年发布了 2.0 版本。JSON-RPC 标准的内容可在https://www.jsonrpc.org/specification查看。Go 标准库net/rpc/jsonrpc实现了 1.0 版本。关于 2.0 版本的实现可以在pkg.go.dev上搜索json-rpc+2.0。本文以 1.0 版本为基础进行介绍。

JSON-RPC 传输的是单一的对象,序列化为 JSON 格式。请求对象包含以下 3 个属性:

  • method:请求调用的方法;
  • params:一个数组表示传给方法的各个参数;
  • id:请求 ID。ID 可以是任何类型,在收到响应时根据这个属性判断对应哪个请求。

响应对象包含以下 3 个属性:

  • result:方法返回的对象,如果error非空时,该属性必须为null
  • error:表示调用是否出错;
  • id:对应请求的 ID。

另外标准还定义了一种通知类型,除了id属性为null之外,通知对象的属性与请求对象完全一样。

调用client.Call("echo", "Hello JSON-RPC", &reply)时:

  1. 请求:{ "method": "echo", "params": ["Hello JSON-RPC"], "id": 1}
  2. 响应:{ "result": "Hello JSON-RPC", "error": null, "id": 1}

使用 zookeeper 实现简单的负载均衡

下面我们使用zookeeper实现一个简单的客户端侧的负载均衡。zookeeper中记录所有的可提供服务的服务器,客户端每次请求时都随机挑选一个。我们的示例中,请求必须是无状态的。首先,我们改造一下服务端程序,将监听地址提取出来,通过flag指定:

  1. package main
  2. import (
  3. "flag"
  4. "log"
  5. "net"
  6. "net/rpc"
  7. "net/rpc/jsonrpc"
  8. )
  9. var (
  10. addr *string
  11. )
  12. type Args struct {
  13. A, B int
  14. }
  15. type Arith int
  16. func (t *Arith) Multiply(args *Args, reply *int) error {
  17. *reply = args.A * args.B
  18. return nil
  19. }
  20. func init() {
  21. addr = flag.String("addr", ":1111", "addr to listen")
  22. }
  23. func main() {
  24. flag.Parse()
  25. l, err := net.Listen("tcp", *addr)
  26. if err != nil {
  27. log.Fatal("listen error:", err)
  28. }
  29. arith := new(Arith)
  30. rpc.Register(arith)
  31. for {
  32. conn, err := l.Accept()
  33. if err != nil {
  34. log.Fatal("accept error:", err)
  35. }
  36. go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
  37. }
  38. }

关于有哪些服务器可用,我们存储在zookeeper中。

首先要启动一个zookeeper的程序。在 Apache Zookeeper 官网可以下载能直接运行的 Windows 程序。下载之后解压,将conf文件夹中的样板配置zoo_sample.cfg复制一份,文件名改为zoo.cfg。在编辑器中打开zoo.cfg,将dataDir改为一个已存在的目录,或创建一个新目录。我在bin同级目录中创建了一个data目录,然后设置dataDir=../data。切换到bin目录下执行zkServer.batzookeeper程序就运行起来了。使用zkClient.bat连接上这个zookeeper,增加一个节点,设置数据:

  1. $ create /rpcserver
  2. $ set /rpcserver 127.0.0.1:1111,127.0.0.1:1112,127.0.0.1:1113

我们用,分隔多个服务器地址。

准备工作完成后,接下来就开始编写客户端代码了。我们实现一个代理类,负责监听zookeeper的数据变化,根据zookeeper中新的地址创建到服务器的连接,删除老的连接,将调用请求随机转发到一个服务器处理:

  1. type Proxy struct {
  2. zookeeper string
  3. clients map[string]*rpc.Client
  4. events <-chan zk.Event
  5. zookeeperConn *zk.Conn
  6. mutex sync.Mutex
  7. }
  8. func NewProxy(addr string) *Proxy {
  9. return &Proxy{
  10. zookeeper: addr,
  11. clients: make(map[string]*rpc.Client),
  12. }
  13. }

这里我们使用了go-zookeeper这个库,需要额外安装:

  1. $ go get github.com/samuel/go-zookeeper/zk

程序启动时,代理对象从zookeeper中获取服务端地址,创建连接:

  1. func (p *Proxy) Connect() {
  2. c, _, err := zk.Connect([]string{p.zookeeper}, time.Second) //*10)
  3. if err != nil {
  4. panic(err)
  5. }
  6. data, _, event, err := c.GetW("/rpcserver")
  7. if err != nil {
  8. panic(err)
  9. }
  10. p.events = event
  11. p.zookeeperConn = c
  12. p.CreateClients(string(data))
  13. }
  14. func (p *Proxy) CreateClients(server string) {
  15. p.mutex.Lock()
  16. defer p.mutex.Unlock()
  17. addrs := strings.Split(server, ",")
  18. allAddr := make(map[string]struct{})
  19. for _, addr := range addrs {
  20. allAddr[addr] = struct{}{}
  21. if _, exist := p.clients[addr]; exist {
  22. continue
  23. }
  24. client, err := jsonrpc.Dial("tcp", addr)
  25. if err != nil {
  26. log.Println("jsonrpc Dial error:", err)
  27. continue
  28. }
  29. p.clients[addr] = client
  30. log.Println("new addr:", addr)
  31. }
  32. for addr := range p.clients {
  33. if _, exist := allAddr[addr]; !exist {
  34. // 不在 zookeeper 中的地址,删除对应连接
  35. oldClient.Close()
  36. delete(p.clients, addr)
  37. log.Println("delete addr", addr)
  38. }
  39. }
  40. }

同时,需要监听zookeeper中的数据变化,当新增或删除某个服务端地址时,Proxy要及时更新连接:

  1. func (p *Proxy) Run() {
  2. for {
  3. select {
  4. case event := <-p.events:
  5. if event.Type == zk.EventNodeDataChanged {
  6. data, _, err := p.zookeeperConn.Get("/rpcserver")
  7. if err != nil {
  8. log.Println("get zookeeper data failed:", err)
  9. continue
  10. }
  11. p.CreateClients(string(data))
  12. }
  13. }
  14. }
  15. }

客户端主体程序使用Proxy结构非常方便:

  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "math/rand"
  6. )
  7. var (
  8. zookeeperAddr *string
  9. )
  10. func init() {
  11. zookeeperAddr = flag.String("addr", ":2181", "zookeeper address")
  12. }
  13. type Args struct {
  14. A, B int
  15. }
  16. func main() {
  17. flag.Parse()
  18. fmt.Println(*zookeeperAddr)
  19. p := NewProxy(*zookeeperAddr)
  20. p.Connect()
  21. go p.Run()
  22. for i := 0; i < 10; i++ {
  23. var reply int
  24. args := &Args{rand.Intn(1000), rand.Intn(1000)}
  25. p.Call("Arith.Multiply", args, &reply)
  26. fmt.Printf("%d*%d=%d\n", args.A, args.B, reply)
  27. }
  28. // sleep 过程中可以修改 zookeeper 中的数据
  29. time.Sleep(1 * time.Minute)
  30. // 使用新的地址做随机
  31. for i := 0; i < 100; i++ {
  32. var reply int
  33. args := &Args{rand.Intn(1000), rand.Intn(1000)}
  34. p.Call("Arith.Multiply", args, &reply)
  35. fmt.Printf("%d*%d=%d\n", args.A, args.B, reply)
  36. }
  37. }

创建一个代理对象,在一个新的 goroutine 中监听zookeeper事件。然后通过ProxyCall调用远程服务端的方法:

  1. func (p *Proxy) Call(method string, args interface{}, reply interface{}) error {
  2. var client *rpc.Client
  3. var addr string
  4. idx := rand.Int31n(int32(len(p.clients)))
  5. var i int32
  6. p.mutex.Lock()
  7. for a, c := range p.clients {
  8. if i == idx {
  9. client = c
  10. addr = a
  11. break
  12. }
  13. i++
  14. }
  15. p.mutex.Unlock()
  16. fmt.Println("use", addr)
  17. return client.Call(method, args, reply)
  18. }

首先我们要启动 3 个服务端程序,分别监听端口 1111、1112、1113,需要 3 个控制台:

控制台 1:

  1. $ go run main.go -addr :1111

控制台 2:

  1. $ go run main.go -addr :1112

控制台 3:

  1. $ go run main.go -addr :1113

客户端在一个新的控制台启动,指定zookeeper地址:

  1. $ go run . -addr=127.0.0.1:2181

在输出中,我们可以看到是怎么随机挑选服务器的。

我们可以尝试在客户端程序运行的过程中,将某个服务器地址从zookeeper中删除。我特意在程序中加了一个 1 分钟的延迟。在sleep过程中,通过zkClient.cmd127.0.0.1:1113这个地址从zookeeper中删除:

  1. $ set /rpcserver 127.0.0.1:1111,127.0.0.1:1112

控制台输出:

  1. $ 2020/05/10 23:47:47 delete addr 127.0.0.1:1113

并且后续的请求不会再发到127.0.0.1:1113这个服务器了。

其实,在实际的项目中,Proxy一般是一个独立的服务器,而不是放在客户端侧。上面示例这样处理只是为了方便。

总结

RPC 底层可以使用各种协议传输数据,JSON/XML/Protobuf 都可以。对 rpc 感兴趣的建议看看rpcx这个库,https://github.com/smallnest/rpcx。非常强大!

大家如果发现好玩、好用的 Go 语言库,欢迎到 Go 每日一库 GitHub 上提交 issue😄

参考

  1. jsonrpc GitHub:https://golang.org/pkg/net/rpc/jsonrpc/
  2. Go 每日一库 GitHub:https://github.com/go-quiz/go-daily-lib