一. 摘要

此篇文章将用实例介绍grpc四种服务类型中的双向流式通信,模拟一个简单的聊天室功能。

二. 实践

整体项目如下:
image.png
其中cloud-grpc-java为maven项目,cloud-grpc-protos为定义接口项目。

1.通过protobuf定义接口和数据类型

在cloud-grpc-protos文件夹下创建doubleWayStream.proto,内容如下:

  1. syntax = "proto3";
  2. option go_package = "pbfs/double_way_stream";
  3. option java_multiple_files = true;
  4. option java_package = "com.cloud.grpc.doubleWayStream";
  5. option java_outer_classname = "DoubleWayStreamProto";
  6. option objc_class_prefix = "DWS";
  7. package double_way_stream;
  8. //双向流式
  9. service DoubleWayStreamService{
  10. rpc DoubleWayStreamFun(stream RequestMessage) returns (stream ResponseMessage){}
  11. }
  12. message RequestMessage{
  13. string req_msg = 1;
  14. }
  15. message ResponseMessage{
  16. string rsp_msg = 1;
  17. }

以上,一个 简单 的双向流式RPC

2.maven 配置

创建一个如上图(cloud-grpc-java)的maven项目,pom.xml加入grpc开发相关配置,如下:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <groupId>com.cloud.grpc</groupId>
  6. <artifactId>java</artifactId>
  7. <version>1.0.0.0</version>
  8. <name>java</name>
  9. <description>Demo project for grpc for java</description>
  10. <properties>
  11. <java.version>11</java.version>
  12. <grpc.version>1.29.0</grpc.version>
  13. <protobuf.version>3.11.0</protobuf.version>
  14. </properties>
  15. <dependencies>
  16. <dependency>
  17. <groupId>io.grpc</groupId>
  18. <artifactId>grpc-netty-shaded</artifactId>
  19. <version>${grpc.version}</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>io.grpc</groupId>
  23. <artifactId>grpc-protobuf</artifactId>
  24. <version>${grpc.version}</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>io.grpc</groupId>
  28. <artifactId>grpc-stub</artifactId>
  29. <version>${grpc.version}</version>
  30. </dependency>
  31. <dependency> <!-- necessary for Java 9+ -->
  32. <groupId>org.apache.tomcat</groupId>
  33. <artifactId>annotations-api</artifactId>
  34. <version>6.0.53</version>
  35. <scope>provided</scope>
  36. </dependency>
  37. </dependencies>
  38. <build>
  39. <extensions>
  40. <extension>
  41. <groupId>kr.motd.maven</groupId>
  42. <artifactId>os-maven-plugin</artifactId>
  43. <version>1.5.0.Final</version>
  44. </extension>
  45. </extensions>
  46. <plugins>
  47. <plugin>
  48. <groupId>org.xolstice.maven.plugins</groupId>
  49. <artifactId>protobuf-maven-plugin</artifactId>
  50. <version>0.5.1</version>
  51. <configuration>
  52. <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
  53. <pluginId>grpc-java</pluginId>
  54. <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
  55. <protoSourceRoot>../cloud-grpc-protos</protoSourceRoot>
  56. </configuration>
  57. <executions>
  58. <execution>
  59. <goals>
  60. <goal>compile</goal>
  61. <goal>compile-custom</goal>
  62. </goals>
  63. </execution>
  64. </executions>
  65. </plugin>
  66. </plugins>
  67. </build>
  68. </project>

说明: ../cloud-grpc-protos 红色标注部分需要根据pom.xml与上面创建proto所在路径相匹配,这样protobuf 插件才会根据此目录找到定义的proto文件生成相关代码。

3.生成grpc,protobuf相关类

  1. mvn protobuf:compile
  2. mvn protobuf:compile-custom

同步pom.xml,即可在target下生成如下源码:
image.png

4.编写服务端,并运行

服务端源码如图:
image.png
首先实现proto文件中定义的rpc DoubleWayStream 的服务端流式响应接口,DoubleWayStreamIml.java如下

  1. package com.cloud.grpc.java.doubleWayStream.server;
  2. import com.cloud.grpc.doubleWayStream.DoubleWayStreamServiceGrpc;
  3. import com.cloud.grpc.doubleWayStream.RequestMessage;
  4. import com.cloud.grpc.doubleWayStream.ResponseMessage;
  5. import io.grpc.stub.StreamObserver;
  6. public class DoubleWayStreamIml extends DoubleWayStreamServiceGrpc.DoubleWayStreamServiceImplBase {
  7. //声明此服务端流响应,为了后面通过控制台向后端发送消息
  8. private StreamObserver<com.cloud.grpc.doubleWayStream.ResponseMessage> responseOb;
  9. @Override
  10. public StreamObserver<com.cloud.grpc.doubleWayStream.RequestMessage> doubleWayStreamFun(StreamObserver<com.cloud.grpc.doubleWayStream.ResponseMessage> responseObserver) {
  11. this.responseOb=responseObserver;
  12. return new StreamObserver<RequestMessage>() {
  13. @Override
  14. public void onNext(RequestMessage requestMessage) {
  15. System.out.println("[收到客户端消息]: " + requestMessage.getReqMsg());
  16. responseObserver.onNext(ResponseMessage.newBuilder().setRspMsg("hello client ,I'm Java grpc Server,your message '" + requestMessage.getReqMsg() + "'").build());
  17. }
  18. @Override
  19. public void onError(Throwable throwable) {
  20. throwable.fillInStackTrace();
  21. }
  22. @Override
  23. public void onCompleted() {
  24. responseObserver.onCompleted();
  25. }
  26. };
  27. }
  28. public StreamObserver<ResponseMessage> getResponseOb() {
  29. return responseOb;
  30. }
  31. }

