一. 摘要
此篇文章将用实例介绍grpc四种服务类型中的双向流式通信,模拟一个简单的聊天室功能。
二. 实践
整体项目如下:
其中cloud-grpc-java为maven项目,cloud-grpc-protos为定义接口项目。
1.通过protobuf定义接口和数据类型
在cloud-grpc-protos文件夹下创建doubleWayStream.proto,内容如下:
syntax = "proto3";option go_package = "pbfs/double_way_stream";option java_multiple_files = true;option java_package = "com.cloud.grpc.doubleWayStream";option java_outer_classname = "DoubleWayStreamProto";option objc_class_prefix = "DWS";package double_way_stream;//双向流式service DoubleWayStreamService{rpc DoubleWayStreamFun(stream RequestMessage) returns (stream ResponseMessage){}}message RequestMessage{string req_msg = 1;}message ResponseMessage{string rsp_msg = 1;}
以上,一个 简单 的双向流式RPC
2.maven 配置
创建一个如上图(cloud-grpc-java)的maven项目,pom.xml加入grpc开发相关配置,如下:
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.cloud.grpc</groupId><artifactId>java</artifactId><version>1.0.0.0</version><name>java</name><description>Demo project for grpc for java</description><properties><java.version>11</java.version><grpc.version>1.29.0</grpc.version><protobuf.version>3.11.0</protobuf.version></properties><dependencies><dependency><groupId>io.grpc</groupId><artifactId>grpc-netty-shaded</artifactId><version>${grpc.version}</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-protobuf</artifactId><version>${grpc.version}</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-stub</artifactId><version>${grpc.version}</version></dependency><dependency> <!-- necessary for Java 9+ --><groupId>org.apache.tomcat</groupId><artifactId>annotations-api</artifactId><version>6.0.53</version><scope>provided</scope></dependency></dependencies><build><extensions><extension><groupId>kr.motd.maven</groupId><artifactId>os-maven-plugin</artifactId><version>1.5.0.Final</version></extension></extensions><plugins><plugin><groupId>org.xolstice.maven.plugins</groupId><artifactId>protobuf-maven-plugin</artifactId><version>0.5.1</version><configuration><protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact><pluginId>grpc-java</pluginId><pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact><protoSourceRoot>../cloud-grpc-protos</protoSourceRoot></configuration><executions><execution><goals><goal>compile</goal><goal>compile-custom</goal></goals></execution></executions></plugin></plugins></build></project>
说明:
3.生成grpc,protobuf相关类
mvn protobuf:compilemvn protobuf:compile-custom
4.编写服务端,并运行
服务端源码如图:
首先实现proto文件中定义的rpc DoubleWayStream 的服务端流式响应接口,DoubleWayStreamIml.java如下
package com.cloud.grpc.java.doubleWayStream.server;import com.cloud.grpc.doubleWayStream.DoubleWayStreamServiceGrpc;import com.cloud.grpc.doubleWayStream.RequestMessage;import com.cloud.grpc.doubleWayStream.ResponseMessage;import io.grpc.stub.StreamObserver;public class DoubleWayStreamIml extends DoubleWayStreamServiceGrpc.DoubleWayStreamServiceImplBase {//声明此服务端流响应,为了后面通过控制台向后端发送消息private StreamObserver<com.cloud.grpc.doubleWayStream.ResponseMessage> responseOb;@Overridepublic StreamObserver<com.cloud.grpc.doubleWayStream.RequestMessage> doubleWayStreamFun(StreamObserver<com.cloud.grpc.doubleWayStream.ResponseMessage> responseObserver) {this.responseOb=responseObserver;return new StreamObserver<RequestMessage>() {@Overridepublic void onNext(RequestMessage requestMessage) {System.out.println("[收到客户端消息]: " + requestMessage.getReqMsg());responseObserver.onNext(ResponseMessage.newBuilder().setRspMsg("hello client ,I'm Java grpc Server,your message '" + requestMessage.getReqMsg() + "'").build());}@Overridepublic void onError(Throwable throwable) {throwable.fillInStackTrace();}@Overridepublic void onCompleted() {responseObserver.onCompleted();}};}public StreamObserver<ResponseMessage> getResponseOb() {return responseOb;}}
如上图, private StreamObserver
编码服务端启动类DoubleWayStreamServer.java,如下
package com.cloud.grpc.java.doubleWayStream.server;import com.cloud.grpc.doubleWayStream.ResponseMessage;import io.grpc.Server;import io.grpc.ServerBuilder;import java.io.IOException;import java.util.Scanner;public class DoubleWayStreamServer {public static void main(String[] args) {ServerBuilder<?> serverBuilder = ServerBuilder.forPort(8899);DoubleWayStreamIml doubleWayStreamIml=new DoubleWayStreamIml();serverBuilder.addService(doubleWayStreamIml);Server server = serverBuilder.build();try {server.start();//开启线程向客户端输入new Thread(new Runnable() {@Overridepublic void run() {Scanner scanner=new Scanner(System.in);for (;true;){String str=scanner.nextLine();if(str.equals("EOF")){break;}try {doubleWayStreamIml.getResponseOb().onNext(ResponseMessage.newBuilder().setRspMsg(str).build());}catch (Exception e){System.out.println("【异常】:没有客户端连接...");//一般客户端链接失败就会断开e.printStackTrace();}}}}).start();server.awaitTermination();} catch (IOException | InterruptedException e) {e.printStackTrace();}}}
如上图所示,开启一个新的线程,为了向客户端推送消息。
通过main函数启动,此时我们向控制台输入一条消息,结果如下:
可以看到,由于我们没有客户端与其建立连接,所以会在控制台打印:没有客户连接
5.编写,运行客户端
编写DoubleWayStreamClient.java,如下:
package com.cloud.grpc.java.doubleWayStream.client;import com.cloud.grpc.doubleWayStream.DoubleWayStreamServiceGrpc;import com.cloud.grpc.doubleWayStream.RequestMessage;import com.cloud.grpc.doubleWayStream.ResponseMessage;import io.grpc.ManagedChannel;import io.grpc.ManagedChannelBuilder;import io.grpc.stub.StreamObserver;import java.util.Scanner;public class DoubleWayStreamClient {public static void main(String[] args) {//使用usePlaintext,否则使用加密连接ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress("localhost", 8899).usePlaintext();ManagedChannel channel = channelBuilder.build();StreamObserver<RequestMessage> requestObserver = DoubleWayStreamServiceGrpc.newStub(channel).doubleWayStreamFun(new StreamObserver<ResponseMessage>() {@Overridepublic void onNext(ResponseMessage value) {System.out.println("[收到服务端发来] : " + value.getRspMsg());}@Overridepublic void onError(Throwable t) {}@Overridepublic void onCompleted() {}});Scanner scanner = new Scanner(System.in);for (; true; ) {String str= scanner.nextLine();if(str.equals("EOF")){requestObserver.onCompleted();break;}try {requestObserver.onNext(RequestMessage.newBuilder().setReqMsg(scanner.nextLine()).build());} catch (Exception e) {e.printStackTrace();}}}}
在上面源码中,我们实现了proto文件中rpc方法接口中的客户端流,在next方法中打印服务端的信息,执行main函数,在控制台输入”你好,我是客户端,我上线了”,
可见,服务端作出了响应,现在我们切换到服务端,让服务端想客户端发送消息,如下:
再看看客户端:
可见服务端可以向客户端发送消息。这样便实现了一个极其简单的聊天室。
