- 非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)
- 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略
- 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上
- selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册
- 如果不取消,会每次可写均会触发 write 事件
服务端
package cn.inetty.nio.selector3;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import static cn.inetty.nio.ByteBufferUtil.debugAll;
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {
ServerSocketChannel open = ServerSocketChannel.open();
open.configureBlocking(false);
Selector selector = Selector.open();
open.register(selector, SelectionKey.OP_ACCEPT);
open.bind(new InetSocketAddress(8888));
while (true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
int w = 0;
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = open.accept();
sc.configureBlocking(false);
SelectionKey selectionKey = sc.register(selector, 0, null);
// 向客户端发送大量数据
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 返回值 = 实际写入的字节数
if (buffer.hasRemaining()) {
// 关注可写事件 为可能的避免覆盖原有的兴趣集,在基础上添加
selectionKey.interestOps(selectionKey.interestOps() + SelectionKey.OP_WRITE);
// selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);
selectionKey.attach(buffer);
}
} else if (key.isWritable()) {
ByteBuffer buffer = (ByteBuffer)key.attachment();
SocketChannel channel = (SocketChannel) key.channel();
int write = channel.write(buffer);
w+=write;
System.out.println(w);
// 清除资源
if (!buffer.hasRemaining()) {
key.attach(null);
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
}
}
}
}
}
}
客户端
package cn.inetty.nio.selector3;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
public class Client1 {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 8888));
int count = 0;
while (true) {
ByteBuffer buffer = ByteBuffer.allocate(1024*1024);
count += socketChannel.read(buffer);
System.out.println(count);
buffer.clear();
}
}
}