中文文档:http://doc.oschina.net/grpc?t=60133
文档:https://godoc.org/google.golang.org/grpc

简介

gRPC 是一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计。

gRPC 基于 HTTP/2 标准设计,带来诸如双向流、流控、头部压缩、单 TCP 连接上的多复用请求等特。这些特性使得其在移动设备上表现更好,更省电和节省空间占用。

gRPC 客户端和服务端可以在多种环境中运行和交互 - 从 google 内部的服务器到你自己的笔记本,并且可以用任何 gRPC 支持的语言来编写。

gRPC 默认使用 protocol buffers,这是 Google 开源的一套成熟的结构数据序列化机制(当然也可以使用其他数据格式如 JSON)

快速开始

1.编写person.proto

  1. syntax = "proto3";
  2. package person;
  3. message person {
  4. int32 id = 1;
  5. string name = 2;
  6. }
  7. message all_person {
  8. repeated person Per = 1;
  9. }
  10. message person_filter {
  11. int32 id = 1;
  12. }
  13. message person_all{
  14. }
  15. // The Person service definition.
  16. service personserver {
  17. // Get all Person
  18. rpc GetPersons(person_all) returns (all_person) {}
  19. // Get a Person
  20. rpc GetPerson(person_filter) returns (person) {}
  21. }
  22. cdprotos文件夹的上一级目录
  23. protoc -I protos/ protos/server.proto --go_out=plugins=grpc:protos
  24. 生成person.pb.go

2.定义服务端

  1. package main
  2. import (
  3. "context"
  4. "errors"
  5. "google.golang.org/grpc"
  6. "log"
  7. "net"
  8. pb "study/grpc/protobuf/person"
  9. )
  10. const (
  11. port = ":8080"
  12. )
  13. type Server struct {
  14. }
  15. var Person map[int32]*pb.Person
  16. func init(){
  17. var p1 pb.Person
  18. p1.Id=1
  19. p1.Name="tom"
  20. var p2 pb.Person
  21. p2.Id=2
  22. p2.Name="jack"
  23. Person = make(map[int32]*pb.Person)
  24. Person[1]=&p1
  25. Person[2]=&p2
  26. }
  27. func (S *Server) GetPersons(ctx context.Context,all *pb.PersonAll)(*pb.AllPerson, error){
  28. var all_person []*pb.Person
  29. for _,v:=range Person{
  30. all_person = append(all_person,v)
  31. }
  32. res := pb.AllPerson{Per:all_person}
  33. return &res ,nil
  34. }
  35. func (S *Server) GetPerson(ctx context.Context,pid *pb.PersonFilter) (*pb.Person, error){
  36. id :=pid.Id
  37. val,ok := Person[id]
  38. if !ok {
  39. res := pb.Person{}
  40. return &res,errors.New("not exist")
  41. }
  42. person :=pb.Person{Id:id,Name:val.Name}
  43. return &person,nil
  44. }
  45. func main() {
  46. lis, err := net.Listen("tcp", port)
  47. if err != nil {
  48. log.Fatalf("failed to listen: %v", err)
  49. }
  50. s := grpc.NewServer() //起一个服务
  51. pb.RegisterPersonserverServer(s,&Server{})
  52. if err := s.Serve(lis); err != nil {
  53. log.Fatalf("failed to serve: %v", err)
  54. }
  55. }

