package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"grpc_test_stream/pb/person"
"net"
"strconv"
"time"
)
type Sever struct {
person.UnimplementedSearchServiceServer
}
func (Sever) Search(ctx context.Context, req *person.PersonReq) (*person.PersonRes, error) {
name := req.GetName()
p := new(person.PersonRes)
p.Name = "我收到了来自" + name + "的请求"
fmt.Println(req)
return p, nil
}
func (Sever) SearchIn(stream_server person.SearchService_SearchInServer) error {
for {
recv, err := stream_server.Recv()
fmt.Println("读取到的内容是:", recv)
if err != nil {
err1 := stream_server.SendAndClose(&person.PersonRes{Name: "读取完成"})
if err1 != nil {
fmt.Println(err1)
}
break
}
}
return nil
}
func (Sever) SearchOut(req *person.PersonReq, server person.SearchService_SearchOutServer) error {
name := req.Name
for i := 0; i < 10; i++ {
server.Send(&person.PersonRes{Name: name + "服务端收到了请求" + strconv.Itoa(i)})
time.Sleep(1 * time.Second)
}
fmt.Println("服务端响应完成")
return nil
}
func (Sever) SearchAll(server person.SearchService_SearchAllServer) error {
c := make(chan string)
//接收流式请求
go func() {
for i := 0; i <= 10; i++ {
recv, err := server.Recv()
if err != nil {
c <- "over"
break
}
c <- recv.Name
}
}()
//发送请求
for {
s := <-c
if s == "over" {
break
}
server.Send(&person.PersonRes{Name: "已接收到:" + s})
}
return nil
}
func main() {
listener, err := net.Listen("tcp", ":8888")
if err != nil {
fmt.Println(err)
fmt.Println("监听失败1")
}
grpc_server := grpc.NewServer()
person.RegisterSearchServiceServer(grpc_server, &Sever{})
err = grpc_server.Serve(listener)
if err != nil {
fmt.Println(err)
fmt.Println("监听失败2")
}
fmt.Println("服务端启动成功")
}
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"grpc_test_stream/pb/person"
"strconv"
"sync"
"time"
)
func main() {
conn, err := grpc.Dial(":8888", grpc.WithTransportCredentials(insecure.NewCredentials()))
defer conn.Close()
if err != nil {
fmt.Println("客户端启用失败")
fmt.Println(err)
}
client := person.NewSearchServiceClient(conn)
fmt.Println("客户端启动成功")
//personRes1, err := client.Search(context.Background(), &person.PersonReq{
// Name: "黄宽",
// Age: 18,
//})
//fmt.Println(personRes1.GetAge(), personRes1.GetName())
//流式传入。
//cin, err := client.SearchIn(context.Background())
//for i := 0; i < 10; i++ {
// cin.Send(&person.PersonReq{Name: "传入消息" + strconv.Itoa(i)})
// time.Sleep(1 * time.Second)
//}
//res, err := cin.CloseAndRecv()
//if err != nil {
// fmt.Println(err)
//}
//流式接收
//out, err := client.SearchOut(context.Background(), &person.PersonReq{Name: "黄宽1"})
//if err != nil {
// fmt.Println(err)
//}
//for {
// recv, err := out.Recv()
// if err != nil {
// fmt.Println(err)
// break
// }
// fmt.Println(recv)
//}
//双向流
all, err := client.SearchAll(context.Background())
if err != nil {
fmt.Print(err)
}
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
for i := 0; i <= 10; i++ {
all.Send(&person.PersonReq{Name: "黄宽" + strconv.Itoa(i)})
time.Sleep(1 * time.Second)
}
wg.Done()
}()
go func() {
for {
recv, err := all.Recv()
if nil != err {
fmt.Println(err)
wg.Done()
break
}
fmt.Println(recv.GetName())
}
}()
wg.Wait()
fmt.Println("客户端完成")
}
syntax = "proto3";
package person;
option go_package = "./;person";
message PersonReq {
string name = 1;
int32 age = 2;
}
message PersonRes {
string name = 1;
int32 age = 2;
}
service SearchService {
rpc Search(PersonReq) returns (PersonRes);//传统的即刻响应
rpc SearchIn(stream PersonReq) returns (PersonRes);//请求为流
rpc SearchOut(PersonReq) returns (stream PersonRes);//响应为流
rpc SearchAll(stream PersonReq) returns (stream PersonRes);//均为流
}