public class Const { public static int DEFAULT_PORT = 12345; public static String DEFAULT_SERVER_IP = "127.0.0.1"; /*根据输入信息拼接出一个应答信息*/ public static String response(String msg){ return "Hello,"+msg+",Now is "+new java.util.Date( System.currentTimeMillis()).toString() ; }}
服务端编程
public class NioServer { private static NioServerHandle nioServerHandle; public static void main(String[] args){ nioServerHandle = new NioServerHandle(DEFAULT_PORT); new Thread(nioServerHandle,"Server").start(); }}
/** * @author * 类说明:nio通信服务端处理器 */public class NioServerHandle implements Runnable{ private volatile boolean started; private ServerSocketChannel serverSocketChannel; private Selector selector; /** * 构造方法 * @param port 指定要监听的端口号 */ public NioServerHandle(int port) { try { /*创建选择器的实例*/ selector = Selector.open(); /*创建ServerSocketChannel的实例*/ serverSocketChannel = ServerSocketChannel.open(); /*设置通道为非阻塞模式*/ serverSocketChannel.configureBlocking(false); /*绑定端口*/ serverSocketChannel.socket().bind(new InetSocketAddress(port)); /*注册事件,表示关心客户端连接*/ serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT); started = true; System.out.println("服务器已启动,端口号:"+port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { while(started){ try { /*获取当前有哪些事件*/ selector.select(1000); /*获取事件的集合*/ Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while(iterator.hasNext()){ SelectionKey key = iterator.next(); /*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。 如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活 的键出现,这会导致我们尝试再次处理它。*/ iterator.remove(); handleInput(key); } } catch (IOException e) { e.printStackTrace(); } } } /*处理事件的发生*/ private void handleInput(SelectionKey key) throws IOException { if(key.isValid()){ /*处理新接入的客户端的请求*/ if(key.isAcceptable()){ /*获取关心当前事件的Channel*/ ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); /*接受连接*/ SocketChannel sc = ssc.accept(); System.out.println("==========建立连接========="); sc.configureBlocking(false); /*关注读事件*/ sc.register(selector,SelectionKey.OP_READ); } /*处理对端的发送的数据*/ if(key.isReadable()){ SocketChannel sc = (SocketChannel) key.channel(); /*创建ByteBuffer,开辟一个缓冲区*/ ByteBuffer buffer = ByteBuffer.allocate(1024); /*从通道里读取数据,然后写入buffer*/ int readBytes = sc.read(buffer); if(readBytes>0){ /*将缓冲区当前的limit设置为position,position=0, 用于后续对缓冲区的读取操作*/ buffer.flip(); /*根据缓冲区可读字节数创建字节数组*/ byte[] bytes = new byte[buffer.remaining()]; /*将缓冲区可读字节数组复制到新建的数组中*/ buffer.get(bytes); String message = new String(bytes,"UTF-8"); System.out.println("服务器收到消息:"+message); /*处理数据*/ String result = Const.response(message); /*发送应答消息*/ doWrite(sc,result); }else if(readBytes<0){ /*取消特定的注册关系*/ key.cancel(); /*关闭通道*/ sc.close(); } } } } /*发送应答消息*/ private void doWrite(SocketChannel sc,String response) throws IOException { byte[] bytes = response.getBytes(); ByteBuffer buffer = ByteBuffer.allocate(bytes.length); buffer.put(bytes); buffer.flip(); sc.write(buffer); } public void stop(){ started = false; }}
客户端
public class NioClient { private static NioClientHandle nioClientHandle; public static void start(){ nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,DEFAULT_PORT); //nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,8888); new Thread(nioClientHandle,"client").start(); } //向服务器发送消息 public static boolean sendMsg(String msg) throws Exception{ nioClientHandle.sendMsg(msg); return true; } public static void main(String[] args) throws Exception { start(); Scanner scanner = new Scanner(System.in); while(NioClient.sendMsg(scanner.next())); }}
/** * @author * 类说明:nio通信客户端处理器 */public class NioClientHandle implements Runnable{ private String host; private int port; private volatile boolean started; private Selector selector; private SocketChannel socketChannel; public NioClientHandle(String ip, int port) { this.host = ip; this.port = port; try { /*创建选择器的实例*/ selector = Selector.open(); /*创建ServerSocketChannel的实例*/ socketChannel = SocketChannel.open(); /*设置通道为非阻塞模式*/ socketChannel.configureBlocking(false); started = true; } catch (IOException e) { e.printStackTrace(); } } public void stop(){ started = false; } @Override public void run() { try{ doConnect(); }catch(IOException e){ e.printStackTrace(); System.exit(1); } //循环遍历selector while(started){ try{ //无论是否有读写事件发生,selector每隔1s被唤醒一次 selector.select(1000); //获取当前有哪些事件可以使用 Set<SelectionKey> keys = selector.selectedKeys(); //转换为迭代器 Iterator<SelectionKey> it = keys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); /*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。 如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活 的键出现,这会导致我们尝试再次处理它。*/ it.remove(); try{ handleInput(key); }catch(Exception e){ if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } }catch(Exception e){ e.printStackTrace(); System.exit(1); } } //selector关闭后会自动释放里面管理的资源 if(selector != null) try{ selector.close(); }catch (Exception e) { e.printStackTrace(); } } //具体的事件处理方法 private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ //获得关心当前事件的channel SocketChannel sc = (SocketChannel) key.channel(); //连接事件 if(key.isConnectable()){ if(sc.finishConnect()){ socketChannel.register(selector, SelectionKey.OP_READ);} else System.exit(1); } //有数据可读事件 if(key.isReadable()){ //创建ByteBuffer,并开辟一个1M的缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); //读取请求码流,返回读取到的字节数 int readBytes = sc.read(buffer); //读取到字节,对字节进行编解码 if(readBytes>0){ //将缓冲区当前的limit设置为position,position=0, // 用于后续对缓冲区的读取操作 buffer.flip(); //根据缓冲区可读字节数创建字节数组 byte[] bytes = new byte[buffer.remaining()]; //将缓冲区可读字节数组复制到新建的数组中 buffer.get(bytes); String result = new String(bytes,"UTF-8"); System.out.println("客户端收到消息:" + result); } //链路已经关闭,释放资源 else if(readBytes<0){ key.cancel(); sc.close(); } } } } private void doWrite(SocketChannel channel,String request) throws IOException { //将消息编码为字节数组 byte[] bytes = request.getBytes(); //根据数组容量创建ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); //将字节数组复制到缓冲区 writeBuffer.put(bytes); //flip操作 writeBuffer.flip(); //发送缓冲区的字节数组 /*关心事件和读写网络并不冲突*/ channel.write(writeBuffer); } private void doConnect() throws IOException{ /*非阻塞的连接*/ if(socketChannel.connect(new InetSocketAddress(host,port))){ socketChannel.register(selector,SelectionKey.OP_READ); }else{ socketChannel.register(selector,SelectionKey.OP_CONNECT); } } //写数据对外暴露的API public void sendMsg(String msg) throws Exception{ doWrite(socketChannel, msg); }}