1. RPC
1.1. RPC概念
RPC(Remote Procedure Call),远程过程调用,即两个进程跨网络进行调用,就如同调用本地的方法一样,无须关注细节。RPC是一种服务端-客户端模式,大概流程如下:
- 客户端stub(client stub)将这些参数包装,并通过系统调用发送到服务端机器。打包的过程叫 marshalling。(常见方式:XML、JSON、二进制编码)
- 客户端本地操作系统发送信息至服务器。(可通过自定义TCP协议或HTTP传输)
- 服务器系统将信息传送至服务端stub(server stub)。
- 服务端stub(server stub)解析信息。该过程叫 unmarshalling。
- 服务端stub(server stub)调用程序,并通过类似的方式返回给客户端。
1.2. Golang RPC编码流程
1.2.1. 常用方法
```go // 注册服务,name为服务的名,rcvr为结构体对象(通常为指针) func RegisterName(name string, rcvr interface{}) error // 处理客户端连接 func ServeConn(conn io.ReadWriteCloser)
// 连接服务端 func Dial(network, address string) (Client, error) // 调用服务端的方法 func (client Client) Call(serviceMethod string, args interface{}, reply interface{})
<a name="OZeWL"></a>
### 1.2.2. 编码流程
<a name="DvG9W"></a>
#### 1.2.2.1. 一般流程
- 服务端
```go
// 1. 定义结构体
type Server struct{}
// 2. 定义对外的RPC方法:
// 只能有两个参数,第一个是传入参数,第二个是传出参数,传出参数必须是指针
// 只能有一个返回值,并且是 error
// 方法必须是可以导出的
func (s *Server) Handler(input,*output) error {}
// 3. 注册RCP结构体
err := rpc.RegisterName("pkg/server", new(Server))
// 4. 监听端口
Listner,err := net.Listen("tcp", "0.0.0.0:8080")
// 5. 接收请求
conn,err := server.Listener.Accept()
// 6. 处理请求
go rpc.ServeConn(conn)
- 客户端 ```go // 1. 拨号 client, err := rpc.Dail(“tcp”,”127.0.0.1:8080)
// 2. 调用远程RCP接口 err := client.Call(“pkg/server.Handler”, &resp)
<a name="oxXER"></a>
#### 1.2.2.2. 规范RPC编码流程
基本定义
- 定义服务名称常量:通常为包路径.结构体名称
- 定义该服务接口:通过接口规范需要对外暴露的RPC方法列表
- 服务端定义注册RPC的结构体,参数类型为上述接口
接口实现
package server
import ( “fmt” log “github.com/sirupsen/logrus” “net” “net/rpc” )
const ( HelloServiceMethodName = “rpc/server/HelloService” )
var ( Listener net.Listener )
// 定义结构体 type HelloService struct { }
// rcp 方法必须满足三个条件: // 1. 只能有两个可序列化的参数,第二个是指针。通过第二个参数(传出参数)获取返回结果 // 2. 方法的返回值有且只能有一个 error // 3. 方法必须是可以导出的 func (p HelloService) Hello( request string, reply string) error { *reply = fmt.Sprintf(“Hello %s !”, request) return nil }
func init() { // 注册RCP对象 err := rpc.RegisterName(HelloServiceMethodName, new(HelloService)) if err != nil { log.Fatalf(“register hello service failed,err:%s”,err.Error()) } // 启用监听 Listener, err = net.Listen(“tcp”, “0.0.0.0:55999”) if err != nil { log.Fatalf(“bind address 0.0.0.0:55999 failed,err:%s”,err.Error()) } }
```go
// server.go
package main
import (
log "github.com/sirupsen/logrus"
"learn/network/rpc/server"
"net/rpc"
)
func main() {
for {
// 接收新的连接
conn, err := server.Listener.Accept()
if err != nil {
log.Errorf("accept new conn failed,err:%s", err.Error())
}
log.Infof("recv new conn:%s", conn.RemoteAddr().String())
go func() {
defer conn.Close()
rpc.ServeConn(conn)
}()
}
}
package main
import (
"fmt"
log "github.com/sirupsen/logrus"
"net/rpc"
)
func main() {
// 拨号连接 rpc 服务端
dial, err := rpc.Dial("tcp", "127.0.0.1:55999")
if err != nil {
log.Errorf("dial to address 127.0.0.1:55999 failed, err:%s", err.Error())
return
}
var resp string
// 远程调用服务端
err = dial.Call("rpc/server/HelloService.Hello", "张三", &resp)
if err != nil {
log.Errorf("call service failed, err:%s",err.Error())
return
}
fmt.Println(resp)
}
1.3.2. 简单案例2
// server.go
package main
import (
"context"
"errors"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
log "github.com/sirupsen/logrus"
"net"
"net/rpc"
"strings"
)
var DockerClient *client.Client
func GetDockerClient() (*client.Client, error) {
if DockerClient != nil {
return DockerClient, nil
}
return client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
}
type Image struct{}
// 根据镜像的Id获取镜像Tag列表的方法
func (i *Image) GetImageTag(id string, tags *[]string) error {
cli, err := GetDockerClient()
if err != nil {
log.Errorf("get docker client failed ,err:%s", err.Error())
return err
}
images, err := cli.ImageList(context.TODO(), types.ImageListOptions{})
if err != nil {
log.Errorf("list images failed ,err:%s", err.Error())
return err
}
for _, image := range images {
if strings.Contains(image.ID, id) {
*tags = image.RepoTags
return nil
}
}
return errors.New("the container id not found")
}
// 根据镜像id删除镜像
func (i *Image) DeleteImage(id string, resp *[]string) error {
cli, err := GetDockerClient()
if err != nil {
log.Errorf("get docker client failed ,err:%s", err.Error())
return err
}
responseList, err := cli.ImageRemove(context.TODO(), id, types.ImageRemoveOptions{})
if err != nil {
log.Errorf("remove image failed,err:%s", err.Error())
return err
}
for _, response := range responseList {
if response.Untagged != "" {
*resp = append(*resp, response.Untagged)
}
}
return nil
}
func main() {
// 注册结rpc服务
err := rpc.RegisterName("container/image", &Image{})
if err != nil {
log.Fatalf("registry rpc service failed,err:%s", err.Error())
}
// 监听端口
listen, err := net.Listen("tcp", "0.0.0.0:8089")
if err != nil {
log.Fatalf("binding rpc service to 0.0.0.0:8089 failed,err:%s", err.Error())
}
for {
conn, err := listen.Accept()
if err != nil {
log.Errorf("accept connect failed,err:%s", err.Error())
continue
}
go func() {
rpc.ServeConn(conn)
_ = conn.Close()
}()
}
}
// client.go
package main
import (
"fmt"
log "github.com/sirupsen/logrus"
"net/rpc"
)
func main() {
dial, err := rpc.Dial("tcp", "127.0.0.1:8089")
if err != nil {
log.Errorf("connet to rpc server failed,err:%s", err.Error())
return
}
var resp []string
err = dial.Call("container/image.GetImageTag", "ed0e1d894299", &resp)
if err != nil {
log.Errorf("call container/image.GetImageTag failed,err:%s", err.Error())
return
}
fmt.Println(resp)
err = dial.Call("container/image.DeleteImage", "ed0e1d894299", &resp)
if err != nil {
log.Errorf("call container/image.DeleteImage failed,err:%s", err.Error())
return
}
fmt.Println(resp)
}
1.3.3. 使用接口
在涉及RPC的应用中,作为开发人员一般至少有三种角色:首先是服务端实现RPC方法的开发人员,其次是客户端调用RPC方法的人员,最后也是最重要的是制定服务端和客户端RPC接口规范的设计人员。之前的两个案例中,并不规范,以下是一个比较规范的RPC开放方法:
// server.go
package main
import (
"context"
"errors"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
log "github.com/sirupsen/logrus"
"net"
"net/rpc"
"strings"
)
var DockerClient *client.Client
func GetDockerClient() (*client.Client, error) {
if DockerClient != nil {
return DockerClient, nil
}
return client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
}
// 定义服务名称常量;通常为路径格式,避免重名
const ImageServiceName = "rpc/server/pkg.ImageService"
// 定义对外暴露的RPC方法
type ImageServiceInterface interface {
GetImageTag(id string, reply *[]string) error
DeleteImage(id string, reply *[]string) error
}
// 注册RPC方法
func ImageServiceRegistry(svc ImageServiceInterface) error {
return rpc.RegisterName(ImageServiceName, svc)
}
// 定义RPC的服务结构体
type Image struct{}
// 定义RPC服务的方法
// 根据镜像的Id获取镜像Tag列表的方法
func (i *Image) GetImageTag(id string, tags *[]string) error {
cli, err := GetDockerClient()
if err != nil {
log.Errorf("get docker client failed ,err:%s", err.Error())
return err
}
images, err := cli.ImageList(context.TODO(), types.ImageListOptions{})
if err != nil {
log.Errorf("list images failed ,err:%s", err.Error())
return err
}
for _, image := range images {
if strings.Contains(image.ID, id) {
*tags = image.RepoTags
return nil
}
}
return errors.New("the container id not found")
}
// 定义RPC服务的方法
// 根据镜像id删除镜像
func (i *Image) DeleteImage(id string, tags *[]string) error {
cli, err := GetDockerClient()
if err != nil {
log.Errorf("get docker client failed ,err:%s", err.Error())
return err
}
responseList, err := cli.ImageRemove(context.TODO(), id, types.ImageRemoveOptions{})
if err != nil {
log.Errorf("remove image failed,err:%s", err.Error())
return err
}
for _, response := range responseList {
if response.Untagged != "" {
*tags = append(*tags, response.Untagged)
}
}
return nil
}
func main() {
// 注册结rpc服务
err := ImageServiceRegistry(new(Image))
if err != nil {
log.Errorf("registry ImageService failed,err:%s", err.Error())
return
}
// 监听端口
listen, err := net.Listen("tcp", "0.0.0.0:8089")
if err != nil {
log.Fatalf("binding rpc service to 0.0.0.0:8089 failed,err:%s", err.Error())
}
for {
conn, err := listen.Accept()
if err != nil {
log.Errorf("accept connect failed,err:%s", err.Error())
continue
}
go func() {
rpc.ServeConn(conn)
_ = conn.Close()
}()
}
}
// client.go
package main
import (
"fmt"
log "github.com/sirupsen/logrus"
"net/rpc"
)
// 定义服务名称常量;通常为路径格式,避免重名
const imageServiceName = "rpc/server/pkg.ImageService"
// 定义对外暴露的RPC方法
type ImageServiceInterface interface {
GetImageTag(id string, reply *[]string) error
DeleteImage(id string, reply *[]string) error
Close()
}
// 定义服务结构体
type ImageClient struct {
Name string
Address string
client *rpc.Client
}
func ImageDail(address string) (ImageServiceInterface, error) {
c, err := rpc.Dial("tcp", address)
if err != nil {
return nil, err
}
return &ImageClient{Name: imageServiceName, Address: address, client: c}, nil
}
func (i *ImageClient) GetImageTag(id string, tags *[]string) error {
return i.client.Call(i.Name+".GetImageTag",id, tags)
}
func (i *ImageClient) DeleteImage(id string, tags *[]string) error {
return i.client.Call(i.Name+".DeleteImage", id , tags)
}
func (i *ImageClient) Close () {
_ = i.client.Close()
}
func main() {
client, err := ImageDail("127.0.0.1:8089")
if err != nil {
log.Errorf("connet to rpc server failed,err:%s", err.Error())
return
}
var resp []string
err = client.GetImageTag("a9d583973f65", &resp)
if err != nil {
log.Errorf("ImageClient call GetImageTag failed,err:%s", err.Error())
return
}
fmt.Println(resp)
err = client.DeleteImage("a9d583973f65", &resp)
if err != nil {
log.Errorf("ImageClient call DeleteImage failed,err:%s", err.Error())
return
}
fmt.Println(resp)
client.Close()
}
2. GRPC
2.1. 介绍
在Golang RPC中,存在一个文件,就是只能在golang语言项目之间调用,无法跨语言。在企业中,不同的组件可能会使用不同的语言进行项目开发,不同语言项目之间使用RPC调用,一般推荐使用GRPC。
GRPC 是一种现代化开源的高性能RPC框架,能够运行于任意环境之中。最初由谷歌进行开发。它使用HTTP/2作为传输协议。
2.2. protocol buffers
Protocol buffers是一种轻便高效的结构化数据存储方式,可以用于数据序列化,非常适合RPC通信中数据序列化,GRPC 是的数据序列化是通过protocol buffers实现的。
- 优势:
- 序列化后体积比Json小,适合网络传输
- 与语言无关,即跨语言、跨平台
- 消息序列化和反序列化速度快
劣势:
文件以 .proto 结尾,除了
{ }
结构和rpc方法定义末尾除外的语句都是以分号结尾- 结构定义可以包含: message, service, enum
- Message类型名称用驼峰式命名,字段采用下划线命名
- Enum类型名称采用驼峰式命名,字段采用大写下划线命名
- Service方法名采用驼峰式命名
2.2.2. 命令行操作
```
go get google.golang.org/grpc/cmd/protoc-gen-go-grpc
```
// 不带 -I 会在services下创建 pbfiles目录
// --go_out会在指定目录下创建结构体
// --go-grpc_out 会创建服务相关的rpc方法
protoc -I=pbfiles --go_out=services --go-grpc_out=services pbfiles/prod.proto