1、编写proto文档如下:

    1. syntax = "proto3";
    2. package proto;
    3. message RunMessage {
    4. int64 time = 1; //unix时间
    5. string id = 2; //爬虫名称
    6. Status status = 3; //状态,
    7. string remark = 4; //备注信息
    8. }
    9. service Trans {
    10. //双向流 客户端发送状态,服务端发送指令
    11. rpc SendStatus (stream RunMessage) returns (stream Order) {
    12. }
    13. //rpc Order(stream Order)returns(Status){}
    14. }
    15. message Order {
    16. Status status = 1;
    17. }
    18. enum Status {
    19. START = 0; //开始
    20. RUNNING = 1; //执行中
    21. END = 2; //结束
    22. IDEAL = 3; //空闲中
    23. }

    2、执行命令: protoc —go_out=plugins=grpc:. .proto
    注意:明确
    .proto文件的位置,要不会找不到*.proto文件,不能生成go的文件
    3、生成的代码

    1. // Code generated by protoc-gen-go. DO NOT EDIT.
    2. // source: message.proto
    3. package proto
    4. import (
    5. context "context"
    6. fmt "fmt"
    7. proto "github.com/golang/protobuf/proto"
    8. grpc "google.golang.org/grpc"
    9. codes "google.golang.org/grpc/codes"
    10. status "google.golang.org/grpc/status"
    11. math "math"
    12. )
    13. // Reference imports to suppress errors if they are not otherwise used.
    14. var _ = proto.Marshal
    15. var _ = fmt.Errorf
    16. var _ = math.Inf
    17. // This is a compile-time assertion to ensure that this generated file
    18. // is compatible with the proto package it is being compiled against.
    19. // A compilation error at this line likely means your copy of the
    20. // proto package needs to be updated.
    21. const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
    22. type Status int32
    23. const (
    24. Status_START Status = 0
    25. Status_RUNNING Status = 1
    26. Status_END Status = 2
    27. Status_IDEAL Status = 3
    28. )
    29. var Status_name = map[int32]string{
    30. 0: "START",
    31. 1: "RUNNING",
    32. 2: "END",
    33. 3: "IDEAL",
    34. }
    35. var Status_value = map[string]int32{
    36. "START": 0,
    37. "RUNNING": 1,
    38. "END": 2,
    39. "IDEAL": 3,
    40. }
    41. func (x Status) String() string {
    42. return proto.EnumName(Status_name, int32(x))
    43. }
    44. func (Status) EnumDescriptor() ([]byte, []int) {
    45. return fileDescriptor_33c57e4bae7b9afd, []int{0}
    46. }
    47. type RunMessage struct {
    48. Time int64 `protobuf:"varint,1,opt,name=time,proto3" json:"time,omitempty"`
    49. Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"`
    50. Status Status `protobuf:"varint,3,opt,name=status,proto3,enum=proto.Status" json:"status,omitempty"`
    51. Remark string `protobuf:"bytes,4,opt,name=remark,proto3" json:"remark,omitempty"`
    52. XXX_NoUnkeyedLiteral struct{} `json:"-"`
    53. XXX_unrecognized []byte `json:"-"`
    54. XXX_sizecache int32 `json:"-"`
    55. }
    56. func (m *RunMessage) Reset() { *m = RunMessage{} }
    57. func (m *RunMessage) String() string { return proto.CompactTextString(m) }
    58. func (*RunMessage) ProtoMessage() {}
    59. func (*RunMessage) Descriptor() ([]byte, []int) {
    60. return fileDescriptor_33c57e4bae7b9afd, []int{0}
    61. }
    62. func (m *RunMessage) XXX_Unmarshal(b []byte) error {
    63. return xxx_messageInfo_RunMessage.Unmarshal(m, b)
    64. }
    65. func (m *RunMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
    66. return xxx_messageInfo_RunMessage.Marshal(b, m, deterministic)
    67. }
    68. func (m *RunMessage) XXX_Merge(src proto.Message) {
    69. xxx_messageInfo_RunMessage.Merge(m, src)
    70. }
    71. func (m *RunMessage) XXX_Size() int {
    72. return xxx_messageInfo_RunMessage.Size(m)
    73. }
    74. func (m *RunMessage) XXX_DiscardUnknown() {
    75. xxx_messageInfo_RunMessage.DiscardUnknown(m)
    76. }
    77. var xxx_messageInfo_RunMessage proto.InternalMessageInfo
    78. func (m *RunMessage) GetTime() int64 {
    79. if m != nil {
    80. return m.Time
    81. }
    82. return 0
    83. }
    84. func (m *RunMessage) GetId() string {
    85. if m != nil {
    86. return m.Id
    87. }
    88. return ""
    89. }
    90. func (m *RunMessage) GetStatus() Status {
    91. if m != nil {
    92. return m.Status
    93. }
    94. return Status_START
    95. }
    96. func (m *RunMessage) GetRemark() string {
    97. if m != nil {
    98. return m.Remark
    99. }
    100. return ""
    101. }
    102. type Order struct {
    103. Status Status `protobuf:"varint,1,opt,name=status,proto3,enum=proto.Status" json:"status,omitempty"`
    104. XXX_NoUnkeyedLiteral struct{} `json:"-"`
    105. XXX_unrecognized []byte `json:"-"`
    106. XXX_sizecache int32 `json:"-"`
    107. }
    108. func (m *Order) Reset() { *m = Order{} }
    109. func (m *Order) String() string { return proto.CompactTextString(m) }
    110. func (*Order) ProtoMessage() {}
    111. func (*Order) Descriptor() ([]byte, []int) {
    112. return fileDescriptor_33c57e4bae7b9afd, []int{1}
    113. }
    114. func (m *Order) XXX_Unmarshal(b []byte) error {
    115. return xxx_messageInfo_Order.Unmarshal(m, b)
    116. }
    117. func (m *Order) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
    118. return xxx_messageInfo_Order.Marshal(b, m, deterministic)
    119. }
    120. func (m *Order) XXX_Merge(src proto.Message) {
    121. xxx_messageInfo_Order.Merge(m, src)
    122. }
    123. func (m *Order) XXX_Size() int {
    124. return xxx_messageInfo_Order.Size(m)
    125. }
    126. func (m *Order) XXX_DiscardUnknown() {
    127. xxx_messageInfo_Order.DiscardUnknown(m)
    128. }
    129. var xxx_messageInfo_Order proto.InternalMessageInfo
    130. func (m *Order) GetStatus() Status {
    131. if m != nil {
    132. return m.Status
    133. }
    134. return Status_START
    135. }
    136. func init() {
    137. proto.RegisterEnum("proto.Status", Status_name, Status_value)
    138. proto.RegisterType((*RunMessage)(nil), "proto.RunMessage")
    139. proto.RegisterType((*Order)(nil), "proto.Order")
    140. }
    141. func init() { proto.RegisterFile("message.proto", fileDescriptor_33c57e4bae7b9afd) }
    142. var fileDescriptor_33c57e4bae7b9afd = []byte{
    143. // 230 bytes of a gzipped FileDescriptorProto
    144. 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x8f, 0x41, 0x4b, 0xc3, 0x30,
    145. 0x14, 0xc7, 0xf7, 0xda, 0xb5, 0x63, 0x4f, 0x37, 0xea, 0x3b, 0x48, 0xf0, 0x54, 0x0a, 0x42, 0xf0,
    146. 0x50, 0x64, 0xf3, 0xe8, 0x65, 0xb0, 0x21, 0x03, 0x8d, 0x90, 0xd6, 0x0f, 0x10, 0x49, 0x90, 0x22,
    147. 0x6d, 0x25, 0x49, 0xbf, 0xbf, 0x98, 0x06, 0xf4, 0xe4, 0x29, 0xc9, 0xef, 0x9f, 0xf7, 0x7b, 0xef,
    148. 0xe1, 0xa6, 0x37, 0xce, 0xa9, 0x0f, 0x53, 0x7f, 0xd9, 0xd1, 0x8f, 0x94, 0x85, 0xa3, 0x1a, 0x11,
    149. 0xe5, 0x34, 0xbc, 0xcc, 0x11, 0x11, 0x2e, 0x7d, 0xd7, 0x1b, 0x06, 0x25, 0xf0, 0x54, 0x86, 0x3b,
    150. 0x6d, 0x31, 0xe9, 0x34, 0x4b, 0x4a, 0xe0, 0x6b, 0x99, 0x74, 0x9a, 0x6e, 0x31, 0x77, 0x5e, 0xf9,
    151. 0xc9, 0xb1, 0xb4, 0x04, 0xbe, 0xdd, 0x6d, 0x66, 0x61, 0xdd, 0x04, 0x28, 0x63, 0x48, 0xd7, 0x98,
    152. 0x5b, 0xd3, 0x2b, 0xfb, 0xc9, 0x96, 0xa1, 0x34, 0xbe, 0xaa, 0x1a, 0xb3, 0x57, 0xab, 0x8d, 0xfd,
    153. 0xe3, 0x81, 0x7f, 0x3c, 0x77, 0x0f, 0x98, 0xcf, 0x84, 0xd6, 0x98, 0x35, 0xed, 0x41, 0xb6, 0xc5,
    154. 0x82, 0x2e, 0x70, 0x25, 0xdf, 0x84, 0x38, 0x8b, 0xa7, 0x02, 0x68, 0x85, 0xe9, 0x49, 0x1c, 0x8b,
    155. 0xe4, 0xe7, 0xc3, 0xf9, 0x78, 0x3a, 0x3c, 0x17, 0xe9, 0xee, 0x11, 0xb3, 0xd6, 0xaa, 0xc1, 0xd1,
    156. 0x1e, 0xb1, 0x31, 0x83, 0x8e, 0x8a, 0xab, 0xd8, 0xe3, 0x77, 0xe5, 0x9b, 0xcb, 0x88, 0xc2, 0x50,
    157. 0xd5, 0x82, 0xc3, 0x3d, 0xbc, 0xe7, 0x01, 0xed, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0xd0, 0x4e,
    158. 0x12, 0xc0, 0x33, 0x01, 0x00, 0x00,
    159. }
    160. // Reference imports to suppress errors if they are not otherwise used.
    161. var _ context.Context
    162. var _ grpc.ClientConn
    163. // This is a compile-time assertion to ensure that this generated file
    164. // is compatible with the grpc package it is being compiled against.
    165. const _ = grpc.SupportPackageIsVersion4
    166. // TransClient is the client API for Trans service.
    167. //
    168. // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
    169. type TransClient interface {
    170. //双向流 客户端发送状态,服务端发送指令
    171. SendStatus(ctx context.Context, opts ...grpc.CallOption) (Trans_SendStatusClient, error)
    172. }
    173. type transClient struct {
    174. cc *grpc.ClientConn
    175. }
    176. func NewTransClient(cc *grpc.ClientConn) TransClient {
    177. return &transClient{cc}
    178. }
    179. func (c *transClient) SendStatus(ctx context.Context, opts ...grpc.CallOption) (Trans_SendStatusClient, error) {
    180. stream, err := c.cc.NewStream(ctx, &_Trans_serviceDesc.Streams[0], "/proto.Trans/SendStatus", opts...)
    181. if err != nil {
    182. return nil, err
    183. }
    184. x := &transSendStatusClient{stream}
    185. return x, nil
    186. }
    187. type Trans_SendStatusClient interface {
    188. Send(*RunMessage) error
    189. Recv() (*Order, error)
    190. grpc.ClientStream
    191. }
    192. type transSendStatusClient struct {
    193. grpc.ClientStream
    194. }
    195. func (x *transSendStatusClient) Send(m *RunMessage) error {
    196. return x.ClientStream.SendMsg(m)
    197. }
    198. func (x *transSendStatusClient) Recv() (*Order, error) {
    199. m := new(Order)
    200. if err := x.ClientStream.RecvMsg(m); err != nil {
    201. return nil, err
    202. }
    203. return m, nil
    204. }
    205. // TransServer is the server API for Trans service.
    206. type TransServer interface {
    207. //双向流 客户端发送状态,服务端发送指令
    208. SendStatus(Trans_SendStatusServer) error
    209. }
    210. // UnimplementedTransServer can be embedded to have forward compatible implementations.
    211. type UnimplementedTransServer struct {
    212. }
    213. func (*UnimplementedTransServer) SendStatus(srv Trans_SendStatusServer) error {
    214. return status.Errorf(codes.Unimplemented, "method SendStatus not implemented")
    215. }
    216. func RegisterTransServer(s *grpc.Server, srv TransServer) {
    217. s.RegisterService(&_Trans_serviceDesc, srv)
    218. }
    219. func _Trans_SendStatus_Handler(srv interface{}, stream grpc.ServerStream) error {
    220. return srv.(TransServer).SendStatus(&transSendStatusServer{stream})
    221. }
    222. type Trans_SendStatusServer interface {
    223. Send(*Order) error
    224. Recv() (*RunMessage, error)
    225. grpc.ServerStream
    226. }
    227. type transSendStatusServer struct {
    228. grpc.ServerStream
    229. }
    230. func (x *transSendStatusServer) Send(m *Order) error {
    231. return x.ServerStream.SendMsg(m)
    232. }
    233. func (x *transSendStatusServer) Recv() (*RunMessage, error) {
    234. m := new(RunMessage)
    235. if err := x.ServerStream.RecvMsg(m); err != nil {
    236. return nil, err
    237. }
    238. return m, nil
    239. }
    240. var _Trans_serviceDesc = grpc.ServiceDesc{
    241. ServiceName: "proto.Trans",
    242. HandlerType: (*TransServer)(nil),
    243. Methods: []grpc.MethodDesc{},
    244. Streams: []grpc.StreamDesc{
    245. {
    246. StreamName: "SendStatus",
    247. Handler: _Trans_SendStatus_Handler,
    248. ServerStreams: true,
    249. ClientStreams: true,
    250. },
    251. },
    252. Metadata: "message.proto",
    253. }

    3、服务端实现grpc的接口

    1. package service
    2. import (
    3. "crawler/log"
    4. "crawler/proto"
    5. "io"
    6. )
    7. //实现grpc的接口
    8. type Service struct {
    9. }
    10. func (s *Service) SendStatus(stream proto.Trans_SendStatusServer) error {
    11. ctx := stream.Context()
    12. log.Debug("进入监听函数")
    13. for {
    14. log.Debug("循环监听请求")
    15. select {
    16. case <-ctx.Done():
    17. log.Info("收到客户端的context发出终止信号")
    18. return ctx.Err()
    19. default:
    20. //接受来自客户端发来的消息
    21. in,err:=stream.Recv()
    22. if err!=nil{
    23. if err==io.EOF{
    24. log.Debug("客户端发送的数据流结束,%s",err.Error())
    25. }else{
    26. log.Error("接受客户端信息出错,%s",err.Error())
    27. //推出连接
    28. return err
    29. }
    30. }else{
    31. log.Debug("来自客户端的信息,%s",in.GetId())
    32. stream.Send(&proto.Order{Status:proto.Status_END})
    33. }
    34. }
    35. }
    36. return nil
    37. }

    4、客户端实现

    1. package main
    2. import (
    3. ctx "context"
    4. "crawler/log"
    5. "crawler/proto"
    6. "google.golang.org/grpc"
    7. "gopkg.in/urfave/cli.v2"
    8. "os"
    9. "time"
    10. )
    11. var client proto.TransClient
    12. var close = make(chan int)
    13. func main() {
    14. //设置日志
    15. log.SetConfig(log.DEBUG_L, 3, os.Stderr)
    16. app := cli.App{
    17. Name: "爬虫客户端,针对金联创的数据",
    18. Version: "1.0",
    19. Usage: "该客户端需要和管理端配合,由管理端调度和管理",
    20. Authors: []*cli.Author{{Name: "lhn", Email: "550124023@qq.com"}},
    21. Flags: []cli.Flag{
    22. &cli.StringFlag{
    23. Name: "server",
    24. Aliases: []string{"s"},
    25. Usage: "管理端的地址,使用grpc通讯,例如:127.0.0.1:8555",
    26. EnvVars: []string{"SERVER", "S"},
    27. Value: "127.0.0.1:8555",
    28. },
    29. },
    30. Commands: []*cli.Command{
    31. &cli.Command{
    32. Name: "send",
    33. Action: func(context *cli.Context) error {
    34. log.Info("输入参数:%s", context.Args().First())
    35. _ctx, cancel := ctx.WithTimeout(ctx.Background(), 10*time.Second)
    36. defer cancel()
    37. stream, err := client.SendStatus(_ctx)
    38. if err != nil {
    39. log.Error("%s", err.Error())
    40. return err
    41. }
    42. stream.Send(&proto.RunMessage{Status: proto.Status_END, Id: "fff"})
    43. return nil
    44. },
    45. },
    46. },
    47. Action: func(context *cli.Context) error {
    48. log.Debug("测试")
    49. var opts = []grpc.DialOption{grpc.WithInsecure()}
    50. log.Info("服务器地址:%s", context.String("server"))
    51. conn, err := grpc.Dial(context.String("server"), opts...)
    52. if err != nil {
    53. log.Fatal("连接服务器失败")
    54. }
    55. defer conn.Close()
    56. client = proto.NewTransClient(conn)
    57. _ctx := ctx.Background()//ctx.WithTimeout(ctx.Background(), 10*time.Hour)
    58. //defer cancel()
    59. stream, err := client.SendStatus(_ctx)
    60. if err != nil {
    61. log.Error("发送消息错误%s", err.Error())
    62. return err
    63. }
    64. go func() {
    65. //必须要开一个runtime来接受消息,要处服务端发送过来没回应的话就会卡住,除非是客户端主动断开,消息才会被消化掉
    66. for {
    67. msg, err := stream.Recv()
    68. if err != nil {
    69. log.Error("接受发生错误,%s", err.Error())
    70. } else {
    71. log.Debug("接受到消息,%s", msg.Status)
    72. }
    73. }
    74. }()
    75. for {
    76. log.Debug("发送消息")
    77. //循环发送消息到服务端
    78. err = stream.Send(&proto.RunMessage{Status: proto.Status_END, Id: "fff"})
    79. if err != nil {
    80. log.Error("发送消息错误:%s", err.Error())
    81. }
    82. time.Sleep(1 * time.Millisecond)
    83. }
    84. return nil
    85. },
    86. }
    87. app.Run(os.Args)
    88. }