Selector、SelectionKey 和 Channel 这三个组件构成了 Java nio 包的核心,也是 Reactor 模型在代码层面的体现。Selector 能让单线程同时处理多个客户端 Channel,非常适用于高并发,传输数据量较小的场景。要使用 Selector,首先要将对应的 Channel 及 IO 事件(读、写、连接)注册到 Selector,注册后会产生一个 SelectionKey 对象,用于关联 Selector 和 Channel,及后续的 IO 事件处理。这三者的关系如下图所示。

在这里插入图片描述

对 nio 编程不熟的同学可以搜索一些简单的 demo 跑一下,下面 我们直接进入源码,窥探一些 nio 的奥秘。

Selector

其实,不管是 Selector 还是 SelectionKey 的源码,其具体实现类都是依赖于底层操作系统的,这里我们只看一下抽象类 Selector 的源码,日后有事件,再找一些具体的实现类深入分析一下。

  1. public abstract class Selector implements Closeable {
  2. protected Selector() { }
  3. /**
  4. * 获取一个 Selector对象,具体实现依赖于底层操作系统
  5. */
  6. public static Selector open() throws IOException {
  7. return SelectorProvider.provider().openSelector();
  8. }
  9. /**
  10. * 判断该 Selector 是否已开启
  11. */
  12. public abstract boolean isOpen();
  13. /**
  14. * 当前所有向Selector注册的Channel 所对应的SelectionKey的集合
  15. */
  16. public abstract Set<SelectionKey> keys();
  17. /**
  18. * 相关事件已经被 Selector 捕获的 SelectionKey的集合
  19. */
  20. public abstract Set<SelectionKey> selectedKeys();
  21. /**
  22. * 阻塞到至少有一个通道在你注册的事件上就绪了
  23. */
  24. public abstract int select() throws IOException;
  25. /**
  26. * 和select()一样,除了最长会阻塞timeout毫秒
  27. */
  28. public abstract int select(long timeout) throws IOException;
  29. /**
  30. * 此方法执行非阻塞的选择操作,如果自从上一次选择操作后,
  31. * 没有通道变成可选择的,则此方法直接返回 0
  32. */
  33. public abstract int selectNow() throws IOException;
  34. /**
  35. * 用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效
  36. * 通道本身并不会关闭
  37. */
  38. public abstract void close() throws IOException;
  39. }

SelectionKey

表示 SelectableChannel 在 Selector 中的注册的标记 / 句柄。

  1. public abstract class SelectionKey {
  2. protected SelectionKey() { }
  3. // -- Channel and selector operations --
  4. /**
  5. * 获取该 SelectionKey 对应的Channel,Channel注册到Selector时会产生该 SelectionKey对象
  6. */
  7. public abstract SelectableChannel channel();
  8. /**
  9. * 获取该 SelectionKey 对应的 Selector
  10. */
  11. public abstract Selector selector();
  12. /**
  13. * 该 SelectionKey 是否是有效的
  14. */
  15. public abstract boolean isValid();
  16. // ------ Operation-set accessors ------
  17. /**
  18. * 获取该 SelectionKey 的兴趣事件 (既 SelectionKey 的4个 事件静态常量)
  19. */
  20. public abstract int interestOps();
  21. /**
  22. * 设置该 SelectionKey 的兴趣事件
  23. */
  24. public abstract SelectionKey interestOps(int ops);
  25. /**
  26. * 获取该 SelectionKey 的已操作集
  27. */
  28. public abstract int readyOps();
  29. // ------ Operation bits and bit-testing convenience methods ------
  30. /**
  31. * channel中的数据是否已经可以读取
  32. */
  33. public static final int OP_READ = 1 << 0;
  34. /**
  35. * channel是否可以开始写入数据
  36. */
  37. public static final int OP_WRITE = 1 << 2;
  38. /**
  39. * channel是否已经建立连接
  40. */
  41. public static final int OP_CONNECT = 1 << 3;
  42. /**
  43. * ServerSocketChannel 是否可以与客户端建立连接
  44. */
  45. public static final int OP_ACCEPT = 1 << 4;
  46. /**
  47. * channel是否可读
  48. */
  49. public final boolean isReadable() {
  50. return (readyOps() & OP_READ) != 0;
  51. }
  52. /**
  53. * channel是否可写
  54. */
  55. public final boolean isWritable() {
  56. return (readyOps() & OP_WRITE) != 0;
  57. }
  58. /**
  59. * channel是否建立连接
  60. */
  61. public final boolean isConnectable() {
  62. return (readyOps() & OP_CONNECT) != 0;
  63. }
  64. /**
  65. * ServerSocketChannel是否可与客户端channel建立连接
  66. */
  67. public final boolean isAcceptable() {
  68. return (readyOps() & OP_ACCEPT) != 0;
  69. }
  70. }