3.定义client

  1. //时间有限,就不写cli了,直接所有接口都跑吧....
  2. package main
  3. import (
  4. "context"
  5. "google.golang.org/grpc"
  6. "log"
  7. "os"
  8. "strconv"
  9. pb "study/grpc/protobuf/person"
  10. "time"
  11. )
  12. const (
  13. address = "localhost:8080"
  14. defaultId = 1
  15. )
  16. func main() {
  17. //建立链接
  18. conn, err := grpc.Dial(address, grpc.WithInsecure())
  19. if err != nil {
  20. log.Fatalf("did not connect: %v", err)
  21. }
  22. defer conn.Close()
  23. client := pb.NewPersonserverClient(conn)
  24. id :=defaultId
  25. if len(os.Args)>1 {
  26. id, err = strconv.Atoi(os.Args[1])
  27. if err != nil {
  28. panic(err.Error())
  29. }
  30. }
  31. ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
  32. defer cancel()
  33. parm1 := pb.PersonAll{}
  34. r, err :=client.GetPersons(ctx,&parm1)
  35. if err!=nil{
  36. log.Fatalf("error: %v", err)
  37. }else {
  38. log.Println(r.Per)
  39. }
  40. parm2 :=pb.PersonFilter{Id:int32(id)}
  41. r2 ,err :=client.GetPerson(ctx,&parm2)
  42. if err!=nil{
  43. log.Fatalf("error: %v", err)
  44. }else {
  45. log.Println(r2.Id,r2.Name)
  46. }
  47. }

文档

包函数

func SendHeader(ctx context.Context, md metadata.MD) error

  • 将头发出去,只能调用一次

func SetHeader(ctx context.Context, md metadata.MD) error

  • 设置头,多次调用将被合并
  • 在SendHeader时,将数据发出去

func SetTrailer(ctx context.Context, md metadata.MD) error

  • 设置尾,多次调用将被合并

type DialOption

  1. type DialOption interface {
  2. }

func WithAuthority(a string) DialOption 它指定:authority头的值。此值仅对WithInsecure有效,如果存在传输凭证,则无效
func WithConnectParams(p ConnectParams) DialOption 使用ConnectParams做拨号选项
func WithInsecure() DialOption 禁用此ClientConn的传输安全性,如果没设置传输必须是安全的
func WithDefaultCallOptions(cos …CallOption) DialOption 使用CallOptions作为拨号选项
func WithMaxHeaderListSize(s uint32) DialOption 设定最大头
func WithReadBufferSize(s int) DialOption 读缓冲
func WithWriteBufferSize(s int) DialOption 写缓冲
func WithTransportCredentials(creds credentials.TransportCredentials) DialOption

  • 设置安全连接TLS/SSL,不要与WithCredentialsBundle一起使用

func WithDisableRetry() DialOption

  • 禁止重试,但目前并没开启自动重试。可设置环境变量”GRPC_GO_RETRY” 为”on”启用

func WithDisableHealthCheck() DialOption 对这个客户机conn的所有子节点禁用LB通道运行状况检查
func WithBlock() DialOption 阻塞连接,直到底层连接打开(不打开一直阻塞),才返回conn,否则立刻返回
func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption

  • 返回一个为客户端传输指定keepalive参数的拨号选项

func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption

  • 返回一个设置凭证的DialOption,使用token验证会用到

func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption 指定一元rpc的拦截器的拨号选项
func WithStreamInterceptor(f StreamClientInterceptor) DialOption 指定流式rpc的拦截器的拨号选项
func WithChainStreamInterceptor(interceptors …StreamClientInterceptor) DialOption

  • 设定多个拦截器
  • WithStreamInterceptor如果存在,会将WithStreamInterceptor放在最前面

func WithChainUnaryInterceptor(interceptors …UnaryClientInterceptor) DialOption

  • 设定多个拦截器
  • WithUnaryInterceptor如果存在,会将WithUnaryInterceptor放在最前面

func WithResolvers(rs …resolver.Builder) DialOption

  • WithResolvers允许在ClientConn上本地注册一系列解析器实现,而不需要通过resolver. register全局注册。
  • 它们将与仅用于当前拨号的方案进行匹配,并优先于全局注册表。

func WithUserAgent(s string) DialOption 设置userAgent
func WithInitialConnWindowSize(s int32) DialOption

  • 它设置连接上初始窗口大小的值。窗口大小的下界是64K,任何小于该值的值都将被忽略。

