本文主要讲解 Hadoop RPC Server 的设计与实现,以 ProtobufRpcEngine 为实例,分步进行叙述

RPC 是帮助我们屏蔽网络编程细节,实现调用远程方法就跟调用本地(同一个项目中的方法)一样的体验,我们不需要因为这个方法是远程调用就需要编写很多与业务无关的代码

Server端架构

Server 端,采用 Reactor 架构

Listener 负责监听服务,当有请求进来,会在 reader pool 中获取一个 Reader 读取请求

Reader 读取 Client 的请求,将读取的数据 Call 放到队列 callQueue 中

Handler 读取 callQueue 中的数据进行处理,将处理的结果交由 Responder 处理
image.png[

](https://blog.csdn.net/zhanglong_4444/article/details/105634160)
Server 处理流程如下:

  • 整个 Server 只有一个 Listener 线程,Listener 对象中的 Selector 对象 acceptorSelector 负责监听来自客户端的 Socket 连接请求。acceptorSelector 在 ServerSocketChannel 上注册 OP_ACCEPT 事件,等待客户端 Client.call() 中的 getConnection 触发该事件唤醒 Listener 线程,创建新的 SocketChannel 并创建 readers 线程池;Listener 会在 reader 线程池中选取一个线程,并在 Reader 的 readerSelector 上注册 OP_READ 事件

  • readerSelector 监听 OP_READ 事件,当客户端发送 RPC 请求,触发 readerSelector 唤醒 Reader 线程;Reader 线程从 SocketChannel 中读取数据封装成 Call 对象,然后放入共享队列 callQueue

  • 最初,handlers 线程池都在 callQueue 上阻塞(BlockingQueue.take()),当有 Call 对象加入,其中一个 Handler 线程被唤醒。根据 Call 对象上的信息,调用 Server.call() 方法(类似 Client.call() ),反序列化并执行 RPC 请求对应的本地函数,最后将响应返回写入 SocketChannel

  • Responder 线程起着缓冲作用。当有大量响应或网络不佳时,Handler 不能将完整的响应返回客户端,会在 Responder 的 respondSelector 上注册 OP_WRITE 事件,当监听到写条件时,会唤醒 Responder 返回响应

Server端创建流程

以下创建 Server 端的代码中协议采用 proto ,产生的 RpcEngine 是 ProtobufRpcEngine

  1. public static void main(String[] args) throws Exception{
  2. //1. 构建配置对象
  3. Configuration conf = new Configuration();
  4. //2. 协议对象的实例
  5. MetaInfoServer serverImpl = new MetaInfoServer();
  6. BlockingService blockingService =
  7. CustomProtos.MetaInfo.newReflectiveBlockingService(serverImpl);
  8. //3. 设置协议的RpcEngine为ProtobufRpcEngine .
  9. RPC.setProtocolEngine(conf, MetaInfoProtocol.class,
  10. ProtobufRpcEngine.class);
  11. //4. 构建RPC框架
  12. RPC.Builder builder = new RPC.Builder(conf);
  13. //5. 绑定地址
  14. builder.setBindAddress("localhost");
  15. //6. 绑定端口
  16. builder.setPort(7777);
  17. //7. 绑定协议
  18. builder.setProtocol(MetaInfoProtocol.class);
  19. //8. 调用协议实现类
  20. builder.setInstance(blockingService);
  21. //9. 创建服务
  22. RPC.Server server = builder.build();
  23. //10. 启动服务
  24. server.start();
  25. }

代码主要分三部分:

  • 定义接口以及实现

  • 设置服务的参数。如:协议使用的 RpcEngine 类型、Server 发布的 IP ,端口。绑定协议&协议的实现对象.

  • 根据第二条中设置的 RpcEngine 的参数,构建 RpcEngine 并启动服务

第一部分和第二部分就是使用 proto 定义一个协议,绑定到 RPC.Builder 的实现对象里面,核心是

  1. RPC.Server server = builder.build(); // 创建服务

来解读一下如何构建 RpcEngine

  1. /**
  2. * Build the RPC Server.
  3. * @throws IOException on error
  4. * @throws HadoopIllegalArgumentException when mandatory fields are not set
  5. */
  6. public Server build() throws IOException, HadoopIllegalArgumentException {
  7. if (this.conf == null) {
  8. throw new HadoopIllegalArgumentException("conf is not set");
  9. }
  10. if (this.protocol == null) {
  11. throw new HadoopIllegalArgumentException("protocol is not set");
  12. }
  13. if (this.instance == null) {
  14. throw new HadoopIllegalArgumentException("instance is not set");
  15. }
  16. // 调用getProtocolEngine()获取当前RPC类配置的RpcEngine对象
  17. // 在NameNodeRpcServer的构造方法中已将当前RPC类的RpcEngine对象设置为ProtobufRpcEngine
  18. // 获取了ProtobufRpcEngine对象之后,build()方法会在
  19. // ProtobufRpcEngine对象上调用getServer()方法获取一个RPC Server对象的引用
  20. return getProtocolEngine(this.protocol, this.conf).getServer(
  21. this.protocol, this.instance, this.bindAddress, this.port,
  22. this.numHandlers, this.numReaders, this.queueSizePerHandler,
  23. this.verbose, this.conf, this.secretManager, this.portRangeConfig,
  24. this.alignmentContext);
  25. }

在这里,主要的是有两个方法,一个是 getProtocolEngine 另一个是 getServer

逻辑顺序是先获取对应协议的 RpcEngine ,然后再用 RpcEngine 创建一个 Server 服务

首先看 getProtocolEngine

  1. // return the RpcEngine configured to handle a protocol
  2. static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
  3. Configuration conf) {
  4. // 从缓存中获取RpcEngine, 这个是提前设置的
  5. // 通过 RPC.setProtocolEngine(conf, MetaInfoProtocol.class,ProtobufRpcEngine.class);
  6. RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
  7. if (engine == null) {
  8. // 通过这里获取RpcEngine的实现类, 这里我们获取的是 ProtobufRpcEngine.class
  9. Class<?> impl = conf.getClass(ENGINE_PROP + "." + protocol.getName(),
  10. WritableRpcEngine.class);
  11. // impl : org.apache.hadoop.ipc.ProtobufRpcEngine
  12. engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
  13. PROTOCOL_ENGINES.put(protocol, engine);
  14. }
  15. return engine;
  16. }

在这里, 先通过

  1. RpcEngine engine = PROTOCOL_ENGINES.get(protocol);

获取到协议对应的 RpcEngine,然后再通过

  1. engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);

进行实例化,这样我们就获取到了 RpcEngine 对象的实例 ProtobufRpcEngine

在获取到 ProtobufRpcEngine 之后,调用其 getServer 方法,获取 Server 实例

  1. @Override
  2. public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
  3. String bindAddress, int port, int numHandlers, int numReaders,
  4. int queueSizePerHandler, boolean verbose, Configuration conf,
  5. SecretManager<? extends TokenIdentifier> secretManager,
  6. String portRangeConfig, AlignmentContext alignmentContext)
  7. throws IOException {
  8. return new Server(protocol, protocolImpl, conf, bindAddress, port,
  9. numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
  10. portRangeConfig, alignmentContext);
  11. }

在整个流程中 getServer 会调用 new Server 的构造方法创建 Server 服务

protocolClass:protocol协议的类 protocolImpl:protocol实现类 conf:配置文件 bindAddress:Server绑定的ip地址 port:Server绑定的端口 numHandlers:handler的线程数量 , 默认值 1 verbose:是否每一个请求,都需要打印日志. portRangeConfig:A config parameter that can be used to restrict alignmentContext:provides server state info on client responses

在 Server 的构建方法中,首先会调用父类的构建方法。然后再调用 registerProtocolAndlmpl 方法注册接口类和接口的实现类

  1. /**
  2. * Construct an RPC server.
  3. *
  4. * @param protocolClass the class of protocol
  5. * @param protocolImpl the protocolImpl whose methods will be called
  6. * @param conf the configuration to use
  7. * @param bindAddress the address to bind on to listen for connection
  8. * @param port the port to listen for connections on
  9. * @param numHandlers the number of method handler threads to run
  10. * @param verbose whether each call should be logged
  11. * @param portRangeConfig A config parameter that can be used to restrict
  12. * @param alignmentContext provides server state info on client responses
  13. */
  14. public Server(Class<?> protocolClass, Object protocolImpl,
  15. Configuration conf, String bindAddress, int port, int numHandlers,
  16. int numReaders, int queueSizePerHandler, boolean verbose,
  17. SecretManager<? extends TokenIdentifier> secretManager,
  18. String portRangeConfig, AlignmentContext alignmentContext)
  19. throws IOException {
  20. super(bindAddress, port, null, numHandlers,
  21. numReaders, queueSizePerHandler, conf,
  22. serverNameFromClass(protocolImpl.getClass()), secretManager,
  23. portRangeConfig);
  24. setAlignmentContext(alignmentContext);
  25. this.verbose = verbose;
  26. // 调用registerProtocolAndlmpl()方法
  27. // 注册接口类protocolClass和实现类protocolImpl的映射关系
  28. registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
  29. protocolImpl);
  30. }

