• 非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)
    • 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略
      • 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上
      • selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册
      • 如果不取消,会每次可写均会触发 write 事件

    服务器端

    1. public class ModifiedWriteServer {
    2. public static void main(String[] args) throws IOException {
    3. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    4. serverSocketChannel.configureBlocking(false);
    5. serverSocketChannel.bind(new InetSocketAddress(8989));
    6. Selector selector = Selector.open();
    7. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    8. while (true){
    9. selector.select();
    10. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
    11. if (iterator.hasNext()) {
    12. SelectionKey key = iterator.next();
    13. iterator.remove();
    14. if (key.isAcceptable()) {
    15. ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
    16. SocketChannel clientChannel = serverChannel.accept();
    17. clientChannel.configureBlocking(false);
    18. SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);
    19. StringBuilder stringBuilder = new StringBuilder();
    20. for (int i = 0; i < 5000000; i++) {
    21. stringBuilder.append("a");
    22. }
    23. ByteBuffer buffer = Charset.defaultCharset().encode(stringBuilder.toString());
    24. int write = clientChannel.write(buffer);
    25. System.out.println("write: " + write);
    26. //返回值代表实际写入的字节数
    27. if (buffer.hasRemaining()){
    28. //关注可写事件
    29. clientKey.interestOps(clientKey.interestOps() + SelectionKey.OP_WRITE);
    30. //把未写完的数据挂到key上
    31. clientKey.attach(buffer);
    32. }
    33. }else if(key.isWritable()){
    34. ByteBuffer buffer = (ByteBuffer) key.attachment();
    35. SocketChannel clientChannel = (SocketChannel) key.channel();
    36. int write = clientChannel.write(buffer);
    37. System.out.println("write: " + write);
    38. //清理操作,如果写完了需要清除buffer
    39. if(!buffer.hasRemaining()){
    40. key.attach(null);
    41. key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
    42. }
    43. }
    44. }
    45. }
    46. }
    47. }

    客户端

    1. public class WriteClient {
    2. public static void main(String[] args) throws IOException {
    3. SocketChannel sc = SocketChannel.open();
    4. sc.connect(new InetSocketAddress("localhost",8989));
    5. int count = 0;
    6. ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
    7. while (true){
    8. count += sc.read(buffer);
    9. buffer.flip();
    10. System.out.println("count: " + count);
    11. buffer.clear();
    12. }
    13. }
    14. }