选择器的使用步骤

  • 获取选择器

与通道和缓冲区的获取类似,选择器的获取也是通过静态工厂方法open()来得到的。

  1. Selector selector = Selector.open(); // 获取一个选择器实例
  • 获取可被选择器监控的通道

能够被选择器监控的通道必须实现了SelectableChannel接口,并且需要将通道配置成非阻塞模式,否则后续的注册会抛出IllegalBlockingModeException

  1. SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9090)); // 打开 SocketChannel 并连接到本机 9090 端口
  2. socketChannel.configureBlocking(false); // 配置通道为非阻塞模式
  • 将通道注册到选择器

通道在被指定的选择器监控之前,应该先告诉选择器,并且告知监控的事件,即:将通道注册到选择器。
通道的注册通过SelectableChannel.register(Selector selector,int ops)来完成,ops表示关注的事件,如果需要关注该通道的多个IO事件,可以传入这些时间类型或运算之后的结果。这些时间必须是通道所支持的,否则抛出IllegalArgumentException。

  1. socketChannel.register(selector, SelectionKey.OP_READ); // 将套接字通过到注册到选择器,关注 read事件
  • 轮询select就绪事件

通过调用选择器的Selector.select()方法可以获取就绪事件,该方法会将就绪事件放到一个SelectionKey集合中,然后返回就绪的事件个数。这个方法映射多路复用IO模型中的select系统调用,它是一个阻塞方法。正常情况下,直到至少有一个就绪事件,或者其他线程调用了当前Selector对象的wakeup()方法,或者当前线程被中断时返回。

  1. while (selector.select() > 0){ // 轮询,且返回时有就绪事件
  2. Set<SelectionKey> keys = selector.selectedKeys(); // 获取就绪事件集合
  3. .......
  4. }

三种方式select就绪事件:

  1. select()阻塞方法,有一个就绪事件,或者其他线程调用了wakeup()或者当前线程被中断时返回。
  2. select(long timeout)阻塞方法,有一个就绪事件,或者其他线程调用了wakeup()或者当前线程被中断,或者阻塞时长达到了timeout时返回。不抛出超时异常
  3. selectNow()不阻塞,如果没有就绪事件则返回0,如果有就绪事件则将就绪事件放到一个集合中,返回就绪事件的数量。
  • 处理就绪事件

每次可以select出一批就绪的事件,所以需要对这些事件进行迭代。从一个SelectKey对象可以得到:就绪事件对应的通道、就绪的事件。通过这些信息,就可以很方便地进行IO操作。

  1. for(SelectionKey key : keys){
  2. if(key.isWritable()){ // 可写事件
  3. if("Bye".equals( (line = scanner.nextLine()) )){
  4. socketChannel.shutdownOutput();
  5. socketChannel.close();
  6. break;
  7. }
  8. buf.put(line.getBytes());
  9. buf.flip();
  10. socketChannel.write(buf);
  11. buf.compact();
  12. }
  13. }
  14. keys.clear(); // 清除选择键(事件)集,避免下次循环的时候重复处理。

需要注意,处理完IO事件之后,需要清除选择键集合,避免下一轮循环的时候对同一事件重复处理。

