简介

在之前的两篇文章rpcjson-rpc中,我们介绍了 Go 标准库提供的rpc实现。在实际开发中,rpc库的功能还是有所欠缺。今天我们介绍一个非常优秀的 Go RPC 库——rpcxrpcx是一位国人大牛开发的,详细开发历程可以在rpcx官方博客了解。rpcx拥有媲美,甚至某种程度上超越gRPC的性能,有完善的中文文档,提供服务发现和治理的插件。

快速使用

本文示例使用go modules

首先是安装:

  1. $ go get -v -tags "reuseport quic kcp zookeeper etcd consul ping" github.com/smallnest/rpcx/...

可以看出rpcx的安装有点特殊。使用go get -v github.com/smallnest/rpcx/...命令只会安装rpcx的基础功能。扩展功能都是通过build tags指定。为了使用方便,一般安装所有的tags,如上面命令所示。这也是官方推荐的安装方式。

我们先编写服务端程序,实际上这个程序与用rpc标准库编写的程序几乎一模一样:

  1. package main
  2. import (
  3. "context"
  4. "errors"
  5. "github.com/smallnest/rpcx/server"
  6. )
  7. type Args struct {
  8. A, B int
  9. }
  10. type Quotient struct {
  11. Quo, Rem int
  12. }
  13. type Arith int
  14. func (t *Arith) Mul(cxt context.Context, args *Args, reply *int) error {
  15. *reply = args.A * args.B
  16. return nil
  17. }
  18. func (t *Arith) Div(cxt context.Context, args *Args, quo *Quotient) error {
  19. if args.B == 0 {
  20. return errors.New("divide by 0")
  21. }
  22. quo.Quo = args.A / args.B
  23. quo.Rem = args.A % args.B
  24. return nil
  25. }
  26. func main() {
  27. s := server.NewServer()
  28. s.RegisterName("Arith", new(Arith), "")
  29. s.Serve("tcp", ":8972")
  30. }

首先创建一个Server对象,调用它的RegisterName()方法在服务路径Arith下注册MulDiv方法。与标准库相比,rpcx要求注册方法的第一个参数必须为context.Context类型。最后调用s.Serve("tcp", ":8972")监听 TCP 端口 8972。是不是很简单?启动服务器:

  1. $ go run main.go

然后是客户端程序:

  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "log"
  6. "github.com/smallnest/rpcx/client"
  7. )
  8. var (
  9. addr = flag.String("addr", ":8972", "service address")
  10. )
  11. func main() {
  12. flag.Parse()
  13. d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
  14. xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
  15. defer xclient.Close()
  16. args := &Args{A:10, B:20}
  17. var reply int
  18. err :=xclient.Call(context.Background(), "Mul", args, &reply)
  19. if err != nil {
  20. log.Fatalf("failed to call: %v", err)
  21. }
  22. fmt.Printf("%d * %d = %d\n", args.A, args.B, reply)
  23. args = &Args{50, 20}
  24. var quo Quotient
  25. err = xclient.Call(context.Background(), "Div", args, &quo)
  26. if err != nil {
  27. log.Fatalf("failed to call: %v", err)
  28. }
  29. fmt.Printf("%d * %d = %d...%d\n", args.A, args.B, quo.Quo, quo.Rem)
  30. }

rpcx支持多种服务发现的方式让客户端找到服务器。上面代码中我们使用的是最简单的点到点的方式,也就是直连。要调用服务端的方法,必须先创建一个Client对象。使用Client对象来调用远程方法。运行客户端:

  1. $ go run main.go
  2. 10 * 20 = 200
  3. 50 * 20 = 2...10

注意到,创建Client对象的参数有client.Failtryclient.RandomSelect。这两个参数分别为失败模式如何选择服务器

传输

rpcx支持多种传输协议:

  • TCP:TCP 协议,网络名称为tcp
  • HTTP:HTTP 协议,网络名称为http
  • UnixDomain:unix 域协议,网络名称为unix
  • QUIC:是 Quick UDP Internet Connections 的缩写,意为快速UDP网络连接。HTTP/3 底层就是 QUIC 协议,Google 出品。网络名称为quic
  • KCP:快速并且可靠的 ARQ 协议,网络名称为kcp

