以 ProtobufRpcEngine 为实例

Client端架构

Client 类只有一个入口,就是 call() 方法。代理类会调用 Client.call() 方法将 RPC 请求发送到远程服务器,然后等待远程服务器的响应。如果远程服务器响应请求时出现异常,则在 call() 方法中抛出异常
image.png

基本流程:

  • 在 call 方法中先将远程调用信息封装成一个 Client.Call 对象(保存了完成标志、返回信息、异常信息等),然后得到 connection 对象用于管理 Client 与 Server 的 Socket 连接

  • getConnection 方法中通过 setupIOstreams 建立与 Server 的 socket 连接,启动 Connection 线程,监听 socket 读取 server 响应

  • call() 方法发送 RPC 请求

  • call() 方法调用 Call.wait() 在 Call 对象上等待 Server 响应信息

  • Connection 线程收到响应信息设置 Call 对象返回信息字段,并调用 Call.notify() 唤醒 call() 方法线程读取 Call 对象返回值

Client端创建流程

此处协议采用 proto,所以产生的 RpcEngine 是 ProtobufRpcEngine

  1. public static void main(String[] args) throws Exception {
  2. //1. 构建配置对象
  3. Configuration conf = new Configuration();
  4. //2. 设置协议的RpcEngine为ProtobufRpcEngine .
  5. RPC.setProtocolEngine(conf, Server.MetaInfoProtocol.class,
  6. ProtobufRpcEngine.class);
  7. //3. 拿到代理对象
  8. Server.MetaInfoProtocol proxy = RPC.getProxy(Server.MetaInfoProtocol.class, 1L,
  9. new InetSocketAddress("localhost", 7777), conf);
  10. //4. 构建发送请求对象
  11. CustomProtos.GetMetaInfoRequestProto obj = CustomProtos.GetMetaInfoRequestProto.newBuilder().setPath("/meta").build();
  12. //5. 将请求对象传入, 获取响应信息
  13. CustomProtos.GetMetaInfoResponseProto metaData = proxy.getMetaInfo(null, obj);
  14. //6. 输出数据
  15. System.out.println(metaData.getInfo());
  16. }

此处代码分为三个主要部分:

  • 构建配置对象并且设置 RpcEngine 引擎

  • 获取代理对象

  • 设置请求参数以及通过代理对象请求

  • 处理结果

第一条和第二条就是使用 proto 定义一个协议,绑定到 RPC.Builder 的实现对象里面

获取代理对象

  1. Server.MetaInfoProtocol proxy = RPC.getProxy(Server.MetaInfoProtocol.class, 1L,
  2. new InetSocketAddress("localhost", 7777), conf);

也是就是通过 RPC.getProxy 方法获取协议的代理对象

  1. /**
  2. * Construct a client-side proxy object with the default SocketFactory
  3. * @param <T>
  4. *
  5. * @param protocol 协议
  6. * @param clientVersion 客户端的版本
  7. * @param addr 请求地址
  8. * @param conf 配置文件
  9. * @return a proxy instance
  10. * @throws IOException
  11. */
  12. public static <T> T getProxy(Class<T> protocol,
  13. long clientVersion,
  14. InetSocketAddress addr, Configuration conf)
  15. throws IOException {
  16. return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
  17. }
  1. /**
  2. * Get a protocol proxy that contains a proxy connection to a remote server
  3. * and a set of methods that are supported by the server
  4. *
  5. * @param protocol protocol
  6. * @param clientVersion client's version
  7. * @param addr server address
  8. * @param ticket security ticket
  9. * @param conf configuration
  10. * @param factory socket factory
  11. * @param rpcTimeout max time for each rpc; 0 means no timeout
  12. * @param connectionRetryPolicy retry policy
  13. * @param fallbackToSimpleAuth set to true or false during calls to indicate if
  14. * a secure client falls back to simple auth
  15. * @return the proxy
  16. * @throws IOException if any error occurs
  17. */
  18. public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
  19. long clientVersion,
  20. InetSocketAddress addr,
  21. UserGroupInformation ticket,
  22. Configuration conf,
  23. SocketFactory factory,
  24. int rpcTimeout,
  25. RetryPolicy connectionRetryPolicy,
  26. AtomicBoolean fallbackToSimpleAuth)
  27. throws IOException {
  28. if (UserGroupInformation.isSecurityEnabled()) {
  29. SaslRpcServer.init(conf);
  30. }
  31. return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
  32. addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
  33. fallbackToSimpleAuth, null);
  34. }

核心代码为 getProtocolProxy 的返回值

  1. return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
  2. addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
  3. fallbackToSimpleAuth, null);

首先通过 getProtocolEngine 获取 RPC Engine —— ProtobufRpcEngine

  1. // return the RpcEngine configured to handle a protocol
  2. static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
  3. Configuration conf) {
  4. // 从缓存中获取RpcEngine ,
  5. // 这个是提前设置的
  6. // 通过RPC.setProtocolEngine(conf, MetaInfoProtocol.class,ProtobufRpcEngine.class);
  7. RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
  8. if (engine == null) {
  9. //通过这里 获取RpcEngine的实现类 , 这里我们获取的是 ProtobufRpcEngine.class
  10. Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
  11. WritableRpcEngine.class);
  12. // impl : org.apache.hadoop.ipc.ProtobufRpcEngine
  13. engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
  14. PROTOCOL_ENGINES.put(protocol, engine);
  15. }
  16. return engine;
  17. }

然后再调用 ProtobufRpcEngine 的 getProxy 方法,将协议、客户端的版本号、socket地址、ticket、配置文件、socket 的创建工厂对象(StandardSocketFactory)、PRC 服务的超时时间、connetion 的重试策略、以及权限等信息传入

  1. @Override
  2. @SuppressWarnings("unchecked")
  3. public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
  4. InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
  5. SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
  6. AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
  7. throws IOException {
  8. // 构造一个实现了 InvocationHandler 接口的invoker 对象
  9. final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
  10. rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth,
  11. alignmentContext);
  12. //然后调用Proxy.newProxylnstance()获取动态代理对象,并通过ProtocolProxy返回
  13. return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
  14. protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
  15. }