Channel 组件

平时编码用的比较多的就是 SocketChannel 和 ServerSocketChannel,而将 Channel 与 Selecor 关联到一起的核心 API 则定义在它们的公共父类 SelectableChannel 中,整个 Channel 组件的核心类图如下所示。

在这里插入图片描述

SelectableChannel

  1. public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel {
  2. protected SelectableChannel() { }
  3. /**
  4. * 当前channel是否注册到了某个selector上,新创建的channel都是未注册状态
  5. */
  6. public abstract boolean isRegistered();
  7. /**
  8. * 根据给定的 Selector,获取本channel注册上去的 SelectionKey
  9. */
  10. public abstract SelectionKey keyFor(Selector sel);
  11. /**
  12. * 将当前channel及关注的事件,注册到Selector上,返回一个 SelectionKey
  13. */
  14. public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException {
  15. return register(sel, ops, null);
  16. }
  17. public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;
  18. /**
  19. * 设置该channel的阻塞模式,默认为 true阻塞
  20. */
  21. public abstract SelectableChannel configureBlocking(boolean block) throws IOException;
  22. /**
  23. * 是否为阻塞IO模式
  24. */
  25. public abstract boolean isBlocking();
  26. }

ServerSocketChannel

相当于 BIO 中的 ServerSocket,主要用于服务端与客户端建立连接通信的 channel。

  1. public abstract class ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel {
  2. protected ServerSocketChannel(SelectorProvider provider) {
  3. super(provider);
  4. }
  5. /**
  6. * 获取一个 ServerSocketChannel实例,具体实现依赖底层操作系统
  7. */
  8. public static ServerSocketChannel open() throws IOException {
  9. return SelectorProvider.provider().openServerSocketChannel();
  10. }
  11. // -- ServerSocket-specific operations --
  12. /**
  13. * 绑定ip地址及要监听的端口
  14. */
  15. public final ServerSocketChannel bind(SocketAddress local) throws IOException {
  16. return bind(local, 0);
  17. }
  18. public abstract ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException;
  19. /**
  20. * 与一个客户端channel建立连接,返回该客户端的存根 SocketChannel
  21. */
  22. public abstract SocketChannel accept() throws IOException;
  23. }

SocketChannel

相当于 BIO 中的 Socket,主要用于通信双方的读写操作。

  1. public abstract class SocketChannel extends AbstractSelectableChannel
  2. implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel {
  3. protected SocketChannel(SelectorProvider provider) {
  4. super(provider);
  5. }
  6. /**
  7. * 根据 SocketAddress 获取一个 SocketChannel,具体实现依赖底层操作系统
  8. */
  9. public static SocketChannel open(SocketAddress remote) throws IOException {
  10. SocketChannel sc = open();
  11. try {
  12. sc.connect(remote);
  13. } catch (Throwable x) {
  14. try {
  15. sc.close();
  16. } catch (Throwable suppressed) {
  17. x.addSuppressed(suppressed);
  18. }
  19. throw x;
  20. }
  21. assert sc.isConnected();
  22. return sc;
  23. }
  24. public static SocketChannel open() throws IOException {
  25. return SelectorProvider.provider().openSocketChannel();
  26. }
  27. // -- Socket-specific operations --
  28. /**
  29. * 绑定要连接的远程服务的ip及端口
  30. */
  31. @Override
  32. public abstract SocketChannel bind(SocketAddress local) throws IOException;
  33. /**
  34. * 该channel与服务端是否已连接
  35. */
  36. public abstract boolean isConnected();
  37. // -- ByteChannel operations --
  38. /**
  39. * 将 channel 中的数据读到 ByteBuffer
  40. */
  41. public abstract int read(ByteBuffer dst) throws IOException;
  42. public final long read(ByteBuffer[] dsts) throws IOException {
  43. return read(dsts, 0, dsts.length);
  44. }
  45. public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException;
  46. /**
  47. * 将 ByteBuffer 中的数据写到 channel
  48. */
  49. public abstract int write(ByteBuffer src) throws IOException;
  50. public final long write(ByteBuffer[] srcs) throws IOException {
  51. return write(srcs, 0, srcs.length);
  52. }
  53. public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException;
  54. }