整个TimeServer的代码较多,不过逻辑应该是比较清楚的,还加了注释。
public class NIOTimeServer {
//默认端口号
private static final int PORT = 8585;
public static void main(String[] args) throws IOException {
new Thread(new TimeServerTask(PORT)).start();
}
private static class TimeServerTask implements Runnable{
private Selector selector;
private ServerSocketChannel serverSocketChannel;
//停止channel 标识
private volatile boolean stop = false;
/**
* 初始化channel 监听端口 初始化selector
*/
public TimeServerTask(int port) {
try {
//初始化通道和选择器
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
//设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
//绑定端口
serverSocketChannel.bind(new InetSocketAddress(port));
//将通道注册到选择器
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The TimeServer started at port: " + port);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 停止
*/
public void stop(){
this.stop = true;
}
@Override
public void run() {
while(!stop){
try{
//这里使用带有时间限制的阻塞select方法
selector.select(1000);
//获取就绪通道
Set<SelectionKey> selectionKeys = selector.selectedKeys();
//遍历就绪通道
Iterator<SelectionKey> iterator = selectionKeys.iterator();
SelectionKey selectionKey;
while(iterator.hasNext()){
selectionKey = iterator.next();
iterator.remove();
try{
handleInput(selectionKey);
}catch (IOException e){
selectionKey.cancel();
if(null != selectionKey.channel()){
selectionKey.channel().close();
}
}
}
}catch (Throwable t){
t.printStackTrace();
}
}
//停止 就关闭selector selector关闭后 所有注册到它上面的channel也会被关闭
if(null != selector){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 处理输入
*/
private void handleInput(SelectionKey selectionKey) throws IOException {
//先使用isValid方法判断是否可用
if(selectionKey.isValid()){
//判断是否可接收连接
if(selectionKey.isAcceptable()){
//从SelectionKey中拿到与之对应的ServerChannel
ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
//建立连接
SocketChannel socketChannel = channel.accept();
//非阻塞模式
socketChannel.configureBlocking(false);
//将socketChannel也注册到selector
socketChannel.register(selector,SelectionKey.OP_READ);
}
if(selectionKey.isReadable()){
//读取数据
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1025-1);
//因为channel是非阻塞的 所以read方法可能返回-1 0 以及正常读取到的字节数量
int read = channel.read(buffer);
if(read>0){
//回绕缓冲区 便于获取有效元素
buffer.flip();
byte[] elements = new byte[buffer.remaining()];
//将buffer中的数据写入数组
buffer.get(elements);
String content = new String(elements, StandardCharsets.UTF_8);
System.out.println("The TimeServer received order: " + content );
String response = "QUERY TIME ORDER ".equalsIgnoreCase(content)?new Date(System.currentTimeMillis()).toString():"BAD ORDER";
//写回响应
doWrite(response,channel);
}else if(read<0){
//对端链路关闭
selectionKey.cancel();
channel.close();
}else {
//读到0字节 忽略
}
}
}
}
private void doWrite(String response,SocketChannel socketChannel) throws IOException {
if(response!=null && response.trim().length()>0){
socketChannel.write(ByteBuffer.wrap(response.getBytes(StandardCharsets.UTF_8)));
}
}
}
}
这样我们启动main线程,就会启动这个TimeServer。