在 getProxy 这个方法中,主要是分两步:

  • 构造一个实现了 InvocationHandler 接口的 invoker 对象 (动态代理机制中的InvocationHandler对象会在invoke()方法中代理所有目标接口上的调用,用户可以在invoke()方法中添加代理操作

  • 调用 Proxy.newProxylnstance() 获取动态代理对象,并通过 ProtocolProxy 返回

Invoker 的创建

  1. private Invoker(Class<?> protocol, InetSocketAddress addr,
  2. UserGroupInformation ticket, Configuration conf, SocketFactory factory,
  3. int rpcTimeout, RetryPolicy connectionRetryPolicy,
  4. AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
  5. throws IOException {
  6. this(protocol,
  7. Client.ConnectionId.getConnectionId(addr, protocol, ticket, rpcTimeout,
  8. connectionRetryPolicy, conf), conf, factory);
  9. this.fallbackToSimpleAuth = fallbackToSimpleAuth;
  10. this.alignmentContext = alignmentContext;
  11. }

重点在于

  1. this(protocol,
  2. Client.ConnectionId.getConnectionId(addr, protocol, ticket, rpcTimeout,
  3. connectionRetryPolicy, conf), conf, factory);

首先来看

  1. Client.ConnectionId.getConnectionId(addr, protocol, ticket, rpcTimeout,
  2. connectionRetryPolicy, conf), conf, factory)

这里会调用 getConnectionId 方法构建一个 Client.ConnectionId 对象

ConnectionId:这个类持有请求地址和用户的 ticketclient 连接 server 的唯一凭证:[remoteAddress, protocol, ticket]

  1. /**
  2. * Returns a ConnectionId object.
  3. * @param addr Remote address for the connection.
  4. * @param protocol Protocol for RPC.
  5. * @param ticket UGI
  6. * @param rpcTimeout timeout
  7. * @param conf Configuration object
  8. * @return A ConnectionId instance
  9. * @throws IOException
  10. */
  11. static ConnectionId getConnectionId(InetSocketAddress addr,
  12. Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
  13. RetryPolicy connectionRetryPolicy, Configuration conf) throws IOException {
  14. //构建重试策略
  15. if (connectionRetryPolicy == null) {
  16. // 设置最大重试次数默认值: 10
  17. final int max = conf.getInt(
  18. CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
  19. CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT);
  20. // 设置重试间隔: 1 秒
  21. final int retryInterval = conf.getInt(
  22. CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY,
  23. CommonConfigurationKeysPublic
  24. .IPC_CLIENT_CONNECT_RETRY_INTERVAL_DEFAULT);
  25. // 创建重试策略实例 RetryUpToMaximumCountWithFixedSleep
  26. // 重试10次, 每次间隔1秒
  27. connectionRetryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
  28. max, retryInterval, TimeUnit.MILLISECONDS);
  29. }
  30. // 创建ConnectionId:
  31. // 这个类持有请求地址和用户的ticket
  32. // client 连接 server 的唯一凭证: [remoteAddress, protocol, ticket]
  33. return new ConnectionId(addr, protocol, ticket, rpcTimeout,
  34. connectionRetryPolicy, conf);
  35. }

在 getConnectionId 这个方法里面会做两件事:

  • 创建一个重试策略 RetryUpToMaximumCountWithFixedSleep
  • 构建一个 ConnectionId 对象

    1. ConnectionId(InetSocketAddress address, Class<?> protocol,
    2. UserGroupInformation ticket, int rpcTimeout,
    3. RetryPolicy connectionRetryPolicy, Configuration conf) {
    4. // 协议
    5. this.protocol = protocol;
    6. // 请求地址
    7. this.address = address;
    8. // 用户ticket
    9. this.ticket = ticket;
    10. // 设置超时时间
    11. this.rpcTimeout = rpcTimeout;
    12. // 设置重试策略 默认: 重试10次, 每次间隔1秒
    13. this.connectionRetryPolicy = connectionRetryPolicy;
    14. // 单位 10秒
    15. this.maxIdleTime = conf.getInt(
    16. CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
    17. CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT);
    18. // sasl client最大重试次数 5 次
    19. this.maxRetriesOnSasl = conf.getInt(
    20. CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY,
    21. CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT);
    22. //指示客户端将在套接字超时时进行重试的次数,以建立服务器连接。 默认值: 45
    23. this.maxRetriesOnSocketTimeouts = conf.getInt(
    24. CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
    25. CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
    26. //使用TCP_NODELAY标志绕过Nagle的算法传输延迟。 默认值: true
    27. this.tcpNoDelay = conf.getBoolean(
    28. CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY,
    29. CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_DEFAULT);
    30. // 从客户端启用低延迟连接默认 false
    31. this.tcpLowLatency = conf.getBoolean(
    32. CommonConfigurationKeysPublic.IPC_CLIENT_LOW_LATENCY,
    33. CommonConfigurationKeysPublic.IPC_CLIENT_LOW_LATENCY_DEFAULT
    34. );
    35. // 启用从RPC客户端到服务器的ping操作默认值: true
    36. this.doPing = conf.getBoolean(
    37. CommonConfigurationKeys.IPC_CLIENT_PING_KEY,
    38. CommonConfigurationKeys.IPC_CLIENT_PING_DEFAULT);
    39. // 设置ping操作的间隔, 默认值 : 1分钟
    40. this.pingInterval = (doPing ? Client.getPingInterval(conf) : 0);
    41. this.conf = conf;
    42. }

回到之前的调用 Invoker 另一个构造方法,但是入参会变

  1. /**
  2. * This constructor takes a connectionId, instead of creating a new one.
  3. */
  4. private Invoker(Class<?> protocol, Client.ConnectionId connId,
  5. Configuration conf, SocketFactory factory) {
  6. this.remoteId = connId;
  7. this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class);
  8. this.protocolName = RPC.getProtocolName(protocol);
  9. this.clientProtocolVersion = RPC
  10. .getProtocolVersion(protocol);
  11. }

这个是 Invoker 真正的构建方法,这里面会将刚刚构建好的 ConnectionId 赋值给 remoteId 字段

