BIO

BIO(Blocking IO)指会阻塞当前线程的IO操作,服务器会为每个IO分配一个单独的线程来提供并发,通常会使用线程池作为优化手段。

  1. package com.example.demo;
  2. import java.io.*;
  3. import java.net.*;
  4. import java.util.Scanner;
  5. import java.nio.charset.StandardCharsets;
  6. public class SocketClient {
  7. public static void main(String[] args) throws IOException {
  8. Socket sock = new Socket("localhost", 8080); // 连接指定服务器和端口
  9. try (InputStream input = sock.getInputStream()) {
  10. try (OutputStream output = sock.getOutputStream()) {
  11. handle(input, output);
  12. }
  13. }
  14. sock.close();
  15. System.out.println("disconnected.");
  16. }
  17. private static void handle(InputStream input, OutputStream output) throws IOException {
  18. BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8));
  19. BufferedReader reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8));
  20. Scanner scanner = new Scanner(System.in);
  21. System.out.println("[server] " + reader.readLine());
  22. for (;;) {
  23. System.out.print(">>> ");
  24. String s = scanner.nextLine();
  25. writer.write(s);
  26. writer.newLine();
  27. writer.flush();
  28. String resp = reader.readLine();
  29. System.out.println("<<< " + resp);
  30. if (resp.equals("bye")) {
  31. break;
  32. }
  33. }
  34. }
  35. }
package com.example.demo;

import java.io.*;
import java.net.*;
import java.nio.charset.StandardCharsets;

public class SocketServer {

    static class Handler extends Thread {
        Socket sock;

        public Handler(Socket sock) {
            this.sock = sock;
        }

        @Override
        public void run() {
            try (InputStream input = this.sock.getInputStream()) {
                try (OutputStream output = this.sock.getOutputStream()) {
                    handle(input, output);
                }
            } catch (Exception e) {
                try {
                    this.sock.close();
                } catch (IOException ioe) {
                }
                System.out.println("client disconnected.");
            }
        }

        private void handle(InputStream input, OutputStream output) throws IOException {
            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8));
            BufferedReader reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8));
            writer.write("hello\n");
            writer.flush();
            for (;;) {
                String s = reader.readLine();
                if (s.equals("bye")) {
                    writer.write("bye\n");
                    writer.flush();
                    break;
                }
                writer.write("ok: " + s + "\n");
                writer.flush();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        ServerSocket ss = new ServerSocket(8080); // 监听指定端口
        System.out.println("server is running...");
        while (true) {
            Socket sock = ss.accept();
            System.out.println("connected from " + sock.getRemoteSocketAddress());
            Thread t = new Handler(sock);
            t.start();
        }
    }
}

NIO

NIO在JDK中的原意是NEW IO,但在IO模型中我们把它理解为Non-Blocking IO,即利用多路复用技术实现的同步非阻塞IO。

