Java IO(1)
Java IO(2)

前边描述的文件IO和Socket IO,也都是阻塞的IO,在执行下一步之前必须等待上一步完成. 否则就得一直等待在原地.

  • 可读事件: 如果socket缓冲区有数据可以读,对应的fd就是readable,应用可以从fd中把数据读取走,进行应用处理。
  • 可写事件: 如果socket缓冲区有空间可以写,对应的fd就是writable,应用可以数据写入到fd缓冲区,交给操作系统写入到协议栈

非阻塞IO

顾名思义,和阻塞IO相对应,在文件IO中的阻塞操作可以通过设置在阻塞中变得非阻塞,完成丑小鸭到丑大鸭的蜕变.

示例

  1. /**
  2. * @author chenshun00@gmail.com
  3. * @module parse
  4. * @since 2020/11/30 10:21 下午
  5. */
  6. public class TestIO {
  7. public static void main(String[] args) throws IOException {
  8. //开启一个ServerSocketChannel,对应了Socket IO中的ServerSocket
  9. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  10. final Selector selector = Selector.open();
  11. serverSocketChannel.socket().setReuseAddress(true);
  12. serverSocketChannel.socket().bind(new InetSocketAddress("localhost", 9998));
  13. //配置非阻塞,如果设置成true还是阻塞的配置
  14. serverSocketChannel.configureBlocking(false);
  15. //注意这里的第三个参数,会attach到SelectorKey上
  16. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, serverSocketChannel);
  17. System.out.println("-- server ready");
  18. while (true) {
  19. //jdk 11才有的方法
  20. selector.select(key -> {
  21. if (!key.isValid()) {
  22. return;
  23. }
  24. //获取到attach的key
  25. final ServerSocketChannel ch = (ServerSocketChannel) key.attachment();
  26. try {
  27. if (key.isAcceptable()) {
  28. //获取endpoint,对应了serverSocket.accpet()返回的socket
  29. SocketChannel client_ch = ch.accept();
  30. if (client_ch != null) { // accept() may return null...
  31. System.out.printf("accepted connection from %s\n", client_ch.getRemoteAddress());
  32. //客户端也需要配置成非阻塞
  33. client_ch.configureBlocking(false);
  34. //监听可读时间
  35. client_ch.register(selector, SelectionKey.OP_READ, key.attachment());
  36. }
  37. } else if (key.isReadable()) {
  38. //整体的流程就是从socket中读取数据
  39. //然后写入到队列中,然后注册监听可写事件
  40. //当触发可写事件时,从队列中获取数据并写回
  41. SocketChannel socketChannel = (SocketChannel) key.channel();
  42. socketChannel.register(selector, SelectionKey.OP_WRITE);
  43. ByteBuffer buffer = ByteBuffer.allocate(1024);
  44. int read = socketChannel.read(buffer);
  45. buffer.flip();
  46. if (read == -1) {
  47. throw new IOException("Socket closed");
  48. }
  49. String result = new String(buffer.array()).trim();
  50. System.out.println("receive:" + result);
  51. ByteBuffer writeBuffer = ByteBuffer.wrap(ACK);
  52. Queue<Object> pendingWrites = channelToPendingWrites.get(key.channel());
  53. if (pendingWrites == null) {
  54. synchronized (channelToPendingWrites) {
  55. pendingWrites = channelToPendingWrites.get(key.channel());
  56. if (pendingWrites == null) {
  57. pendingWrites = new ConcurrentLinkedQueue<>();
  58. channelToPendingWrites.put(key.channel(), pendingWrites);
  59. }
  60. }
  61. }
  62. pendingWrites.add(writeBuffer);
  63. socketChannel.register(selector, SelectionKey.OP_WRITE);
  64. } else if (key.isWritable()) {
  65. //参考可读事件的注释
  66. SocketChannel socketChannel = (SocketChannel) key.channel();
  67. final Queue<Object> objects = channelToPendingWrites.get(socketChannel);
  68. if (objects == null || objects.size() == 0) {
  69. final ByteBuffer allocate = ByteBuffer.allocate(12);
  70. allocate.put("hello,world!".getBytes());
  71. socketChannel.write(allocate);
  72. } else {
  73. final Object poll = objects.poll();
  74. socketChannel.write((ByteBuffer) poll);
  75. }
  76. socketChannel.register(selector, SelectionKey.OP_READ);
  77. }
  78. } catch (Exception e) {
  79. e.printStackTrace();
  80. }
  81. }, 10000L);
  82. }
  83. }
  84. private static final Map<SelectableChannel, Queue<Object>> channelToPendingWrites = new ConcurrentHashMap<>();
  85. private static final byte[] ACK = "Data logged successfully\n".getBytes();
  86. }