概述

RpcEngineProtobufRpcEngineWritableRpcEngine(已过时)的父类,在hadoop的RPC通讯的体系中扮演着很重要的角色

RpcEngine的重要性在于,正常的RPC请求分为三个部分:通讯协议、Client端和Server端。通讯协议是Client和Server端沟通的桥梁

在Client是通过RpcEngine.getProxy方法获取Server的代理对象;在Server端是通过getServer方法获取Server端的实例。所以RpcEngine是Hadoop RPC通讯体系中非常重要的父类定义

  • getServer:Server端获取RPC.Server的实例

  • getProxy:Client端获取RPC.Server的实例

  • getProtocolMetaInfoProxy:根据给定的connection id获取ProtocolMetaInfoPB代理对象

getServer解析

RpcEngine只有一个getServer方法获取RPC.Server的实例

  1. /**
  2. *
  3. * 该方法用于产生一个RPC Server对象,服务器会启动这个Server对象监听从客户端发来的请求
  4. * 成功从网络接收请求数据后,
  5. * Server对象会调用Rpclnvoker(在RpcEngine的实现类中定义)对象处理这个请求
  6. *
  7. * Construct a server for a protocol implementation instance.
  8. *
  9. * @param protocol the class of protocol to use
  10. * @param instance the instance of protocol whose methods will be called
  11. * @param conf the configuration to use
  12. * @param bindAddress the address to bind on to listen for connection
  13. * @param port the port to listen for connections on
  14. * @param numHandlers the number of method handler threads to run
  15. * @param numReaders the number of reader threads to run
  16. * @param queueSizePerHandler the size of the queue per hander thread
  17. * @param verbose whether each call should be logged
  18. * @param secretManager The secret manager to use to validate incoming requests.
  19. * @param portRangeConfig A config parameter that can be used to restrict
  20. * the range of ports used when port is 0 (an ephemeral port)
  21. * @param alignmentContext provides server state info on client responses
  22. * @return The Server instance
  23. * @throws IOException on any error
  24. */
  25. RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress,
  26. int port, int numHandlers, int numReaders,
  27. int queueSizePerHandler, boolean verbose,
  28. Configuration conf,
  29. SecretManager<? extends TokenIdentifier> secretManager,
  30. String portRangeConfig,
  31. AlignmentContext alignmentContext) throws IOException;


getProxy解析

RpcEngine有两个getProxy方法获取ProtocolProxy的实例

  1. /**
  2. * 客户端会调用RpcEngine.getProxy()方法获取一个本地接口的代理对象,
  3. * 然后在这个代理对象上调用本地接口的方法
  4. *
  5. * getProxy()方法的实现采用了Java动态代理机制
  6. *
  7. * 客户端调用程序在代理对象上的调用会由一个
  8. * RpcInvocationHandler(java.lang.reflect.InvocationHandler的子类,
  9. * 在RpcEngine的实现类中定义)对象处理,
  10. * 这个RpcInvocationHandler会将请求序列化(使用RpcEngine实现类定义的序列化方式)
  11. * 并调用Client.call()方法将请求发送到远程服务器
  12. *
  13. * 当远程服务器发回响应信息后,RpcInvocationHandler会将响应信息反序列化并返回给调用程序,
  14. * 这一切通过Java动态代理机制对于调用程序是完全透明的,就像本地调用一样
  15. *
  16. * @param protocol
  17. * @param clientVersion
  18. * @param addr
  19. * @param ticket
  20. * @param conf
  21. * @param factory
  22. * @param rpcTimeout
  23. * @param connectionRetryPolicy
  24. * @param <T>
  25. * @return
  26. * @throws IOException
  27. */
  28. <T> ProtocolProxy<T> getProxy(Class<T> protocol,
  29. long clientVersion, InetSocketAddress addr,
  30. UserGroupInformation ticket, Configuration conf,
  31. SocketFactory factory, int rpcTimeout,
  32. RetryPolicy connectionRetryPolicy) throws IOException;
  33. /** Construct a client-side proxy object. */
  34. <T> ProtocolProxy<T> getProxy(Class<T> protocol,
  35. long clientVersion, InetSocketAddress addr,
  36. UserGroupInformation ticket, Configuration conf,
  37. SocketFactory factory, int rpcTimeout,
  38. RetryPolicy connectionRetryPolicy,
  39. AtomicBoolean fallbackToSimpleAuth,
  40. AlignmentContext alignmentContext) throws IOException;

代理类在程序运行时创建的代理方式被成为动态代理。 关于动态代理,代理类并不是在Java代码中定义的,而是在运行时根据我们在Java代码中的“指示”动态生成的。相比于静态代理, 动态代理的优势在于可以很方便的对代理类的函数进行统一的处理,而不用修改每个代理类中的方法

getProtocolMetaInfoProxy

根据给定的connection id获取ProtocolMetaInfoPB代理对象

  1. /**
  2. * Returns a proxy for ProtocolMetaInfoPB, which uses the given connection id.
  3. * @param connId, ConnectionId to be used for the proxy.
  4. * @param conf, Configuration.
  5. * @param factory, Socket factory.
  6. * @return Proxy object.
  7. * @throws IOException
  8. */
  9. ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
  10. ConnectionId connId, Configuration conf, SocketFactory factory)
  11. throws IOException;