rpcx对这些协议做了非常好的封装。除了在创建服务器和客户端连接时需要指定协议名称,其它时候的使用基本是透明的。我们将上面的例子改装成使用http协议的:

服务端改动:

  1. s.Serve("http", ":8972")

客户端改动:

  1. d := client.NewPeer2PeerDiscovery("http@"+*addr, "")

QUICKCP的使用有点特殊,QUIC必须与 TLS 一起使用,KCP也需要做传输加密。使用 Go 语言我们能很方便地生成一个证书和私钥:

  1. package main
  2. import (
  3. "crypto/rand"
  4. "crypto/rsa"
  5. "crypto/x509"
  6. "crypto/x509/pkix"
  7. "encoding/pem"
  8. "math/big"
  9. "net"
  10. "os"
  11. "time"
  12. )
  13. func main() {
  14. max := new(big.Int).Lsh(big.NewInt(1), 128)
  15. serialNumber, _ := rand.Int(rand.Reader, max)
  16. subject := pkix.Name{
  17. Organization: []string{"Go Daily Lib"},
  18. OrganizationalUnit: []string{"TechBlog"},
  19. CommonName: "go daily lib",
  20. }
  21. template := x509.Certificate{
  22. SerialNumber: serialNumber,
  23. Subject: subject,
  24. NotBefore: time.Now(),
  25. NotAfter: time.Now().Add(365 * 24 * time.Hour),
  26. KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
  27. ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
  28. IPAddresses: []net.IP{net.ParseIP("127.0.0.1")},
  29. }
  30. pk, _ := rsa.GenerateKey(rand.Reader, 2048)
  31. derBytes, _ := x509.CreateCertificate(rand.Reader, &template, &template, &pk.PublicKey, pk)
  32. certOut, _ := os.Create("server.pem")
  33. pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes})
  34. certOut.Close()
  35. keyOut, _ := os.Create("server.key")
  36. pem.Encode(keyOut, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(pk)})
  37. keyOut.Close()
  38. }

上面代码生成了一个证书和私钥,有效期为 1 年。运行程序,得到两个文件server.pemserver.key。然后我们就可以编写使用QUIC协议的程序了。服务端:

  1. func main() {
  2. cert, _ := tls.LoadX509KeyPair("server.pem", "server.key")
  3. config := &tls.Config{Certificates: []tls.Certificate{cert}}
  4. s := server.NewServer(server.WithTLSConfig(config))
  5. s.RegisterName("Arith", new(Arith), "")
  6. s.Serve("quic", "localhost:8972")
  7. }

实际上就是加载证书和密钥,然后在创建Server对象时作为选项传入。客户端改动:

  1. conf := &tls.Config{
  2. InsecureSkipVerify: true,
  3. }
  4. option := client.DefaultOption
  5. option.TLSConfig = conf
  6. d := client.NewPeer2PeerDiscovery("quic@"+*addr, "")
  7. xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, option)
  8. defer xclient.Close()

客户端也需要配置 TLS。

有一点需要注意,rpcxquic/kcp这些协议的支持是通过build tags实现的。默认不会编译quic/kcp相关文件。如果要使用,必须自己手动指定tags。先启动服务端程序:

  1. $ go run -tags quic main.go

然后切换到客户端程序目录,执行下面命令:

  1. $ go run -tags quic main.go

还有一点需要注意,在使用tcphttp(底层也是tcp)协议的时候,我们可以将地址简写为:8972,因为默认就是本地地址。但是quic不行,必须把地址写完整:

  1. // 服务端
  2. s.Serve("quic", "localhost:8972")
  3. // 客户端
  4. addr = flag.String("addr", "localhost:8972", "service address")

注册函数