func WithInitialWindowSize(s int32) DialOption

  • 设置流上初始窗口大小的值。窗口大小的下界是64K,任何小于该值的值都将被忽略。

type CallOption

func MaxCallRecvMsgSize(s int) CallOption 最大可接收
func MaxCallSendMsgSize(s int) CallOption 最大可发送
func PerRPCCredentials(creds credentials.PerRPCCredentials) CallOption 为每次rpc调用设置凭证
func WaitForReady(waitForReady bool) CallOption

  • WaitForReady配置在断开的连接或不可达的服务器上尝试RPC时采取的操作。如果waitForReady为false, RPC将立即失败。否则,RPC客户端将阻塞调用,直到连接可用为止(或者调用被取消或超时),如果由于临时错误而失败,则将重试调用。如果数据被写入线路,gRPC将不会重试,除非服务器指示它没有处理数据。请参考https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md。
  • 默认情况下,rpc不“等待就绪”。

func Header(md metadata.MD) CallOption rpc完成后,返回头部
func Trailer(md
metadata.MD) CallOption rpc完成后,返回Trailer
func Peer(p *peer.Peer) CallOption rpc完成后,返回peer信息

  1. // Peer包含RPC对等体的信息,如地址和身份验证信息。
  2. type Peer struct {
  3. // Addr is the peer address.
  4. Addr net.Addr
  5. // AuthInfo is the authentication information of the transport.
  6. // It is nil if there is no transport security being used.
  7. AuthInfo credentials.AuthInfo
  8. }

type ClientConn

grpc中并没有池的概念,grpc采取http2的方式进行通讯,支持多路复用,并且并发安全
func Dial(target string, opts …DialOption) (*ClientConn, error) target:ip+port

  1. 使用Dial进行拨号,最好不要使用WithBlock参数,默认情况下,Dial返回的conn并非已进行连接的conn
  2. 那么,连接是在哪进行的呢,
  3. // 拨号
  4. taskModel.SynServerIp = "127.0.0.1"
  5. conn, err := grpc.Dial(net.JoinHostPort(taskModel.SynServerIp,commons.AppParam.RpcPort), grpc.WithInsecure(),grpc.WithBlock())
  6. if err != nil {
  7. commons.LogError.Println("RpcAppPub 拨号失败",err.Error())
  8. return "",err
  9. }
  10. defer conn.Close()
  11. // 6.连接
  12. Client := pb.NewPubPlanClient(conn)
  13. ctx, cancel := context.WithTimeout(context.Background(), rpcutils.ContextTimeOut)
  14. defer cancel()
  15. res,err := Client.PlanAppPub(ctx,params) //不使用WithBlock在这里进行连接,连接不上会立马报错
  16. 注意:WithBlock会使程序阻塞

func DialContext(ctx context.Context, target string, opts …DialOption) (conn *ClientConn, err error)

  • 默认情况下,它是一个非阻塞拨号(该函数不会等待建立连接,并且连接发生在后台)。要使其成为一个阻塞拨号,请使用WithBlock()拨号选项。在非阻塞情况下,ctx不会对连接起作用。

func (cc ClientConn) Close() error 关闭连接
func (cc
ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts …CallOption) error,Invoke在线路上发送RPC请求,并在收到响应后返回。这通常由生成的代码调用。

  1. func (c *commonClient) RpcForTranspondCommand(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) {
  2. out := new(Response)
  3. err := c.cc.Invoke(ctx, "/SCDPSshService.Common/RpcForTranspondCommand", in, out, opts...)
  4. if err != nil {
  5. return nil, err
  6. }
  7. return out, nil
  8. }

func (cc ClientConn) NewStream(ctx context.Context, desc StreamDesc, method string, opts …CallOption) (ClientStream, error) NewStream为客户端创建一个新的流。这通常由生成的代码调用。ctx用于流的生命周期