并且创建一个 Client 对象

  1. // 获取、创建客户端
  2. this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class);

getClient 方法,会先尝试从缓存中获取 client 对象,如果没有的话,再自己创建一个,并且加到缓存中

为什么会放到缓存中呢?? 当 client 和 server 再次通讯的时候,可以复用这个 client

  1. /**
  2. * 如果没有缓存的client存在的话
  3. * 根据用户提供的SocketFactory构造或者缓存一个IPC客户端
  4. *
  5. * Construct & cache an IPC client with the user-provided SocketFactory
  6. * if no cached client exists.
  7. *
  8. * @param conf Configuration
  9. * @param factory SocketFactory for client socket
  10. * @param valueClass Class of the expected response
  11. * @return an IPC client
  12. */
  13. public synchronized Client getClient(Configuration conf,
  14. SocketFactory factory, Class<? extends Writable> valueClass) {
  15. // Construct & cache client.
  16. //
  17. // The configuration is only used for timeout,
  18. // and Clients have connection pools. So we can either
  19. // (a) lose some connection pooling and leak sockets, or
  20. // (b) use the same timeout for all configurations.
  21. //
  22. // Since the IPC is usually intended globally, notper-job, we choose (a).
  23. // 从缓存中获取Client
  24. Client client = clients.get(factory);
  25. if (client == null) {
  26. //client在缓存中不存在, 创建一个.
  27. client = new Client(valueClass, conf, factory);
  28. //缓存创建的client
  29. clients.put(factory, client);
  30. } else {
  31. //client的引用计数+1
  32. client.incCount();
  33. }
  34. if (Client.LOG.isDebugEnabled()) {
  35. Client.LOG.debug("getting client out of cache: " + client);
  36. }
  37. // 返回client
  38. return client;
  39. }

到这里 Invoker 对象就创建完了。回到 ProtobufRpcEngine 的 getProxy 方法

  1. //然后调用Proxy.newProxylnstance()获取动态代理对象,并通过ProtocolProxy返回
  2. return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
  3. protocol.getClassLoader(), new Class[]{protocol}, invoker), false);

构建一个 ProtocolProxy 对象返回

  1. /**
  2. * Constructor
  3. *
  4. * @param protocol protocol class
  5. * @param proxy its proxy
  6. * @param supportServerMethodCheck If false proxy will never fetch server
  7. * methods and isMethodSupported will always return true. If true,
  8. * server methods will be fetched for the first call to
  9. * isMethodSupported.
  10. */
  11. public ProtocolProxy(Class<T> protocol, T proxy,
  12. boolean supportServerMethodCheck) {
  13. this.protocol = protocol;
  14. this.proxy = proxy;
  15. this.supportServerMethodCheck = supportServerMethodCheck;
  16. }

创建 client 端的第四步,根据proto协议,构建一个请求对象,这里是 proto 自动生成的,我们只需要创建了一下

  1. //4. 构建发送请求对象
  2. CustomProtos.GetMetaInfoRequestProto obj = CustomProtos.GetMetaInfoRequestProto.
  3. newBuilder().setPath("/meta").build();

第五步,将请求对象传入,获取响应信息

  1. //5. 将请求对象传入, 获取响应信息
  2. CustomProtos.GetMetaInfoResponseProto metaData = proxy.getMetaInfo(null, obj);

最后一步,输出响应信息

  1. //6. 输出数据
  2. System.out.println(metaData.getInfo());

请求 Server 端的代码、请求是如何发出去的、如何拿到响应信息

采用动态代理。首先看 ProtobufRpcEngine 的 getProxy 方法

  1. return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
  2. protocol.getClassLoader(), new Class[]{protocol}, invoker), false);

重点在于

  1. (T) Proxy.newProxyInstance(
  2. protocol.getClassLoader(), new Class[]{protocol}, invoker)
  1. public static Object newProxyInstance(ClassLoader loader,
  2. Class<?>[] interfaces,
  3. InvocationHandler h) {undefined
  4. ........................
  5. }

这个方法的作用就是创建一个代理类对象,它接收三个参数,我们来看下几个参数的含义:

  • loader:一个 classloader 对象,定义了由哪个 classloader 对象对生成的代理类进行加载

  • interfaces:一个 interface 对象数组,表示我们将要给我们的代理对象提供一组什么样的接口,如果我们提供了这样一个接口对象数组,那么也就是声明了代理类实现了这些接口,代理类就可以调用接口中声明的所有方法

  • h:一个 InvocationHandler 对象,表示的是当动态代理对象调用方法的时候会关联到哪一个InvocationHandler 对象上,并最终由其调用