上面的例子都是调用对象的方法,我们也可以调用函数。函数的类型与对象方法相比只是没有接收者。注册函数需要指定一个服务路径。服务端:

  1. type Args struct {
  2. A, B int
  3. }
  4. type Quotient struct {
  5. Quo, Rem int
  6. }
  7. func Mul(cxt context.Context, args *Args, reply *int) error {
  8. *reply = args.A * args.B
  9. return nil
  10. }
  11. func Div(cxt context.Context, args *Args, quo *Quotient) error {
  12. if args.B == 0 {
  13. return errors.New("divide by 0")
  14. }
  15. quo.Quo = args.A / args.B
  16. quo.Rem = args.A % args.B
  17. return nil
  18. }
  19. func main() {
  20. s := server.NewServer()
  21. s.RegisterFunction("function", Mul, "")
  22. s.RegisterFunction("function", Div, "")
  23. s.Serve("tcp", ":8972")
  24. }

只是注册方法由RegisterName变为了RegisterFunction,参数由一个对象变为一个函数。我们需要为注册的函数指定一个服务路径,客户端调用时会根据这个路径查找对应方法。客户端:

  1. func main() {
  2. flag.Parse()
  3. d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
  4. xclient := client.NewXClient("function", client.Failtry, client.RandomSelect, d, client.DefaultOption)
  5. defer xclient.Close()
  6. args := &Args{A: 10, B: 20}
  7. var reply int
  8. err := xclient.Call(context.Background(), "Mul", args, &reply)
  9. if err != nil {
  10. log.Fatalf("failed to call: %v", err)
  11. }
  12. fmt.Printf("%d * %d = %d\n", args.A, args.B, reply)
  13. args = &Args{50, 20}
  14. var quo Quotient
  15. err = xclient.Call(context.Background(), "Div", args, &quo)
  16. if err != nil {
  17. log.Fatalf("failed to call: %v", err)
  18. }
  19. fmt.Printf("%d * %d = %d...%d\n", args.A, args.B, quo.Quo, quo.Rem)
  20. }

注册中心

rpcx支持多种注册中心:

  • 点对点:其实就是直连,没有注册中心;
  • 点对多:可以配置多个服务器;
  • zookeeper:常用的注册中心;
  • Etcd:Go 语言编写的注册中心;
  • 进程内调用:方便调试功能,在同一个进程内查找服务;
  • Consul/mDNS等。

我们之前演示的都是点对点的连接,接下来我们介绍如何使用zookeeper作为注册中心。在rpcx中,注册中心是通过插件的方式集成的。使用ZooKeeperRegisterPlugin这个插件来集成Zookeeper。服务端代码:

  1. type Args struct {
  2. A, B int
  3. }
  4. type Quotient struct {
  5. Quo, Rem int
  6. }
  7. var (
  8. addr = flag.String("addr", ":8972", "service address")
  9. zkAddr = flag.String("zkAddr", "127.0.0.1:2181", "zookeeper address")
  10. basePath = flag.String("basePath", "/services/math", "service base path")
  11. )
  12. type Arith int
  13. func (t *Arith) Mul(cxt context.Context, args *Args, reply *int) error {
  14. fmt.Println("Mul on", *addr)
  15. *reply = args.A * args.B
  16. return nil
  17. }
  18. func (t *Arith) Div(cxt context.Context, args *Args, quo *Quotient) error {
  19. fmt.Println("Div on", *addr)
  20. if args.B == 0 {
  21. return errors.New("divide by 0")
  22. }
  23. quo.Quo = args.A / args.B
  24. quo.Rem = args.A % args.B
  25. return nil
  26. }
  27. func main() {
  28. flag.Parse()
  29. p := &serverplugin.ZooKeeperRegisterPlugin{
  30. ServiceAddress: "tcp@" + *addr,
  31. ZooKeeperServers: []string{*zkAddr},
  32. BasePath: *basePath,
  33. Metrics: metrics.NewRegistry(),
  34. UpdateInterval: time.Minute,
  35. }
  36. if err := p.Start(); err != nil {
  37. log.Fatal(err)
  38. }
  39. s := server.NewServer()
  40. s.Plugins.Add(p)
  41. s.RegisterName("Arith", new(Arith), "")
  42. s.Serve("tcp", *addr)
  43. }