重点在于调用父类的构造方法,解读一下这里面做了什么

RPC.Server

  1. protected Server(String bindAddress, int port,
  2. Class<? extends Writable> paramClass, int handlerCount,
  3. int numReaders, int queueSizePerHandler,
  4. Configuration conf, String serverName,
  5. SecretManager<? extends TokenIdentifier> secretManager,
  6. String portRangeConfig) throws IOException {
  7. // 调用父类, 进行Server的初始化操作
  8. super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler,
  9. conf, serverName, secretManager, portRangeConfig);
  10. // 在这里设置meta data的通讯协议, 已经处理的RpcEngine
  11. initProtocolMetaInfo(conf);
  12. }

这里一共分两步,一个当前 Server 继续调用父类[org.apache.hadoop.ipc.Server]构造方法。另一个继续注册元数据的通讯协议&实现类

同样,只看父类[org.apache.hadoop.ipc.Server]构造方法就可以了

org.apache.hadoop.ipc.Server

到这里,才是 Server 端的真正构建过程
image.png

  1. protected Server(String bindAddress, int port,
  2. Class<? extends Writable> rpcRequestClass, int handlerCount,
  3. int numReaders, int queueSizePerHandler, Configuration conf,
  4. String serverName, SecretManager<? extends TokenIdentifier> secretManager,
  5. String portRangeConfig)
  6. throws IOException {
  7. // 绑定IP地址 必填
  8. this.bindAddress = bindAddress;
  9. // 绑定配置文件
  10. this.conf = conf;
  11. this.portRangeConfig = portRangeConfig;
  12. // 绑定端口 必填
  13. this.port = port;
  14. // 这个值应该是为null
  15. this.rpcRequestClass = rpcRequestClass;
  16. // handlerCount的线程数量
  17. this.handlerCount = handlerCount;
  18. this.socketSendBufferSize = 0;
  19. // 服务名
  20. this.serverName = serverName;
  21. this.auxiliaryListenerMap = null;
  22. // server接收的最大数据长度
  23. // ipc.maximum.data.length 默认 : 64 * 1024 * 1024 ===> 64 MB
  24. this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
  25. CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
  26. // handler队列的最大数量默认值为-1, 即默认最大容量为handler线程的数量 * 每个handler线程队列的数量 = 1 * 100 = 100
  27. if (queueSizePerHandler != -1) {
  28. // 最大队列长度: 如果不是默认值, 则是handler线程的数量 * 每个handler线程队列的数量
  29. this.maxQueueSize = handlerCount * queueSizePerHandler;
  30. } else {
  31. // 最大队列长度: 如果是默认值的话, 默认值handler队列值为100
  32. // 所以最大队列长度为: handler 线程的数量 * 100
  33. this.maxQueueSize = handlerCount * conf.getInt(
  34. CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
  35. CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
  36. }
  37. // 返回值的大小如果超过 1024*1024 = 1M,将会有告警[WARN]级别的日志输出....
  38. this.maxRespSize = conf.getInt(
  39. CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
  40. CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
  41. // 设置readThread的线程数量, 默认 1
  42. if (numReaders != -1) {
  43. this.readThreads = numReaders;
  44. } else {
  45. this.readThreads = conf.getInt(
  46. CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
  47. CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
  48. }
  49. // 设置reader的队列长度, 默认 100
  50. this.readerPendingConnectionQueue = conf.getInt(
  51. CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
  52. CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
  53. // Setup appropriate callqueue
  54. final String prefix = getQueueClassPrefix();
  55. // callQueue reader读取client端的数据之后, 放到这个队列里面, 等到handler进行处理
  56. // 队列 : LinkedBlockingQueue<Call> 格式, 调度器默认: DefaultRpcScheduler
  57. this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),
  58. getSchedulerClass(prefix, conf),
  59. getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf);
  60. // 安全相关
  61. this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
  62. this.authorize = conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false);
  63. // configure supported authentications
  64. this.enabledAuthMethods = getAuthMethods(secretManager, conf);
  65. this.negotiateResponse = buildNegotiateResponse(enabledAuthMethods);
  66. // Start the listener here and let it bind to the port
  67. // 创建Listener, 绑定监听的端口, 所有client端发送的请求, 都是通过这里进行转发
  68. listener = new Listener(port);
  69. // set the server port to the default listener port.
  70. this.port = listener.getAddress().getPort();
  71. connectionManager = new ConnectionManager();
  72. this.rpcMetrics = RpcMetrics.create(this, conf);
  73. this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
  74. // 打开/关闭服务器上TCP套接字连接的Nagle算法, 默认值 true
  75. // 如果设置为true,则禁用该算法,并可能会降低延迟,同时会导致更多/更小数据包的开销
  76. this.tcpNoDelay = conf.getBoolean(
  77. CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
  78. CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_DEFAULT);
  79. // 如果当前的rpc服务比其他的rpc服务要慢的话, 记录日志, 默认 false
  80. this.setLogSlowRPC(conf.getBoolean(
  81. CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC,
  82. CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT));
  83. // Create the responder here
  84. // 创建响应服务
  85. responder = new Responder();
  86. // 安全相关
  87. if (secretManager != null || UserGroupInformation.isSecurityEnabled()) {
  88. SaslRpcServer.init(conf);
  89. saslPropsResolver = SaslPropertiesResolver.getInstance(conf);
  90. }
  91. // 设置StandbyException异常处理
  92. this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class);
  93. }

构建完 Server 之后,就调用

  1. server.start();

启动顺序为

  1. public synchronized void start() {
  2. responder.start();
  3. listener.start();
  4. if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
  5. for (Listener newListener : auxiliaryListenerMap.values()) {
  6. newListener.start();
  7. }
  8. }
  9. handlers = new Handler[handlerCount];
  10. for (int i = 0; i < handlerCount; i++) {
  11. handlers[i] = new Handler(i);
  12. handlers[i].start();
  13. }
  14. }

Server组件

Server 服务里面包含多个组件,多个组件之间相互衔接完成 RPC Server 端的功能

关键组件为:Listener,Reader,callQueue,Handler,ConnectionManager,Responder
Server端实现和源码 - 图3

Listener

Listener 是一个线程类,整个 Server 中只会有一个 Listener 线程,用于监听来自客户端的 Socket 连接请求。对于每一个新到达的 Socket 连接请求,Listener 都会从 readers 线程池中选择一个Reader线程来处理

Listener 对象中存在一个 Selector 对象 acceptSelector,负责监听来自客户端的 Socket 连接请求。当acceptSelector 监听到连接请求后,Listener 对象会初始化这个连接,之后采用轮询的方式从 readers 线程池中选出一个 Reader 线程处理 RPC 请求的读取操作

构建

  1. // Start the listener here and let it bind to the port
  2. // 创建Listener, 绑定监听的端口, 所有client端发送的请求, 都是通过这里进行转发
  3. listener = new Listener(port);

常量

