简介
在上一篇文章中我们介绍了 Go 标准库net/rpc的用法。在默认情况下,rpc库内部使用gob格式传输数据。我们仿造gob的编解码器实现了一个json格式的。实际上标准库net/rpc/jsonrcp中已有实现。本文是对上一篇文章的补充。
快速使用
标准库无需安装。
首先是服务端,使用net/rpc/jsonrpc之后,我们就不用自己去编写json的编解码器了:
package mainimport ("log""net""net/rpc""net/rpc/jsonrpc")type Args struct {A, B int}type Arith intfunc (t *Arith) Multiply(args *Args, reply *int) error {*reply = args.A * args.Breturn nil}func main() {l, err := net.Listen("tcp", ":1234")if err != nil {log.Fatal("listen error:", err)}arith := new(Arith)rpc.Register(arith)for {conn, err := l.Accept()if err != nil {log.Fatal("accept error:", err)}// 注意这一行go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))}}
直接调用jsonrpc.NewServerCodec(conn)创建一个服务端的codec。客户端也是类似的:
func main() {conn, err := net.Dial("tcp", ":1234")if err != nil {log.Fatal("dial error:", err)}// 这里,这里😁client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))args := &Args{7, 8}var reply interr = client.Call("Arith.Multiply", args, &reply)if err != nil {log.Fatal("Multiply error:", err)}fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply)}
先运行服务端程序:
$ go run main.go
然后在一个新的控制台中运行客户端程序:
$ go run client.goMultiply: 7*8=56
下面这段代码基本上每个使用jsonrpc的程序都要编写:
conn, err := net.Dial("tcp", ":1234")if err != nil {log.Fatal("dial error:", err)}client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
因此jsonrpc为了方便直接提供了一个Dial方法。使用Dial简化上面的客户端程序:
func main() {client, err := jsonrpc.Dial("tcp", ":1234")if err != nil {log.Fatal("dial error:", err)}args := &Args{7, 8}var reply interr = client.Call("Arith.Multiply", args, &reply)if err != nil {log.Fatal("Multiply error:", err)}fmt.Printf("Multiply: %d*%d=%d\n", args.A, args.B, reply)}
效果是一样的。
JSON-RPC 标准
JSON-RPC 1.0 标准在 2005 年发布,经过数年演化,于 2010 年发布了 2.0 版本。JSON-RPC 标准的内容可在https://www.jsonrpc.org/specification查看。Go 标准库net/rpc/jsonrpc实现了 1.0 版本。关于 2.0 版本的实现可以在pkg.go.dev上搜索json-rpc+2.0。本文以 1.0 版本为基础进行介绍。
JSON-RPC 传输的是单一的对象,序列化为 JSON 格式。请求对象包含以下 3 个属性:
method:请求调用的方法;params:一个数组表示传给方法的各个参数;id:请求 ID。ID 可以是任何类型,在收到响应时根据这个属性判断对应哪个请求。
响应对象包含以下 3 个属性:
result:方法返回的对象,如果error非空时,该属性必须为null;error:表示调用是否出错;id:对应请求的 ID。
另外标准还定义了一种通知类型,除了id属性为null之外,通知对象的属性与请求对象完全一样。
调用client.Call("echo", "Hello JSON-RPC", &reply)时:
请求:{ "method": "echo", "params": ["Hello JSON-RPC"], "id": 1}响应:{ "result": "Hello JSON-RPC", "error": null, "id": 1}
使用 zookeeper 实现简单的负载均衡
下面我们使用zookeeper实现一个简单的客户端侧的负载均衡。zookeeper中记录所有的可提供服务的服务器,客户端每次请求时都随机挑选一个。我们的示例中,请求必须是无状态的。首先,我们改造一下服务端程序,将监听地址提取出来,通过flag指定:
package mainimport ("flag""log""net""net/rpc""net/rpc/jsonrpc")var (addr *string)type Args struct {A, B int}type Arith intfunc (t *Arith) Multiply(args *Args, reply *int) error {*reply = args.A * args.Breturn nil}func init() {addr = flag.String("addr", ":1111", "addr to listen")}func main() {flag.Parse()l, err := net.Listen("tcp", *addr)if err != nil {log.Fatal("listen error:", err)}arith := new(Arith)rpc.Register(arith)for {conn, err := l.Accept()if err != nil {log.Fatal("accept error:", err)}go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))}}
关于有哪些服务器可用,我们存储在zookeeper中。
首先要启动一个zookeeper的程序。在 Apache Zookeeper 官网可以下载能直接运行的 Windows 程序。下载之后解压,将conf文件夹中的样板配置zoo_sample.cfg复制一份,文件名改为zoo.cfg。在编辑器中打开zoo.cfg,将dataDir改为一个已存在的目录,或创建一个新目录。我在bin同级目录中创建了一个data目录,然后设置dataDir=../data。切换到bin目录下执行zkServer.bat,zookeeper程序就运行起来了。使用zkClient.bat连接上这个zookeeper,增加一个节点,设置数据:
$ create /rpcserver$ set /rpcserver 127.0.0.1:1111,127.0.0.1:1112,127.0.0.1:1113
我们用,分隔多个服务器地址。
准备工作完成后,接下来就开始编写客户端代码了。我们实现一个代理类,负责监听zookeeper的数据变化,根据zookeeper中新的地址创建到服务器的连接,删除老的连接,将调用请求随机转发到一个服务器处理:
type Proxy struct {zookeeper stringclients map[string]*rpc.Clientevents <-chan zk.EventzookeeperConn *zk.Connmutex sync.Mutex}func NewProxy(addr string) *Proxy {return &Proxy{zookeeper: addr,clients: make(map[string]*rpc.Client),}}
这里我们使用了go-zookeeper这个库,需要额外安装:
$ go get github.com/samuel/go-zookeeper/zk
程序启动时,代理对象从zookeeper中获取服务端地址,创建连接:
func (p *Proxy) Connect() {c, _, err := zk.Connect([]string{p.zookeeper}, time.Second) //*10)if err != nil {panic(err)}data, _, event, err := c.GetW("/rpcserver")if err != nil {panic(err)}p.events = eventp.zookeeperConn = cp.CreateClients(string(data))}func (p *Proxy) CreateClients(server string) {p.mutex.Lock()defer p.mutex.Unlock()addrs := strings.Split(server, ",")allAddr := make(map[string]struct{})for _, addr := range addrs {allAddr[addr] = struct{}{}if _, exist := p.clients[addr]; exist {continue}client, err := jsonrpc.Dial("tcp", addr)if err != nil {log.Println("jsonrpc Dial error:", err)continue}p.clients[addr] = clientlog.Println("new addr:", addr)}for addr := range p.clients {if _, exist := allAddr[addr]; !exist {// 不在 zookeeper 中的地址,删除对应连接oldClient.Close()delete(p.clients, addr)log.Println("delete addr", addr)}}}
同时,需要监听zookeeper中的数据变化,当新增或删除某个服务端地址时,Proxy要及时更新连接:
func (p *Proxy) Run() {for {select {case event := <-p.events:if event.Type == zk.EventNodeDataChanged {data, _, err := p.zookeeperConn.Get("/rpcserver")if err != nil {log.Println("get zookeeper data failed:", err)continue}p.CreateClients(string(data))}}}}
客户端主体程序使用Proxy结构非常方便:
package mainimport ("flag""fmt""math/rand")var (zookeeperAddr *string)func init() {zookeeperAddr = flag.String("addr", ":2181", "zookeeper address")}type Args struct {A, B int}func main() {flag.Parse()fmt.Println(*zookeeperAddr)p := NewProxy(*zookeeperAddr)p.Connect()go p.Run()for i := 0; i < 10; i++ {var reply intargs := &Args{rand.Intn(1000), rand.Intn(1000)}p.Call("Arith.Multiply", args, &reply)fmt.Printf("%d*%d=%d\n", args.A, args.B, reply)}// sleep 过程中可以修改 zookeeper 中的数据time.Sleep(1 * time.Minute)// 使用新的地址做随机for i := 0; i < 100; i++ {var reply intargs := &Args{rand.Intn(1000), rand.Intn(1000)}p.Call("Arith.Multiply", args, &reply)fmt.Printf("%d*%d=%d\n", args.A, args.B, reply)}}
创建一个代理对象,在一个新的 goroutine 中监听zookeeper事件。然后通过Proxy的Call调用远程服务端的方法:
func (p *Proxy) Call(method string, args interface{}, reply interface{}) error {var client *rpc.Clientvar addr stringidx := rand.Int31n(int32(len(p.clients)))var i int32p.mutex.Lock()for a, c := range p.clients {if i == idx {client = caddr = abreak}i++}p.mutex.Unlock()fmt.Println("use", addr)return client.Call(method, args, reply)}
首先我们要启动 3 个服务端程序,分别监听端口 1111、1112、1113,需要 3 个控制台:
控制台 1:
$ go run main.go -addr :1111
控制台 2:
$ go run main.go -addr :1112
控制台 3:
$ go run main.go -addr :1113
客户端在一个新的控制台启动,指定zookeeper地址:
$ go run . -addr=127.0.0.1:2181
在输出中,我们可以看到是怎么随机挑选服务器的。
我们可以尝试在客户端程序运行的过程中,将某个服务器地址从zookeeper中删除。我特意在程序中加了一个 1 分钟的延迟。在sleep过程中,通过zkClient.cmd将127.0.0.1:1113这个地址从zookeeper中删除:
$ set /rpcserver 127.0.0.1:1111,127.0.0.1:1112
控制台输出:
$ 2020/05/10 23:47:47 delete addr 127.0.0.1:1113
并且后续的请求不会再发到127.0.0.1:1113这个服务器了。
其实,在实际的项目中,Proxy一般是一个独立的服务器,而不是放在客户端侧。上面示例这样处理只是为了方便。
总结
RPC 底层可以使用各种协议传输数据,JSON/XML/Protobuf 都可以。对 rpc 感兴趣的建议看看rpcx这个库,https://github.com/smallnest/rpcx。非常强大!
大家如果发现好玩、好用的 Go 语言库,欢迎到 Go 每日一库 GitHub 上提交 issue😄
参考
- jsonrpc GitHub:https://golang.org/pkg/net/rpc/jsonrpc/
- Go 每日一库 GitHub:https://github.com/go-quiz/go-daily-lib