ZooKeeperRegisterPlugin中,我们指定了本服务地址,zookeeper 集群地址(可以是多个),起始路径等。服务器启动时自动向 zookeeper 注册本服务的信息,客户端可直接从 zookeeper 拉取可用的服务列表。

首先启动 zookeeper 服务器,zookeeper 的安装与启动可以参考我的上一篇文章。分别在 3 个控制台中启动 3 个服务器,指定不同的端口(注意需要指定-tags zookeeper):

  1. // 控制台1
  2. $ go run -tags zookeeper main.go -addr 127.0.0.1:8971
  3. // 控制台2
  4. $ go run -tags zookeeper main.go -addr 127.0.0.1:8972
  5. // 控制台3
  6. $ go run -tags zookeeper main.go -addr 127.0.0.1:8973

启动之后,我们观察 zookeeper 路径/services/math中的内容:

每日一库之41:rpcx - 图1

非常棒,可用的服务地址不用我们手动维护了!

接下来是客户端:

  1. var (
  2. zkAddr = flag.String("zkAddr", "127.0.0.1:2181", "zookeeper address")
  3. basePath = flag.String("basePath", "/services/math", "service base path")
  4. )
  5. func main() {
  6. flag.Parse()
  7. d := client.NewZookeeperDiscovery(*basePath, "Arith", []string{*zkAddr}, nil)
  8. xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
  9. defer xclient.Close()
  10. args := &Args{A: 10, B: 20}
  11. var reply int
  12. err := xclient.Call(context.Background(), "Mul", args, &reply)
  13. if err != nil {
  14. log.Fatalf("failed to call: %v", err)
  15. }
  16. fmt.Printf("%d * %d = %d\n", args.A, args.B, reply)
  17. args = &Args{50, 20}
  18. var quo Quotient
  19. err = xclient.Call(context.Background(), "Div", args, &quo)
  20. if err != nil {
  21. log.Fatalf("failed to call: %v", err)
  22. }
  23. fmt.Printf("%d * %d = %d...%d\n", args.A, args.B, quo.Quo, quo.Rem)
  24. }

我们通过 zookeeper 读取可用的Arith服务列表,然后随机选择一个服务发送请求:

  1. $ go run -tags zookeeper main.go
  2. 2020/05/26 23:03:40 Connected to 127.0.0.1:2181
  3. 2020/05/26 23:03:40 authenticated: id=72057658440744975, timeout=10000
  4. 2020/05/26 23:03:40 re-submitting `0` credentials after reconnect
  5. 10 * 20 = 200
  6. 50 * 20 = 2...10

我们的客户端发送了两条请求。由于使用了client.RandomSelect策略,所以这两个请求随机发送到某个服务端。我在MulDiv方法中增加了一个打印,可以观察一下各个控制台的输出!

如果我们关闭了某个服务器,对应的服务地址会从 zookeeper 中移除。我关闭了服务器 1,zookeeper 服务列表变为:

每日一库之41:rpcx - 图2

相比上一篇文章中需要手动维护 zookeeper 的内容,rpcx的自动注册和维护明显要方便太多了!

总结

rpcx是 Go 语言中首屈一指的 rpc 库,功能丰富,性能出众,文档丰富,已经被不少公司和个人采用。本文介绍的只是最基础的功能,rpcx支持各种路由选择策略、分组、限流、身份认证等高级功能,推荐深入学习!

大家如果发现好玩、好用的 Go 语言库,欢迎到 Go 每日一库 GitHub 上提交 issue😄

参考

  1. rpcx GitHub:https://github.com/smallnest/rpcx
  2. rpcx 博客:https://blog.rpcx.io/
  3. rpcx 官网:https://rpcx.io/
  4. rpcx 文档:https://doc.rpcx.io/
  5. Go 每日一库 GitHub:https://github.com/go-quiz/go-daily-lib