目的在于创建一个无阻塞的 socket

  1. private class Listener extends Thread {
  2. // socket 接收服务的 channel 这是一个无阻塞的socker服务.
  3. private ServerSocketChannel acceptChannel = null; //the accept channel
  4. // 注册一个 Selector 用于服务的监控
  5. private Selector selector = null; //the selector that we use for the server
  6. // 注册Reader服务的缓冲池,用于读取client的服务.
  7. private Reader[] readers = null;
  8. private int currentReader = 0;
  9. // Socket 地址的实体对象
  10. private InetSocketAddress address; //the address we bind at
  11. // 监听的端口
  12. private int listenPort; //the port we bind at
  13. //服务监听队列的长度, 默认 128
  14. private int backlogLength = conf.getInt(
  15. CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
  16. CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
  17. ...........
  18. }

构造方法

  • 初始化 Listener,根据 ip ,端口创建一个无阻塞的 socket 并绑定 SelectionKey.OP_ACCEPT 事件到 Selector

  • 根据 readThreads 的数量,构建 Reader ```java Listener(int port) throws IOException {

    // 创建InetSocketAddress 实例 address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode

    // 创建一个无阻塞的socket服务 acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false);

    // Bind the server socket to the local host and port // 绑定服务和端口 bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);

    //Could be an ephemeral port // 可能是一个临时端口 this.listenPort = acceptChannel.socket().getLocalPort();

    //设置当前线程的名字 Thread.currentThread().setName(“Listener at “ + bindAddress + “/“ + this.listenPort);

    // create a selector; // 创建一个selector selector= Selector.open();

    // 创建 Reader readers = new Reader[readThreads]; for (int i = 0; i < readThreads; i++) {

    1. Reader reader = new Reader(
    2. "Socket Reader #" + (i + 1) + " for port " + port);
    3. readers[i] = reader;
    4. reader.start();

    }

    // Register accepts on the server socket with the selector. /// 注册 SelectionKey.OP_ACCEPT 事件到 selector acceptChannel.register(selector, SelectionKey.OP_ACCEPT);

    //设置线程名字 this.setName(“IPC Server listener on “ + port); //设置守护模式. this.setDaemon(true);

}

  1. <a name="JToOX"></a>
  2. #### run
  3. Listener 类中定义了一个 Selector 对象,负责监听 `SelectionKey.OP_ACCEPT` 事件,Listener 线程的 run() 方法会循环判断是否监听到了 OP_ACCEPT 事件, 也就是是否有新的 Socket 连接请求到达,如果有则调用doAccept() 方法响应
  4. ```java
  5. @Override
  6. public void run() {
  7. LOG.info(Thread.currentThread().getName() + ": starting");
  8. SERVER.set(Server.this);
  9. // 创建线程, 定时扫描connection, 关闭超时,无效的连接
  10. connectionManager.startIdleScan();
  11. while (running) {
  12. SelectionKey key = null;
  13. try {
  14. //如果没有请求进来的话,会阻塞.
  15. getSelector().select();
  16. //循环判断是否有新的连接建立请求
  17. Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
  18. while (iter.hasNext()) {
  19. key = iter.next();
  20. iter.remove();
  21. try {
  22. if (key.isValid()) {
  23. if (key.isAcceptable()){
  24. //如果有,则调用doAccept()方法响应
  25. doAccept(key);
  26. }
  27. }
  28. } catch (IOException e) {
  29. }
  30. key = null;
  31. }
  32. } catch (OutOfMemoryError e) {
  33. // 这里可能出现内存溢出的情况,要特别注意
  34. // 如果内存溢出了, 会关闭当前连接, 休眠 60 秒
  35. LOG.warn("Out of Memory in server select", e);
  36. closeCurrentConnection(key, e);
  37. connectionManager.closeIdle(true);
  38. try { Thread.sleep(60000); } catch (Exception ie) {}
  39. } catch (Exception e) {
  40. //捕获到其他异常,也关闭当前连接
  41. closeCurrentConnection(key, e);
  42. }
  43. }
  44. LOG.info("Stopping " + Thread.currentThread().getName());
  45. // 关闭请求. 停止所有服务.
  46. synchronized (this) {
  47. try {
  48. acceptChannel.close();
  49. selector.close();
  50. } catch (IOException e) { }
  51. selector = null;
  52. acceptChannel = null;
  53. // close all connections
  54. connectionManager.stopIdleScan();
  55. connectionManager.closeAll();
  56. }
  57. }

doAccept(key)

doAccept() 方法会接收来自客户端的 Socket 连接请求并初始化 Socket 连接。之后 doAccept() 方法会从 readers线程池中选出一个 Reader 线程读取来自这个客户端的RPC请求。每个Reader线程都会有一个自己的readSelector,用于监听是否有新的 RPC 请求到达

所以 doAccept() 方法在建立连接并选出 Reader 对象后,会在这个 Reader 对象的 readSelector 上注册 OP_READ事件。doAccept() 方法会通过 SelectionKey 将新构造的 Connection 对象传给 Reader,Connection 类封装了 Server 与 Client 之间的 Socket 连接,这样 Reader 线程在被唤醒时就可以通过 Connection 对象读取 RPC 请求

  1. void doAccept(SelectionKey key) throws InterruptedException, IOException,
  2. OutOfMemoryError {
  3. //接收请求,建立连接
  4. ServerSocketChannel server = (ServerSocketChannel) key.channel();
  5. SocketChannel channel;
  6. while ((channel = server.accept()) != null) {
  7. channel.configureBlocking(false);
  8. channel.socket().setTcpNoDelay(tcpNoDelay);
  9. channel.socket().setKeepAlive(true);
  10. // 获取reader, 通过 % 取余的方式获取reader
  11. Reader reader = getReader();
  12. //构造Connection对象,添加到readKey的附件传递给Reader对象
  13. Connection c = connectionManager.register(channel, this.listenPort);
  14. // 如果connectionManager获取不到Connection, 关闭当前连接
  15. if (c == null) {
  16. if (channel.isOpen()) {
  17. IOUtils.cleanup(null, channel);
  18. }
  19. connectionManager.droppedConnections.getAndIncrement();
  20. continue;
  21. }
  22. // so closeCurrentConnection can get the object
  23. key.attach(c);
  24. // reader增加连接, 处理 connection 里面的数据
  25. reader.addConnection(c);
  26. }
  27. }

Reader

Reader 也是一个线程类, 每个 Reader 线程都会负责读取若干个客户端连接发来的 RPC 请求。 而在 Server 类中会存在多个 Reader 线程构成一个 readers 线程池, readers 线程池并发地读取 RPC 请求, 提高了 Server 处理RPC 请求的速率。 Reader 类定义了自己的 readSelector 字段, 用于监听 SelectionKey.OP_READ 事件。 Reader类还定义了 adding 字段标识是否有任务正在添加到 Reader 线程

创建

Reader 是在 Listener 构造方法里面创建,Reader 继承 Thread 类,是一个线程方法

  1. // 创建 Reader
  2. readers = new Reader[readThreads];
  3. for (int i = 0; i < readThreads; i++) {
  4. Reader reader = new Reader(
  5. "Socket Reader #" + (i + 1) + " for port " + port);
  6. readers[i] = reader;
  7. reader.start();
  8. }

[

](https://blog.csdn.net/zhanglong_4444/article/details/105634160)

常量

  1. // 队列
  2. final private BlockingQueue<Connection> pendingConnections;
  3. // Selector 用于注册 channel
  4. private final Selector readSelector;

构造方法

  1. Reader(String name) throws IOException {
  2. // 设置线程名字
  3. super(name);
  4. // reader的队列长度, 默认 100
  5. this.pendingConnections =
  6. new LinkedBlockingQueue<Connection>(readerPendingConnectionQueue);
  7. this.readSelector = Selector.open();
  8. }

run

调用 doRunLoop 方法

  1. @Override
  2. public void run() {
  3. LOG.info("Starting " + Thread.currentThread().getName());
  4. try {
  5. //Reader ... 进行轮询操作...
  6. doRunLoop();
  7. } finally {
  8. try {
  9. readSelector.close();
  10. } catch (IOException ioe) {
  11. LOG.error("Error closing read selector in " + Thread.currentThread().getName(), ioe);
  12. }
  13. }
  14. }

doRunLoop

Reader 线程的主循环则是在 doRunLoop() 方法中实现的,doRunLoop() 方法会监听当前 Reader 对象负责的所有客户端连接中是否有新的 RPC 请求到达,如果有则读取这些请求,然后将成功读取的请求用一个 Call 对象封装, 最后放入 callQueue 中等待 Handler 线程处理

主要有两个步骤:

  • 从队列 pendingConnections 中接入连接,注册 SelectionKey.OP_READ 事件到 Selector

  • 有可读事件时,调用 doRead() 方法处理

    1. private synchronized void doRunLoop() {
    2. while (running) {
    3. SelectionKey key = null;
    4. try {
    5. // consume as many connections as currently queued to avoid
    6. // unbridled acceptance of connections that starves the select
    7. int size = pendingConnections.size();
    8. for (int i=size; i>0; i--) {
    9. Connection conn = pendingConnections.take();
    10. conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
    11. }
    12. //等待请求接入
    13. readSelector.select();
    14. //在当前的readSelector上等待可读事件,也就是有客户端RPC请求到达
    15. Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
    16. while (iter.hasNext()) {
    17. key = iter.next();
    18. iter.remove();
    19. try {
    20. if (key.isReadable()) {
    21. //有可读事件时,调用doRead()方法处理
    22. doRead(key);
    23. }
    24. } catch (CancelledKeyException cke) {
    25. // something else closed the connection, ex. responder or
    26. // the listener doing an idle scan. ignore it and let them
    27. // clean up.
    28. LOG.info(Thread.currentThread().getName() +
    29. ": connection aborted from " + key.attachment());
    30. }
    31. key = null;
    32. }
    33. } catch (InterruptedException e) {
    34. if (running) { // unexpected -- log it
    35. LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
    36. }
    37. } catch (IOException ex) {
    38. LOG.error("Error in Reader", ex);
    39. } catch (Throwable re) {
    40. LOG.error("Bug in read selector!", re);
    41. ExitUtil.terminate(1, "Bug in read selector!");
    42. }
    43. }
    44. }

doRead(key)

当有数据到达触发 Selector 的 SelectionKey.OP_READ 的时候,会通过 key.attachment() 方法获取 SelectionKey key 值上绑定的 Connection 对象,然后调用 c.readAndProcess() 读取数据,同时会更新 connetion 上的 lastContact 时间戳,当 c.readAndProcess() 的返回值 count 值小于 0 或者 connetion 的 shouldClose 方法返回值true 时,才会关闭 connection

  1. // doRead()方法负责读取RPC请求,
  2. // 虽然readSelector监听到了RPC请求的可读事件,
  3. // 但是doRead()方法此时并不知道这个RPC请求是由哪个客户端发送来的,
  4. // 所以doRead()方法首先调用SelectionKey.attachment()方法获取Listener对象构造的Connection对象,
  5. // Connection对象中封装了Server与Client之间的网络连接,之后doRead()方法只需调用
  6. // Connection.readAndProcess()方法就可以读取RPC请求了,这里的设计非常的巧妙。
  7. void doRead(SelectionKey key) throws InterruptedException {
  8. int count;
  9. //通过SelectionKey获取Connection对象
  10. // (Connection对象是 Listener#run方法中的doAccept 方法中绑定的 key.attach(c) )
  11. Connection c = (Connection)key.attachment();
  12. if (c == null) {
  13. return;
  14. }
  15. c.setLastContact(Time.now());
  16. try {
  17. //调用Connection.readAndProcess处理读取请求
  18. count = c.readAndProcess();
  19. } catch (InterruptedException ieo) {
  20. LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
  21. throw ieo;
  22. } catch (Exception e) {
  23. // Any exceptions that reach here are fatal unexpected internal errors
  24. // that could not be sent to the client.
  25. LOG.info(Thread.currentThread().getName() +
  26. ": readAndProcess from client " + c +
  27. " threw exception [" + e + "]", e);
  28. count = -1; //so that the (count < 0) block is executed
  29. }
  30. // setupResponse will signal the connection should be closed when a
  31. // fatal response is sent.
  32. if (count < 0 || c.shouldClose()) {
  33. closeConnection(c);
  34. c = null;
  35. } else {
  36. c.setLastContact(Time.now());
  37. }
  38. }

CallQueue

这里默认就当成一个普通的阻塞式队列,如果不配置 scheduer 的话。默认的调度策略就是 DefaultRpcScheduler

DefaultRpcScheduler 就是一个摆设,什么也干不了。使用的是调度队里的 FIFO 策略

如果配置了其他的策略的话,需要自行去看一下对应的策略。比如:DecayRpcScheduler

默认调度策略是 FIFO,虽然 FIFO 在先到先服务的情况下足够公平,但如果用户执行的 I/O 操作较多,相比 I/O 操作较少的用户,将获得更多的服务。在这种情况下,FIFO 有失公平并且会导致延迟增加

FairCallQueue 队列会根据调用者的调用规模将传入的 RPC 调用分配至多个队列中。调度模块会跟踪最新的调用,并为调用量较小的用户分配更高的优先级

创建

callQueue 是在 Server 初始化的时候进行创建的

callQueue 不仅仅是一个队列,是通过 CallQueueManager 对象进行管理,支持阻塞式队列,调度

  1. //队列 : LinkedBlockingQueue<Call> 格式. 调度器默认: DefaultRpcScheduler
  2. this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),
  3. getSchedulerClass(prefix, conf),
  4. getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf);

