Server端
public class Server { //定义属性 private Selector selector; private ServerSocketChannel listenChannel; private static final int PORT = 6667; //构造器完成初始化工作 public Server() { try { //得到选择器 selector = Selector.open(); //serverSocketChannel listenChannel = ServerSocketChannel.open(); //绑定端口 listenChannel.socket().bind(new InetSocketAddress(PORT)); //设置非阻塞 listenChannel.configureBlocking(false); //将listenerChannel注册到selector listenChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (Exception e) { e.printStackTrace(); } } //监听 public void listen() { try { while (true) { int count = selector.select(2000); if (count > 0) { //表明有事件需要处理 //遍历得到selectionKey集合 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while ((iterator.hasNext())) { //取出SelectionKey SelectionKey key = iterator.next(); //监听到accept if (key.isAcceptable()) { SocketChannel socketChannel = listenChannel.accept(); //设置非阻塞 socketChannel.configureBlocking(false); //将该socketChannel注册到seletor上 socketChannel.register(selector, SelectionKey.OP_READ); //提示上线 System.out.println(socketChannel.getRemoteAddress() + "上线了"); } if (key.isReadable()) { //读取数据 readData(key); } //删除key,防止重复 iterator.remove(); } } else { //System.out.println("等待中"); } } } catch (Exception e) { e.printStackTrace(); } finally { } } /** * 读取消息 * * @param key 传入SelectionKey */ private void readData(SelectionKey key) { //定义socketChannel SocketChannel channel = null; try { channel = (SocketChannel) key.channel(); //创建Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); int count = channel.read(buffer); //根据count的值做处理 if (count > 0) { //读取到了数据转化成字符串 String msg = new String(buffer.array()); //输出消息 System.out.println("form 客户端" + msg); //向其他的客户端转发消息 sendInfoToaOtherClients(msg, channel); } } catch (Exception e) { try { System.out.println(channel.getRemoteAddress() + "离线了"); //取消注册 key.cancel(); //关闭通道 channel.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } /** * 转发消息给其他用户 * * @param msg 消息 * @param self 排除自己--传入自己的channel */ private void sendInfoToaOtherClients(String msg, SocketChannel self) throws IOException { System.out.println("服务器转发消息"); //遍历所有注册到selector中的socketChannel并排除自己 for (SelectionKey key : selector.keys()) { //获取到key对应的channel Channel channel = key.channel(); //排除自己 if (channel instanceof SocketChannel && channel != self) { SocketChannel dest = (SocketChannel) channel; //将消息存储到buffer ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); //将buffer的数据写入通道 dest.write(buffer); } } } public static void main(String[] args) { Server server = new Server(); //监听 server.listen(); }}
Client端
public class Client1 { private final static String HOST = "127.0.0.1"; //ip private final static int PORT = 6667; //端口 private final Selector selector; private final SocketChannel socketChannel; private final String username; public Client1() throws Exception { selector = Selector.open(); //连接 socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT)); //设置非阻塞 socketChannel.configureBlocking(false); //将channel注册到selector socketChannel.register(selector, SelectionKey.OP_READ); //得到username username = socketChannel.getLocalAddress().toString().substring(1); System.out.println(username + "is ok"); } /** * 向服务器端发送消息 * * @param info 发送的消息 */ public void sendInfo(String info) { info = username + "说:" + info; try { socketChannel.write(ByteBuffer.wrap(info.getBytes(StandardCharsets.UTF_8))); } catch (Exception e) { e.printStackTrace(); } } public void readInfo() { try { int ReadSelects = selector.select(); if (ReadSelects > 0) {//说明有事件发生的通道 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isReadable()) { //得到相关的通道 SocketChannel channel = (SocketChannel) key.channel(); //得到一个buffer ByteBuffer buffer = ByteBuffer.allocate(1024); channel.read(buffer); //输出读到缓冲区的内容 String msg = new String(buffer.array()); System.out.println(msg.trim()); } } iterator.remove(); //移除当前key } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { //启动客户端 Client1 client1 = new Client1(); //启动线程,每隔3s,读取从服务器端发送数据 new Thread(() -> { while (true) { client1.readInfo(); try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } } }).start(); //发送数据给服务器 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String s = scanner.nextLine(); client1.sendInfo(s); } }}