选择器的使用步骤
- 获取选择器
与通道和缓冲区的获取类似,选择器的获取也是通过静态工厂方法open()来得到的。
Selector selector = Selector.open(); // 获取一个选择器实例
- 获取可被选择器监控的通道
能够被选择器监控的通道必须实现了SelectableChannel接口,并且需要将通道配置成非阻塞模式,否则后续的注册会抛出IllegalBlockingModeException
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9090)); // 打开 SocketChannel 并连接到本机 9090 端口socketChannel.configureBlocking(false); // 配置通道为非阻塞模式
- 将通道注册到选择器
通道在被指定的选择器监控之前,应该先告诉选择器,并且告知监控的事件,即:将通道注册到选择器。
通道的注册通过SelectableChannel.register(Selector selector,int ops)来完成,ops表示关注的事件,如果需要关注该通道的多个IO事件,可以传入这些时间类型或运算之后的结果。这些时间必须是通道所支持的,否则抛出IllegalArgumentException。
socketChannel.register(selector, SelectionKey.OP_READ); // 将套接字通过到注册到选择器,关注 read事件
- 轮询select就绪事件
通过调用选择器的Selector.select()方法可以获取就绪事件,该方法会将就绪事件放到一个SelectionKey集合中,然后返回就绪的事件个数。这个方法映射多路复用IO模型中的select系统调用,它是一个阻塞方法。正常情况下,直到至少有一个就绪事件,或者其他线程调用了当前Selector对象的wakeup()方法,或者当前线程被中断时返回。
while (selector.select() > 0){ // 轮询,且返回时有就绪事件Set<SelectionKey> keys = selector.selectedKeys(); // 获取就绪事件集合.......}
三种方式select就绪事件:
- select()阻塞方法,有一个就绪事件,或者其他线程调用了wakeup()或者当前线程被中断时返回。
- select(long timeout)阻塞方法,有一个就绪事件,或者其他线程调用了wakeup()或者当前线程被中断,或者阻塞时长达到了timeout时返回。不抛出超时异常
- selectNow()不阻塞,如果没有就绪事件则返回0,如果有就绪事件则将就绪事件放到一个集合中,返回就绪事件的数量。
- 处理就绪事件
每次可以select出一批就绪的事件,所以需要对这些事件进行迭代。从一个SelectKey对象可以得到:就绪事件对应的通道、就绪的事件。通过这些信息,就可以很方便地进行IO操作。
for(SelectionKey key : keys){if(key.isWritable()){ // 可写事件if("Bye".equals( (line = scanner.nextLine()) )){socketChannel.shutdownOutput();socketChannel.close();break;}buf.put(line.getBytes());buf.flip();socketChannel.write(buf);buf.compact();}}keys.clear(); // 清除选择键(事件)集,避免下次循环的时候重复处理。
需要注意,处理完IO事件之后,需要清除选择键集合,避免下一轮循环的时候对同一事件重复处理。
代码案例
package cn.linguo.netty.nio.selector;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.DatagramChannel;import java.nio.channels.ReadableByteChannel;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Set;public class EchoServer {public static void main(String[] args) throws IOException {Selector selector = Selector.open(); // 获取选择器ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 打开服务器通道serverSocketChannel.configureBlocking(false); // 服务器通道配置为非阻塞模式serverSocketChannel.bind(new InetSocketAddress(9090)); // 绑定 TCP 端口 9090serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 将服务器通道注册到选择器 selector 中,注册事件为 ACCEPTDatagramChannel datagramChannel = DatagramChannel.open(); // 打开套接字通道datagramChannel.configureBlocking(false); // 配置通道为非阻塞模式datagramChannel.bind(new InetSocketAddress(9090)); // 绑定 UDP 端口 9090datagramChannel.register(selector, SelectionKey.OP_READ); // 将通道注册到选择器 selector 中,注册事件为读取数据ByteBuffer buf = ByteBuffer.allocate(1024); // 分配一个 1024 字节的堆字节缓冲区while (selector.select() > 0) { // 轮询已经就绪的注册的通道的 I/O 事件Set<SelectionKey> keys = selector.selectedKeys(); // 获取就绪的 I/O 事件,即选择器键集合for (SelectionKey key : keys) { // 遍历选择键,处理就绪事件if (key.isAcceptable()) { // 选择键的事件的是 I/O 连接事件SocketChannel socketChannel = serverSocketChannel.accept(); // 执行 I/O 操作,获取套接字连接通道socketChannel.configureBlocking(false); // 配置为套接字通道为非阻塞模式socketChannel.register(selector, SelectionKey.OP_READ); // 将套接字通过到注册到选择器,关注 READ 事件System.err.println("选择器拿到连接事件");} else if (key.isReadable()) { // 选择键的事件是 READSystem.err.println("选择器拿到可读事件");StringBuilder sb = new StringBuilder();if (key.channel() instanceof DatagramChannel) { // 选择的通道为数据报通道,客户端是通过 UDP 连接过来的sb.append("UDP Client: ");datagramChannel.receive(buf); // 最多读取 1024 字节,数据报多出的部分自动丢弃buf.flip();while (buf.position() < buf.limit()) {sb.append((char) buf.get());}buf.clear();} else { // 选择的通道为套接字通道,客户端时通过 TCP 连接过来的sb.append("TCP Client: ");ReadableByteChannel channel = (ReadableByteChannel) key.channel(); // 获取通道int size;while ((size = channel.read(buf)) > 0) {buf.flip();while (buf.position() < buf.limit()) {sb.append((char) buf.get());}buf.clear();}if (size == -1) {sb.append("Exit");channel.close();}}System.out.println(sb);}}keys.clear(); // 将选择键清空,防止下次循环时被重复处理}}}
package cn.linguo.netty.nio.selector;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Scanner;import java.util.Set;public class TcpClient {public static void main(String[] args) throws IOException {Selector selector = Selector.open();SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9090));socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_WRITE);Scanner scanner = new Scanner(System.in);String line;ByteBuffer buf = ByteBuffer.allocate(1024);while (selector.select() > 0) {Set<SelectionKey> keys = selector.selectedKeys();for (SelectionKey key : keys) {if (key.isWritable()) {if ("Bye".equals((line = scanner.nextLine()))) {socketChannel.shutdownOutput();socketChannel.close();break;}buf.put(line.getBytes());buf.flip();socketChannel.write(buf);buf.compact();}}keys.clear();if (!socketChannel.isOpen())break;}}}
