简介
- NIO,是jdk4提供的一种新的IO标准,是一种同步非阻塞的IO,NIO是以块为单位进行数据处理的,当然块的大小是程序员自己指定的。其相对于BIO的以字节为单位所进行的阻塞式处理方式,大大提高了读写效率和并发度
- BIO,每个客户端连接到来,都会有一个channel,同时都会创建一个专门负责处理此业务的线程,但是计算机的线程数有限,所以,高并发场景下,会将计算机的线程数全部打满。不可用
- NIO,每个客户端连接到来,都会有一个channel,但是只有一个线程处理所有的请求,此线程会根据选择器选择准备就绪的channel,然后执行相应逻辑
Selector
- 多路复用器,包括三个set
- keys:所有注册连接的channel,都会有对应的SelectKey加入到该set中
- select-keys:所有已经准备就绪的channel的key,会加到此set中,keys中的是不会变的,是keys的子集
- cancel-keys:如果channel被取消了,会先将此channel的key从select-key中移除到cancel-key中,在下次执行select方法的时候,会将cancel-keys已经keys中已经取消的key全部清除。
SelectionKey
- 含义:一个令牌,表示一个Channel注册到selector的令牌
- 时机:每次channel注册到selector都会创建一个selectionKey,一个key保持他的有效性,直到channel被cancel取消掉。有三种方式关闭,channel.cancel、channel.close、selector.close。取消一个selectionKey,并不会直接删除,而是添加到cancel-keys中,为的是,在下次select操作时,会将cancel-keys清空,同时删除selector的keys中已经取消的selectionKey
- 每个selectionKey包含了两个整数,这两个操作集合都是整型数值。
- 一个是表示拥有此key的channel所关注的操作事件,是一个二进制位的整数,即interest set:表示与此selectionKey相关的channel,关注的客户端请求是什么状态,好比,当channel注册到selector的时候,会告知selector,此channel关注的,感兴趣的是什么事件,好比客户端连接事件,此channel关注哪种操作类型。存在四种操作类型,四种状态。通过selectionKey.interestOps():int 获取到感兴趣的集合
- 一个是就绪的集合,ready set,也是一个整数,是操作系统底层去查询判断就绪的信息。通过selectionKey.readyOps():int,可以获取到准备就绪的集合。
- 通过SelectionKey可以获取到
- Selector
- channel
- 还可以获取到状态,是可连接的,还是可写的,四种状态。
NIO简单通信案例
服务端
public class NioServer {
public static void main(String[] args) throws IOException {
//创建一个服务端channel
ServerSocketChannel serverChanel = ServerSocketChannel.open();
//指定channel采用的为非阻塞模式
serverChanel.configureBlocking(false);
//指定要监听的端口
serverChanel.bind(new InetSocketAddress(8888));
//创建一个多路复用器selector
Selector selector = Selector.open();
//将channel注册到selector,并告诉selector让其监听"接受client连接事件"
serverChanel.register(selector, SelectionKey.OP_ACCEPT);
while (true){
//select()是一个阻塞方法,若阻塞1秒的时间到了,或在阻塞期间有channel就绪,都会打破阻塞
if (selector.select(1000)==0) {
System.out.println("当前没有找到就绪的channel");
continue;
}
//获取所有就绪的channel的key
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey key : selectionKeys) {
//若当前key为OPT_ACCEPT,则说明当前channel是可以接收客户端连接的
//那么,这里的代码就是用于接收客户端连接的
if(key.isAcceptable()){
System.out.println("接收到client的连接");
//获取连接到Server的客户端channel,其是客户端channel在Server端的代表(驻京办)
SocketChannel clientChannel = serverChanel.accept();
clientChannel.configureBlocking(false);
//将客户端channel注册到selector,并告诉selector让其监听这个channel中是否发生了读事件
clientChannel.register(selector,SelectionKey.OP_READ);
}
//若当前key为OPT_READ,则说明当前channel中有客户端发送来的数据。
//那么,这里的代码就是用于读取channel中的数据的
if (key.isReadable()) {
try {
//创建buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
//根据key获取其对应的channel
SocketChannel clientChannel = (SocketChannel)key.channel();
//将channel中的数据读取到buffer
clientChannel.read(buffer);
}catch (Exception e){
//若在读取过程中发生异常,则直接取消该Key,即放弃该channel
key.cancel();
}
}
//删除当前处理过得key,以免重复处理
selectionKeys.remove(key);
}//end-for
}
}
}
客户端
public class NioClient {
public static void main(String[] args) throws Exception {
SocketChannel clientChannel = SocketChannel.open();
clientChannel.configureBlocking(false);
InetSocketAddress serverAddr = new InetSocketAddress("localhost", 8888);
//连接Server
if (!clientChannel.connect(serverAddr)) { //首次连接
while (!clientChannel.finishConnect()) { //完成重连
continue;
}
}
//将消息写入到channel
clientChannel.write(ByteBuffer.wrap("hello".getBytes("UTF-8")));
System.out.println("Client消息已发送");
System.in.read();
}
}
NIO群聊案例
服务端
public class NioChatServerStarter {
public static void main(String[] args) throws Exception{
//创建一个服务端channel
ServerSocketChannel serverChanel = ServerSocketChannel.open();
//指定channel采用的为非阻塞模式
serverChanel.configureBlocking(false);
//指定要监听的端口
serverChanel.bind(new InetSocketAddress(8888));
//创建一个多路复用器selector
Selector selector = Selector.open();
serverChanel.register(selector, SelectionKey.OP_ACCEPT);
//创建支持群聊的NIO Server
NioChatServer chatServer = new NioChatServer();
chatServer.enableChat(serverChanel,selector);
}
}
public class NioChatServer {
//开启Server的支持群聊功能
public void enableChat(ServerSocketChannel serverChanel, Selector selector) throws Exception{
System.out.println("chatServer启动....");
while (true){
if (selector.select(1)==0) {
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey key : selectionKeys) {
//处理客户端上线
if (key.isAcceptable()) {
SocketChannel clientChannel = serverChanel.accept();
clientChannel.configureBlocking(false);
clientChannel.register(selector,SelectionKey.OP_READ);
//获取到client地址
String msg = clientChannel.getRemoteAddress()+ "-上线了-";
//将上线通知,广播给所有在线的其他client
sendMsgToOtherClientOnline(selector,clientChannel,msg);
}
//处理客户端发送消息
if (key.isReadable()) {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
clientChannel.read(buffer);
//获取来自于client的消息,trim()将buffer中没有数据的内容转为了空格去掉
String msgFromClient = new String(buffer.array()).trim();
if (msgFromClient.length()>0) {
//构造要发送给其他client的消息
String msgToSend = clientChannel.getRemoteAddress() + "say:"+msgFromClient;
//若client发送的是字符串88,则表示其要下线
if ("88".equals(msgFromClient)) {
msgToSend = clientChannel.getRemoteAddress() +"下线了";
//取消当前key,即放弃其所对应的channel
//将其对应的channel从selector中去掉
key.cancel();
}
//将client消息广播给所有在线的其他client
sendMsgToOtherClientOnline(selector,clientChannel,msgToSend);
}
}//end-if
selectionKeys.remove(key);
}//end-for
}
}
private void sendMsgToOtherClientOnline(Selector selector, SocketChannel self, String msg) throws IOException {
//遍历所有注册到selector的channel,即所有在线的client
Set<SelectionKey> keys = selector.keys();
for (SelectionKey key : keys) {
SelectableChannel channel = key.channel();
//将消息发送给所有其他client
if (channel instanceof SocketChannel && channel != self) {
((SocketChannel)channel).write(ByteBuffer.wrap(msg.trim().getBytes()));
}
}
}
}
客户端
public class NioChatClientStarter {
public static void main(String[] args) throws Exception {
SocketChannel clientChannel = SocketChannel.open();
clientChannel.configureBlocking(false);
InetSocketAddress serverAddr = new InetSocketAddress("localhost", 8888);
//连接Server
if (!clientChannel.connect(serverAddr)) { //首次连接
while (!clientChannel.finishConnect()) { //完成重连
continue;
}
}
NioChatClient client = new NioChatClient();
client.enableChat(clientChannel);
}
}
public class NioChatClient {
//启动聊天功能
public void enableChat(SocketChannel clientChannel) throws Exception {
//获取client自己的地址
SocketAddress selfAddr = clientChannel.getLocalAddress();
System.out.println(selfAddr +",你已经成功上线了");
//创建一个线程用于不间断的接收来自于Server的消息
new Thread(){
@Override
public void run() {
//实现不间断
while (true){
try {
//若当前client已经关闭,则结束循环
//否则正常接收来自Server的消息
if (!clientChannel.isConnected()) {
return;
}
receiveMsgFromServer(clientChannel);
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}.start();
//注意,该方法不能写在前面的创建线程之前,这样会导致无法接收到来自于Server的消息,
//因为该方法中的Scanner是阻塞的
//向Server发送消息
sendMsgToServer(clientChannel);
}
//向Server发送消息
private void sendMsgToServer(SocketChannel clientChannel) throws Exception {
//接收来自键盘的输入
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String msg = scanner.nextLine();
//将消息写入到channel,其中有可能是下线请求消息88
clientChannel.write(ByteBuffer.wrap(msg.trim().getBytes()));
//若消息为88,则表示当前client要下线,则将该channel关闭
if ("88".equalsIgnoreCase(msg.trim())) {
//关闭客户端
clientChannel.close();
return;
}
}
}
private void receiveMsgFromServer(SocketChannel clientChannel) throws Exception{
ByteBuffer buffer = ByteBuffer.allocate(1024);
clientChannel.read(buffer);
String msg = new String(buffer.array()).trim();
if (msg.length()>0) {
System.out.println(msg);
}
}
}
测试
- 开启多个客户端的结果
- server

- client1