type Server

func NewServer(opt …ServerOption) Server
func (s
Server) RegisterService(sd ServiceDesc, ss interface{})
func (s
Server) GetServiceInfo() map[string]ServiceInfo

  • 返回一个从服务名到ServiceInfo的映射,格式为.

func (s Server) GracefulStop()
func (s
Server) Stop()

type ServerOption

func ConnectionTimeout(d time.Duration) ServerOption 设定连接超时 默认120s,0或负值将导致立即超时
func Creds(c credentials.TransportCredentials) ServerOption 设置服务器连接的凭据,用于tls连接
func MaxConcurrentStreams(n uint32) ServerOption 设定流的数据量大小
func MaxHeaderListSize(s uint32) ServerOption 设定最大可接受头部大小
func MaxRecvMsgSize(m int) ServerOption 设定最大可接受消息体大小,默认4M
func MaxSendMsgSize(m int) ServerOption 设定最大可发送消息体大小,默认math.MaxInt32
func StreamInterceptor(i StreamServerInterceptor) ServerOption 流拦截器(用该方式不能设置多个)
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption 一元拦截器(用该方式不能设置多个)
func ChainStreamInterceptor(interceptors …StreamServerInterceptor) ServerOption

  • 多个流拦截器
  • 如果StreamInterceptor存在,会将StreamInterceptor放在最前面

func ChainUnaryInterceptor(interceptors …UnaryServerInterceptor) ServerOption

  • 多个一元拦截器
  • 如果UnaryInterceptor存在,会将UnaryInterceptor放在最前面

func UnknownServiceHandler(streamHandler StreamHandler) ServerOption

  • 添加一个自定义的未知服务处理程序。提供的方法是一个双向流的RPC服务处理程序,当收到未注册的服务或方法的请求时,它将被调用,而不是返回“未实现的”gRPC错误

func WriteBufferSize(s int) ServerOption 读缓冲
func ReadBufferSize(s int) ServerOption 写缓冲
func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption 返回一个服务器选项,该选项设置服务器的keepalive强制策略
func KeepaliveParams(kp keepalive.ServerParameters) ServerOption 返回一个ServerOption,用于设置服务器的keepalive和max-age参数
func InitialConnWindowSize(s int32) ServerOption

  • 返回设置连接窗口大小的服务器选项。窗口大小的下界是64K,任何小于该值的值都将被忽略

func InitialWindowSize(s int32) ServerOption

  • 返回一个设置流窗口大小的服务器选项。窗口大小的下界是64K,任何小于该值的值都将被忽略

func NumStreamWorkers(numServerWorkers uint32) ServerOption

  • 实验性api
  • 用于设置应该用于处理传入流的worker goroutines的数量。将此设置为0(默认值)将禁用workers,并为每个流生成一个新的goroutine。

type ConnectParams

  1. type ConnectParams struct {
  2. // Backoff specifies the configuration options for connection backoff.
  3. Backoff backoff.Config
  4. // 希望的拿到连接的最短时间,一般Backoff用默认的就行,直接再设置一个最短时间
  5. // 意思就是过多少秒后才去尝试获取连接
  6. // MinConnectTimeout如果大于Backoff的重试间隔时间,将使用MinConnectTimeout
  7. MinConnectTimeout time.Duration
  8. }

type StreamClientInterceptor

StreamClientInterceptor拦截ClientStream的创建。它可以返回一个定制的客户端流来拦截所有的I/O操作

  1. type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

type StreamServerInterceptor

StreamServerInterceptor提供了一个钩子来拦截流RPC在服务器上的执行

  1. type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

type UnaryClientInterceptor

在客户端拦截unary RPC的执行,一元拦截器

  1. type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

type UnaryServerInterceptor

UnaryServerInterceptor提供了一个钩子来拦截服务器上unary RPC的执行,一元拦截器

  1. type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