ProtobufRpcEngine.Invoker.invoker

  1. /**
  2. *
  3. * ProtobufRpcEngine.Invoker.invoker()方法主要做了三件事情:
  4. * 1.构造请求头域,
  5. * 使用protobuf将请求头序列化,这个请求头域记录了当前RPC调用是什么接口的什么方法上的
  6. * 调用;
  7. * 2.通过RPC.Client类发送请求头以及序列化好的请求参数。
  8. * 请求参数是在ClientNamenodeProtocolPB调用时就已经序列化好的,
  9. * 调用Client.call()方法时,
  10. * 需要将请求头以及请求参数使用一个RpcRequestWrapper对象封装;
  11. * 3.获取响应信息,序列化响应信息并返回。
  12. *
  13. *
  14. * This is the client side invoker of RPC method. It only throws
  15. * ServiceException, since the invocation proxy expects only
  16. * ServiceException to be thrown by the method in case protobuf service.
  17. *
  18. * ServiceException has the following causes:
  19. * <ol>
  20. * <li>Exceptions encountered on the client side in this method are
  21. * set as cause in ServiceException as is.</li>
  22. * <li>Exceptions from the server are wrapped in RemoteException and are
  23. * set as cause in ServiceException</li>
  24. * </ol>
  25. *
  26. * Note that the client calling protobuf RPC methods, must handle
  27. * ServiceException by getting the cause from the ServiceException. If the
  28. * cause is RemoteException, then unwrap it to get the exception thrown by
  29. * the server.
  30. */
  31. @Override
  32. public Message invoke(Object proxy, final Method method, Object[] args)
  33. throws ServiceException {
  34. long startTime = 0;
  35. if (LOG.isDebugEnabled()) {
  36. startTime = Time.now();
  37. }
  38. // pb接口的参数只有两个,即RpcController + Message
  39. if (args.length != 2) { // RpcController + Message
  40. throw new ServiceException(
  41. "Too many or few parameters for request. Method: ["
  42. + method.getName() + "]" + ", Expected: 2, Actual: "
  43. + args.length);
  44. }
  45. if (args[1] == null) {
  46. throw new ServiceException("null param while calling Method: ["
  47. + method.getName() + "]");
  48. }
  49. // if Tracing is on then start a new span for this rpc.
  50. // guard it in the if statement to make sure there isn't
  51. // any extra string manipulation.
  52. // todo 这个是啥
  53. Tracer tracer = Tracer.curThreadTracer();
  54. TraceScope traceScope = null;
  55. if (tracer != null) {
  56. traceScope = tracer.newScope(RpcClientUtil.methodToTraceString(method));
  57. }
  58. //构造请求头域,标明在什么接口上调用什么方法
  59. RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
  60. if (LOG.isTraceEnabled()) {
  61. LOG.trace(Thread.currentThread().getId() + ": Call -> " +
  62. remoteId + ": " + method.getName() +
  63. " {" + TextFormat.shortDebugString((Message) args[1]) + "}");
  64. }
  65. //获取请求调用的参数,例如RenameRequestProto
  66. final Message theRequest = (Message) args[1];
  67. final RpcWritable.Buffer val;
  68. try {
  69. //调用RPC.Client发送请求
  70. val = (RpcWritable.Buffer) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
  71. new RpcProtobufRequest(rpcRequestHeader, theRequest), remoteId,
  72. fallbackToSimpleAuth, alignmentContext);
  73. } catch (Throwable e) {
  74. if (LOG.isTraceEnabled()) {
  75. LOG.trace(Thread.currentThread().getId() + ": Exception <- " +
  76. remoteId + ": " + method.getName() +
  77. " {" + e + "}");
  78. }
  79. if (traceScope != null) {
  80. traceScope.addTimelineAnnotation("Call got exception: " +
  81. e.toString());
  82. }
  83. throw new ServiceException(e);
  84. } finally {
  85. if (traceScope != null) traceScope.close();
  86. }
  87. if (LOG.isDebugEnabled()) {
  88. long callTime = Time.now() - startTime;
  89. LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
  90. }
  91. if (Client.isAsynchronousMode()) {
  92. final AsyncGet<RpcWritable.Buffer, IOException> arr
  93. = Client.getAsyncRpcResponse();
  94. final AsyncGet<Message, Exception> asyncGet
  95. = new AsyncGet<Message, Exception>() {
  96. @Override
  97. public Message get(long timeout, TimeUnit unit) throws Exception {
  98. return getReturnMessage(method, arr.get(timeout, unit));
  99. }
  100. @Override
  101. public boolean isDone() {
  102. return arr.isDone();
  103. }
  104. };
  105. ASYNC_RETURN_MESSAGE.set(asyncGet);
  106. return null;
  107. } else {
  108. return getReturnMessage(method, val);
  109. }
  110. }

重点在以下四个方面:

  • 构造请求头域,标明在什么接口上调用什么方法

  • 获取请求调用的参数

  • 调用 RPC.Client 发送请求

  • 获取响应信息

构造请求头域,标明在什么接口上调用什么方法

  1. RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);

这里很简单,就是根据协议定义,将协议名称、调用的方法名称、版本号三个值传入,来构建一个消息头

  1. private RequestHeaderProto constructRpcRequestHeader(Method method) {
  2. RequestHeaderProto.Builder builder = RequestHeaderProto
  3. .newBuilder();
  4. builder.setMethodName(method.getName());
  5. builder.setDeclaringClassProtocolName(protocolName);
  6. builder.setClientProtocolVersion(clientProtocolVersion);
  7. return builder.build();
  8. }

获取请求调用的参数

  1. // 通过参数传进来的.
  2. // 例如 GetMetaInfoRequestProto
  3. final Message theRequest = (Message) args[1];

调用 RPC.Client 发送请求(核心)

这一步骤是核心

  1. //调用RPC.Client发送请求
  2. val = (RpcWritable.Buffer) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
  3. new RpcProtobufRequest(rpcRequestHeader, theRequest), remoteId,
  4. fallbackToSimpleAuth, alignmentContext);

Client是在 ProtobufRpcEngine.Invoker 构造方法里面创建的

  1. /**
  2. * This constructor takes a connectionId, instead of creating a new one.
  3. */
  4. private Invoker(Class<?> protocol, Client.ConnectionId connId,
  5. Configuration conf, SocketFactory factory) {
  6. // 设置ConnectionId, ConnectionId里面保存着Client连接Server的信息
  7. this.remoteId = connId;
  8. // 获取、创建客户端
  9. this.client = CLIENTS.getClient(conf, factory, RpcWritable.Buffer.class);
  10. // 获取协议的名称
  11. this.protocolName = RPC.getProtocolName(protocol);
  12. // 获取协议的版本 version
  13. this.clientProtocolVersion = RPC
  14. .getProtocolVersion(protocol);
  15. }

Client.call()方法