常量

  1. // Number of checkpoints for empty queue.
  2. private static final int CHECKPOINT_NUM = 20;
  3. // Interval to check empty queue.
  4. private static final long CHECKPOINT_INTERVAL_MS = 10;
  5. /**
  6. *
  7. * 启用Backoff配置参数
  8. * 当前,若应用程序中包含较多的用户调用,假设没有达到操作系统的连接限制,则RPC请求将处于阻塞状态。
  9. * 或者,当RPC或NameNode在重负载时,可以基于某些策略将一些明确定义的异常抛回给客户端,
  10. * 客户端将理解这种异常并进行指数回退,
  11. * 以此作为类RetryInvocationHandler的另一个实现
  12. */
  13. private volatile boolean clientBackOffEnabled;
  14. // Atomic refs point to active callQueue
  15. // We have two so we can better control swapping
  16. // 存放队列引用
  17. private final AtomicReference<BlockingQueue<E>> putRef;
  18. // 获取队列引用
  19. private final AtomicReference<BlockingQueue<E>> takeRef;
  20. //调度器
  21. private RpcScheduler scheduler;

构造方法

  1. public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass,
  2. Class<? extends RpcScheduler> schedulerClass,
  3. boolean clientBackOffEnabled, int maxQueueSize, String namespace,
  4. Configuration conf) {
  5. int priorityLevels = parseNumLevels(namespace, conf);
  6. // 创建调度scheduler. 默认DefaultRpcScheduler
  7. this.scheduler = createScheduler(schedulerClass, priorityLevels,
  8. namespace, conf);
  9. // 创建queue 实例
  10. BlockingQueue<E> bq = createCallQueueInstance(backingClass,
  11. priorityLevels, maxQueueSize, namespace, conf);
  12. this.clientBackOffEnabled = clientBackOffEnabled;
  13. // 放入队列引用
  14. this.putRef = new AtomicReference<BlockingQueue<E>>(bq);
  15. // 获取队列引用
  16. this.takeRef = new AtomicReference<BlockingQueue<E>>(bq);
  17. LOG.info("Using callQueue: {}, queueCapacity: {}, " +
  18. "scheduler: {}, ipcBackoff: {}.",
  19. backingClass, maxQueueSize, schedulerClass, clientBackOffEnabled);
  20. }

put(E e)

  1. /**
  2. * Insert e into the backing queue or block until we can. If client
  3. * backoff is enabled this method behaves like add which throws if
  4. * the queue overflows.
  5. * If we block and the queue changes on us, we will insert while the
  6. * queue is drained.
  7. */
  8. @Override
  9. public void put(E e) throws InterruptedException {
  10. if (!isClientBackoffEnabled()) {
  11. putRef.get().put(e);
  12. } else if (shouldBackOff(e)) {
  13. throwBackoff();
  14. } else {
  15. // No need to re-check backoff criteria since they were just checked
  16. addInternal(e, false);
  17. }
  18. }

offer(E e)

  1. /**
  2. * Insert e into the backing queue.
  3. * Return true if e is queued.
  4. * Return false if the queue is full.
  5. */
  6. @Override
  7. public boolean offer(E e) {
  8. return putRef.get().offer(e);
  9. }

ConnectionManager

ConnectionManager 就是对 Connection 的一个管理类,可以对 Connection 进行创建、监控等操作

创建

在 Server 的构建方法中进行创建

  1. connectionManager = new ConnectionManager();

常量

  1. // 现有Connection的数量
  2. final private AtomicInteger count = new AtomicInteger();
  3. final private AtomicLong droppedConnections = new AtomicLong();
  4. //现有的Connection连接.
  5. final private Set<Connection> connections;
  6. /* Map to maintain the statistics per User */
  7. final private Map<String, Integer> userToConnectionsMap;
  8. final private Object userToConnectionsMapLock = new Object();
  9. //Timer定时器, 定期检查/关闭 Connection
  10. final private Timer idleScanTimer;
  11. // 定义空闲多久之后关闭 Connection 默认值: 4秒
  12. final private int idleScanThreshold;
  13. // 扫描间隔默认10秒
  14. final private int idleScanInterval;
  15. // 最大等待时间默认值20秒
  16. final private int maxIdleTime;
  17. // 定义一次断开连接的最大客户端数。 默认值 10
  18. final private int maxIdleToClose;
  19. // 定义最大连接数默认值 0, 无限制
  20. final private int maxConnections;

构造方法

  1. ConnectionManager() {
  2. this.idleScanTimer = new Timer(
  3. "IPC Server idle connection scanner for port " + getPort(), true);
  4. this.idleScanThreshold = conf.getInt(
  5. CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_KEY,
  6. CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_DEFAULT);
  7. this.idleScanInterval = conf.getInt(
  8. CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY,
  9. CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_DEFAULT);
  10. this.maxIdleTime = 2 * conf.getInt(
  11. CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
  12. CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT);
  13. this.maxIdleToClose = conf.getInt(
  14. CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY,
  15. CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_DEFAULT);
  16. this.maxConnections = conf.getInt(
  17. CommonConfigurationKeysPublic.IPC_SERVER_MAX_CONNECTIONS_KEY,
  18. CommonConfigurationKeysPublic.IPC_SERVER_MAX_CONNECTIONS_DEFAULT);
  19. // create a set with concurrency -and- a thread-safe iterator, add 2
  20. // for listener and idle closer threads
  21. this.connections = Collections.newSetFromMap(
  22. new ConcurrentHashMap<Connection,Boolean>(
  23. maxQueueSize, 0.75f, readThreads+2));
  24. this.userToConnectionsMap = new ConcurrentHashMap<>();
  25. }

初始化 idleScanTimer 定时任务

  1. this.idleScanTimer = new Timer(
  2. "IPC Server idle connection scanner for port " + getPort(), true);
  1. this.connections = Collections.newSetFromMap(
  2. new ConcurrentHashMap<Connection,Boolean>(
  3. maxQueueSize, 0.75f, readThreads+2));
  4. //返回值是线程安全的 Set<Connection>

ScheduleIdleScanTask 方法

