ProtobufRpcEngine目前是作为 Hadoop RPC 引擎唯一的实现方式;WritableRpcEngine在 3.2.1 版本已经标识为废弃,但依旧是默认实现的 RPC 引擎

和 WritableRpcEngine 相比,ProtobufRpcEngine 的使用要麻烦的多,主要是在接口的定义和实现的写法上有区别

定义协议

定义Proto协议

根据 Protocol 的语法,定义一个服务MetaInfo,通过该服务的getMetaInfo接口,可以获取到元数据的信息

  1. /**
  2. * 生成java代码指令
  3. * protoc --java_out=. CustomProtocol.proto
  4. *
  5. */
  6. option java_package = "org.apache.hadoop.rpc.protobuf";
  7. option java_outer_classname = "CustomProtos";
  8. option java_generic_services = true;
  9. option java_generate_equals_and_hash = true;
  10. package hadoop.common;
  11. service MetaInfo {
  12. rpc getMetaInfo(GetMetaInfoRequestProto) returns (GetMetaInfoResponseProto);
  13. }
  14. message GetMetaInfoRequestProto {
  15. required string path = 1;
  16. }
  17. message GetMetaInfoResponseProto {
  18. required string info = 1;
  19. }

[

](https://blog.csdn.net/zhanglong_4444/article/details/105630023)

根据定义好的proto协议生成java类,导入项目

将定义好的协议保存成文件,命名为 CustomProtocol.proto 。在该文件的目录下执行命令

  1. MacBook-Pro:proto sysadmin$ ls -l
  2. total 8
  3. -rw-r--r--@ 1 sysadmin staff 526 4 20 09:56 CustomProtocol.proto
  4. MacBook-Pro:proto sysadmin$
  5. MacBook-Pro:proto sysadmin$
  6. MacBook-Pro:proto sysadmin$ protoc --java_out=. CustomProtocol.proto
  7. MacBook-Pro:proto sysadmin$
  8. MacBook-Pro:proto sysadmin$
  9. MacBook-Pro:proto sysadmin$ ls -l
  10. total 8
  11. -rw-r--r--@ 1 sysadmin staff 526 4 20 09:56 CustomProtocol.proto
  12. drwxr-xr-x 3 sysadmin staff 96 4 20 11:53 org
  13. MacBook-Pro:proto sysadmin$
  14. MacBook-Pro:proto sysadmin$
  15. MacBook-Pro:proto sysadmin$
  16. MacBook-Pro:proto sysadmin$ cd org/apache/hadoop/rpc/protobuf
  17. MacBook-Pro:protobuf sysadmin$
  18. MacBook-Pro:protobuf sysadmin$ ls -l
  19. total 104
  20. -rw-r--r-- 1 sysadmin staff 49880 4 20 11:53 CustomProtos.java
  21. MacBook-Pro:protobuf sysadmin$
  22. MacBook-Pro:protobuf sysadmin$

这时在该目录下,会有一个文件目录生成,层级为我们定义好的org.apache.hadoop.rpc.protobuf路径,将里面生成的 java 文件 CustomProtos.java 导入到项目中,存放路径与定义好的路径一样
image.png

根据定义好的proto协议定义接口

创建一个接口,集成生成JAVA类中的CustomProtos.MetaInfo.BlockingInterface接口

  1. // 接口信息供server和client端使用
  2. @ProtocolInfo(
  3. protocolName = "org.apache.hadoop.rpc.CustomProtos$MetaInfoProtocol",
  4. protocolVersion = 1)
  5. public interface MetaInfoProtocol
  6. extends CustomProtos.MetaInfo.BlockingInterface {
  7. }

实现接口协议

创建一个类,实现刚刚定义好的MetaInfoProtocol接口,并且实现里面的方法。这个需要自己去写代码逻辑。注意返回值和返回对象的写法不太一样

  1. // 实现类
  2. public static class MetaInfoServer implements MetaInfoProtocol {
  3. @Override
  4. public CustomProtos.GetMetaInfoResponseProto getMetaInfo(RpcController controller,
  5. CustomProtos.GetMetaInfoRequestProto request) throws
  6. ServiceException {
  7. //获取请求参数
  8. final String path = request.getPath();
  9. return CustomProtos.GetMetaInfoResponseProto.newBuilder().setInfo(path + ":3 - {BLOCK_1,BLOCK_2,BLOCK_3....").build();
  10. }
  11. }

创建Server服务,注册协议,并启动RPC服务

  1. public static void main(String[] args) throws Exception{
  2. //1. 构建配置对象
  3. Configuration conf = new Configuration();
  4. //2. 协议对象的实例
  5. MetaInfoServer serverImpl = new MetaInfoServer();
  6. BlockingService blockingService =
  7. CustomProtos.MetaInfo.newReflectiveBlockingService(serverImpl);
  8. //3. 设置协议的RpcEngine为ProtobufRpcEngine
  9. RPC.setProtocolEngine(conf, MetaInfoProtocol.class,
  10. ProtobufRpcEngine.class);
  11. //4. 构建RPC框架
  12. RPC.Builder builder = new RPC.Builder(conf);
  13. //5. 绑定地址
  14. builder.setBindAddress("localhost");
  15. //6. 绑定端口
  16. builder.setPort(7777);
  17. //7. 绑定协议
  18. builder.setProtocol(MetaInfoProtocol.class);
  19. //8. 调用协议实现类
  20. builder.setInstance(blockingService);
  21. //9. 创建服务
  22. RPC.Server server = builder.build();
  23. //10. 启动服务
  24. server.start();
  25. }

创建Client服务,请求数据接口

  1. public static void main(String[] args) throws Exception {
  2. //1. 构建配置对象
  3. Configuration conf = new Configuration();
  4. //2. 设置协议的RpcEngine为ProtobufRpcEngine
  5. RPC.setProtocolEngine(conf, Server.MetaInfoProtocol.class,
  6. ProtobufRpcEngine.class);
  7. //3. 拿到RPC协议
  8. Server.MetaInfoProtocol proxy = RPC.getProxy(Server.MetaInfoProtocol.class, 1L,
  9. new InetSocketAddress("localhost", 7777), conf);
  10. //4. 发送请求
  11. CustomProtos.GetMetaInfoRequestProto obj =
  12. CustomProtos.GetMetaInfoRequestProto.newBuilder().setPath("/meta").build();
  13. CustomProtos.GetMetaInfoResponseProto metaData = proxy.getMetaInfo(null, obj);
  14. //5. 打印元数据
  15. System.out.println(metaData.getInfo());
  16. }