• 非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)
  • 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略
    • 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上
    • selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册
    • 如果不取消,会每次可写均会触发 write 事件

服务端

  1. package cn.inetty.nio.selector3;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.io.IOException;
  4. import java.net.InetSocketAddress;
  5. import java.nio.ByteBuffer;
  6. import java.nio.channels.SelectionKey;
  7. import java.nio.channels.Selector;
  8. import java.nio.channels.ServerSocketChannel;
  9. import java.nio.channels.SocketChannel;
  10. import java.nio.charset.Charset;
  11. import java.util.Iterator;
  12. import static cn.inetty.nio.ByteBufferUtil.debugAll;
  13. @Slf4j
  14. public class Server {
  15. public static void main(String[] args) throws IOException {
  16. ServerSocketChannel open = ServerSocketChannel.open();
  17. open.configureBlocking(false);
  18. Selector selector = Selector.open();
  19. open.register(selector, SelectionKey.OP_ACCEPT);
  20. open.bind(new InetSocketAddress(8888));
  21. while (true) {
  22. selector.select();
  23. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
  24. int w = 0;
  25. while (iterator.hasNext()) {
  26. SelectionKey key = iterator.next();
  27. iterator.remove();
  28. if (key.isAcceptable()) {
  29. SocketChannel sc = open.accept();
  30. sc.configureBlocking(false);
  31. SelectionKey selectionKey = sc.register(selector, 0, null);
  32. // 向客户端发送大量数据
  33. StringBuilder sb = new StringBuilder();
  34. for (int i = 0; i < 3000000; i++) {
  35. sb.append("a");
  36. }
  37. ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
  38. // 返回值 = 实际写入的字节数
  39. if (buffer.hasRemaining()) {
  40. // 关注可写事件 为可能的避免覆盖原有的兴趣集,在基础上添加
  41. selectionKey.interestOps(selectionKey.interestOps() + SelectionKey.OP_WRITE);
  42. // selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);
  43. selectionKey.attach(buffer);
  44. }
  45. } else if (key.isWritable()) {
  46. ByteBuffer buffer = (ByteBuffer)key.attachment();
  47. SocketChannel channel = (SocketChannel) key.channel();
  48. int write = channel.write(buffer);
  49. w+=write;
  50. System.out.println(w);
  51. // 清除资源
  52. if (!buffer.hasRemaining()) {
  53. key.attach(null);
  54. key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
  55. }
  56. }
  57. }
  58. }
  59. }
  60. }

客户端

  1. package cn.inetty.nio.selector3;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SocketChannel;
  6. import java.nio.charset.Charset;
  7. public class Client1 {
  8. public static void main(String[] args) throws IOException {
  9. SocketChannel socketChannel = SocketChannel.open();
  10. socketChannel.connect(new InetSocketAddress("localhost", 8888));
  11. int count = 0;
  12. while (true) {
  13. ByteBuffer buffer = ByteBuffer.allocate(1024*1024);
  14. count += socketChannel.read(buffer);
  15. System.out.println(count);
  16. buffer.clear();
  17. }
  18. }
  19. }