由 Listener 的 run 方法进行调用,定时扫描 connetion,关闭超时、无效的 connetion

  1. private void scheduleIdleScanTask() {
  2. if (!running) {
  3. return;
  4. }
  5. //创建线程,定时扫描connection, 关闭超时、无效的连接
  6. TimerTask idleScanTask = new TimerTask(){
  7. @Override
  8. public void run() {
  9. if (!running) {
  10. return;
  11. }
  12. if (LOG.isDebugEnabled()) {
  13. LOG.debug(Thread.currentThread().getName()+": task running");
  14. }
  15. try {
  16. closeIdle(false);
  17. } finally {
  18. // explicitly reschedule so next execution occurs relative
  19. // to the end of this scan, not the beginning
  20. scheduleIdleScanTask();
  21. }
  22. }
  23. };
  24. idleScanTimer.schedule(idleScanTask, idleScanInterval);
  25. }

register 注册 connetion


由 Listener 的 doAccept 方法创建 Connection,并通过 add(connection) 方法加入到 connections 缓存中

  1. //注册IO读事件
  2. Connection c = connectionManager.register(channel, this.listenPort);
  1. Connection register(SocketChannel channel, int ingressPort) {
  2. if (isFull()) {
  3. return null;
  4. }
  5. Connection connection = new Connection(channel, Time.now(), ingressPort);
  6. add(connection);
  7. if (LOG.isDebugEnabled()) {
  8. LOG.debug("Server connection from " + connection +
  9. "; # active connections: " + size() +
  10. "; # queued calls: " + callQueue.size());
  11. }
  12. return connection;
  13. }
  1. private boolean add(Connection connection) {
  2. boolean added = connections.add(connection);
  3. if (added) {
  4. count.getAndIncrement();
  5. }
  6. return added;
  7. }

Connection

Connection 类封装了 Server 与 Client 之间的 Socket 连接,doAccept() 方法会通过 SelectionKey 将新构造的Connection 对象传给 Reader,这样 Reader 线程在被唤醒时就可以通过 Connection 对象读取 RPC 请求了

创建

当客户端接入,触发 selector 上绑定的 SelectionKey.OP_ACCEPT 事件的时候,会根据当时的 server.accept() 返回的 SocketChannel 和监听的端口建立一个 Connection

  1. //注册IO读事件
  2. Connection c = connectionManager.register(channel, this.listenPort);
  1. Connection connection = new Connection(channel, Time.now(), ingressPort);

常量

  1. private boolean connectionHeaderRead = false; // connection header is read?
  2. private boolean connectionContextRead = false; //if connection context that
  3. //follows connection header is read
  4. private SocketChannel channel;
  5. private ByteBuffer data;
  6. private ByteBuffer dataLengthBuffer;
  7. private LinkedList<RpcCall> responseQueue;
  8. // number of outstanding rpcs
  9. private AtomicInteger rpcCount = new AtomicInteger();
  10. private long lastContact;
  11. private int dataLength;
  12. private Socket socket;
  13. // Cache the remote host & port info so that even if the socket is
  14. // disconnected, we can say where it used to connect to.
  15. private String hostAddress;
  16. private int remotePort;
  17. private InetAddress addr;
  18. IpcConnectionContextProto connectionContext;
  19. String protocolName;
  20. SaslServer saslServer;
  21. private String establishedQOP;
  22. private AuthMethod authMethod;
  23. private AuthProtocol authProtocol;
  24. private boolean saslContextEstablished;
  25. private ByteBuffer connectionHeaderBuf = null;
  26. private ByteBuffer unwrappedData;
  27. private ByteBuffer unwrappedDataLengthBuffer;
  28. private int serviceClass;
  29. private boolean shouldClose = false;
  30. private int ingressPort;
  31. UserGroupInformation user = null;
  32. public UserGroupInformation attemptingUser = null; // user name before auth
  33. // Fake 'call' for failed authorization response
  34. private final RpcCall authFailedCall =
  35. new RpcCall(this, AUTHORIZATION_FAILED_CALL_ID);
  36. private boolean sentNegotiate = false;
  37. private boolean useWrap = false;

构造方法

  1. public Connection(SocketChannel channel, long lastContact,
  2. int ingressPort) {
  3. this.channel = channel;
  4. this.lastContact = lastContact;
  5. this.data = null;
  6. // the buffer is initialized to read the "hrpc" and after that to read
  7. // the length of the Rpc-packet (i.e 4 bytes)
  8. this.dataLengthBuffer = ByteBuffer.allocate(4);
  9. this.unwrappedData = null;
  10. this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
  11. this.socket = channel.socket();
  12. this.addr = socket.getInetAddress();
  13. this.ingressPort = ingressPort;
  14. if (addr == null) {
  15. this.hostAddress = "*Unknown*";
  16. } else {
  17. this.hostAddress = addr.getHostAddress();
  18. }
  19. this.remotePort = socket.getPort();
  20. this.responseQueue = new LinkedList<RpcCall>();
  21. if (socketSendBufferSize != 0) {
  22. try {
  23. socket.setSendBufferSize(socketSendBufferSize);
  24. } catch (IOException e) {
  25. LOG.warn("Connection: unable to set socket send buffer size to " +
  26. socketSendBufferSize);
  27. }
  28. }
  29. }

ReadAndProcess()方法

Reader 线程会调用 readAndProcess() 方法从IO流中读取一个 RPC 请求

  1. /**
  2. * This method reads in a non-blocking fashion from the channel:
  3. * this method is called repeatedly when data is present in the channel;
  4. * when it has enough data to process one rpc it processes that rpc.
  5. *
  6. * On the first pass, it processes the connectionHeader,
  7. * connectionContext (an outOfBand RPC) and at most one RPC request that
  8. * follows that. On future passes it will process at most one RPC request.
  9. *
  10. * Quirky things: dataLengthBuffer (4 bytes) is used to read "hrpc" OR
  11. * rpc request length.
  12. *
  13. * @return -1 in case of error, else num bytes read so far
  14. * @throws IOException - internal error that should not be returned to
  15. * client, typically failure to respond to client
  16. * @throws InterruptedException
  17. *
  18. * readAndProcess()方法会首先从Socket流中读取连接头域(connectionHeader),
  19. * 然后读取一个完整的RPC请求,
  20. * 最后调用processOneRpc()方法处理这个RPC请求。
  21. * processOneRpc()方法会读取出RPC请求头域,
  22. * 然后调用processRpcRequest()处理RPC请求 体。
  23. *
  24. * 这里特别注意,
  25. * 如果在处理过程中抛出了异常,则直接通过Socket返回RPC响应(带 有Server异常信息的响应)。
  26. */
  27. public int readAndProcess() throws IOException, InterruptedException {
  28. while (!shouldClose()) { // stop if a fatal response has been sent.
  29. // dataLengthBuffer is used to read "hrpc" or the rpc-packet length
  30. int count = -1;
  31. if (dataLengthBuffer.remaining() > 0) {
  32. count = channelRead(channel, dataLengthBuffer);
  33. if (count < 0 || dataLengthBuffer.remaining() > 0)
  34. return count;
  35. }
  36. if (!connectionHeaderRead) {
  37. // Every connection is expected to send the header;
  38. // so far we read "hrpc" of the connection header.
  39. if (connectionHeaderBuf == null) {
  40. // for the bytes that follow "hrpc", in the connection header
  41. connectionHeaderBuf = ByteBuffer.allocate(HEADER_LEN_AFTER_HRPC_PART);
  42. }
  43. count = channelRead(channel, connectionHeaderBuf);
  44. if (count < 0 || connectionHeaderBuf.remaining() > 0) {
  45. return count;
  46. }
  47. int version = connectionHeaderBuf.get(0);
  48. // TODO we should add handler for service class later
  49. this.setServiceClass(connectionHeaderBuf.get(1));
  50. dataLengthBuffer.flip();
  51. // Check if it looks like the user is hitting an IPC port
  52. // with an HTTP GET - this is a common error, so we can
  53. // send back a simple string indicating as much.
  54. if (HTTP_GET_BYTES.equals(dataLengthBuffer)) {
  55. setupHttpRequestOnIpcPortResponse();
  56. return -1;
  57. }
  58. if(!RpcConstants.HEADER.equals(dataLengthBuffer)) {
  59. LOG.warn("Incorrect RPC Header length from {}:{} "
  60. + "expected length: {} got length: {}",
  61. hostAddress, remotePort, RpcConstants.HEADER, dataLengthBuffer);
  62. setupBadVersionResponse(version);
  63. return -1;
  64. }
  65. if (version != CURRENT_VERSION) {
  66. //Warning is ok since this is not supposed to happen.
  67. LOG.warn("Version mismatch from " +
  68. hostAddress + ":" + remotePort +
  69. " got version " + version +
  70. " expected version " + CURRENT_VERSION);
  71. setupBadVersionResponse(version);
  72. return -1;
  73. }
  74. // this may switch us into SIMPLE
  75. authProtocol = initializeAuthContext(connectionHeaderBuf.get(2));
  76. dataLengthBuffer.clear(); // clear to next read rpc packet len
  77. connectionHeaderBuf = null;
  78. connectionHeaderRead = true;
  79. continue; // connection header read, now read 4 bytes rpc packet len
  80. }
  81. if (data == null) { // just read 4 bytes - length of RPC packet
  82. dataLengthBuffer.flip();
  83. dataLength = dataLengthBuffer.getInt();
  84. checkDataLength(dataLength);
  85. // Set buffer for reading EXACTLY the RPC-packet length and no more.
  86. data = ByteBuffer.allocate(dataLength);
  87. }
  88. // Now read the RPC packet
  89. count = channelRead(channel, data);
  90. if (data.remaining() == 0) {
  91. dataLengthBuffer.clear(); // to read length of future rpc packets
  92. data.flip();
  93. ByteBuffer requestData = data;
  94. data = null; // null out in case processOneRpc throws.
  95. boolean isHeaderRead = connectionContextRead;
  96. //处理这个RPC请求。
  97. processOneRpc(requestData);
  98. // the last rpc-request we processed could have simply been the
  99. // connectionContext; if so continue to read the first RPC.
  100. if (!isHeaderRead) {
  101. continue;
  102. }
  103. }
  104. return count;
  105. }
  106. return -1;
  107. }