这里重点关注 Client.call() 方法,主要从一下四个方面来说:

  • 创建 Call 对象

  • 获取&建立连接

  • 发送RPC请求

  • 获取响应 ```java /*

  • Make a call, passing rpcRequest, to the IPC server defined by
  • remoteId, returning the rpc response. *
  • @param rpcKind
  • @param rpcRequest - contains serialized method and method parameters
  • @param remoteId - the target rpc server
  • @param serviceClass - service class for RPC
  • @param fallbackToSimpleAuth - set to true or false during this method to
  • indicate if a secure client falls back to simple auth
  • @param alignmentContext - state alignment context
  • @return the rpc response
  • Throws exceptions if there are network problems or if the remote code
  • threw an exception. */ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,

    1. ConnectionId remoteId, int serviceClass,
    2. AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)

    throws IOException {

    /**

    • 创建 Call 对象
    • Client.call()方法将RPC请求封装成一个Call对象,
    • Call对象中保存了RPC调用的完成标志、返回值信息以及异常信息;
    • 随后,Client.cal()方法会创建一个Connection对象,
    • Connection对象用于管理Client与Server的Socket连接 */ final Call call = createCall(rpcKind, rpcRequest);

      call.setAlignmentContext(alignmentContext);

      // 获取&建立连接 // 用ConnectionId作为key, // 将新建的Connection对象放入Client.connections字段中保存 // (对于Connection对象, // 由于涉及了与Server建立Socket连接,会比较耗费资 源, // 所以Client类使用一个HashTable对象connections保存那些没有过期的 Connection, // 如果可以复用,则复用这些Connection对象); // 以callId作为key,将构造的Call对象放入Connection.calls字段中保存。

      final Connection connection = getConnection(remoteId, call, serviceClass,

      1. fallbackToSimpleAuth);

      try { //检测是否是异步请求. checkAsyncCall(); try {

      1. //发送RPC请求
      2. connection.sendRpcRequest(call); // send the rpc request
  1. } catch (RejectedExecutionException e) {
  2. throw new IOException("connection has been closed", e);
  3. } catch (InterruptedException ie) {
  4. Thread.currentThread().interrupt();
  5. IOException ioe = new InterruptedIOException(
  6. "Interrupted waiting to send RPC request to server");
  7. ioe.initCause(ie);
  8. throw ioe;
  9. }
  10. } catch(Exception e) {
  11. if (isAsynchronousMode()) {
  12. releaseAsyncCall();
  13. }
  14. throw e;
  15. }
  16. if (isAsynchronousMode()) {
  17. final AsyncGet<Writable, IOException> asyncGet
  18. = new AsyncGet<Writable, IOException>() {
  19. @Override
  20. public Writable get(long timeout, TimeUnit unit)
  21. throws IOException, TimeoutException{
  22. boolean done = true;
  23. try {
  24. final Writable w = getRpcResponse(call, connection, timeout, unit);
  25. if (w == null) {
  26. done = false;
  27. throw new TimeoutException(call + " timed out "
  28. + timeout + " " + unit);
  29. }
  30. return w;
  31. } finally {
  32. if (done) {
  33. releaseAsyncCall();
  34. }
  35. }
  36. }
  37. @Override
  38. public boolean isDone() {
  39. synchronized (call) {
  40. return call.done;
  41. }
  42. }
  43. };
  44. ASYNC_RPC_RESPONSE.set(asyncGet);
  45. return null;
  46. } else {
  47. //服务器成功发回响应信息,返回RPC响应
  48. return getRpcResponse(call, connection, -1, null);
  49. }

}

  1. <a name="ZX7d8"></a>
  2. ##### 创建 Call 对象
  3. ```java
  4. /**
  5. * 创建 Call 对象
  6. * Client.call()方法将RPC请求封装成一个Call对象,
  7. * Call对象中保存了RPC调用的完成标志、返回值信息以及异常信息;
  8. * 随后,Client.cal()方法会创建一个 Connection对象,
  9. * Connection对象用于管理Client与Server的Socket连接
  10. */
  11. final Call call = createCall(rpcKind, rpcRequest);
  1. /**
  2. *
  3. * @param rpcKind rpcKind参数用于描述RPC请求的序列化工具类型
  4. * @param rpcRequest rpcRequest参数则用于记录序列化后的RPC请求
  5. * @return
  6. */
  7. Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {
  8. return new Call(rpcKind, rpcRequest);
  9. }

这里就是构造方法, 生成一个唯一的 callId , 进行初始化

  1. private Call(RPC.RpcKind rpcKind, Writable param) {
  2. this.rpcKind = rpcKind;
  3. this.rpcRequest = param;
  4. //获取 callId
  5. final Integer id = callId.get();
  6. if (id == null) {
  7. // AtomicInteger callIdCounter 自增
  8. this.id = nextCallId();
  9. } else {
  10. callId.set(null);
  11. this.id = id;
  12. }
  13. final Integer rc = retryCount.get();
  14. if (rc == null) {
  15. this.retry = 0;
  16. } else {
  17. this.retry = rc;
  18. }
  19. // 设置异常处理类
  20. this.externalHandler = EXTERNAL_CALL_HANDLER.get();
  21. }

获取&建立连接