如上图, private StreamObserver responseOb 声明此服务端流响应,为了后面通过控制台向后端发送消息。
编码服务端启动类DoubleWayStreamServer.java,如下

  1. package com.cloud.grpc.java.doubleWayStream.server;
  2. import com.cloud.grpc.doubleWayStream.ResponseMessage;
  3. import io.grpc.Server;
  4. import io.grpc.ServerBuilder;
  5. import java.io.IOException;
  6. import java.util.Scanner;
  7. public class DoubleWayStreamServer {
  8. public static void main(String[] args) {
  9. ServerBuilder<?> serverBuilder = ServerBuilder.forPort(8899);
  10. DoubleWayStreamIml doubleWayStreamIml=new DoubleWayStreamIml();
  11. serverBuilder.addService(doubleWayStreamIml);
  12. Server server = serverBuilder.build();
  13. try {
  14. server.start();
  15. //开启线程向客户端输入
  16. new Thread(new Runnable() {
  17. @Override
  18. public void run() {
  19. Scanner scanner=new Scanner(System.in);
  20. for (;true;){
  21. String str=scanner.nextLine();
  22. if(str.equals("EOF")){
  23. break;
  24. }
  25. try {
  26. doubleWayStreamIml.getResponseOb().onNext(ResponseMessage.newBuilder().setRspMsg(str).build());
  27. }catch (Exception e){
  28. System.out.println("【异常】:没有客户端连接...");
  29. //一般客户端链接失败就会断开
  30. e.printStackTrace();
  31. }
  32. }
  33. }
  34. }).start();
  35. server.awaitTermination();
  36. } catch (IOException | InterruptedException e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. }

如上图所示,开启一个新的线程,为了向客户端推送消息。
通过main函数启动,此时我们向控制台输入一条消息,结果如下:
image.png
可以看到,由于我们没有客户端与其建立连接,所以会在控制台打印:没有客户连接

5.编写,运行客户端

编写DoubleWayStreamClient.java,如下:

  1. package com.cloud.grpc.java.doubleWayStream.client;
  2. import com.cloud.grpc.doubleWayStream.DoubleWayStreamServiceGrpc;
  3. import com.cloud.grpc.doubleWayStream.RequestMessage;
  4. import com.cloud.grpc.doubleWayStream.ResponseMessage;
  5. import io.grpc.ManagedChannel;
  6. import io.grpc.ManagedChannelBuilder;
  7. import io.grpc.stub.StreamObserver;
  8. import java.util.Scanner;
  9. public class DoubleWayStreamClient {
  10. public static void main(String[] args) {
  11. //使用usePlaintext,否则使用加密连接
  12. ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress("localhost", 8899).usePlaintext();
  13. ManagedChannel channel = channelBuilder.build();
  14. StreamObserver<RequestMessage> requestObserver = DoubleWayStreamServiceGrpc.newStub(channel).doubleWayStreamFun(new StreamObserver<ResponseMessage>() {
  15. @Override
  16. public void onNext(ResponseMessage value) {
  17. System.out.println("[收到服务端发来] : " + value.getRspMsg());
  18. }
  19. @Override
  20. public void onError(Throwable t) {
  21. }
  22. @Override
  23. public void onCompleted() {
  24. }
  25. });
  26. Scanner scanner = new Scanner(System.in);
  27. for (; true; ) {
  28. String str= scanner.nextLine();
  29. if(str.equals("EOF")){
  30. requestObserver.onCompleted();
  31. break;
  32. }
  33. try {
  34. requestObserver.onNext(RequestMessage.newBuilder().setReqMsg(scanner.nextLine()).build());
  35. } catch (Exception e) {
  36. e.printStackTrace();
  37. }
  38. }
  39. }
  40. }

在上面源码中,我们实现了proto文件中rpc方法接口中的客户端流,在next方法中打印服务端的信息,执行main函数,在控制台输入”你好,我是客户端,我上线了”,
image.png
可见,服务端作出了响应,现在我们切换到服务端,让服务端想客户端发送消息,如下:
image.png
再看看客户端:
image.png
可见服务端可以向客户端发送消息。这样便实现了一个极其简单的聊天室。