
Client1加入聊天室,当selector监听到了accept信号之后,就会做handles的处理,将客户端注册到selector上
当客户向服务端发送了信息之后,就会认为socketchannel就会被触发read事件。
读取出来之后,转发给其他的客户端
但selector是阻塞的,如果没有监听到任何的信息,selector就会阻塞住。
如果再有一个客户端加入,selector就会监听到accept,然后加入这个客户端的channel,并监听这个channel的read事件。
也就是一个客户端对应一个channel,selector负责监听各个channel的信号。
一个selector对象在一个线程里面监听处理多个通道
代码演示
先来演示ServerSocket这个Channel的
import java.io.Closeable;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.*;import java.nio.charset.Charset;import java.util.Set;public class ChatServer {private static final int DEFAULT_PORT = 8888;private static final String QUIT = "quit";private static final int BUFFERE = 1024;private ServerSocketChannel server;private Selector selector;private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFERE);private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFERE);private Charset charset = Charset.forName("UTF-8");private int port;public ChatServer(){this(DEFAULT_PORT);}public ChatServer(int port){this.port = port;}private void start(){try {server = ServerSocketChannel.open();server.configureBlocking(false);server.socket().bind(new InetSocketAddress(port));selector = Selector.open();server.register(selector, SelectionKey.OP_ACCEPT);System.out.println("启动服务器,监听端口:" + port + "...");while (true){selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();// 遍历每一个触发的事件for (SelectionKey key : selectionKeys){// 处理被触发的事件handles(key);}selectionKeys.clear();}} catch (IOException e) {e.printStackTrace();}finally {close(selector);}}private void handles(SelectionKey key) throws IOException {// ACCEPT事件 —— 和客户端建立了连接 ServerSocketChannel上的//将客户端绑定selector,这样后面selector才能监听客户端的channel上的消息if (key.isAcceptable()){SocketChannel client = (SocketChannel) key.channel();// SocketChannel client = server.accept();client.configureBlocking(false);client.register(selector,SelectionKey.OP_READ);System.out.println(getClientName(client) + "已连接");}// READ事件 —— 客户端发送了消息给服务器端 SocketChannel上的else if (key.isReadable()){SocketChannel client = (SocketChannel) key.channel();// 往Channel中读取信息String fwdMsg = receive(client);if (fwdMsg.isEmpty()){//客户端异常 取消这个通道key.cancel();selector.wakeup();}else{//转发数据到其他客户端forwardMessage(client, fwdMsg);//检查用户是否退出if (readyToQuit(fwdMsg)){key.cancel();selector.wakeup();System.out.println(getClientName(client) + "已断开");}}}}private void forwardMessage(SocketChannel client, String fwdMsg) throws IOException {for (SelectionKey key : selector.keys()){Channel connectedClient = key.channel();if (connectedClient instanceof ServerSocketChannel){continue;}if (key.isValid() && !client.equals(connectedClient)){//发送消息wBuffer.clear();wBuffer.put(charset.encode(getClientName(client) + ":" + fwdMsg));wBuffer.flip();while (wBuffer.hasRemaining()){((SocketChannel)connectedClient).write(wBuffer);}}}}private String receive(SocketChannel client) throws IOException {rBuffer.clear();// rBuffer读取SocketChannel中的消息while (client.read(rBuffer) > 0);rBuffer.flip();return String.valueOf(charset.decode(rBuffer));}private String getClientName(SocketChannel client){return "客户端[" + client.socket().getPort() + "]";}private boolean readyToQuit(String msg){return QUIT.equals(msg);}private void close(Closeable closeable){if (closeable != null){try{closeable.close();}catch (IOException e){e.printStackTrace();}}}public static void main(String[] args) {ChatServer chatServer = new ChatServer(7777);chatServer.start();}}
