前边描述的文件IO和Socket IO,也都是阻塞的IO,在执行下一步之前必须等待上一步完成. 否则就得一直等待在原地.
- 可读事件: 如果socket缓冲区有数据可以读,对应的fd就是readable,应用可以从fd中把数据读取走,进行应用处理。
- 可写事件: 如果socket缓冲区有空间可以写,对应的fd就是writable,应用可以数据写入到fd缓冲区,交给操作系统写入到协议栈
非阻塞IO
顾名思义,和阻塞IO相对应,在文件IO中的阻塞操作可以通过设置在阻塞中变得非阻塞,完成丑小鸭到丑大鸭的蜕变.
示例
/**
* @author chenshun00@gmail.com
* @module parse
* @since 2020/11/30 10:21 下午
*/
public class TestIO {
public static void main(String[] args) throws IOException {
//开启一个ServerSocketChannel,对应了Socket IO中的ServerSocket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
final Selector selector = Selector.open();
serverSocketChannel.socket().setReuseAddress(true);
serverSocketChannel.socket().bind(new InetSocketAddress("localhost", 9998));
//配置非阻塞,如果设置成true还是阻塞的配置
serverSocketChannel.configureBlocking(false);
//注意这里的第三个参数,会attach到SelectorKey上
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, serverSocketChannel);
System.out.println("-- server ready");
while (true) {
//jdk 11才有的方法
selector.select(key -> {
if (!key.isValid()) {
return;
}
//获取到attach的key
final ServerSocketChannel ch = (ServerSocketChannel) key.attachment();
try {
if (key.isAcceptable()) {
//获取endpoint,对应了serverSocket.accpet()返回的socket
SocketChannel client_ch = ch.accept();
if (client_ch != null) { // accept() may return null...
System.out.printf("accepted connection from %s\n", client_ch.getRemoteAddress());
//客户端也需要配置成非阻塞
client_ch.configureBlocking(false);
//监听可读时间
client_ch.register(selector, SelectionKey.OP_READ, key.attachment());
}
} else if (key.isReadable()) {
//整体的流程就是从socket中读取数据
//然后写入到队列中,然后注册监听可写事件
//当触发可写事件时,从队列中获取数据并写回
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.register(selector, SelectionKey.OP_WRITE);
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(buffer);
buffer.flip();
if (read == -1) {
throw new IOException("Socket closed");
}
String result = new String(buffer.array()).trim();
System.out.println("receive:" + result);
ByteBuffer writeBuffer = ByteBuffer.wrap(ACK);
Queue<Object> pendingWrites = channelToPendingWrites.get(key.channel());
if (pendingWrites == null) {
synchronized (channelToPendingWrites) {
pendingWrites = channelToPendingWrites.get(key.channel());
if (pendingWrites == null) {
pendingWrites = new ConcurrentLinkedQueue<>();
channelToPendingWrites.put(key.channel(), pendingWrites);
}
}
}
pendingWrites.add(writeBuffer);
socketChannel.register(selector, SelectionKey.OP_WRITE);
} else if (key.isWritable()) {
//参考可读事件的注释
SocketChannel socketChannel = (SocketChannel) key.channel();
final Queue<Object> objects = channelToPendingWrites.get(socketChannel);
if (objects == null || objects.size() == 0) {
final ByteBuffer allocate = ByteBuffer.allocate(12);
allocate.put("hello,world!".getBytes());
socketChannel.write(allocate);
} else {
final Object poll = objects.poll();
socketChannel.write((ByteBuffer) poll);
}
socketChannel.register(selector, SelectionKey.OP_READ);
}
} catch (Exception e) {
e.printStackTrace();
}
}, 10000L);
}
}
private static final Map<SelectableChannel, Queue<Object>> channelToPendingWrites = new ConcurrentHashMap<>();
private static final byte[] ACK = "Data logged successfully\n".getBytes();
}