一、BIO
java.io 中最核心的概念是流(stream),面向流编程,在 Java 中,一个流要么是输入流,要么是输出流
Blocking IO:阻塞 IO 处理多任务可采用多线程的方式
- 一旦采用多线程的方式,可能会造成资源浪费,因为一个线程就会开辟一个栈内存空间
- ServerSocket 的 accept 会阻塞
- socket.getInputStream() 的 read 会阻塞
二、NIO
java.nio 中拥有三个核心概念,selector、channel、buffer,在 nio 中是面向块(block)或者是缓冲区(buffer)编程的
No Blocking IO:非阻塞IO,也就是单线程的实现多线程的方式
- 设置 ServerSocket 的 accept 为非阻塞
- 当有客户端连接时,保存客户端的 Socket
- 设置读取数据 read 为非阻塞
- 循环 List 中的 Socket 看看是否有人发送数据
Buffer
buffer 本身就是一块内存,实际上就是一个数组,数据的读写都是通过 buffer 来实现的。
java 中 8 种基本数据类型都有各自对应的 buffer 类型(除 boolean 外),如 IntBuffer、CharBuffer、ByteBuffer、ShortBuffer、LongBuffer 等。
capacity:最大容量,永远不可能是负数,并且是不会变化的
limit:限制,永远不可能是负数,并且不会大于 capacity
position:写一个读或者写的位置,永远不可能是负数,并且不会大于 limit
- buffer.put():往此 buffer 中放置元素(往数组中写)
- buffer.get():往此 buffer 中取出元素(往数组中读)
Channel
所有数据读写都是通过 buffer 来进行的,永远不会出现直接在 channel 中直接写入、读取数据,与 stream 不同的是,channel 是双向的,而 stream 可能是 inputSteram 或 outputStream,而 channel 打开后可以读又可以写
- channel.read(buffer):从通道中读取数据写到 buffer 中,对于 buffer 来说是写操作
- channel.write(buffer):从 buffer 中读取数据写到到 channel 中,对于 buffer 来说是读操作
Selector
一个 channel 注册到 selector 上,这个动作是通过 selectionKey来表示的,一个 selector 会维护三种 selectorKey 集合
- keys():表示注册到 selector 上面所有的 selectionKey,通过 keys() 方法返回 => 全集
- selectedKeys():表示感兴趣的 selectionKey,通过 selectionKeys() 方法返回 => 子集
SelectionKey
是一个抽象类,表示 selectableChannel 在 Selector 中注册的标识,每个 Channel 向 Selector 注册时,都会创建一个 selectionKey 将 Channel 与 Selector 关联在一起,并维护了 Channel 事件。
- 可以通过 cannel 方法取消,取消的键不会立即从 selector 中移除,而是添加到 canneledKeys 中,再下一次 select 操作时会移除,所以再调用某个 key 时,需要使用 isValid 进行校验
NIO 通讯流程
底层原理
NIO 底层 IO 模型的实现和操作系统相关,通过注册给给操作系统来实现接受消息
Windows OS 会调用操作系统的 select 函数,也是一个轮询机制
select(0, &errreadfds, &errwritefds, &errexceptfds, &zerotime)
Linux OS 会调用 epoll 函数,是一个事件机制
/**
* Returns the default SelectorProvider.
*/
public static SelectorProvider create() {
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
}
package org.wesoft.advanced.tcp.nio.simulate;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
public class Server {
private static List<SocketChannel> list = new ArrayList<>();
public static void main(String[] args) {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(6379));
serverSocketChannel.configureBlocking(false); // 设置非阻塞
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel == null) {
Thread.sleep(1000);
System.out.println("等待连接...");
} else {
System.out.println("来了一个客户端....");
socketChannel.configureBlocking(false); // 设置非阻塞
list.add(socketChannel);
}
for (SocketChannel client : list) {
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
int len = client.read(byteBuffer);
System.out.println(len);
if (len > 0) {
byteBuffer.flip();
System.out.println("收到了客户端的消息: " + new String(byteBuffer.array()));
}
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
package org.wesoft.advanced.tcp.nio.simulate;
import java.io.IOException;
import java.net.Socket;
import java.util.Scanner;
public class Client {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("127.0.0.1", 6379);
Scanner scanner = new Scanner(System.in);
while (true) {
String content = scanner.next();
socket.getOutputStream().write(content.getBytes());
}
}
}