1、选择器介绍

选择器的作用是完成IO的多路复用

并不是所有通道都可以被选择器监控或者选择的,例如FileChannel就不能被选择器复用。判断一个通道是否能被选择器监控或者选择只需看其是否继承了抽象类SelectableChannel

选择器和通道是监控和被监控的关系。通过选择器可以监控多个通道的IO情况。
一般来说,一个单线程处理一个选择器,一个选择器可以监控很多通道,通过选择器,一个单线程就可以处理数千数万甚至更多的通道。在极端情况下(数万连接),只用一个线程就可以处理所有的通道,会大量的减少线程之间上下文切换的开销。

单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用

  • 多路复用仅针对网络 IO、普通文件 IO 没法利用多路复用
  • 如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 能够保证
    • 有可连接事件时才去连接
    • 有可读事件才去读取
    • 有可写事件才去写入
      • 限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件

通道和选择器的关系通过通道注册的形式确定,调用通道的注册方法可以将通道实例注册到一个选择器中,注册方法有两个参数一个是指定的选择器实例,一个是指定选择器监控的IO事件类型。

  • 可读
  • 可写
  • 连接
  • 接收

2、选择器使用

2.1、监听 Channel 事件

可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少 channel 发生了事件
方法1,阻塞直到绑定事件发生
int count = selector.select();
方法2,阻塞直到绑定事件发生,或是超时(时间单位为 ms)
int count = selector.select(long timeout);
方法3,不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件
int count = selector.selectNow();

💡 select 何时不阻塞

  • 事件发生时
    • 客户端发起连接请求,会触发 accept 事件
    • 客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件
    • channel 可写,会触发 write 事件
    • 在 linux 下 nio bug 发生时
  • 调用 selector.wakeup()
  • 调用 selector.close()
  • selector 所在线程 interrupt

2.2、注册选择器

SelectionKey serverSelectKey = ssc.register(通道, 关注事件, 关联的buffer);

  • 可读 SelectionKey.OP_READ
  • 可写 SelectionKey.OP_WRITE
  • 连接 SelectionKey.OP_CONNECT
  • 接收 SelectionKey.OP_ACCEPT

通过或运算监控通道的多个事件:
int ket = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

3、选择器实战

server端

  1. @Slf4j
  2. public class SelectorServer {
  3. public static void main(String[] args) throws IOException {
  4. //1、创建selector 选择器,管理多个channel
  5. Selector selector = Selector.open();
  6. //2、创建服务器
  7. ServerSocketChannel ssc = ServerSocketChannel.open();
  8. ssc.configureBlocking(false);
  9. ssc.bind(new InetSocketAddress(8989));
  10. //3、注册selector
  11. /**
  12. * SelectionKey为事件发生后,通过它可以知道事件和那个channel发生的事件
  13. * 事件分为:accept(有连接请求时触发),connect(建立连接时触发),read,write
  14. */
  15. SelectionKey serverSelectKey = ssc.register(selector, 0, null);
  16. //服务端只关注accept
  17. serverSelectKey.interestOps(SelectionKey.OP_ACCEPT);
  18. log.info("register key : {}",serverSelectKey);
  19. while (true){
  20. //3、select 没有事件发生则线程阻塞,在事件未处理时,它不会阻塞,事件处理或者取消就会阻塞
  21. // selector会在事件发生后向selectedKeys集合中加入key,但是不会删除key,所以处理完一个key,需要删除
  22. selector.select();
  23. //4、处理事件
  24. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
  25. while(iterator.hasNext()){
  26. SelectionKey key = iterator.next();
  27. //处理key时要从selectedKeys集合删除
  28. iterator.remove();
  29. log.info("key: {}",key);
  30. //5、区分事件类型
  31. if(key.isAcceptable()){
  32. ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
  33. SocketChannel clientChannel = serverChannel.accept();
  34. log.info("accepted... {}",clientChannel);
  35. clientChannel.configureBlocking(false);
  36. ByteBuffer buffer = ByteBuffer.allocate(1024);
  37. //byteBuffer非线程安全,将byteBuffer作为附件关联到clientSelectKey上
  38. SelectionKey clientSelectKey = clientChannel.register(selector, SelectionKey.OP_READ, buffer);
  39. log.info("clientSelectKey {}",clientSelectKey);
  40. }else if(key.isReadable()){
  41. try {
  42. SocketChannel clientChannel = (SocketChannel) key.channel();
  43. //获取关联的byteBuffer
  44. ByteBuffer buffer = (ByteBuffer) key.attachment();
  45. //如果客户端是正常断开,read返回-1
  46. int read = clientChannel.read(buffer);
  47. if(read == -1){
  48. key.cancel();
  49. log.info("客户端 {} 断开",clientChannel);
  50. }else {
  51. split(buffer);
  52. //扩容
  53. if(buffer.limit() == buffer.position()){
  54. ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
  55. buffer.flip();
  56. newBuffer.put(buffer);
  57. key.attach(newBuffer);
  58. }
  59. }
  60. } catch (IOException e) {
  61. //避免客户端异常断开造成异常导致服务器停掉,需要将key从选择器key集合中删除
  62. e.printStackTrace();
  63. key.cancel();
  64. }
  65. }
  66. }
  67. }
  68. }
  69. private static void split(ByteBuffer source) {
  70. source.flip();
  71. for (int i = 0; i < source.limit() ; i++) {
  72. if (source.get(i) == '#'){
  73. int len = i + 1 - source.position();
  74. ByteBuffer target = ByteBuffer.allocate(len);
  75. for (int j = 0; j < len; j++) {
  76. byte b = source.get();
  77. target.put(b);
  78. }
  79. target.flip();
  80. log.info(StandardCharsets.UTF_8.decode(target).toString());
  81. }
  82. }
  83. source.compact();
  84. }
  85. }

💡 事件发生后能否不处理

事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发

💡 为何要 iter.remove()

因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如

  • 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey
  • 第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常

💡 cancel 的作用

cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件

💡 客户端断开事件

如果客户端是正常断开,read返回-1 客户端异常断开会抛出IOException异常

client端

  1. public class Client {
  2. public static void main(String[] args) throws IOException {
  3. SocketChannel sc = SocketChannel.open();
  4. sc.connect(new InetSocketAddress("localhost",8989));
  5. Scanner scanner = new Scanner(System.in);
  6. while (scanner.hasNext()) {
  7. String str = scanner.next();
  8. if("close".equals(str)){
  9. sc.close();
  10. }else {
  11. sc.write(StandardCharsets.UTF_8.encode(str));
  12. }
  13. }
  14. }
  15. }