BIO
BIO(Blocking IO)指会阻塞当前线程的IO操作,服务器会为每个IO分配一个单独的线程来提供并发,通常会使用线程池作为优化手段。
package com.example.demo;
import java.io.*;
import java.net.*;
import java.util.Scanner;
import java.nio.charset.StandardCharsets;
public class SocketClient {
public static void main(String[] args) throws IOException {
Socket sock = new Socket("localhost", 8080); // 连接指定服务器和端口
try (InputStream input = sock.getInputStream()) {
try (OutputStream output = sock.getOutputStream()) {
handle(input, output);
}
}
sock.close();
System.out.println("disconnected.");
}
private static void handle(InputStream input, OutputStream output) throws IOException {
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8));
BufferedReader reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8));
Scanner scanner = new Scanner(System.in);
System.out.println("[server] " + reader.readLine());
for (;;) {
System.out.print(">>> ");
String s = scanner.nextLine();
writer.write(s);
writer.newLine();
writer.flush();
String resp = reader.readLine();
System.out.println("<<< " + resp);
if (resp.equals("bye")) {
break;
}
}
}
}
package com.example.demo;
import java.io.*;
import java.net.*;
import java.nio.charset.StandardCharsets;
public class SocketServer {
static class Handler extends Thread {
Socket sock;
public Handler(Socket sock) {
this.sock = sock;
}
@Override
public void run() {
try (InputStream input = this.sock.getInputStream()) {
try (OutputStream output = this.sock.getOutputStream()) {
handle(input, output);
}
} catch (Exception e) {
try {
this.sock.close();
} catch (IOException ioe) {
}
System.out.println("client disconnected.");
}
}
private void handle(InputStream input, OutputStream output) throws IOException {
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8));
BufferedReader reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8));
writer.write("hello\n");
writer.flush();
for (;;) {
String s = reader.readLine();
if (s.equals("bye")) {
writer.write("bye\n");
writer.flush();
break;
}
writer.write("ok: " + s + "\n");
writer.flush();
}
}
}
public static void main(String[] args) throws IOException {
ServerSocket ss = new ServerSocket(8080); // 监听指定端口
System.out.println("server is running...");
while (true) {
Socket sock = ss.accept();
System.out.println("connected from " + sock.getRemoteSocketAddress());
Thread t = new Handler(sock);
t.start();
}
}
}
NIO
NIO在JDK中的原意是NEW IO,但在IO模型中我们把它理解为Non-Blocking IO,即利用多路复用技术实现的同步非阻塞IO。
package com.example.demo;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class SocketServerNIO {
private static final long SLEEP_PERIOD = 5000L; // 5 seconds
private static final int BUFFER_SIZE = 8192;
public static void main(String[] args) throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket server = serverChannel.socket();
server.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
SocketChannel clientChannel = null;
System.out.println("0. SERVER STARTED TO LISTEN");
while (true) {
try {
// wait for selection
int numKeys = selector.select();
if (numKeys == 0) {
System.err.println("select wakes up with zero!!!");
}
Iterator it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey selected = (SelectionKey) it.next();
int ops = selected.interestOps();
try {
// process new connection
if ((ops & SelectionKey.OP_ACCEPT) != 0) {
clientChannel = serverChannel.accept();
clientChannel.configureBlocking(false);
// register channel to selector
clientChannel.register(selector, SelectionKey.OP_READ, null);
System.out.println("2. SERVER ACCEPTED AND REGISTER READ OP : client - " + clientChannel.socket().getInetAddress());
}
if ((ops & SelectionKey.OP_READ) != 0) {
// read client message
System.out.println("3. SERVER READ DATA FROM client - " + clientChannel.socket().getInetAddress());
readClient((SocketChannel) selected.channel(), buffer);
// deregister OP_READ
System.out.println("PREV SET : " + selected.interestOps());
selected.interestOps(selected.interestOps() & ~SelectionKey.OP_READ);
System.out.println("NEW SET : " + selected.interestOps());
Thread.sleep(SLEEP_PERIOD * 2);
new WriterThread(clientChannel).start();
}
} finally {
// remove from selected key set
it.remove();
}
}
} catch (IOException e) {
System.err.println("IO Error : " + e.getMessage());
}
}
}
public static void readClient(SocketChannel channel, ByteBuffer buffer) throws IOException {
try {
buffer.clear();
int nRead = channel.read(buffer);
if (nRead < 0) {
channel.close();
return;
}
if (buffer.position() != 0) {
int size = buffer.position();
buffer.flip();
byte[] bytes = new byte[size];
buffer.get(bytes);
System.out.println("RECVED : " + new String(bytes));
}
} catch (IOException e) {
System.err.println("IO Error : " + e.getMessage());
channel.close();
}
}
public static class WriterThread extends Thread {
private SocketChannel clientChannel;
public WriterThread(SocketChannel clientChannel) {
this.clientChannel = clientChannel;
}
public void run() {
try {
writeClient(clientChannel);
System.out.println("5. SERVER WRITE DATA TO client - " + clientChannel.socket().getInetAddress());
} catch (IOException e) {
System.err.println("5. SERVER WRITE DATA FAILED : " + e);
}
}
public void writeClient(SocketChannel channel) throws IOException {
try {
ByteBuffer buffer = ByteBuffer.wrap("bye\n".getBytes());
int total = buffer.limit();
int totalWrote = 0;
int nWrote = 0;
while ((nWrote = channel.write(buffer)) >= 0) {
totalWrote += nWrote;
if (totalWrote == total) {
break;
}
}
} catch (IOException e) {
System.err.println("IO Error : " + e.getMessage());
channel.close();
}
}
}
}
由于使用JDK NIO包来编程过于繁琐,所以大多数场景下使用Netty来代替,Netty还解决了JDK空轮询的bug,检测当空轮询计数达到512次时,重新建立selector。
AIO
AIO在这里指的是Asynchronous IO,将对IO的监听全权交给操作系统,在可读写时由操作系统调用回调函数来处理数据
package com.example.demo;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class SocketServerAIO {
public static void main(String[] args) {
try {
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8080));
CompletionHandler<AsynchronousSocketChannel, Object> handler = new CompletionHandler<AsynchronousSocketChannel,
Object>() {
@Override
public void completed(final AsynchronousSocketChannel result, final Object attachment) {
// 继续监听下一个连接请求
serverSocketChannel.accept(attachment, this);
try {
System.out.println("2. SERVER GOT CONNECTION FROM CLIENT : client - " + result.getRemoteAddress().toString());
ByteBuffer readBuffer = ByteBuffer.allocate(128);
result.read(readBuffer).get();
System.out.println(new String(readBuffer.array()));
result.write(ByteBuffer.wrap("bye".getBytes())).get();
result.close();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void failed(final Throwable exc, final Object attachment) {
System.out.println(exc.getMessage());
}
};
System.out.println("0. SERVER STARTED TO LISTEN");
serverSocketChannel.accept(null, handler);
while (true) {
Thread.sleep(100);
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
**