一、AIO的异步操作
AsyncSocketChannel和AsyncServerSocketChannel都是支持如图几个IO操作异步调用的。
二、如何实现异步调用
Future
简单来说通过Channel来调用那几个函数,然后返回一个Future的对象,Future在线程池中介绍过,他是一个描述未来的一个对象,通过get、isDone等等的调用查询未来的任务。
CompletionHander
通过Channel调用IO操作,然后不等待是否完成,然后把一些参数传入一个回调函数(Handler)进一步操作(实现)。
Handler拥有两种函数,一个是Completed,是在完成后调用,另一种是Failed,是在失败了后调用。
三、EchoDemo实现异步操作机制
Server:
主体函数:
package demo4.aio.server;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.HashMap;
import java.util.Map;
public class Server {
private String LOCALHOST="localhost";
private static final int DEFAULT_PORT=9999;
private AsynchronousServerSocketChannel serverSocketChannel;
private static final String QUIT="quit";
private boolean readyToQuit(String str){
return QUIT.equalsIgnoreCase(str);
}
private void shutDown(Closeable...closeables){
try {
for (Closeable shut : closeables) {
if (shut != null) {
shut.close();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void start(){
try {
//绑定监听端口
serverSocketChannel=AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(LOCALHOST,DEFAULT_PORT));
System.out.println("服务器已启动,正在监听:["+LOCALHOST+","+DEFAULT_PORT+"]");
while (true) {
//AcceptHandler在AsynchronousChannelGroup线程池中完成,而非主线程。
//AsynchronousChannelGroup未定义时使用默认的AsynchronousChannelGroup
serverSocketChannel.accept(null, new AcceptHandler());
//防止频繁调用accept函数。
System.in.read();
}
} catch (IOException e) {
e.printStackTrace();
}finally {
shutDown(serverSocketChannel);
}
}
private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,Object> {
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
//并不会造成stepOverFlow在底层实现拥有次数限制
if (serverSocketChannel.isOpen()){
serverSocketChannel.accept(null,this);
}
AsynchronousSocketChannel clientChannel=result;
if (clientChannel!=null&&clientChannel.isOpen()) {
ClientHander hander = new ClientHander(clientChannel);
ByteBuffer buffer = ByteBuffer.allocate(1024);
Map<String, Object> info = new HashMap<>();
info.put("type", "read");
info.put("buffer", buffer);
clientChannel.read(buffer, info, hander);
}
}
@Override
public void failed(Throwable exc, Object attachment) {
//异常处理,略
}
}
private class ClientHander implements CompletionHandler<Integer,Object> {
private AsynchronousSocketChannel clientChannel;
public ClientHander(AsynchronousSocketChannel clientChannel) {
try {
System.out.println("客户端["+clientChannel.getLocalAddress()+"]已连接到服务器");
} catch (IOException e) {
e.printStackTrace();
}
this.clientChannel=clientChannel;
}
@Override
public void completed(Integer result, Object attachment) {
Map<String,Object> info= (Map<String, Object>) attachment;
String type= (String) info.get("type");
if ("read".equalsIgnoreCase(type)) {
try {
ByteBuffer buffer = (ByteBuffer) info.get("buffer");
String str = new String(buffer.array());
if (readyToQuit(str)) {
System.out.println("客户端[" + clientChannel.getLocalAddress() + "]已断开连接");
shutDown(clientChannel);
} else {
buffer.flip();
info.put("type", "write");
clientChannel.write(buffer, info, this);
buffer.clear();
System.out.println("客户端[" + clientChannel.getLocalAddress() + "]:" + str);
}
} catch (IOException e) {
e.printStackTrace();
}
}
if ("write".equalsIgnoreCase(type)){
ByteBuffer buffer= ByteBuffer.allocate(1024);
info.put("type","read");
info.put("buffer",buffer);
clientChannel.read(buffer,info,this);
}
}
@Override
public void failed(Throwable exc, Object attachment) {
//异常处理,略
}
}
}
Main:
package demo4.aio.server;
public class ServerMain {
public static void main(String[] args) {
Server server=new Server();
server.start();
}
}
Client
主题函数:
package demo4.aio.client;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class Client {
private String LOCALHOST = "localhost";
private static final int DEFAULT_PORT = 9999;
private AsynchronousSocketChannel clientChannel;
private static final String QUIT="quit";
private boolean readyToQuit(String str){
return QUIT.equalsIgnoreCase(str);
}
private void shutDown(Closeable... closeables) {
try {
for (Closeable shut : closeables) {
if (shut != null) {
shut.close();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void start() {
try {
//创建channel
clientChannel = AsynchronousSocketChannel.open();
Future<Void> future = clientChannel.connect(new InetSocketAddress(LOCALHOST, DEFAULT_PORT));
future.get();
System.out.println("客户端["+LOCALHOST+","+DEFAULT_PORT+"]已连接到服务器");
//等待用户输入
BufferedReader consoleReader = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String intput = consoleReader.readLine();
byte[] inputBytes = intput.getBytes();
ByteBuffer buffer = ByteBuffer.wrap(inputBytes);
Future<Integer> writeRes = clientChannel.write(buffer);
writeRes.get();
if (readyToQuit(intput)){
break;
}
System.out.println("消息发送成功");
buffer.flip();
Future<Integer> readRes = clientChannel.read(buffer);
readRes.get();
System.out.print("收到服务器消息:");
buffer.clear();
String res = new String(buffer.array());
res="[ECHO]:" +res;
System.out.println(res);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}finally {
shutDown(clientChannel);
}
}
}
Main:
package demo4.aio.client;
public class ClientMain {
public static void main(String[] args) {
Client client=new Client();
client.start();
}
}