1 stream.proto
syntax = "proto3";
option go_package="./;proto";
message StreamReqData {
string data = 1;
}
message StreamResData {
string data = 1;
}
service Greeter {
// 服务端流模式, 客户端发一次请求, 服务端源源不断地推送数据
rpc GetStream(StreamReqData) returns (stream StreamResData);
// 客户端流模式
rpc PutStream(stream StreamReqData) returns (StreamResData);
// 双向流模式
rpc AllStream(stream StreamReqData) returns (stream StreamResData);
}
protoc -I . stream.proto —go_out=plugins=grpc:.
2 server.go
package main
import (
"fmt"
"google.golang.org/grpc"
"net"
"stream_grpc_test/proto"
"sync"
"time"
)
const PORT = ":50052"
type server struct{}
func (s *server) GetStream(req *proto.StreamReqData, res proto.Greeter_GetStreamServer) error {
i := 0
for {
i++
_ = res.Send(&proto.StreamResData{
Data: fmt.Sprintf("%v", time.Now().Unix()),
})
time.Sleep(time.Second)
if i > 10 {
break
}
}
return nil
}
func (s *server) PutStream(cliStr proto.Greeter_PutStreamServer) error {
for {
a, err := cliStr.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println(a.Data)
}
return nil
}
func (s *server) AllStream(allStr proto.Greeter_AllStreamServer) error {
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for {
data, _ := allStr.Recv()
fmt.Println("收到客户端消息:" + data.Data)
}
}()
go func() {
defer wg.Done()
for {
_ = allStr.Send(&proto.StreamResData{Data: "我是服务器"})
time.Sleep(time.Second)
}
}()
wg.Wait()
return nil
}
func main() {
lis, err := net.Listen("tcp", PORT)
if err != nil {
panic(err)
}
s := grpc.NewServer()
proto.RegisterGreeterServer(s, &server{})
err = s.Serve(lis)
if err != nil {
panic(err)
}
}
3 client.go
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"stream_grpc_test/proto"
"sync"
"time"
)
func main() {
conn, err := grpc.Dial(":50052", grpc.WithInsecure())
if err != nil {
panic(err)
}
defer conn.Close()
c := proto.NewGreeterClient(conn)
// 服务端流模式
res, _ := c.GetStream(context.Background(), &proto.StreamReqData{Data: "慕课网"})
for {
a, err := res.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println(a)
}
// 客户端流模式
putS, _ := c.PutStream(context.Background())
i := 0
for {
i++
putS.Send(&proto.StreamReqData{
Data: fmt.Sprintf("慕课网%d", i),
})
time.Sleep(time.Second)
if i > 10 {
break
}
}
// 双向流模式
allStr, _ := c.AllStream(context.Background())
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for {
data, _ := allStr.Recv()
fmt.Println("收到客户端消息:" + data.Data)
}
}()
go func() {
defer wg.Done()
for {
_ = allStr.Send(&proto.StreamReqData{Data: "慕课网"})
time.Sleep(time.Second)
}
}()
wg.Wait()
}