- 非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)
- 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略
- 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上
- selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册
- 如果不取消,会每次可写均会触发 write 事件
- 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上
服务器端
public class ModifiedWriteServer {public static void main(String[] args) throws IOException {ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(8989));Selector selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while (true){selector.select();Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();if (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if (key.isAcceptable()) {ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();SocketChannel clientChannel = serverChannel.accept();clientChannel.configureBlocking(false);SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);StringBuilder stringBuilder = new StringBuilder();for (int i = 0; i < 5000000; i++) {stringBuilder.append("a");}ByteBuffer buffer = Charset.defaultCharset().encode(stringBuilder.toString());int write = clientChannel.write(buffer);System.out.println("write: " + write);//返回值代表实际写入的字节数if (buffer.hasRemaining()){//关注可写事件clientKey.interestOps(clientKey.interestOps() + SelectionKey.OP_WRITE);//把未写完的数据挂到key上clientKey.attach(buffer);}}else if(key.isWritable()){ByteBuffer buffer = (ByteBuffer) key.attachment();SocketChannel clientChannel = (SocketChannel) key.channel();int write = clientChannel.write(buffer);System.out.println("write: " + write);//清理操作,如果写完了需要清除bufferif(!buffer.hasRemaining()){key.attach(null);key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);}}}}}}
客户端
public class WriteClient {public static void main(String[] args) throws IOException {SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress("localhost",8989));int count = 0;ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);while (true){count += sc.read(buffer);buffer.flip();System.out.println("count: " + count);buffer.clear();}}}