ProcessOneRpc()方法

processOneRpc() 方法会读取出 RPC 请求头域,然后调用 processRpcRequest() 处理 RPC 请求体

  1. /**
  2. * Process one RPC Request from buffer read from socket stream
  3. * - decode rpc in a rpc-Call
  4. * - handle out-of-band RPC requests such as the initial connectionContext
  5. * - A successfully decoded RpcCall will be deposited in RPC-Q and
  6. * its response will be sent later when the request is processed.
  7. *
  8. * Prior to this call the connectionHeader ("hrpc...") has been handled and
  9. * if SASL then SASL has been established and the buf we are passed
  10. * has been unwrapped from SASL.
  11. *
  12. * @param bb - contains the RPC request header and the rpc request
  13. * @throws IOException - internal error that should not be returned to
  14. * client, typically failure to respond to client
  15. * @throws InterruptedException
  16. */
  17. private void processOneRpc(ByteBuffer bb)
  18. throws IOException, InterruptedException {
  19. // exceptions that escape this method are fatal to the connection.
  20. // setupResponse will use the rpc status to determine if the connection
  21. // should be closed.
  22. int callId = -1;
  23. int retry = RpcConstants.INVALID_RETRY_COUNT;
  24. try {
  25. final RpcWritable.Buffer buffer = RpcWritable.Buffer.wrap(bb);
  26. //解析出RPC请求头域
  27. final RpcRequestHeaderProto header =
  28. getMessage(RpcRequestHeaderProto.getDefaultInstance(), buffer);
  29. //从RPC请求头域中提取出callId
  30. callId = header.getCallId();
  31. //从RPC请求头域中提取出重试次数
  32. retry = header.getRetryCount();
  33. if (LOG.isDebugEnabled()) {
  34. LOG.debug(" got #" + callId);
  35. }
  36. //检测头信息是否正确
  37. checkRpcHeaders(header);
  38. //处理RPC请求头域异常的情况
  39. if (callId < 0) { // callIds typically used during connection setup
  40. processRpcOutOfBandRequest(header, buffer);
  41. } else if (!connectionContextRead) {
  42. throw new FatalRpcServerException(
  43. RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
  44. "Connection context not established");
  45. } else {
  46. //如果RPC请求头域正常,则直接调用processRpcRequest处理RPC请求体
  47. processRpcRequest(header, buffer);
  48. }
  49. } catch (RpcServerException rse) {
  50. // inform client of error, but do not rethrow else non-fatal
  51. // exceptions will close connection!
  52. if (LOG.isDebugEnabled()) {
  53. LOG.debug(Thread.currentThread().getName() +
  54. ": processOneRpc from client " + this +
  55. " threw exception [" + rse + "]");
  56. }
  57. //通过Socket返回这个带有异常信息的RPC响应
  58. // use the wrapped exception if there is one.
  59. Throwable t = (rse.getCause() != null) ? rse.getCause() : rse;
  60. final RpcCall call = new RpcCall(this, callId, retry);
  61. setupResponse(call,
  62. rse.getRpcStatusProto(), rse.getRpcErrorCodeProto(), null,
  63. t.getClass().getName(),
  64. t.getMessage() != null ? t.getMessage() : t.toString());
  65. sendResponse(call);
  66. }
  67. }

ProcessRpcRequest 方法