代码案例

  1. package cn.linguo.netty.nio.selector;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.DatagramChannel;
  6. import java.nio.channels.ReadableByteChannel;
  7. import java.nio.channels.SelectionKey;
  8. import java.nio.channels.Selector;
  9. import java.nio.channels.ServerSocketChannel;
  10. import java.nio.channels.SocketChannel;
  11. import java.util.Set;
  12. public class EchoServer {
  13. public static void main(String[] args) throws IOException {
  14. Selector selector = Selector.open(); // 获取选择器
  15. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 打开服务器通道
  16. serverSocketChannel.configureBlocking(false); // 服务器通道配置为非阻塞模式
  17. serverSocketChannel.bind(new InetSocketAddress(9090)); // 绑定 TCP 端口 9090
  18. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 将服务器通道注册到选择器 selector 中,注册事件为 ACCEPT
  19. DatagramChannel datagramChannel = DatagramChannel.open(); // 打开套接字通道
  20. datagramChannel.configureBlocking(false); // 配置通道为非阻塞模式
  21. datagramChannel.bind(new InetSocketAddress(9090)); // 绑定 UDP 端口 9090
  22. datagramChannel.register(selector, SelectionKey.OP_READ); // 将通道注册到选择器 selector 中,注册事件为读取数据
  23. ByteBuffer buf = ByteBuffer.allocate(1024); // 分配一个 1024 字节的堆字节缓冲区
  24. while (selector.select() > 0) { // 轮询已经就绪的注册的通道的 I/O 事件
  25. Set<SelectionKey> keys = selector.selectedKeys(); // 获取就绪的 I/O 事件,即选择器键集合
  26. for (SelectionKey key : keys) { // 遍历选择键,处理就绪事件
  27. if (key.isAcceptable()) { // 选择键的事件的是 I/O 连接事件
  28. SocketChannel socketChannel = serverSocketChannel.accept(); // 执行 I/O 操作,获取套接字连接通道
  29. socketChannel.configureBlocking(false); // 配置为套接字通道为非阻塞模式
  30. socketChannel.register(selector, SelectionKey.OP_READ); // 将套接字通过到注册到选择器,关注 READ 事件
  31. System.err.println("选择器拿到连接事件");
  32. } else if (key.isReadable()) { // 选择键的事件是 READ
  33. System.err.println("选择器拿到可读事件");
  34. StringBuilder sb = new StringBuilder();
  35. if (key.channel() instanceof DatagramChannel) { // 选择的通道为数据报通道,客户端是通过 UDP 连接过来的
  36. sb.append("UDP Client: ");
  37. datagramChannel.receive(buf); // 最多读取 1024 字节,数据报多出的部分自动丢弃
  38. buf.flip();
  39. while (buf.position() < buf.limit()) {
  40. sb.append((char) buf.get());
  41. }
  42. buf.clear();
  43. } else { // 选择的通道为套接字通道,客户端时通过 TCP 连接过来的
  44. sb.append("TCP Client: ");
  45. ReadableByteChannel channel = (ReadableByteChannel) key.channel(); // 获取通道
  46. int size;
  47. while ((size = channel.read(buf)) > 0) {
  48. buf.flip();
  49. while (buf.position() < buf.limit()) {
  50. sb.append((char) buf.get());
  51. }
  52. buf.clear();
  53. }
  54. if (size == -1) {
  55. sb.append("Exit");
  56. channel.close();
  57. }
  58. }
  59. System.out.println(sb);
  60. }
  61. }
  62. keys.clear(); // 将选择键清空,防止下次循环时被重复处理
  63. }
  64. }
  65. }
  1. package cn.linguo.netty.nio.selector;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.SocketChannel;
  8. import java.util.Scanner;
  9. import java.util.Set;
  10. public class TcpClient {
  11. public static void main(String[] args) throws IOException {
  12. Selector selector = Selector.open();
  13. SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9090));
  14. socketChannel.configureBlocking(false);
  15. socketChannel.register(selector, SelectionKey.OP_WRITE);
  16. Scanner scanner = new Scanner(System.in);
  17. String line;
  18. ByteBuffer buf = ByteBuffer.allocate(1024);
  19. while (selector.select() > 0) {
  20. Set<SelectionKey> keys = selector.selectedKeys();
  21. for (SelectionKey key : keys) {
  22. if (key.isWritable()) {
  23. if ("Bye".equals((line = scanner.nextLine()))) {
  24. socketChannel.shutdownOutput();
  25. socketChannel.close();
  26. break;
  27. }
  28. buf.put(line.getBytes());
  29. buf.flip();
  30. socketChannel.write(buf);
  31. buf.compact();
  32. }
  33. }
  34. keys.clear();
  35. if (!socketChannel.isOpen())
  36. break;
  37. }
  38. }
  39. }