用 ConnectionId 作为key,将新建的 Connection 对象放入 Client.connections 字段中保存(对于 Connection 对象,由于涉及了与 Server 建立 Socket 连接,会比较耗费资源,所以 Client 类使用一个 ConcurrentMap 对象connections 保存那些没有过期的 Connection,如果可以复用,则复用这些Connection对象);以 callId 作为key,将构造的 Call 对象放入 Connection.calls 字段中保存

  1. final Connection connection = getConnection(remoteId, call, serviceClass,
  2. fallbackToSimpleAuth);
  1. /** Get a connection from the pool, or create a new one and add it to the
  2. * pool. Connections to a given ConnectionId are reused.
  3. * 获取连接
  4. *
  5. * */
  6. private Connection getConnection(ConnectionId remoteId,
  7. Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
  8. throws IOException {
  9. if (!running.get()) {
  10. // the client is stopped
  11. throw new IOException("The client is stopped");
  12. }
  13. Connection connection;
  14. /* we could avoid this allocation for each RPC by having a
  15. * connectionsId object and with set() method. We need to manage the
  16. * refs for keys in HashMap properly. For now its ok.
  17. */
  18. while (true) {
  19. // These lines below can be shorten with computeIfAbsent in Java8
  20. // 首先尝试从Client.connections队列中获取Connection对象
  21. connection = connections.get(remoteId);
  22. if (connection == null) {
  23. // 如果connections队列中没有保存,则构造新的对象
  24. connection = new Connection(remoteId, serviceClass);
  25. // putIfAbsent 如果对应的 key 已经有值了,
  26. // 则忽略本次操作,直接返回旧值
  27. Connection existing = connections.putIfAbsent(remoteId, connection);
  28. if (existing != null) {
  29. connection = existing;
  30. }
  31. }
  32. // 将待发送请求对应的Call对象放入Connection.calls队列
  33. if (connection.addCall(call)) {
  34. break;
  35. } else {
  36. // This connection is closed, should be removed. But other thread could
  37. // have already known this closedConnection, and replace it with a new
  38. // connection. So we should call conditional remove to make sure we only
  39. // remove this closedConnection.
  40. connections.remove(remoteId, connection);
  41. }
  42. }
  43. // If the server happens to be slow, the method below will take longer to
  44. // establish a connection.
  45. // 调用setupIOstreams()方法,初始化Connection对象并获取IO流
  46. connection.setupIOstreams(fallbackToSimpleAuth);
  47. return connection;
  48. }

里面创建 connection 就不细说了,其实就是创建 connection 对象,然后做一下请求地址和请求参数的设置。并没有和server端进行请求

  1. //如果connections队列中没有保存,则构造新的对象
  2. connection = new Connection(remoteId, serviceClass);

connection.setupIOstreams 这个才是建立连接的关键方法

  1. // 调用setupIOstreams()方法,初始化Connection对象并获取IO流
  2. connection.setupIOstreams(fallbackToSimpleAuth);
  1. /**
  2. * Connect to the server and set up the I/O streams. It then sends
  3. * a header to the server and starts
  4. * the connection thread that waits for responses.
  5. *
  6. * Client.call()方法调用 Connection.setupIOstreams() 方法建立与Server的Socket连接。
  7. * setupIOstreams()方法还会启动Connection线程,
  8. * Connection线程会监听Socket并读取Server发回的响应信息。
  9. */
  10. private synchronized void setupIOstreams( AtomicBoolean fallbackToSimpleAuth) {
  11. if (socket != null || shouldCloseConnection.get()) {
  12. return;
  13. }
  14. UserGroupInformation ticket = remoteId.getTicket();
  15. if (ticket != null) {
  16. final UserGroupInformation realUser = ticket.getRealUser();
  17. if (realUser != null) {
  18. ticket = realUser;
  19. }
  20. }
  21. try {
  22. connectingThread.set(Thread.currentThread());
  23. if (LOG.isDebugEnabled()) {
  24. LOG.debug("Connecting to "+server);
  25. }
  26. Span span = Tracer.getCurrentSpan();
  27. if (span != null) {
  28. span.addTimelineAnnotation("IPC client connecting to " + server);
  29. }
  30. short numRetries = 0;
  31. Random rand = null;
  32. while (true) {
  33. // 建立到Server的Socket连接,
  34. // 并且在这个Socket连接上获得InputStream和OutputStream对象。
  35. setupConnection(ticket);
  36. ipcStreams = new IpcStreams(socket, maxResponseLength);
  37. // 调用writeConnectionHeader()方法在连接建立时发送连接头域。
  38. writeConnectionHeader(ipcStreams);
  39. if (authProtocol == AuthProtocol.SASL) {
  40. try {
  41. authMethod = ticket
  42. .doAs(new PrivilegedExceptionAction<AuthMethod>() {
  43. @Override
  44. public AuthMethod run()
  45. throws IOException, InterruptedException {
  46. return setupSaslConnection(ipcStreams);
  47. }
  48. });
  49. } catch (IOException ex) {
  50. if (saslRpcClient == null) {
  51. // whatever happened -it can't be handled, so rethrow
  52. throw ex;
  53. }
  54. // otherwise, assume a connection problem
  55. authMethod = saslRpcClient.getAuthMethod();
  56. if (rand == null) {
  57. rand = new Random();
  58. }
  59. handleSaslConnectionFailure(numRetries++, maxRetriesOnSasl, ex,
  60. rand, ticket);
  61. continue;
  62. }
  63. if (authMethod != AuthMethod.SIMPLE) {
  64. // Sasl connect is successful. Let's set up Sasl i/o streams.
  65. ipcStreams.setSaslClient(saslRpcClient);
  66. // for testing
  67. remoteId.saslQop =
  68. (String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
  69. LOG.debug("Negotiated QOP is :" + remoteId.saslQop);
  70. if (fallbackToSimpleAuth != null) {
  71. fallbackToSimpleAuth.set(false);
  72. }
  73. } else if (UserGroupInformation.isSecurityEnabled()) {
  74. if (!fallbackAllowed) {
  75. throw new IOException("Server asks us to fall back to SIMPLE " +
  76. "auth, but this client is configured to only allow secure " +
  77. "connections.");
  78. }
  79. if (fallbackToSimpleAuth != null) {
  80. fallbackToSimpleAuth.set(true);
  81. }
  82. }
  83. }
  84. if (doPing) {
  85. ipcStreams.setInputStream(new PingInputStream(ipcStreams.in));
  86. }
  87. //调用writeConnectionContext()方法写入连接上下文。
  88. writeConnectionContext(remoteId, authMethod);
  89. // update last activity time
  90. //调用touch()方法更新上次活跃时间。
  91. touch();
  92. span = Tracer.getCurrentSpan();
  93. if (span != null) {
  94. span.addTimelineAnnotation("IPC client connected to " + server);
  95. }
  96. // 开启接收线程
  97. // 调用start()方法启动Connection线程监听并接收Server发回的响应信息。
  98. start();
  99. return;
  100. }
  101. } catch (Throwable t) {
  102. if (t instanceof IOException) {
  103. markClosed((IOException)t);
  104. } else {
  105. markClosed(new IOException("Couldn't set up IO streams: " + t, t));
  106. }
  107. close();
  108. } finally {
  109. connectingThread.set(null);
  110. }
  111. }

在这个方法中通过 setupConnection(ticket) 方法与 server 端建立连接

  1. // 建立到Server的Socket连接,
  2. // 并且在这个Socket连接上 获得InputStream和OutputStream对象。
  3. setupConnection(ticket);

调用 writeConnectionHeader() 方法在连接建立时发送连接头信息

  1. writeConnectionHeader(ipcStreams);
  1. /* Write the connection context header for each connection
  2. * Out is not synchronized because only the first thread does this.
  3. */
  4. private void writeConnectionContext(ConnectionId remoteId,
  5. AuthMethod authMethod)
  6. throws IOException {
  7. // Write out the ConnectionHeader
  8. IpcConnectionContextProto message = ProtoUtil.makeIpcConnectionContext(
  9. RPC.getProtocolName(remoteId.getProtocol()),
  10. remoteId.getTicket(),
  11. authMethod);
  12. RpcRequestHeaderProto connectionContextHeader = ProtoUtil
  13. .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
  14. OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,
  15. RpcConstants.INVALID_RETRY_COUNT, clientId);
  16. // do not flush. the context and first ipc call request must be sent
  17. // together to avoid possibility of broken pipes upon authz failure.
  18. // see writeConnectionHeader
  19. final ResponseBuffer buf = new ResponseBuffer();
  20. connectionContextHeader.writeDelimitedTo(buf);
  21. message.writeDelimitedTo(buf);
  22. synchronized (ipcStreams.out) {
  23. ipcStreams.sendRequest(buf.toByteArray());
  24. }
  25. }

开启接收线程调用 start() 方法启动 Connection 线程监听并接收 Server发回的响应信息

  1. // 开启接收线程
  2. // 调用start()方法启动Connection线程监听并接收Server发回的响应信息。
  3. start();

在这里会调用Connection里面的 run() 方法,接收并处理返回消息

  1. @Override
  2. public void run() {
  3. if (LOG.isDebugEnabled())
  4. LOG.debug(getName() + ": starting, having connections "
  5. + connections.size());
  6. try {
  7. while (waitForWork()) {//wait here for work - read or close connection
  8. //接收到返回信息
  9. receiveRpcResponse();
  10. }
  11. } catch (Throwable t) {
  12. // This truly is unexpected, since we catch IOException in receiveResponse
  13. // -- this is only to be really sure that we don't leave a client hanging
  14. // forever.
  15. LOG.warn("Unexpected error reading responses on connection " + this, t);
  16. markClosed(new IOException("Error reading responses", t));
  17. }
  18. close();
  19. if (LOG.isDebugEnabled())
  20. LOG.debug(getName() + ": stopped, remaining connections "
  21. + connections.size());
  22. }
  1. /**
  2. * 接收到返回信息
  3. * Receive a response.
  4. * Because only one receiver, so no synchronization on in.
  5. * receiveRpcResponse()方法接收RPC响应。
  6. * receiveRpcResponse()方法会从输入流中读取序列化对象RpcResponseHeaderProto,
  7. * 然后根据RpcResponseHeaderProto 中记录的callid字段获取对应的Call的对象。
  8. *
  9. * 接下来receiveRpcResponse()方法会从输入流中 读取响应消息,
  10. * 然后调用Call.setRpcResponse()将响应消息保存在Call对象中。
  11. * 如果服务器 在处理RPC请求时抛出异常,
  12. * 则receiveRpcResponse()会从输入流中读取异常信息,并构造 异常对象,
  13. * 然后调用Call.setException()将异常保存在Call对象中。
  14. *
  15. */
  16. private void receiveRpcResponse() {
  17. if (shouldCloseConnection.get()) {
  18. return;
  19. }
  20. // 更新请求时间
  21. touch();
  22. try {
  23. // 通过 ipcStreams 获取响应对象
  24. ByteBuffer bb = ipcStreams.readResponse();
  25. // 将响应对象,采用RpcWritable 进行包装
  26. RpcWritable.Buffer packet = RpcWritable.Buffer.wrap(bb);
  27. // 获取响应的响应头
  28. RpcResponseHeaderProto header = packet.getValue(RpcResponseHeaderProto.getDefaultInstance());
  29. // 检测头信息.
  30. checkResponse(header);
  31. // 获取call唯一标识callId
  32. int callId = header.getCallId();
  33. if (LOG.isDebugEnabled())
  34. LOG.debug(getName() + " got value #" + callId);
  35. // 获取头信息里面的响应状态.
  36. RpcStatusProto status = header.getStatus();
  37. // 如果调用成功,则读取响应消息,在call实例中设置
  38. if (status == RpcStatusProto.SUCCESS) {
  39. // 构建实例
  40. Writable value = packet.newInstance(valueClass, conf);
  41. // 将Call 从正在执行的calls缓存队列中移除
  42. final Call call = calls.remove(callId);
  43. // 将Call设置请求信息
  44. call.setRpcResponse(value);
  45. if (call.alignmentContext != null) {
  46. call.alignmentContext.receiveResponseState(header);
  47. }
  48. }
  49. // verify that packet length was correct
  50. if (packet.remaining() > 0) {
  51. throw new RpcClientException("RPC response length mismatch");
  52. }
  53. // RPC调用失败
  54. if (status != RpcStatusProto.SUCCESS) { // Rpc Request failed
  55. // 取出响应中的异常消息
  56. final String exceptionClassName = header.hasExceptionClassName() ?
  57. header.getExceptionClassName() :
  58. "ServerDidNotSetExceptionClassName";
  59. final String errorMsg = header.hasErrorMsg() ?
  60. header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
  61. final RpcErrorCodeProto erCode =
  62. (header.hasErrorDetail() ? header.getErrorDetail() : null);
  63. if (erCode == null) {
  64. LOG.warn("Detailed error code not set by server on rpc error");
  65. }
  66. RemoteException re = new RemoteException(exceptionClassName, errorMsg, erCode);
  67. // 在Call对象中设置异常
  68. if (status == RpcStatusProto.ERROR) {
  69. final Call call = calls.remove(callId);
  70. call.setException(re);
  71. } else if (status == RpcStatusProto.FATAL) {
  72. // Close the connection
  73. markClosed(re);
  74. }
  75. }
  76. } catch (IOException e) {
  77. markClosed(e);
  78. }
  79. }

发送RPC请求

先构造 RPC 请求头,将请求头和 RPC 请求,在另外一个线程池 sendParamsExecutor 的 run 方法中使用ipcStreams.sendRequest(buf.toByteArray()) 发送出去

  1. //发送RPC请求
  2. connection.sendRpcRequest(call); // send the rpc request
  1. /**
  2. *
  3. * 发送RPC请求到Server
  4. *
  5. * Initiates a rpc call by sending the rpc request to the remote server.
  6. * Note: this is not called from the Connection thread, but by other
  7. * threads.
  8. * @param call - the rpc request
  9. *
  10. * RPC发送请求线程会调用Connection.sendRpcRequest()方法发送RPC请求到Server,
  11. * 这里要特别注意,这个方法不是由Connection线程调用的,而是由发起RPC请求的线程调用的
  12. *
  13. */
  14. public void sendRpcRequest(final Call call)
  15. throws InterruptedException, IOException {
  16. if (shouldCloseConnection.get()) {
  17. return;
  18. }
  19. // Serialize the call to be sent. This is done from the actual
  20. // caller thread, rather than the sendParamsExecutor thread,
  21. // so that if the serialization throws an error, it is reported
  22. // properly. This also parallelizes the serialization.
  23. //
  24. // Format of a call on the wire:
  25. // 0) Length of rest below (1 + 2)
  26. // 1) RpcRequestHeader - is serialized Delimited hence contains length
  27. // 2) RpcRequest
  28. //
  29. // Items '1' and '2' are prepared here.
  30. // 先构造RPC请求头
  31. RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
  32. call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
  33. clientId, call.alignmentContext);
  34. final ResponseBuffer buf = new ResponseBuffer();
  35. // 将RPC请求头写入输出流 ResponseBuffer
  36. header.writeDelimitedTo(buf);
  37. // 将RPC请求(包括请求元数据和请求参数)封装成ProtobufWrapper,写入输出流
  38. RpcWritable.wrap(call.rpcRequest).writeTo(buf);
  39. // 这里使用 线程池 将请求发送出去,
  40. // 请求包括三个部分:
  41. // 1.长度;
  42. // 2.RPC请求头;
  43. // 3.RPC请求(包括 请求元数据以及请求参数)
  44. synchronized (sendRpcRequestLock) {
  45. Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
  46. @Override
  47. public void run() {
  48. try {
  49. synchronized (ipcStreams.out) {
  50. if (shouldCloseConnection.get()) {
  51. return;
  52. }
  53. if (LOG.isDebugEnabled()) {
  54. LOG.debug(getName() + " sending #" + call.id
  55. + " " + call.rpcRequest);
  56. }
  57. // RpcRequestHeader + RpcRequest
  58. ipcStreams.sendRequest(buf.toByteArray());
  59. ipcStreams.flush();
  60. }
  61. } catch (IOException e) {
  62. // exception at this point would leave the connection in an
  63. // unrecoverable state (eg half a call left on the wire).
  64. // So, close the connection, killing any outstanding calls
  65. // 如果发生发送异常,则直接关闭连接
  66. markClosed(e);
  67. } finally {
  68. // the buffer is just an in-memory buffer, but it is still polite to
  69. // close early
  70. // 之前申请的buffer给关闭了,比较优雅
  71. IOUtils.closeStream(buf);
  72. }
  73. }
  74. });
  75. try {
  76. senderFuture.get();
  77. } catch (ExecutionException e) {
  78. Throwable cause = e.getCause();
  79. // cause should only be a RuntimeException as the Runnable above
  80. // catches IOException
  81. if (cause instanceof RuntimeException) {
  82. throw (RuntimeException) cause;
  83. } else {
  84. throw new RuntimeException("unexpected checked exception", cause);
  85. }
  86. }
  87. }
  88. }

