服务端
public class SelectServer { private static Executor executor = Executors.newFixedThreadPool(10); public static void main(String[] args) throws Exception{ // 打开服务端通道 ServerSocketChannel ssc = ServerSocketChannel.open(); // 打开选择器 Selector selector = Selector.open(); // 服务端通过到绑定到端口 ssc.socket().bind(new InetSocketAddress(8022)); // 配置为非阻塞 ssc.configureBlocking(false); // 将服务端通道的连接事件注册到选择器上 // 与Selector一起使用时,Channel必须处于非阻塞模式下。这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道都可以 ssc.register(selector, SelectionKey.OP_ACCEPT); // 开始监听 while (true){ // 100ms内无事件发生 if (selector.select(100) == 0){ continue; } // 选择器获取100ms内发生的事件 Iterator<SelectionKey> iterable = selector.selectedKeys().iterator(); while (iterable.hasNext()){ // 处理事件 SelectionKey key = iterable.next(); // 监听连接事件 if (key.isAcceptable()){ handleAcceptable(key); } // 监听连接事件 if (key.isReadable()){ handleReadable(key); } // 监听连接事件 if (key.isWritable() && key.isValid()){ handleWritable(key); } // 监听连接事件 if (key.isConnectable()){ System.out.println("isConnectable = true"); } iterable.remove(); } } } private static void handleWritable(SelectionKey key){ // 另起worker线程处理实际工作 executor.execute(new Runnable() { @Override public void run() { try { // 获取客户端通道 SocketChannel sc = (SocketChannel) key.channel(); // 获取服务端准备发送附件内容 ByteBuffer buf = (ByteBuffer)key.attachment(); buf.flip(); while(buf.hasRemaining()){ sc.write(buf); } buf.compact(); }catch (Exception e){ } } }); } private static void handleReadable(SelectionKey key){ // 另起worker线程处理实际工作 executor.execute(new Runnable() { @Override public void run() { try { // 获取客户端通道 SocketChannel sc = (SocketChannel)key.channel(); // 获取客户端上传附件信息 ByteBuffer buf = (ByteBuffer)key.attachment(); // 读取内容 long bytesRead = sc.read(buf); while(bytesRead>0){ buf.flip(); while(buf.hasRemaining()){ System.out.print((char)buf.get()); } System.out.println(); buf.clear(); bytesRead = sc.read(buf); } if(bytesRead == -1){ sc.close(); } }catch (Exception e){ } } }); } private static void handleAcceptable(SelectionKey key){ // 另起worker线程处理实际工作 executor.execute(new Runnable() { @Override public void run() { try { // 拿到事件关联的通道 ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel(); // 拿到客户端 SocketChannel sc = ssChannel.accept(); // 配置为非阻塞 sc.configureBlocking(false); // 将客户端通道的可读事件注册到选择器上 sc.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocateDirect(1024)); }catch (Exception e){ } } }); }}
客户端
public class SelectClient { public static void main(String[] args) throws Exception { ByteBuffer buffer = ByteBuffer.allocate(1024); SocketChannel socketChannel = null; try { socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress("127.0.0.1", 8022)); if (socketChannel.finishConnect()) { int i = 0; while (true) { TimeUnit.SECONDS.sleep(1); String info = "I'm " + i++ + "-th information from client"; buffer.clear(); buffer.put(info.getBytes()); buffer.flip(); while (buffer.hasRemaining()) { System.out.println(buffer); socketChannel.write(buffer); } } } } catch (IOException | InterruptedException e) { e.printStackTrace(); } finally { try { if (socketChannel != null) { socketChannel.close(); } } catch (IOException e) { e.printStackTrace(); } } }}