type ServerStream

  1. type ServerStream interface {
  2. SetHeader(metadata.MD) error
  3. SendHeader(metadata.MD) error
  4. SetTrailer(metadata.MD)
  5. Context() context.Context
  6. //发消息
  7. SendMsg(m interface{}) error
  8. //收消息,阻塞
  9. RecvMsg(m interface{}) error
  10. }

type ClientStream

  1. type ClientStream interface {
  2. Header() (metadata.MD, error)
  3. Trailer() metadata.MD
  4. CloseSend() error
  5. Context() context.Context
  6. SendMsg(m interface{}) error
  7. //收消息,阻塞
  8. RecvMsg(m interface{}) error
  9. }

func NewClientStream(ctx context.Context, desc StreamDesc, cc ClientConn, method string, opts …CallOption) (ClientStream, error)

type EmptyCallOption

EmptyCallOption不会改变调用配置。它可以嵌入到另一个结构中,以构建自定义call选项

  1. type EmptyCallOption struct{}
  2. // 具体使用可参考go-grpc-middleware/retry
  3. // CallOption is a grpc.CallOption that is local to grpc_retry.
  4. type CallOption struct {
  5. grpc.EmptyCallOption // make sure we implement private after() and before() fields so we don't panic.
  6. applyFunc func(opt *options)
  7. }

type EmptyDialOption

空拨号选项不改变拨号配置。它可以嵌入到另一个结构中,以构建自定义拨号选项。

  1. type EmptyDialOption struct{}

type EmptyServerOption

EmptyServerOption不改变服务器配置。它可以嵌入到另一个结构中来构建自定义服务器选项。

  1. type EmptyServerOption struct{}

type UnaryHandler

UnaryHandler定义UnaryServerInterceptor调用的处理程序,以完成unary RPC的正常执行。如果UnaryHandler返回一个错误,它应该由状态包生成,否则gRPC将使用codes.Unknown

  1. type UnaryHandler func(ctx context.Context, req interface{}) (interface{}, error)

type UnaryInvoker

UnaryInvoker被UnaryClientInterceptor调用来完成rpc

  1. type UnaryInvoker func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error

type UnaryServerInfo

UnaryServerInfo由关于服务器端的unary RPC的各种信息组成。拦截器可以更改所有per-rpc信息。

  1. type UnaryServerInfo struct {
  2. // 服务器是用户提供的服务实现。这是只读的
  3. Server interface{}
  4. // FullMethod是完整的RPC方法字符串,/package.service/method.
  5. FullMethod string
  6. }

type StreamDesc

StreamDesc表示一个流RPC服务的方法规范

  1. type StreamDesc struct {
  2. StreamName string
  3. Handler StreamHandler
  4. // At least one of these is true.
  5. ServerStreams bool
  6. ClientStreams bool
  7. }

type StreamHandler

StreamHandler定义gRPC服务器调用的处理程序,以完成流RPC的执行。如果StreamHandler返回一个错误,它应该由状态包生成,否则gRPC将使用codes.Unknown

  1. type StreamHandler func(srv interface{}, stream ServerStream) error

type StreamServerInfo

  1. type StreamServerInfo struct {
  2. // FullMethod是完整的RPC方法字符串, /package.service/method.
  3. FullMethod string
  4. // IsClientStream指示RPC是否是一个客户端流RPC.
  5. IsClientStream bool
  6. // IsServerStream指示RPC是否为服务器流RPC.
  7. IsServerStream bool
  8. }

type PeerCallOption

PeerCallOption是一个收集远端对等体身份的调用。在RPC完成后将填充peer字段

  1. type PeerCallOption struct {
  2. PeerAddr *peer.Peer
  3. }

type HeaderCallOption

HeaderCallOption是一个收集响应头元数据的调用。在*RPC完成后将填充元数据字段

  1. type HeaderCallOption struct {
  2. HeaderAddr *metadata.MD
  3. }