获取响应
  1. //服务器成功发回响应信息,返回RPC响应
  2. return getRpcResponse(call, connection, -1, null);

在这里,会不断轮询 call 的状态,如果状态为 done 则代表数据已经处理完,并且已经获取到响应信息

响应信息由 connection 中的 run 方法进行处理

  1. /** @return the rpc response or, in case of timeout, null. */
  2. private Writable getRpcResponse(final Call call, final Connection connection,
  3. final long timeout, final TimeUnit unit) throws IOException {
  4. synchronized (call) {
  5. while (!call.done) {
  6. try {
  7. //等待RPC响应
  8. AsyncGet.Util.wait(call, timeout, unit);
  9. if (timeout >= 0 && !call.done) {
  10. return null;
  11. }
  12. } catch (InterruptedException ie) {
  13. Thread.currentThread().interrupt();
  14. throw new InterruptedIOException("Call interrupted");
  15. }
  16. }
  17. if (call.error != null) {
  18. if (call.error instanceof RemoteException) {
  19. call.error.fillInStackTrace();
  20. throw call.error;
  21. } else { // local exception
  22. InetSocketAddress address = connection.getRemoteAddress();
  23. throw NetUtils.wrapException(address.getHostName(),
  24. address.getPort(),
  25. NetUtils.getHostname(),
  26. 0,
  27. call.error);
  28. }
  29. } else {
  30. return call.getRpcResponse();
  31. }
  32. }

处理Client端的响应信息

Client 获取响应信息,不论是同步还是异步获取响应信息,都会调用这个方法 getReturnMessage(method, val)

  1. getReturnMessage(method, val);
  1. private Message getReturnMessage(final Method method,
  2. final RpcWritable.Buffer buf) throws ServiceException {
  3. Message prototype = null;
  4. try {
  5. //获取返回参数
  6. prototype = getReturnProtoType(method);
  7. } catch (Exception e) {
  8. throw new ServiceException(e);
  9. }
  10. Message returnMessage;
  11. try {
  12. // 序列化响应信息
  13. returnMessage = buf.getValue(prototype.getDefaultInstanceForType());
  14. if (LOG.isTraceEnabled()) {
  15. LOG.trace(Thread.currentThread().getId() + ": Response <- " +
  16. remoteId + ": " + method.getName() +
  17. " {" + TextFormat.shortDebugString(returnMessage) + "}");
  18. }
  19. } catch (Throwable e) {
  20. throw new ServiceException(e);
  21. }
  22. //返回结果
  23. return returnMessage;
  24. }