processRpcRequest() 会从输入流中解析出完整的请求对象(包括请求元数据以及请求参数), 然后根据 RPC 请求头的信息(包括callId)构造 Call 对象(Call对象保存了这次调用的所有信息),最后将这个 Call 对象放入callQueue 队列中保存,等待 Handler 线程处理

  1. /**
  2. * Process an RPC Request
  3. * - the connection headers and context must have been already read.
  4. * - Based on the rpcKind, decode the rpcRequest.
  5. * - A successfully decoded RpcCall will be deposited in RPC-Q and
  6. * its response will be sent later when the request is processed.
  7. * @param header - RPC request header
  8. * @param buffer - stream to request payload
  9. * @throws RpcServerException - generally due to fatal rpc layer issues
  10. * such as invalid header or deserialization error. The call queue
  11. * may also throw a fatal or non-fatal exception on overflow.
  12. * @throws IOException - fatal internal error that should/could not
  13. * be sent to client.
  14. * @throws InterruptedException
  15. */
  16. private void processRpcRequest(RpcRequestHeaderProto header,
  17. RpcWritable.Buffer buffer) throws RpcServerException,
  18. InterruptedException {
  19. Class<? extends Writable> rpcRequestClass =
  20. getRpcRequestWrapper(header.getRpcKind());
  21. if (rpcRequestClass == null) {
  22. LOG.warn("Unknown rpc kind " + header.getRpcKind() +
  23. " from client " + getHostAddress());
  24. final String err = "Unknown rpc kind in rpc header" +
  25. header.getRpcKind();
  26. throw new FatalRpcServerException(
  27. RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
  28. }
  29. //读取RPC请求体
  30. Writable rpcRequest;
  31. try { //Read the rpc request
  32. rpcRequest = buffer.newInstance(rpcRequestClass, conf);
  33. } catch (RpcServerException rse) { // lets tests inject failures.
  34. throw rse;
  35. } catch (Throwable t) { // includes runtime exception from newInstance
  36. LOG.warn("Unable to read call parameters for client " +
  37. getHostAddress() + "on connection protocol " +
  38. this.protocolName + " for rpcKind " + header.getRpcKind(), t);
  39. String err = "IPC server unable to read call parameters: "+ t.getMessage();
  40. throw new FatalRpcServerException(
  41. RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
  42. }
  43. TraceScope traceScope = null;
  44. if (header.hasTraceInfo()) {
  45. if (tracer != null) {
  46. // If the incoming RPC included tracing info, always continue the
  47. // trace
  48. SpanId parentSpanId = new SpanId(
  49. header.getTraceInfo().getTraceId(),
  50. header.getTraceInfo().getParentId());
  51. traceScope = tracer.newScope(
  52. RpcClientUtil.toTraceName(rpcRequest.toString()),
  53. parentSpanId);
  54. traceScope.detach();
  55. }
  56. }
  57. CallerContext callerContext = null;
  58. if (header.hasCallerContext()) {
  59. callerContext =
  60. new CallerContext.Builder(header.getCallerContext().getContext())
  61. .setSignature(header.getCallerContext().getSignature()
  62. .toByteArray())
  63. .build();
  64. }
  65. //构造Call对象封装RPC请求信息
  66. RpcCall call = new RpcCall(this, header.getCallId(),
  67. header.getRetryCount(), rpcRequest,
  68. ProtoUtil.convert(header.getRpcKind()),
  69. header.getClientId().toByteArray(), traceScope, callerContext);
  70. // Save the priority level assignment by the scheduler
  71. call.setPriorityLevel(callQueue.getPriorityLevel(call));
  72. call.markCallCoordinated(false);
  73. if(alignmentContext != null && call.rpcRequest != null &&
  74. (call.rpcRequest instanceof ProtobufRpcEngine.RpcProtobufRequest)) {
  75. // if call.rpcRequest is not RpcProtobufRequest, will skip the following
  76. // step and treat the call as uncoordinated. As currently only certain
  77. // ClientProtocol methods request made through RPC protobuf needs to be
  78. // coordinated.
  79. String methodName;
  80. String protoName;
  81. ProtobufRpcEngine.RpcProtobufRequest req =
  82. (ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest;
  83. try {
  84. methodName = req.getRequestHeader().getMethodName();
  85. protoName = req.getRequestHeader().getDeclaringClassProtocolName();
  86. if (alignmentContext.isCoordinatedCall(protoName, methodName)) {
  87. call.markCallCoordinated(true);
  88. long stateId;
  89. stateId = alignmentContext.receiveRequestState(
  90. header, getMaxIdleTime());
  91. call.setClientStateId(stateId);
  92. }
  93. } catch (IOException ioe) {
  94. throw new RpcServerException("Processing RPC request caught ", ioe);
  95. }
  96. }
  97. try {
  98. //将Call对象放入callQueue中,等待Handler处理
  99. internalQueueCall(call);
  100. } catch (RpcServerException rse) {
  101. throw rse;
  102. } catch (IOException ioe) {
  103. throw new FatalRpcServerException(
  104. RpcErrorCodeProto.ERROR_RPC_SERVER, ioe);
  105. }
  106. incRpcCount(); // Increment the rpc count
  107. }

Handler

用于处理 RPC 请求并发回响应。Handler 对象会从 CallQueue 中不停地取出 RPC 请求, 然后执行 RPC 请求对应的本地函数, 最后封装响应并将响应发回客户端。 为了能够并发地处理 RPC 请求,Server 中存在多个 Handler对象

创建

  1. /** Starts the service. Must be called before any calls will be handled. */
  2. public synchronized void start() {
  3. responder.start();
  4. listener.start();
  5. if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
  6. for (Listener newListener : auxiliaryListenerMap.values()) {
  7. newListener.start();
  8. }
  9. }
  10. handlers = new Handler[handlerCount];
  11. for (int i = 0; i < handlerCount; i++) {
  12. handlers[i] = new Handler(i);
  13. handlers[i].start();
  14. }
  15. }

构造方法

  1. public Handler(int instanceNumber) {
  2. this.setDaemon(true);
  3. this.setName("IPC Server handler "+ instanceNumber +
  4. " on default port " + port);
  5. }

run方法

Handler 线程类的主方法会循环从共享队列 callQueue 中取出待处理的 Call 对象,然后调用 Server.call() 方法执行 RPC 调用对应的本地函数,如果在调用过程中发生异常,则将异常信息保存下来。接下来 Handler 会调用setupResponse() 方法构造 RPC 响应, 并调用 responder.doRespond() 方法将响应发回

  1. @Override
  2. public void run() {
  3. LOG.debug(Thread.currentThread().getName() + ": starting");
  4. SERVER.set(Server.this);
  5. while (running) {
  6. TraceScope traceScope = null;
  7. Call call = null;
  8. long startTimeNanos = 0;
  9. // True iff the connection for this call has been dropped.
  10. // Set to true by default and update to false later if the connection
  11. // can be succesfully read.
  12. boolean connDropped = true;
  13. try {
  14. //从callQueue中取出请求
  15. call = callQueue.take(); // pop the queue; maybe blocked here
  16. startTimeNanos = Time.monotonicNowNanos();
  17. if (alignmentContext != null && call.isCallCoordinated() &&
  18. call.getClientStateId() > alignmentContext.getLastSeenStateId()) {
  19. /*
  20. * The call processing should be postponed until the client call's
  21. * state id is aligned (<=) with the server state id.
  22. * NOTE:
  23. * Inserting the call back to the queue can change the order of call
  24. * execution comparing to their original placement into the queue.
  25. * This is not a problem, because Hadoop RPC does not have any
  26. * constraints on ordering the incoming rpc requests.
  27. * In case of Observer, it handles only reads, which are
  28. * commutative.
  29. */
  30. // Re-queue the call and continue
  31. requeueCall(call);
  32. continue;
  33. }
  34. if (LOG.isDebugEnabled()) {
  35. LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind);
  36. }
  37. //设置当前线程要处理的 call 任务
  38. CurCall.set(call);
  39. if (call.traceScope != null) {
  40. call.traceScope.reattach();
  41. traceScope = call.traceScope;
  42. traceScope.getSpan().addTimelineAnnotation("called");
  43. }
  44. // always update the current call context
  45. CallerContext.setCurrent(call.callerContext);
  46. UserGroupInformation remoteUser = call.getRemoteUser();
  47. connDropped = !call.isOpen();
  48. //通过调用Call对象的run()方法发起本地调用,并返回结果
  49. if (remoteUser != null) {
  50. remoteUser.doAs(call);
  51. } else {
  52. // RpcCall#run()
  53. call.run();
  54. }
  55. } catch (InterruptedException e) {
  56. if (running) { // unexpected -- log it
  57. LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
  58. if (traceScope != null) {
  59. traceScope.getSpan().addTimelineAnnotation("unexpectedly interrupted: " +
  60. StringUtils.stringifyException(e));
  61. }
  62. }
  63. } catch (Exception e) {
  64. LOG.info(Thread.currentThread().getName() + " caught an exception", e);
  65. if (traceScope != null) {
  66. traceScope.getSpan().addTimelineAnnotation("Exception: " +
  67. StringUtils.stringifyException(e));
  68. }
  69. } finally {
  70. CurCall.set(null);
  71. IOUtils.cleanupWithLogger(LOG, traceScope);
  72. if (call != null) {
  73. updateMetrics(call, startTimeNanos, connDropped);
  74. ProcessingDetails.LOG.debug(
  75. "Served: [{}]{} name={} user={} details={}",
  76. call, (call.isResponseDeferred() ? ", deferred" : ""),
  77. call.getDetailedMetricsName(), call.getRemoteUser(),
  78. call.getProcessingDetails());
  79. }
  80. }
  81. }
  82. LOG.debug(Thread.currentThread().getName() + ": exiting");
  83. }

RpcCall中的 Run 方法

  1. @Override
  2. public Void run() throws Exception {
  3. if (!connection.channel.isOpen()) {
  4. Server.LOG.info(Thread.currentThread().getName() + ": skipped " + this);
  5. return null;
  6. }
  7. long startNanos = Time.monotonicNowNanos();
  8. Writable value = null;
  9. ResponseParams responseParams = new ResponseParams();
  10. try {
  11. //通过call()发起本地调用,并返回结果
  12. value = call(
  13. rpcKind, connection.protocolName, rpcRequest, timestampNanos);
  14. } catch (Throwable e) {
  15. populateResponseParamsOnError(e, responseParams);
  16. }
  17. if (!isResponseDeferred()) {
  18. long deltaNanos = Time.monotonicNowNanos() - startNanos;
  19. ProcessingDetails details = getProcessingDetails();
  20. details.set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS);
  21. deltaNanos -= details.get(Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
  22. deltaNanos -= details.get(Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
  23. deltaNanos -= details.get(Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
  24. details.set(Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
  25. startNanos = Time.monotonicNowNanos();
  26. setResponseFields(value, responseParams);
  27. sendResponse();
  28. deltaNanos = Time.monotonicNowNanos() - startNanos;
  29. details.set(Timing.RESPONSE, deltaNanos, TimeUnit.NANOSECONDS);
  30. } else {
  31. if (LOG.isDebugEnabled()) {
  32. LOG.debug("Deferring response for callId: " + this.callId);
  33. }
  34. }
  35. return null;
  36. }

最终会匹配到 ProtobufRpcEngine 里面的 call 方法

  1. /**
  2. *
  3. *
  4. * This is a server side method, which is invoked over RPC. On success
  5. * the return response has protobuf response payload. On failure, the
  6. * exception name and the stack trace are returned in the response.
  7. * See {@link HadoopRpcResponseProto}
  8. *
  9. * In this method there three types of exceptions possible and they are
  10. * returned in response as follows.
  11. * <ol>
  12. * <li> Exceptions encountered in this method that are returned
  13. * as {@link RpcServerException} </li>
  14. * <li> Exceptions thrown by the service is wrapped in ServiceException.
  15. * In that this method returns in response the exception thrown by the
  16. * service.</li>
  17. * <li> Other exceptions thrown by the service. They are returned as
  18. * it is.</li>
  19. * </ol>
  20. *
  21. * call()方法首先会从请求头中提取出RPC调用的接口名和方法名等信息,
  22. * 然后根据调用的接口信息获取对应的BlockingService对象,
  23. * 再根据调用的方法信息在BlockingService对象上调用callBlockingMethod()方法
  24. * 并将调用前转到ClientNamenodeProtocolServerSideTranslatorPB对象上,
  25. * 最终这个请求会由 NameNodeRpcServer响应
  26. */
  27. public Writable call(RPC.Server server, String connectionProtocolName,
  28. Writable writableRequest, long receiveTime) throws Exception {
  29. //获取rpc调用头
  30. RpcProtobufRequest request = (RpcProtobufRequest) writableRequest;
  31. RequestHeaderProto rpcRequest = request.getRequestHeader();
  32. //获得调用的接口名、方法名、版本号
  33. String methodName = rpcRequest.getMethodName();
  34. /**
  35. * RPCs for a particular interface (ie protocol) are done using a
  36. * IPC connection that is setup using rpcProxy.
  37. * The rpcProxy's has a declared protocol name that is
  38. * sent form client to server at connection time.
  39. *
  40. * Each Rpc call also sends a protocol name
  41. * (called declaringClassprotocolName). This name is usually the same
  42. * as the connection protocol name except in some cases.
  43. * For example metaProtocols such ProtocolInfoProto which get info
  44. * about the protocol reuse the connection but need to indicate that
  45. * the actual protocol is different (i.e. the protocol is
  46. * ProtocolInfoProto) since they reuse the connection; in this case
  47. * the declaringClassProtocolName field is set to the ProtocolInfoProto.
  48. */
  49. String declaringClassProtoName =
  50. rpcRequest.getDeclaringClassProtocolName();
  51. long clientVersion = rpcRequest.getClientProtocolVersion();
  52. if (server.verbose)
  53. LOG.info("Call: connectionProtocolName=" + connectionProtocolName +
  54. ", method=" + methodName);
  55. //获得该接口在Server侧对应的实现类
  56. ProtoClassProtoImpl protocolImpl = getProtocolImpl(server,
  57. declaringClassProtoName, clientVersion);
  58. BlockingService service = (BlockingService) protocolImpl.protocolImpl;
  59. //获取要调用的方法的描述信息
  60. MethodDescriptor methodDescriptor = service.getDescriptorForType()
  61. .findMethodByName(methodName);
  62. if (methodDescriptor == null) {
  63. String msg = "Unknown method " + methodName + " called on "
  64. + connectionProtocolName + " protocol.";
  65. LOG.warn(msg);
  66. throw new RpcNoSuchMethodException(msg);
  67. }
  68. //获取调用的方法描述符以及调用参数
  69. Message prototype = service.getRequestPrototype(methodDescriptor);
  70. Message param = request.getValue(prototype);
  71. Message result;
  72. Call currentCall = Server.getCurCall().get();
  73. try {
  74. server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
  75. currentCallInfo.set(new CallInfo(server, methodName));
  76. currentCall.setDetailedMetricsName(methodName);
  77. //在实现类上调用callBlockingMethod方法,级联适配调用到NameNodeRpcServer
  78. result = service.callBlockingMethod(methodDescriptor, null, param);
  79. // Check if this needs to be a deferred response,
  80. // by checking the ThreadLocal callback being set
  81. if (currentCallback.get() != null) {
  82. currentCall.deferResponse();
  83. currentCallback.set(null);
  84. return null;
  85. }
  86. } catch (ServiceException e) {
  87. Exception exception = (Exception) e.getCause();
  88. currentCall.setDetailedMetricsName(
  89. exception.getClass().getSimpleName());
  90. throw (Exception) e.getCause();
  91. } catch (Exception e) {
  92. currentCall.setDetailedMetricsName(e.getClass().getSimpleName());
  93. throw e;
  94. } finally {
  95. currentCallInfo.set(null);
  96. }
  97. return RpcWritable.wrap(result);
  98. }
  99. }

Responder

用于向客户端发送 RPC 响应,Responder 也是一个线程类,Server 端仅有一个 Responder 对象,Responder 内部包含一个 Selector 对象 responseSelector,用于监听 SelectionKey.OP_WRITE 事件。 当网络环境不佳或者响应信息太大时, Handler 线程可能无法发送完整的响应信息到客户端, 这时 Handler 会在Responder.responseSelector 上注册 SelectionKey.OP_WRITE 事件,responseSelector 会循环监听网络环境是否具备发送数据的条件,之后 responseselector 会触发 Responder 线程发送未完成的响应结果到客户端

doRunLoop

Responder 是一个线程类,所以核心的还是 run 方法中的 doRunLoop 方法

  1. private void doRunLoop() {
  2. long lastPurgeTimeNanos = 0; // last check for old calls.
  3. while (running) {
  4. try {
  5. waitPending(); // If a channel is being registered, wait.
  6. // 阻塞 15min ,如果超时的话, 会执行后面的清除长期没有发送成功的消息
  7. writeSelector.select(
  8. TimeUnit.NANOSECONDS.toMillis(PURGE_INTERVAL_NANOS));
  9. Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
  10. while (iter.hasNext()) {
  11. SelectionKey key = iter.next();
  12. iter.remove();
  13. try {
  14. if (key.isWritable()) {
  15. //执行写入操作
  16. doAsyncWrite(key);
  17. }
  18. } catch (CancelledKeyException cke) {
  19. // something else closed the connection, ex. reader or the
  20. // listener doing an idle scan. ignore it and let them clean
  21. // up
  22. RpcCall call = (RpcCall)key.attachment();
  23. if (call != null) {
  24. LOG.info(Thread.currentThread().getName() +
  25. ": connection aborted from " + call.connection);
  26. }
  27. } catch (IOException e) {
  28. LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw exception " + e);
  29. }
  30. }
  31. long nowNanos = Time.monotonicNowNanos();
  32. if (nowNanos < lastPurgeTimeNanos + PURGE_INTERVAL_NANOS) {
  33. continue;
  34. }
  35. lastPurgeTimeNanos = nowNanos;
  36. //
  37. // If there were some calls that have not been sent out for a
  38. // long time, discard them.
  39. //
  40. if(LOG.isDebugEnabled()) {
  41. LOG.debug("Checking for old call responses.");
  42. }
  43. ArrayList<RpcCall> calls;
  44. // get the list of channels from list of keys.
  45. synchronized (writeSelector.keys()) {
  46. calls = new ArrayList<RpcCall>(writeSelector.keys().size());
  47. iter = writeSelector.keys().iterator();
  48. while (iter.hasNext()) {
  49. SelectionKey key = iter.next();
  50. RpcCall call = (RpcCall)key.attachment();
  51. if (call != null && key.channel() == call.connection.channel) {
  52. calls.add(call);
  53. }
  54. }
  55. }
  56. // 移除掉已经很久没有发送调的信息
  57. for (RpcCall call : calls) {
  58. doPurge(call, nowNanos);
  59. }
  60. } catch (OutOfMemoryError e) {
  61. //
  62. // we can run out of memory if we have too many threads
  63. // log the event and sleep for a minute and give
  64. // some thread(s) a chance to finish
  65. //
  66. LOG.warn("Out of Memory in server select", e);
  67. try { Thread.sleep(60000); } catch (Exception ie) {}
  68. } catch (Exception e) {
  69. LOG.warn("Exception in Responder", e);
  70. }
  71. }
  72. }

ProcessResponse

异步处理请求的方法

  1. // Processes one response. Returns true if there are no more pending
  2. // data for this channel.
  3. //
  4. private boolean processResponse(LinkedList<RpcCall> responseQueue,
  5. boolean inHandler) throws IOException {
  6. boolean error = true;
  7. boolean done = false; // there is more data for this channel.
  8. int numElements = 0;
  9. RpcCall call = null;
  10. try {
  11. synchronized (responseQueue) {
  12. //
  13. // If there are no items for this channel, then we are done
  14. //
  15. numElements = responseQueue.size();
  16. if (numElements == 0) {
  17. error = false;
  18. return true; // no more data for this channel.
  19. }
  20. //
  21. // Extract the first call
  22. //
  23. call = responseQueue.removeFirst();
  24. SocketChannel channel = call.connection.channel;
  25. if (LOG.isDebugEnabled()) {
  26. LOG.debug(Thread.currentThread().getName() + ": responding to " + call);
  27. }
  28. //
  29. // Send as much data as we can in the non-blocking fashion
  30. //
  31. int numBytes = channelWrite(channel, call.rpcResponse);
  32. if (numBytes < 0) {
  33. return true;
  34. }
  35. if (!call.rpcResponse.hasRemaining()) {
  36. //Clear out the response buffer so it can be collected
  37. call.rpcResponse = null;
  38. call.connection.decRpcCount();
  39. if (numElements == 1) { // last call fully processes.
  40. done = true; // no more data for this channel.
  41. } else {
  42. done = false; // more calls pending to be sent.
  43. }
  44. if (LOG.isDebugEnabled()) {
  45. LOG.debug(Thread.currentThread().getName() + ": responding to " + call
  46. + " Wrote " + numBytes + " bytes.");
  47. }
  48. } else {
  49. //
  50. // If we were unable to write the entire response out, then
  51. // insert in Selector queue.
  52. //
  53. call.connection.responseQueue.addFirst(call);
  54. if (inHandler) {
  55. // set the serve time when the response has to be sent later
  56. call.timestampNanos = Time.monotonicNowNanos();
  57. incPending();
  58. try {
  59. // Wakeup the thread blocked on select, only then can the call
  60. // to channel.register() complete.
  61. writeSelector.wakeup();
  62. channel.register(writeSelector, SelectionKey.OP_WRITE, call);
  63. } catch (ClosedChannelException e) {
  64. //Its ok. channel might be closed else where.
  65. done = true;
  66. } finally {
  67. decPending();
  68. }
  69. }
  70. if (LOG.isDebugEnabled()) {
  71. LOG.debug(Thread.currentThread().getName() + ": responding to " + call
  72. + " Wrote partial " + numBytes + " bytes.");
  73. }
  74. }
  75. error = false; // everything went off well
  76. }
  77. } finally {
  78. if (error && call != null) {
  79. LOG.warn(Thread.currentThread().getName()+", call " + call + ": output error");
  80. done = true; // error. no more data for this channel.
  81. closeConnection(call.connection);
  82. }
  83. }
  84. return done;
  85. }