public class NIOTimeClient {
//本地默认端口号
private static final int LOCAL_PORT = 3738;
//服务端默认端口号
private static final int REMOTE_PORT = 8585;
public static void main(String[] args) {
new Thread(new TimeClientHandler("127.0.0.1", REMOTE_PORT)).start();
}
private static class TimeClientHandler implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop = false;
public TimeClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
//从本地3738端口连接
socketChannel.bind(new InetSocketAddress(LOCAL_PORT));
socketChannel.configureBlocking(false);
System.out.println("TimeClient started at port " + LOCAL_PORT);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
doConnect();
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
while (!stop) {
try {
//选择就绪通道
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
SelectionKey selectionKey;
while (it.hasNext()) {
selectionKey = it.next();
it.remove();
try {
handleInput(selectionKey);
} catch (Exception e) {
if (null != selectionKey) {
selectionKey.cancel();
if (null != selectionKey.channel()) {
selectionKey.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
//停止 关闭selector
if(null != selector){
try {
System.out.println("关闭selector");
//selector关闭之后 注册在其上面的channel都会关闭
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 处理输入
*/
private void handleInput(SelectionKey selectionKey) throws IOException {
if(selectionKey.isValid()){
//获取key对应的channel
SocketChannel channel = (SocketChannel) selectionKey.channel();
//首先判断connection
if(selectionKey.isConnectable()){
if(channel.finishConnect()){
channel.register(selector,SelectionKey.OP_READ);
doWrite(channel);
}else {
System.exit(1);
}
}
//判断是否可读
if(selectionKey.isReadable()){
ByteBuffer buffer = ByteBuffer.allocate(1025-1);
int read = channel.read(buffer);
if(read>0){
//回绕缓冲区 方便读取
buffer.flip();
byte[] elements = new byte[buffer.remaining()];
buffer.get(elements);
String content = new String(elements,StandardCharsets.UTF_8);
System.out.println("The time client received time : " + content);
stop = true;
}else if(read < 0){
selectionKey.cancel();
channel.close();
}else {
//读到0字节 忽略
}
}
}
}
private void doConnect() throws IOException {
//如果直接连接成功 就将其注册到selector并监听读取就绪事件
if (socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
} else {
//没有连接成功 就注册 注意这里是OP_CONNECT
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
private void doWrite(SocketChannel socketChannel) throws IOException {
byte[] request = "QUERY TIME ORDER ".getBytes(StandardCharsets.UTF_8);
ByteBuffer byteBuffer = ByteBuffer.allocate(request.length);
byteBuffer.put(request);
byteBuffer.flip();
socketChannel.write(byteBuffer);
if (!byteBuffer.hasRemaining()) {
System.out.println("Send order to server successfully.");
}
}
}
}