Client1加入聊天室,当selector监听到了accept信号之后,就会做handles的处理,将客户端注册到selector上
当客户向服务端发送了信息之后,就会认为socketchannel就会被触发read事件。
读取出来之后,转发给其他的客户端
但selector是阻塞的,如果没有监听到任何的信息,selector就会阻塞住。
如果再有一个客户端加入,selector就会监听到accept,然后加入这个客户端的channel,并监听这个channel的read事件。
也就是一个客户端对应一个channel,selector负责监听各个channel的信号。
一个selector对象在一个线程里面监听处理多个通道
代码演示
先来演示ServerSocket这个Channel的
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Set;
public class ChatServer {
private static final int DEFAULT_PORT = 8888;
private static final String QUIT = "quit";
private static final int BUFFERE = 1024;
private ServerSocketChannel server;
private Selector selector;
private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFERE);
private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFERE);
private Charset charset = Charset.forName("UTF-8");
private int port;
public ChatServer(){
this(DEFAULT_PORT);
}
public ChatServer(int port){
this.port = port;
}
private void start(){
try {
server = ServerSocketChannel.open();
server.configureBlocking(false);
server.socket().bind(new InetSocketAddress(port));
selector = Selector.open();
server.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("启动服务器,监听端口:" + port + "...");
while (true){
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 遍历每一个触发的事件
for (SelectionKey key : selectionKeys){
// 处理被触发的事件
handles(key);
}
selectionKeys.clear();
}
} catch (IOException e) {
e.printStackTrace();
}finally {
close(selector);
}
}
private void handles(SelectionKey key) throws IOException {
// ACCEPT事件 —— 和客户端建立了连接 ServerSocketChannel上的
//将客户端绑定selector,这样后面selector才能监听客户端的channel上的消息
if (key.isAcceptable()){
SocketChannel client = (SocketChannel) key.channel();
// SocketChannel client = server.accept();
client.configureBlocking(false);
client.register(selector,SelectionKey.OP_READ);
System.out.println(getClientName(client) + "已连接");
}
// READ事件 —— 客户端发送了消息给服务器端 SocketChannel上的
else if (key.isReadable()){
SocketChannel client = (SocketChannel) key.channel();
// 往Channel中读取信息
String fwdMsg = receive(client);
if (fwdMsg.isEmpty()){
//客户端异常 取消这个通道
key.cancel();
selector.wakeup();
}else{
//转发数据到其他客户端
forwardMessage(client, fwdMsg);
//检查用户是否退出
if (readyToQuit(fwdMsg)){
key.cancel();
selector.wakeup();
System.out.println(getClientName(client) + "已断开");
}
}
}
}
private void forwardMessage(SocketChannel client, String fwdMsg) throws IOException {
for (SelectionKey key : selector.keys()){
Channel connectedClient = key.channel();
if (connectedClient instanceof ServerSocketChannel){
continue;
}
if (key.isValid() && !client.equals(connectedClient)){
//发送消息
wBuffer.clear();
wBuffer.put(charset.encode(getClientName(client) + ":" + fwdMsg));
wBuffer.flip();
while (wBuffer.hasRemaining()){
((SocketChannel)connectedClient).write(wBuffer);
}
}
}
}
private String receive(SocketChannel client) throws IOException {
rBuffer.clear();
// rBuffer读取SocketChannel中的消息
while (client.read(rBuffer) > 0);
rBuffer.flip();
return String.valueOf(charset.decode(rBuffer));
}
private String getClientName(SocketChannel client){
return "客户端[" + client.socket().getPort() + "]";
}
private boolean readyToQuit(String msg){
return QUIT.equals(msg);
}
private void close(Closeable closeable){
if (closeable != null){
try{
closeable.close();
}catch (IOException e){
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ChatServer chatServer = new ChatServer(7777);
chatServer.start();
}
}