在上一篇文章已经介绍过 Java NIO 的基本使用,NIO 中有一个重要的组件 Selector,其解决一个线程在多个网络连接上的多路复用问题。我会分为上下两篇文章来为大家深入分析 Java NIO 中 IO 多路复用的底层原理,这篇文章我将介绍 Selector 组件的底层系统调用过程。
什么是 SelectionKey?
我们以下面简单的代码为例,启动一个服务端进程:
public static void main(String[] args) throws IOException, InterruptedException {
// 创建NIO ServerSocketChannel
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(9000));
// 设置ServerSocketChannel为非阻塞
serverSocket.configureBlocking(false);
// 打开Selector处理Channel,即创建epoll
Selector selector = Selector.open();
// 把ServerSocketChannel注册到selector上,并且selector对客户端accept连接操作感兴趣
SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务启动成功");
while (true) {
// 阻塞等待需要处理的事件发生
selector.select();
// 获取selector中注册的全部事件的 SelectionKey实例
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 遍历SelectionKey对事件进行处理
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 如果是OP_ACCEPT事件,则进行连接获取和事件注册
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = server.accept();
socketChannel.configureBlocking(false);
// 这里只注册了读事件,如果需要给客户端发送数据可以注册写事件
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("客户端连接成功");
// 如果是OP_READ事件,则进行读取
} else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
int len = socketChannel.read(byteBuffer);
// 如果有数据,把数据打印出来
if (len > 0) {
System.out.println("接收到消息:" + new String(byteBuffer.array()));
key.attach("服务端已收到消息");
key.interestOps(SelectionKey.OP_WRITE);
} else if (len == -1) { // 如果客户端断开连接,关闭Socket
System.out.println("客户端断开连接");
socketChannel.close();
}
}else if(key.isWritable()){
SocketChannel socketChannel = (SocketChannel) key.channel();
String msg = (String) key.attachment();
key.attach(null);
socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
key.interestOps(SelectionKey.OP_READ);
}
//从事件集合里删除本次处理的key,防止下次select重复处理
iterator.remove();
}
}
}
上面的代码中通过创建一个 Selector,将 Channel 绑定对应的事件(read、write、accept、connect)注册到 Selector 中,返回一个 SelectionKey 于该 Channel 绑定,与。Selector调用 select() 方法开始监听事件的发生,如果没有事件则会阻塞直到新的事件到来。
SelectionKey 是一个抽象类,表示 selectableChannel 在 Selector 中注册的标识。每个Channel 向 Selector 注册时,都将会创建一个 SelectionKey。SelectionKey 将 Channel 与 Selector 建立了关系,并维护了 channel 事件。
可以通过 cancel 方法取消键,取消的键不会立即从 selector 中移除,而是添加到 cancelledKeys 中,在下一次 select 操作时移除它.所以在调用某个 key 时,需要使用 isValid 进行校。
在向 Selector 对象注册感兴趣的事件时,JAVA NIO 共定义了四种事件:
/**
* Operation-set bit for read operations.
*
* <p> Suppose that a selection key's interest set contains
* <tt>OP_READ</tt> at the start of a <a
* href="Selector.html#selop">selection operation</a>. If the selector
* detects that the corresponding channel is ready for reading, has reached
* end-of-stream, has been remotely shut down for further reading, or has
* an error pending, then it will add <tt>OP_READ</tt> to the key's
* ready-operation set and add the key to its selected-key set. </p>
*/
public static final int OP_READ = 1 << 0;
/**
* Operation-set bit for write operations.
*
* <p> Suppose that a selection key's interest set contains
* <tt>OP_WRITE</tt> at the start of a <a
* href="Selector.html#selop">selection operation</a>. If the selector
* detects that the corresponding channel is ready for writing, has been
* remotely shut down for further writing, or has an error pending, then it
* will add <tt>OP_WRITE</tt> to the key's ready set and add the key to its
* selected-key set. </p>
*/
public static final int OP_WRITE = 1 << 2;
/**
* Operation-set bit for socket-connect operations.
*
* <p> Suppose that a selection key's interest set contains
* <tt>OP_CONNECT</tt> at the start of a <a
* href="Selector.html#selop">selection operation</a>. If the selector
* detects that the corresponding socket channel is ready to complete its
* connection sequence, or has an error pending, then it will add
* <tt>OP_CONNECT</tt> to the key's ready set and add the key to its
* selected-key set. </p>
*/
public static final int OP_CONNECT = 1 << 3;
/**
* Operation-set bit for socket-accept operations.
*
* <p> Suppose that a selection key's interest set contains
* <tt>OP_ACCEPT</tt> at the start of a <a
* href="Selector.html#selop">selection operation</a>. If the selector
* detects that the corresponding server-socket channel is ready to accept
* another connection, or has an error pending, then it will add
* <tt>OP_ACCEPT</tt> to the key's ready set and add the key to its
* selected-key set. </p>
*/
public static final int OP_ACCEPT = 1 << 4;
OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT,分别对应读、写、请求连接、接受连接等网络 Socket 操作。这些时间的触发条件如下:
- OP_READ:当操作系统的读缓冲区有数据可读时就绪。并非时刻都有数据可读,所以一般需要注册该操作,仅当就绪时才发起读操作,避免浪费 CPU。
- OP_WRITE:当操作系统的写缓冲区有空闲空间时就绪。一般情况下写缓冲区都有空闲空间,小块数据直接写入即可,没必要注册该操作类型,否则该条件不断就绪浪费 CPU;但如果是写密集型的任务,比如文件下载等,缓冲区很可能满,注册该操作类型就很有必要,同时注意写完后取消注册。
- OP_CONNECT:当 SocketChannel.connect() 请求连接成功后就绪,该操作只给客户端使用。
- OP_ACCEPT:当接收到一个客户端连接请求时就绪,该操作只给服务器使用。
服务器启动 ServerSocketChannel,关注 OP_ACCEPT 事件。服务器接受连接,启动一个服务器的 SocketChannel,这个 SocketChannel 可以关注 OP_READ、OP_WRITE 事件,一般连接建立后会直接关注 OP_READ 事件。
客户端启动 SocketChannel,连接服务器,关注 OP_CONNECT 事件。客户端这边的客户端 SocketChannel 发现连接建立后,可以关注 OP_READ、OP_WRITE 事件,一般是需要客户端需要发送数据了才关注 OP_READ 事件。
连接建立后客户端与服务器端开始相互发送消息(读写),根据实际情况来关注OP_READ、 OP_WRITE 事件。
此外上面代码中调用了三个重要的方法 :
- Selector.open():创建多路复用器;
- socketChannel.regrister():将 Channel 注册到多路复用器上(真的是注册吗?);
- Selector.select():阻塞等待需要处理的事件发生;
这三个方法是 Selector 的核心方法,下面的内容中将从 JDK 源码(linux 版本)及 Hotspot 与 Linux 内核函数级别来讲解以上方法。
Selector.open()
首先创建一个 Provider,再由该 Provider 开启一个 Selector。
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
provider()
创建 provider,若没有则调用 create() 方法创建。
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
//调用create()方法创建provider
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
判断当前操作系统是否为 linux 操作系统,如果是则通过反射创建 EPollSelectorProvider 对象。
public static SelectorProvider create() {
String osname = (String)AccessController.doPrivileged(new GetPropertyAction("os.name"));
if (osname.equals("SunOS")) {
return createProvider("sun.nio.ch.DevPollSelectorProvider");
} else {
return (SelectorProvider)(osname.equals("Linux") ?
//判断当前是否是linux系统,是则利用创建EPollSelectorProvider
createProvider("sun.nio.ch.EPollSelectorProvider") : new PollSelectorProvider());
}
}
-------------------------------
private static SelectorProvider createProvider(String cn) {
Class c;
try {
//获取class对象
c = Class.forName(cn);
} catch (ClassNotFoundException var4) {
throw new AssertionError(var4);
}
try {
//利用反射创建实例
return (SelectorProvider)c.newInstance();
} catch (InstantiationException | IllegalAccessException var3) {
throw new AssertionError(var3);
}
}
openSelector()
创建完 EPollSelectorProvider 对象后,紧接着调用其 openSelector() 方法,直接返回 JDK 封装好的 EPollSelectorImpl 对象,在该对象的构造方法里面开始创建 Linux 底层的 epoll 实例。
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
---------------------------
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
long pipeFds = IOUtil.makePipe(false);
this.fd0 = (int)(pipeFds >>> 32);
this.fd1 = (int)pipeFds;
try {
this.pollWrapper = new EPollArrayWrapper();
this.pollWrapper.initInterrupt(this.fd0, this.fd1);
this.fdToKey = new HashMap();
} catch (Throwable var8) {
.........省略
}
}
里面创建了一个 EpollArrayWrapper 包装类,该类初始化时调用 native 方法创建了一个 epoll 实例。
EPollArrayWrapper() throws IOException {
......
this.epfd = this.epollCreate();
......
}
----------------------
private native int epollCreate()
系统调用 epoll_create
Hotspot 底层调用 linux 的 epoll_create(int size) 函数,返回一个 epfd(epoll 对应的文件描述符)
到底什么是 epoll 以及其实现原理,我会在下一篇文章中具体介绍,大家这里留一个印象
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
//调用linux的 epoll_create 函数,返回一个epoll对应的文件描述符
int epfd = epoll_create(256);
if (epfd < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed")
}
return epfd;
}
Linux 对 epoll_create 函数描述如下,创建一个 epoll 结构体,并返回一个非负数作为文件描述符,用于对 epoll 接口的所有后续调用。在 epoll 结构体中有两个重要的成员:红黑树,是用于存储注册进来的 socket 链接;就绪事件链表(双向链表),用于存储就绪的事件。
参数 size 代表可能会容纳 size 个描述符,但 size 不是一个最大值,只是提示操作系统它的数量级,现在这个参数基本上已经弃用了。
Selector.open() 方法就是在操作系统底层创建了一个 epoll 实例,返回了该实例的一个文件描述符。
SocketChannel.register()
register() 方法把表面上是把 socketChannel 注册到 Selector 上并指定该 socketChannel 感兴趣的事件,但真正的注册其实不是在这一步完成。该方法主要就是获取所有的 SocketChannel 并存起来。
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException{
synchronized (regLock) {
..............
SelectionKey k = findKey(sel);
..............
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
//将socketChannel注册到Selector上
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
implRegister()
创建 SelectionKey 对象,绑定对应的 SocketChannel,并调用上一步创建出来的EPollSelectorImpl 对象的 implRegister() 方法:
protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {
if (!(var1 instanceof SelChImpl)) {
throw new IllegalSelectorException();
} else {
//将SocketChannel传进去,返回一个对应的SelectionKey
SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);
var4.attach(var3);
synchronized(this.publicKeys) {
//将SelectionKey传进去,实现注册
this.implRegister(var4);
}
//指定感兴趣的事件
var4.interestOps(var2);
return var4;
}
}
-----------------------------
SelectionKeyImpl(SelChImpl var1, SelectorImpl var2) {
this.channel = var1;
this.selector = var2;
}
将 SocketChannel 的 fd(文件描述符)添加到第一步创建的 EpollArrayWrapper 包装类中集合里面。
protected void implRegister(SelectionKeyImpl ski) {
if (this.closed) {
throw new ClosedSelectorException();
} else {
SelChImpl ch = ski.channel;
int fd = Integer.valueOf(ch.getFDVal());
this.fdToKey.put(fd, ski);
//把SocketChannel对应添加到EpollArrayWrapper包装类中结合里面
this.pollWrapper.add(fd);
this.keys.add(ski);
}
}
Selector.select()
该方法非常重要,主要做的是有两件:
- 将 SocketChannel 添加到 epoll 池中;
- 等待 epoll 中的 IO 事件;
该方法最终会调用 EPollSelectorImpl 的 doSelect()方法:
protected int doSelect(long timeout) throws IOException {
if (this.closed) {
throw new ClosedSelectorException();
} else {
this.processDeregisterQueue();
try {
this.begin();
this.pollWrapper.poll(timeout);
} finally {
this.end();
}
//将已经cancel的SocketChannel从pollWrapper中移除
this.processDeregisterQueue();
int numKeysUpdated = this.updateSelectedKeys();
//中断处理
if (this.pollWrapper.interrupted()) {
this.pollWrapper.putEventOps(this.pollWrapper.interruptedIndex(), 0);
synchronized(this.interruptLock) {
this.pollWrapper.clearInterrupted();
IOUtil.drain(this.fd0);
this.interruptTriggered = false;
}
}
return numKeysUpdated;
}
}
里面调用了最关键的 poll() 方法,调用了两个本地方法 epollCtl() 和 epollWait()。
int poll(long timeout) throws IOException {
this.updateRegistrations();
this.updated = this.epollWait(this.pollArrayAddress, NUM_EPOLLEVENTS, timeout, this.epfd);
for(int i = 0; i < this.updated; ++i) {
if (this.getDescriptor(i) == this.incomingInterruptFD) {
this.interruptedIndex = i;
this.interrupted = true;
break;
}
}
return this.updated;
}
系统调用 epollCtl()
在updateRegistrations()方法里调用了native方法 epollCtl(),完成真正的注册。
private void updateRegistrations() {
synchronized(this.updateLock) {
for(int j = 0; j < this.updateCount; ++j) {
int fd = this.updateDescriptors[j];
short events = (short)this.getUpdateEvents(fd);
boolean isRegistered = this.registered.get(fd);
int opcode = false;
if (events != -1) {
int opcode;
if (isRegistered) {
opcode = events != 0 ? 3 : 2;
} else {
opcode = events != 0 ? 1 : 0;
}
if (opcode != 0) {
//当opcode的值对应为EPOLL_CTL_ADD,真正注册SocketChannel到epoll上
this.epollCtl(this.epfd, opcode, fd, events);
if (opcode == 1) {
this.registered.set(fd);
} else if (opcode == 2) {
this.registered.clear(fd);
}
}
}
}
this.updateCount = 0;
}
}
epollCtl() 底层调用Linux内核函数
epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
该方法对 epoll 池实例所对应的文件描述符执行参数 op 所指定的操作,同时为该 fd 绑定相关事件。
参数解释
epfd:
epoll实例对应的文件描述符
fd:
SocketChannel对应的文件描述符
*event:
该结构体中通过 events 指定了 SocketChannel 感兴趣的事件,data 保存了当事件触发时相关的数据
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
typedef{
void *ptr;
int fd;
__uint32_t u32;
__uint61_t u64;
}epoll_data_t
events表示感兴趣的事件,常用的有以下几种
EPOLLIN :表示对应的文件描述符是可读的; EPOLLOUT:表示对应的文件描述符是可写的; EPOLLERR:表示对应的文件描述符发生了错误 EPOLLRDHUP:表示对应的文件描述符被挂断
op:
对epoll对应的文件描述符对应的操作
EPOLL_CTL_ADD:注册新的fd(SocketChannel)到epfd(epoll)中,并关联事件event EPOLL_CTL_MOD:修改已经注册的fd的监听事件; EPOLL_CTL_DEL:从epfd中移除fd,并且忽略掉绑定的event,这时event可以为null
系统调用 epollWait()
epollWait() 是一个 native 方法,Hotspot底层调用了 Linux 内核函数:
int epoll_wait (int epfd, struct epoll_event *events, int maxevents, int timeout)
该方法负责从 epoll 池中的就绪事件列表中获取发生的事件。该方法会阻塞,只要有 “事件” 发生就会马上唤醒。
等待 Epoll 文件描述符 epfd 上的事件发生,其实就是往就绪事件列表中获取已发生的事件,如果有事件发生则放入到一个集合中,没有则阻塞等待。
Set
selectionKeys = selector.selectedKeys() 拿到所有响应的事件
以上就是 JDK 中 Selector 的实现机制源码,这里涉及到 LInux 底层的 epoll 模型,在下一篇文章我会为大家详解介绍其工作原理和其高性能原因。