AIO 是彻底的异步通信,NIO 是同步非阻塞通信,AIO是真正的非阻塞模型,但相比NIO并没有明显的性能提高,因此NIO目前依旧是主流
AIO操作流程
解释:
假设有这么一个场景,有一排水壶(客户)在烧水。AIO的做法是,每个水壶上装一个开关,当水开了以后会提醒对应的线程去处理。NIO的做法是,叫一个线程不停的循环观察每一个水壶,根据每个水壶当前的状态去处理。BIO的做法是,叫一个线程停留在一个水壶那,直到这个水壶烧开,才去处理下一个水壶
案例演示:
做一个类似回音壁的AIO案例,服务端收到客户端消息后直接将消息再返回给客户端
流程分析:
1、服务器端创建 AsynchronousServerSocketChannel (异步服务器端通道)绑定端口和地址,默认属于AsynchronousChannelGroup 通道组(可以被多个异步通道进行资源共享的群组)
2、服务端创建 服务端CompletionHandler 和 客户端ClientHandler 并使用 accept 方法异步等待客户端连接,当有客户端加入时将会触发 服务端CompletionHandler 的 completed 方法,在该方法内通知服务端接收连接请求
3、客户端创建 AsynchronousSocketChannel(异步客户端通道),绑定端口和地址,建立与服务端的连接后默认也属于AsynchronousChannelGroup
4、客户端输入信息后将数据写入 AsynchronousSocketChannel,服务端拿到客户端发送的数据后传给客户端ClientHandler ,根据额外参数判断并返回给客户端 AsynchronousSocketChannel
5、每一个新加入到服务端的 AsynchronousSocketChannel 实际都会创建一个 Handler 用来处理与其对应的读写IO事件
![C2)I@Z%5}Z(HMEP)@87T7T.png
服务端代码:
package study;
import java.io.*;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.HashMap;
import java.util.Map;
/**
* 服务端
*/
public class Server {
final String LOCALHOST = "localhost";
final int DEFAULT_PORT = 8888;
AsynchronousServerSocketChannel serverChannel;
/* 关闭资源流 */
private void close(Closeable closable) {
if (closable != null) {
try {
closable.close();
System.out.println("关闭" + closable);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void start() {
try {
serverChannel = AsynchronousServerSocketChannel.open(); //创建AIO通道,默认使用AsynchronousChannelGroup
serverChannel.bind(new InetSocketAddress(LOCALHOST, DEFAULT_PORT)); //绑定监听端口
System.out.println("启动服务器,监听端口:" + DEFAULT_PORT);
/* accept 判断后立马返回,直到客户端发送消息后 AcceptHandler 方法才会被系统调用返回数据
* 通过accept函数返回的Feture对象 或者 有回调函数CompletionHandler作为参数的accept方法与客户端建立连接 ,这里使用自定义的 CompletionHandler 作为参数的方式进行异步
* 参数一: 在回调的时候添加额外的数据,非必须,可以理解为邮件发送的附件
* 参数二: 回调函数的 CompletionHandler
*/
while (true) {
serverChannel.accept(null, new AcceptHandler()); //异步等待新客户端连接
System.in.read();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
close(serverChannel);
}
}
/* 参数一: IO操作的返回结果,由于是服务端调用,返回的应该是客户端对象,所以是 AsynchronousSocketChannel,
* 参数二:accept方法的参数一
*/
private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
/* 异步调用的函数有数据返回时调用该方法
* AsynchronousSocketChannel : 用于服务器和客户端之间收发信息
* */
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
/* 判断服务端channel是否处于正常开放状态 */
if (serverChannel.isOpen()) {
serverChannel.accept(null, this); //通知服务端接收客户端连接
}
AsynchronousSocketChannel clientChannel = result; //获取异步的客户端通道
/* 判断客户端Channle是否处于开放状态 */
if (clientChannel != null && clientChannel.isOpen()) {
ClientHandler handler = new ClientHandler(clientChannel);
ByteBuffer buffer = ByteBuffer.allocate(1024);
/* 使用额外参数让 ClientHandler 判断是读操作还是写操作 */
Map<String, Object> info = new HashMap<>();
info.put("type", "read");
info.put("buffer", buffer);
clientChannel.read(buffer, info, handler); //从客户端通道读取输入信息返回给ClientHandler: 参数一 > 缓存区,参数二 > 添加到客户端Handler回调函数的额外参数,参数三 > 客户端Handler
}
}
/* 异步调用的操作有错误时调用该方法 */
@Override
public void failed(Throwable exc, Object attachment) {
}
}
/* 客户端Handler,处理 clientChannel 异步调用读&写产生的结果
* 参数一: IO操作的函数返回的数据类型,因为是字节数所以是Integer
* 参数二:传给handler的额外参数
*/
private class ClientHandler implements CompletionHandler<Integer, Object>{
private AsynchronousSocketChannel clientChannel;
public ClientHandler(AsynchronousSocketChannel channel) {
this.clientChannel = channel;
}
/* 异步调用的函数有数据返回时调用该方法 */
@Override
public void completed(Integer result, Object attachment) {
Map<String, Object> info = (Map<String, Object>) attachment; //获取服务端的额外信息
String type = (String) info.get("type");
/* 接收客户端数据再返回给客户端 */
if ("read".equals(type)) {
ByteBuffer buffer = (ByteBuffer) info.get("buffer"); //从客户端Buffer读取数据
buffer.flip();
info.put("type", "write");
clientChannel.write(buffer, info, this);
buffer.clear();
}
/* 监听客户端发送的数据 */
if ("write".equals(type)) {
ByteBuffer buffer = ByteBuffer.allocate(1024); //从客户端Buffer读取数据
info.put("type", "read");
info.put("buffer", buffer);
clientChannel.read(buffer, info, this);
}
}
@Override
public void failed(Throwable exc, Object attachment) {
// 处理错误
}
}
public static void main(String[] args) {
Server server = new Server();
server.start();
}
}
客户端代码:
package study;
import java.io.*;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class Client {
final String LOCALHOST = "localhost";
final int DEFAULT_PORT = 8888;
AsynchronousSocketChannel clientChannel;
private void close(Closeable closable) {
if (closable != null) {
try {
closable.close();
System.out.println("关闭" + closable);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void start() {
try {
clientChannel = AsynchronousSocketChannel.open(); //创建channel
Future<Void> future = clientChannel.connect(new InetSocketAddress(LOCALHOST, DEFAULT_PORT)); //绑定端口
future.get(); //进行异步的服务端连接
BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in)); //等待用户的输入
while (true) {
String input = consoleReader.readLine();
byte[] inputBytes = input.getBytes();
ByteBuffer buffer = ByteBuffer.wrap(inputBytes); //wrap写入数据后会自动flip
Future<Integer> writeResult = clientChannel.write(buffer);
writeResult.get(); //有返回值则认为用户数据成功写入到通道并发送给服务器
buffer.flip();
Future<Integer> readResult = clientChannel.read(buffer);
readResult.get(); //有返回值则认为获取到服务端返回的数据并写入到Buffer
String echo = new String(buffer.array());
buffer.clear();
System.out.println(echo);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
close(clientChannel);
}
}
public static void main(String[] args) {
Client client = new Client();
client.start();
}
}
测试:
启动客户端和服务端,客户端输入信息后服务端返回相同数据
测试结果
AIO聊天室:
服务端代码:
package chatroom;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/* AIO聊天室服务端 */
public class ChatServer {
private static final String LOCALHOST = "localhost";
private static final int DEFAULT_PORT = 8888;
private static final String QUIT = "quit";
private static final int BUFFER = 1024;
private AsynchronousChannelGroup channelGroup; //线程组
private AsynchronousServerSocketChannel serverChannel; //服务端通道
private Charset charset = Charset. forName("UTF-8");
private int port;
private List<ClientHandler> connectedClients; //在线客户列表
public ChatServer(){
this(DEFAULT_PORT);
}
/* 用户自定义监听端口 */
public ChatServer(int port){
this.port = port;
this.connectedClients = new ArrayList<>();
}
/* 退出操作 */
private void close(Closeable closable) {
if (closable != null) {
try {
closable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void start(){
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5,8,
200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10));
try {
channelGroup = AsynchronousChannelGroup.withThreadPool(threadPool); //创建一个使用自定义线程池的
serverChannel = AsynchronousServerSocketChannel.open(channelGroup); //使用自定义的channelGroup
serverChannel.bind(new InetSocketAddress(LOCALHOST,port)); //绑定监听地址和端口
System.out.println("启动服务器,监听端口: "+port);
while (true){
serverChannel.accept(null, new AcceptHandler()); //等待客户端连接
System.in.read();
}
} catch (IOException e) {
e.printStackTrace();
}finally{
close(serverChannel);
}
}
public static void main(String[] args) {
ChatServer chatServer = new ChatServer(6767);
chatServer.start();
}
/* 服务端 Handler */
private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,Object>{
@Override
public void completed(AsynchronousSocketChannel clientChannel, Object attachment) {
/* 判断服务端channel是否处于正常开放状态 */
if (serverChannel.isOpen()) {
serverChannel.accept(null, this); //通知服务端接收客户端连接
}
/* 判断客户端Channle是否处于开放状态 */
if (clientChannel != null && clientChannel.isOpen()) {
ClientHandler handler = new ClientHandler(clientChannel);
ByteBuffer buffer = ByteBuffer.allocate(BUFFER);
addClient(handler); //将新用户加入到在线用户列表
clientChannel.read(buffer, buffer, handler); //从客户端通道读取输入信息返回给ClientHandler: 参数一 > 缓存区,参数二 > 添加到客户端Handler回调函数的额外参数,参数三 > 客户端Handler
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("连接失败: "+exc);
}
}
/* 将新用户加入到在线用户列表 */
private synchronized void addClient(ClientHandler handler) {
connectedClients.add(handler);
System.out.println(getClientName(handler.clientChannel)+"已连接到服务器");
}
/* 用户下线,从在线用户列表移除 */
private synchronized void removeClient(ClientHandler handler) {
connectedClients.remove(handler);
System.out.println(getClientName(handler.clientChannel)+"已断开连接");
close(handler.clientChannel);
}
/*转换为字符串*/
private String receive(ByteBuffer buffer){
CharBuffer decode = charset.decode(buffer);
return String.valueOf(decode);
}
/* 客户端Handler,处理 clientChannel 异步调用读&写产生的结果
* 参数一: IO操作的数据类型 > 字节数
* 参数二: 传给handler的额外参数
*/
private class ClientHandler implements CompletionHandler<Integer,Object>{
private AsynchronousSocketChannel clientChannel;
public ClientHandler(AsynchronousSocketChannel channel){
this.clientChannel = channel;
}
@Override
public void completed(Integer result, Object attachment) {
ByteBuffer buffer = (ByteBuffer) attachment; //read函数调用才有的对象,write是没有额外对象的
if(null!=buffer){
if(result<=0){
removeClient(this);//客户端异常,将客户移除出在线客户列表
}else{
buffer.flip();
String fwdMsg = receive(buffer);
System.out.println(getClientName(clientChannel)+":"+fwdMsg);
forwardMessage(clientChannel,fwdMsg); //转发信息
buffer.clear();
/** 检查用户是否退出 */
if(QUIT.equals(fwdMsg)){
removeClient(this);
}else{
clientChannel.read(buffer,buffer,this); //继续接收用户后续消息
}
}
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("读写失败: "+exc);
}
}
/* 将消息转发给在线用户列表中的其他用户 */
private synchronized void forwardMessage(AsynchronousSocketChannel clientChannel, String fwdMsg) {
for (ClientHandler handler : connectedClients) {
if(!clientChannel.equals(handler.clientChannel)){
try {
ByteBuffer buffer = charset.encode(getClientName(handler.clientChannel) + ":" + fwdMsg);
handler.clientChannel.write(buffer,null,handler);
} catch (Exception e) {
System.out.println(handler.clientChannel+"消息转发错误!");
e.printStackTrace();
}
}
}
}
/* 获取对应客户端对象,返回客户端ID */
private String getClientName(AsynchronousSocketChannel clientChannel) {
int clientPort = -1;
try {
InetSocketAddress address = (InetSocketAddress) clientChannel.getRemoteAddress();
clientPort = address.getPort(); //获取客户端端口
} catch (IOException e) {
e.printStackTrace();
}
return "客户端["+clientPort+"]";
}
}
客户端代码:
package chatroom;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/* AIO聊天室客户端 */
public class ChatClient {
private static final String LOCALHOST = "localhost";
private static final int DEFAULT_PORT = 8888;
private static final int BUFFER = 1024;
private String host; //用户定义地址
private int port; //用户定义端口
private AsynchronousSocketChannel clientChannel;
private Charset chaset = Charset.forName("UTF-8");
public ChatClient(){
this(LOCALHOST,DEFAULT_PORT);
}
public ChatClient(String host,int port){
this.port = port;
this.host = host;
}
/* 退出操作 */
private void close(Closeable closable) {
if (closable != null) {
try {
closable.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void start(){
try {
clientChannel = AsynchronousSocketChannel.open(); //创建Channel
Future<Void> future = clientChannel.connect(new InetSocketAddress(host,port)); //绑定端口
future.get(); //进行异步的服务端连接
/* 启用新线程处理用户输入 */
new Thread(new UserInputHander(this)).start();
ByteBuffer buffer = ByteBuffer.allocate(BUFFER); //创建缓存
while(true){
Future<Integer> readResult = clientChannel.read(buffer);
int result = readResult.get(); //等待返回读取结果
if(result<=0){
System.out.println("已断开与服务器的连接");
close(clientChannel);
System.exit(1); //中止当前虚拟机的运行进行强制性的程序退出
}else{
buffer.flip();
String msg = String.valueOf(chaset.decode(buffer));
buffer.clear();
System.out.println("服务器返回数据: "+msg);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void send(String msg){
/* 误输入空格 */
if(msg.isEmpty()){
return;
}
/* 有意义的数据 */
ByteBuffer buffer = chaset.encode(msg);
Future<Integer> writeResult = clientChannel.write(buffer);
try {
writeResult.get();
} catch (Exception e) {
System.out.println("发送消息失败!");
e.printStackTrace();
}
}
public static void main(String[] args) {
ChatClient chatClient = new ChatClient(LOCALHOST,6767);
chatClient.start();
}
}
客户端消息发送方法:
package chatroom;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/* 客户端消息发送 */
public class UserInputHander implements Runnable {
private ChatClient chatClient;
public UserInputHander(ChatClient chatClient){
this.chatClient = chatClient;
}
@Override
public void run() {
try {
BufferedReader consoleRerader = new BufferedReader(new InputStreamReader(System.in));
while (true){
String input = consoleRerader.readLine();
//向服务器发送消息
chatClient.send(input);
//检查用户是否准备退出
if("quit".equals(input)){
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
测试:
启动 ChatServer 和多个 ChatClient 实例进行测试,在客户端窗口中发送消息进行测试