package com.example.demo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class SocketServerNIO {

    private static final long SLEEP_PERIOD = 5000L; // 5 seconds
    private static final int BUFFER_SIZE = 8192;


    public static void main(String[] args) throws Exception {

        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);

        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket server = serverChannel.socket();
        server.bind(new InetSocketAddress(8080));

        Selector selector = Selector.open();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        SocketChannel clientChannel = null;

        System.out.println("0. SERVER STARTED TO LISTEN");
        while (true) {
            try {
                // wait for selection
                int numKeys = selector.select();

                if (numKeys == 0) {
                    System.err.println("select wakes up with zero!!!");
                }

                Iterator it = selector.selectedKeys().iterator();
                while (it.hasNext()) {
                    SelectionKey selected = (SelectionKey) it.next();
                    int ops = selected.interestOps();
                    try {
                        // process new connection
                        if ((ops & SelectionKey.OP_ACCEPT) != 0) {
                            clientChannel = serverChannel.accept();
                            clientChannel.configureBlocking(false);

                            // register channel to selector
                            clientChannel.register(selector, SelectionKey.OP_READ, null);
                            System.out.println("2. SERVER ACCEPTED AND REGISTER READ OP : client - " + clientChannel.socket().getInetAddress());
                        }

                        if ((ops & SelectionKey.OP_READ) != 0) {
                            // read client message
                            System.out.println("3. SERVER READ DATA FROM client - " + clientChannel.socket().getInetAddress());
                            readClient((SocketChannel) selected.channel(), buffer);

                            // deregister OP_READ
                            System.out.println("PREV SET : " + selected.interestOps());
                            selected.interestOps(selected.interestOps() & ~SelectionKey.OP_READ);
                            System.out.println("NEW SET : " + selected.interestOps());

                            Thread.sleep(SLEEP_PERIOD * 2);
                            new WriterThread(clientChannel).start();
                        }

                    } finally {
                        // remove from selected key set
                        it.remove();
                    }

                }
            } catch (IOException e) {
                System.err.println("IO Error : " + e.getMessage());
            }
        }

        }

    public static void readClient(SocketChannel channel, ByteBuffer buffer) throws IOException {
        try {
            buffer.clear();

            int nRead = channel.read(buffer);
            if (nRead < 0) {
                channel.close();
                return;
            }

            if (buffer.position() != 0) {
                int size = buffer.position();
                buffer.flip();
                byte[] bytes = new byte[size];
                buffer.get(bytes);
                System.out.println("RECVED : " + new String(bytes));
            }
        } catch (IOException e) {
            System.err.println("IO Error : " + e.getMessage());
            channel.close();
        }
    }

    public static class WriterThread extends Thread {
        private SocketChannel clientChannel;

        public WriterThread(SocketChannel clientChannel) {
            this.clientChannel = clientChannel;
        }

        public void run() {
            try {
                writeClient(clientChannel);
                System.out.println("5. SERVER WRITE DATA TO client - " + clientChannel.socket().getInetAddress());
            } catch (IOException e) {
                System.err.println("5. SERVER WRITE DATA FAILED : " + e);
            }
        }

        public void writeClient(SocketChannel channel) throws IOException {
            try {
                ByteBuffer buffer = ByteBuffer.wrap("bye\n".getBytes());
                int total = buffer.limit();

                int totalWrote = 0;
                int nWrote = 0;

                while ((nWrote = channel.write(buffer)) >= 0) {
                    totalWrote += nWrote;
                    if (totalWrote == total) {
                        break;
                    }
                }
            } catch (IOException e) {
                System.err.println("IO Error : " + e.getMessage());
                channel.close();
            }
        }
    }
}

由于使用JDK NIO包来编程过于繁琐,所以大多数场景下使用Netty来代替,Netty还解决了JDK空轮询的bug,检测当空轮询计数达到512次时,重新建立selector。

AIO

AIO在这里指的是Asynchronous IO,将对IO的监听全权交给操作系统,在可读写时由操作系统调用回调函数来处理数据

package com.example.demo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class SocketServerAIO {

    public static void main(String[] args) {
        try {
            AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(8080));
            CompletionHandler<AsynchronousSocketChannel, Object> handler = new CompletionHandler<AsynchronousSocketChannel,
                    Object>() {
                @Override
                public void completed(final AsynchronousSocketChannel result, final Object attachment) {
                    // 继续监听下一个连接请求
                    serverSocketChannel.accept(attachment, this);
                    try {
                        System.out.println("2. SERVER GOT CONNECTION FROM CLIENT : client - " + result.getRemoteAddress().toString());
                        ByteBuffer readBuffer = ByteBuffer.allocate(128);
                        result.read(readBuffer).get();
                        System.out.println(new String(readBuffer.array()));

                        result.write(ByteBuffer.wrap("bye".getBytes())).get();

                        result.close();

                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }

                @Override
                public void failed(final Throwable exc, final Object attachment) {
                    System.out.println(exc.getMessage());
                }
            };

            System.out.println("0. SERVER STARTED TO LISTEN");
            serverSocketChannel.accept(null, handler);

            while (true) {
                Thread.sleep(100);
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

**