1、编写proto文档如下:
syntax = "proto3";package proto;message RunMessage {int64 time = 1; //unix时间string id = 2; //爬虫名称Status status = 3; //状态,string remark = 4; //备注信息}service Trans {//双向流 客户端发送状态,服务端发送指令rpc SendStatus (stream RunMessage) returns (stream Order) {}//rpc Order(stream Order)returns(Status){}}message Order {Status status = 1;}enum Status {START = 0; //开始RUNNING = 1; //执行中END = 2; //结束IDEAL = 3; //空闲中}
2、执行命令: protoc —go_out=plugins=grpc:. .proto
注意:明确.proto文件的位置,要不会找不到*.proto文件,不能生成go的文件
3、生成的代码
// Code generated by protoc-gen-go. DO NOT EDIT.// source: message.protopackage protoimport (context "context"fmt "fmt"proto "github.com/golang/protobuf/proto"grpc "google.golang.org/grpc"codes "google.golang.org/grpc/codes"status "google.golang.org/grpc/status"math "math")// Reference imports to suppress errors if they are not otherwise used.var _ = proto.Marshalvar _ = fmt.Errorfvar _ = math.Inf// This is a compile-time assertion to ensure that this generated file// is compatible with the proto package it is being compiled against.// A compilation error at this line likely means your copy of the// proto package needs to be updated.const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto packagetype Status int32const (Status_START Status = 0Status_RUNNING Status = 1Status_END Status = 2Status_IDEAL Status = 3)var Status_name = map[int32]string{0: "START",1: "RUNNING",2: "END",3: "IDEAL",}var Status_value = map[string]int32{"START": 0,"RUNNING": 1,"END": 2,"IDEAL": 3,}func (x Status) String() string {return proto.EnumName(Status_name, int32(x))}func (Status) EnumDescriptor() ([]byte, []int) {return fileDescriptor_33c57e4bae7b9afd, []int{0}}type RunMessage struct {Time int64 `protobuf:"varint,1,opt,name=time,proto3" json:"time,omitempty"`Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"`Status Status `protobuf:"varint,3,opt,name=status,proto3,enum=proto.Status" json:"status,omitempty"`Remark string `protobuf:"bytes,4,opt,name=remark,proto3" json:"remark,omitempty"`XXX_NoUnkeyedLiteral struct{} `json:"-"`XXX_unrecognized []byte `json:"-"`XXX_sizecache int32 `json:"-"`}func (m *RunMessage) Reset() { *m = RunMessage{} }func (m *RunMessage) String() string { return proto.CompactTextString(m) }func (*RunMessage) ProtoMessage() {}func (*RunMessage) Descriptor() ([]byte, []int) {return fileDescriptor_33c57e4bae7b9afd, []int{0}}func (m *RunMessage) XXX_Unmarshal(b []byte) error {return xxx_messageInfo_RunMessage.Unmarshal(m, b)}func (m *RunMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {return xxx_messageInfo_RunMessage.Marshal(b, m, deterministic)}func (m *RunMessage) XXX_Merge(src proto.Message) {xxx_messageInfo_RunMessage.Merge(m, src)}func (m *RunMessage) XXX_Size() int {return xxx_messageInfo_RunMessage.Size(m)}func (m *RunMessage) XXX_DiscardUnknown() {xxx_messageInfo_RunMessage.DiscardUnknown(m)}var xxx_messageInfo_RunMessage proto.InternalMessageInfofunc (m *RunMessage) GetTime() int64 {if m != nil {return m.Time}return 0}func (m *RunMessage) GetId() string {if m != nil {return m.Id}return ""}func (m *RunMessage) GetStatus() Status {if m != nil {return m.Status}return Status_START}func (m *RunMessage) GetRemark() string {if m != nil {return m.Remark}return ""}type Order struct {Status Status `protobuf:"varint,1,opt,name=status,proto3,enum=proto.Status" json:"status,omitempty"`XXX_NoUnkeyedLiteral struct{} `json:"-"`XXX_unrecognized []byte `json:"-"`XXX_sizecache int32 `json:"-"`}func (m *Order) Reset() { *m = Order{} }func (m *Order) String() string { return proto.CompactTextString(m) }func (*Order) ProtoMessage() {}func (*Order) Descriptor() ([]byte, []int) {return fileDescriptor_33c57e4bae7b9afd, []int{1}}func (m *Order) XXX_Unmarshal(b []byte) error {return xxx_messageInfo_Order.Unmarshal(m, b)}func (m *Order) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {return xxx_messageInfo_Order.Marshal(b, m, deterministic)}func (m *Order) XXX_Merge(src proto.Message) {xxx_messageInfo_Order.Merge(m, src)}func (m *Order) XXX_Size() int {return xxx_messageInfo_Order.Size(m)}func (m *Order) XXX_DiscardUnknown() {xxx_messageInfo_Order.DiscardUnknown(m)}var xxx_messageInfo_Order proto.InternalMessageInfofunc (m *Order) GetStatus() Status {if m != nil {return m.Status}return Status_START}func init() {proto.RegisterEnum("proto.Status", Status_name, Status_value)proto.RegisterType((*RunMessage)(nil), "proto.RunMessage")proto.RegisterType((*Order)(nil), "proto.Order")}func init() { proto.RegisterFile("message.proto", fileDescriptor_33c57e4bae7b9afd) }var fileDescriptor_33c57e4bae7b9afd = []byte{// 230 bytes of a gzipped FileDescriptorProto0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x8f, 0x41, 0x4b, 0xc3, 0x30,0x14, 0xc7, 0xf7, 0xda, 0xb5, 0x63, 0x4f, 0x37, 0xea, 0x3b, 0x48, 0xf0, 0x54, 0x0a, 0x42, 0xf0,0x50, 0x64, 0xf3, 0xe8, 0x65, 0xb0, 0x21, 0x03, 0x8d, 0x90, 0xd6, 0x0f, 0x10, 0x49, 0x90, 0x22,0x6d, 0x25, 0x49, 0xbf, 0xbf, 0x98, 0x06, 0xf4, 0xe4, 0x29, 0xc9, 0xef, 0x9f, 0xf7, 0x7b, 0xef,0xe1, 0xa6, 0x37, 0xce, 0xa9, 0x0f, 0x53, 0x7f, 0xd9, 0xd1, 0x8f, 0x94, 0x85, 0xa3, 0x1a, 0x11,0xe5, 0x34, 0xbc, 0xcc, 0x11, 0x11, 0x2e, 0x7d, 0xd7, 0x1b, 0x06, 0x25, 0xf0, 0x54, 0x86, 0x3b,0x6d, 0x31, 0xe9, 0x34, 0x4b, 0x4a, 0xe0, 0x6b, 0x99, 0x74, 0x9a, 0x6e, 0x31, 0x77, 0x5e, 0xf9,0xc9, 0xb1, 0xb4, 0x04, 0xbe, 0xdd, 0x6d, 0x66, 0x61, 0xdd, 0x04, 0x28, 0x63, 0x48, 0xd7, 0x98,0x5b, 0xd3, 0x2b, 0xfb, 0xc9, 0x96, 0xa1, 0x34, 0xbe, 0xaa, 0x1a, 0xb3, 0x57, 0xab, 0x8d, 0xfd,0xe3, 0x81, 0x7f, 0x3c, 0x77, 0x0f, 0x98, 0xcf, 0x84, 0xd6, 0x98, 0x35, 0xed, 0x41, 0xb6, 0xc5,0x82, 0x2e, 0x70, 0x25, 0xdf, 0x84, 0x38, 0x8b, 0xa7, 0x02, 0x68, 0x85, 0xe9, 0x49, 0x1c, 0x8b,0xe4, 0xe7, 0xc3, 0xf9, 0x78, 0x3a, 0x3c, 0x17, 0xe9, 0xee, 0x11, 0xb3, 0xd6, 0xaa, 0xc1, 0xd1,0x1e, 0xb1, 0x31, 0x83, 0x8e, 0x8a, 0xab, 0xd8, 0xe3, 0x77, 0xe5, 0x9b, 0xcb, 0x88, 0xc2, 0x50,0xd5, 0x82, 0xc3, 0x3d, 0xbc, 0xe7, 0x01, 0xed, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0xd0, 0x4e,0x12, 0xc0, 0x33, 0x01, 0x00, 0x00,}// Reference imports to suppress errors if they are not otherwise used.var _ context.Contextvar _ grpc.ClientConn// This is a compile-time assertion to ensure that this generated file// is compatible with the grpc package it is being compiled against.const _ = grpc.SupportPackageIsVersion4// TransClient is the client API for Trans service.//// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.type TransClient interface {//双向流 客户端发送状态,服务端发送指令SendStatus(ctx context.Context, opts ...grpc.CallOption) (Trans_SendStatusClient, error)}type transClient struct {cc *grpc.ClientConn}func NewTransClient(cc *grpc.ClientConn) TransClient {return &transClient{cc}}func (c *transClient) SendStatus(ctx context.Context, opts ...grpc.CallOption) (Trans_SendStatusClient, error) {stream, err := c.cc.NewStream(ctx, &_Trans_serviceDesc.Streams[0], "/proto.Trans/SendStatus", opts...)if err != nil {return nil, err}x := &transSendStatusClient{stream}return x, nil}type Trans_SendStatusClient interface {Send(*RunMessage) errorRecv() (*Order, error)grpc.ClientStream}type transSendStatusClient struct {grpc.ClientStream}func (x *transSendStatusClient) Send(m *RunMessage) error {return x.ClientStream.SendMsg(m)}func (x *transSendStatusClient) Recv() (*Order, error) {m := new(Order)if err := x.ClientStream.RecvMsg(m); err != nil {return nil, err}return m, nil}// TransServer is the server API for Trans service.type TransServer interface {//双向流 客户端发送状态,服务端发送指令SendStatus(Trans_SendStatusServer) error}// UnimplementedTransServer can be embedded to have forward compatible implementations.type UnimplementedTransServer struct {}func (*UnimplementedTransServer) SendStatus(srv Trans_SendStatusServer) error {return status.Errorf(codes.Unimplemented, "method SendStatus not implemented")}func RegisterTransServer(s *grpc.Server, srv TransServer) {s.RegisterService(&_Trans_serviceDesc, srv)}func _Trans_SendStatus_Handler(srv interface{}, stream grpc.ServerStream) error {return srv.(TransServer).SendStatus(&transSendStatusServer{stream})}type Trans_SendStatusServer interface {Send(*Order) errorRecv() (*RunMessage, error)grpc.ServerStream}type transSendStatusServer struct {grpc.ServerStream}func (x *transSendStatusServer) Send(m *Order) error {return x.ServerStream.SendMsg(m)}func (x *transSendStatusServer) Recv() (*RunMessage, error) {m := new(RunMessage)if err := x.ServerStream.RecvMsg(m); err != nil {return nil, err}return m, nil}var _Trans_serviceDesc = grpc.ServiceDesc{ServiceName: "proto.Trans",HandlerType: (*TransServer)(nil),Methods: []grpc.MethodDesc{},Streams: []grpc.StreamDesc{{StreamName: "SendStatus",Handler: _Trans_SendStatus_Handler,ServerStreams: true,ClientStreams: true,},},Metadata: "message.proto",}
3、服务端实现grpc的接口
package serviceimport ("crawler/log""crawler/proto""io")//实现grpc的接口type Service struct {}func (s *Service) SendStatus(stream proto.Trans_SendStatusServer) error {ctx := stream.Context()log.Debug("进入监听函数")for {log.Debug("循环监听请求")select {case <-ctx.Done():log.Info("收到客户端的context发出终止信号")return ctx.Err()default://接受来自客户端发来的消息in,err:=stream.Recv()if err!=nil{if err==io.EOF{log.Debug("客户端发送的数据流结束,%s",err.Error())}else{log.Error("接受客户端信息出错,%s",err.Error())//推出连接return err}}else{log.Debug("来自客户端的信息,%s",in.GetId())stream.Send(&proto.Order{Status:proto.Status_END})}}}return nil}
4、客户端实现
package mainimport (ctx "context""crawler/log""crawler/proto""google.golang.org/grpc""gopkg.in/urfave/cli.v2""os""time")var client proto.TransClientvar close = make(chan int)func main() {//设置日志log.SetConfig(log.DEBUG_L, 3, os.Stderr)app := cli.App{Name: "爬虫客户端,针对金联创的数据",Version: "1.0",Usage: "该客户端需要和管理端配合,由管理端调度和管理",Authors: []*cli.Author{{Name: "lhn", Email: "550124023@qq.com"}},Flags: []cli.Flag{&cli.StringFlag{Name: "server",Aliases: []string{"s"},Usage: "管理端的地址,使用grpc通讯,例如:127.0.0.1:8555",EnvVars: []string{"SERVER", "S"},Value: "127.0.0.1:8555",},},Commands: []*cli.Command{&cli.Command{Name: "send",Action: func(context *cli.Context) error {log.Info("输入参数:%s", context.Args().First())_ctx, cancel := ctx.WithTimeout(ctx.Background(), 10*time.Second)defer cancel()stream, err := client.SendStatus(_ctx)if err != nil {log.Error("%s", err.Error())return err}stream.Send(&proto.RunMessage{Status: proto.Status_END, Id: "fff"})return nil},},},Action: func(context *cli.Context) error {log.Debug("测试")var opts = []grpc.DialOption{grpc.WithInsecure()}log.Info("服务器地址:%s", context.String("server"))conn, err := grpc.Dial(context.String("server"), opts...)if err != nil {log.Fatal("连接服务器失败")}defer conn.Close()client = proto.NewTransClient(conn)_ctx := ctx.Background()//ctx.WithTimeout(ctx.Background(), 10*time.Hour)//defer cancel()stream, err := client.SendStatus(_ctx)if err != nil {log.Error("发送消息错误%s", err.Error())return err}go func() {//必须要开一个runtime来接受消息,要处服务端发送过来没回应的话就会卡住,除非是客户端主动断开,消息才会被消化掉for {msg, err := stream.Recv()if err != nil {log.Error("接受发生错误,%s", err.Error())} else {log.Debug("接受到消息,%s", msg.Status)}}}()for {log.Debug("发送消息")//循环发送消息到服务端err = stream.Send(&proto.RunMessage{Status: proto.Status_END, Id: "fff"})if err != nil {log.Error("发送消息错误:%s", err.Error())}time.Sleep(1 * time.Millisecond)}return nil},}app.Run(os.Args)}
