Unix系统有五种IO模型分别是阻塞IO(blocking IO),非阻塞IO( non-blocking IO),IO多路复用(IO multiplexing),信号驱动(SIGIO/Signal IO)和异步IO(Asynchronous IO)。而IO多路复用通常有select,poll,epoll,kqueue等方式。而多路复用器Selector,就是采用这些IO多路复用的机制获取事件。JDK中的NIO(new IO)包,采用的就是IO多路复用的模型。
select,poll和epoll
阻塞IO下,应用程序调用IO函数,如果没有数据贮备好,那么IO操作会一直阻塞下去,阻塞IO不会占用大量CPU,但在这种IO模型下,一个线程只能处理一个文件描述符的IO事件;非阻塞IO下,应用程序调用IO事件,如果数据没有准备好,会直接返回一个错误,应用程序不会阻塞,这样就可以同时处理多个文件描述符的IO事件,但是需要不间断地轮询来获取IO事件,对CPU是很大的浪费。并且阻塞IO和非阻塞IO调用一个IO函数只能获取一个IO事件。
select,poll和epoll是最常见的三种IO多路复用的方式,它们都支持同时感知多个IO事件,它们的工作特点和区别如下:
- select可以在一定时间内监视多个文件描述符的IO事件,select函数需要传入要监视的文件描述符的句柄作为参数,并且返回所有文件描述符,应用程序需要遍历所有的循环来看每一个文件描述符是否有IO事件发生,效率较低。并且,select默认只能监视1024个文件描述符,这些文件描述符采用数组进行存储,可以修改FD_SETSIZE的值来修改文件描述符的数量限制。
- poll和select类似,poll采用链表存储监视的文件描述符,可以超过1024的限制。
- epoll可以监控的文件描述符数量是可以打开文件的数量上限。与select和poll不同,epoll获取事件不是通过轮询得到,而是通过给每个文件描述符定义回调得到,因此,在监视的文件描述符很多的情况下,epoll的效率不会有明显的下降。并且,select和poll返回给应用程序的是所有的文件描述符,而epoll返回的是就绪(有事件发生的)的文件描述符。
JDK NIO包中的各种Selector
JDK中的Selector是一个抽象类,创建一个Selector通常以下面代码中的方式进行:
/*** 代码片段1 创建Selector*/Selector selector = Selector.open();
下面是具体的实现:
/*** 代码片段2 Selector中的open方法和SelectorProvider中的provider方法*///调用SelectorProvider的openSelector创建Selectorpublic static Selector open() throws IOException {return SelectorProvider.provider().openSelector();}//创建SelectorProvider,最终调用sun.nio.ch.DefaultSelectorProvider.create(), 这个方法在不同平台上有不同的实现public static SelectorProvider provider() {synchronized (lock) {if (provider != null)return provider;return AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {public SelectorProvider run() {if (loadProviderFromProperty())return provider;if (loadProviderAsService())return provider;provider = sun.nio.ch.DefaultSelectorProvider.create();return provider;}});}}
在不同的操作系统平台下,SelectorProvider的实现也不相同,创建出来的Selector的实现也不一样。
windows下的多路复用实现
windows下的jdk中只有一个非抽象的SelectorProvider的实现类——WindowsSelectorProvider。显然,sun.nio.ch.DefaultSelectorProvider.create()返回的也是一个WindowsSelectorProvider对象:
/*** 代码片段3 windows环境下jdk中的sun.nio.ch.DefaultSelectorProvider.create()方法*/public static SelectorProvider create() {return new WindowsSelectorProvider();}
WindowsSelectorProvider的openSelector方法会返回一个WindowsSelectorImpl对象,WindowsSelectorImpl继承了SelectorImpl这个抽象类:
/*** 代码片段4*/public AbstractSelector openSelector() throws IOException {return new WindowsSelectorImpl(this);}
WindowsSelectorImpl的成员变量pollWrapper是一个PollArrayWrapper对象,PollArrayWrapper类在openjdk的源码中有这样的一段文档注释:
/*** 代码片段5 windows下的PollArrayWrapper类中的注释*//*** Manipulates a native array of structs corresponding to (fd, events) pairs.** typedef struct pollfd {* SOCKET fd; // 4 bytes* short events; // 2 bytes* } pollfd_t;** @author Konstantin Kladko* @author Mike McCloskey*/
PollArrayWrapper是用来操作(fd,events)对相对应的结构的原生数组。这个原生数组的结构就是上面的注释中的结构体所定义的。PollArrayWrapper类中通过操作AllocatedNativeObject类型的成员变量pollArray来操作文件描述符和事件。AllocatedNativeObject类继承了NativeObject类,NativeObject类型是驻留在本地内存中的对象的代理,提供了在堆外内存中存放和取出除boolean外的基本类型数据的方法。以byte类型为例,其存取方法如下:
/*** 代码片段6 NativeObject中的getByte和putByte方法*//*** Reads a byte starting at the given offset from base of this native* object.** @param offset* The offset at which to read the byte** @return The byte value read*/final byte getByte(int offset) {return unsafe.getByte(offset + address);}/*** Writes a byte at the specified offset from this native object's* base address.** @param offset* The offset at which to write the byte** @param value* The byte value to be written*/final void putByte(int offset, byte value) {unsafe.putByte(offset + address, value);}
PollArrayWrapper中提供了方法存储事件和文件描述符,这些方法都通过来pollArray存取int和short类型,这些方法和PollArrayWrapper构造方法如下:
/*** 代码片段7 PollArrayWrapper构造方法和对文件描述符和事件的操作方法*/PollArrayWrapper(int newSize) {int allocationSize = newSize * SIZE_POLLFD;pollArray = new AllocatedNativeObject(allocationSize, true);pollArrayAddress = pollArray.address();this.size = newSize;}// Access methods for fd structuresvoid putDescriptor(int i, int fd) {pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);}void putEventOps(int i, int event) {pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);}int getEventOps(int i) {return pollArray.getShort(SIZE_POLLFD * i + EVENT_OFFSET);}int getDescriptor(int i) {return pollArray.getInt(SIZE_POLLFD * i + FD_OFFSET);}
因为pollfd结构体中,fd占用4个字节,events占用2个字节分别对应int和short的长度。FD_OFFSET,EVENT_OFFSET和SIZE_POLLFD分别是final修饰的int常量0,4和8。PollArrayWrapper会用8个字节来存储一个event和fd的配对,构造一个PollArrayWrapper对象会从堆外内存分配newSize_8字节的空间。获取第i个fd则获取对应的第8_i个字节对应的int,获取第i个event则只要获取第8*i+4个字节对应的short。所以构造一个size大小的PollArrayWrapper对象就可以存储size个fd,event对,并且他们在内存上是连续的(每对的空间末尾有2个字节用不到),所以这也是一个数组。
Selector中的doSelect方法是具体对文件描述符的操作,WindowsSelectorImpl中的doSelector方法如下:
/*** 代码片段8 WindowsSelectorImpl内部类SubSelector的poll方法中的doSelect方法及其调用的方法具体实现*/protected int doSelect(long timeout) throws IOException {if (channelArray == null)throw new ClosedSelectorException();this.timeout = timeout; // set selector timeoutprocessDeregisterQueue();if (interruptTriggered) {resetWakeupSocket();return 0;}// 计算轮询所需的辅助线程数。如果需要,在这里创建线程并开始等待startLockadjustThreadsCount();// 重置finishLockfinishLock.reset();// 唤醒辅助线程,等待启动锁,线程启动后会开始轮询。冗余线程将在唤醒后退出。startLock.startThreads();// 在主线程中进行轮询。主线程负责pollArray中的前MAX_SELECTABLE_FDS(默认1024)个fd,event对。try {begin();try {subSelector.poll();} catch (IOException e) {// 保存异常finishLock.setException(e);}// 主线程poll()调用结束。唤醒其他线程并等待他们if (threads.size()0)finishLock.waitForHelperThreads();} finally {end();}finishLock.checkForException();processDeregisterQueue();// 更新相应channel的操作。将就绪的key添加到就绪队列。int updated = updateSelectedKeys();// poll()调用完成。为下一次运行,将wakeupSocket设置为nonsigned。resetWakeupSocket();return updated;}//WindowsSelectorImpl内部类SubSelector的poll方法private int poll() throws IOException{ // poll for the main threadreturn poll0(pollWrapper.pollArrayAddress,Math.min(totalChannels, MAX_SELECTABLE_FDS),readFds, writeFds, exceptFds, timeout);}//WindowsSelectorImpl内部类SubSelector的poll0方法private native int poll0(long pollAddress, int numfds,int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
poll0方法的C语言源码:
/*** 代码片段9 WindowsSelectorImpl内部类SubSelector的poll方法的c源码*/JNIEXPORT jint JNICALLJava_sun_nio_ch_WindowsSelectorImpl_00024SubSelector_poll0(JNIEnv *env, jobject this,jlong pollAddress, jint numfds,jintArray returnReadFds, jintArray returnWriteFds,jintArray returnExceptFds, jlong timeout){... //省略部分代码/* Call select */if ((result = select(0 , &readfds, &writefds, &exceptfds, tv))//调用系统的select函数== SOCKET_ERROR) {/* Bad error - this should not happen frequently *//* Iterate over sockets and call select() on each separately */FD_SET errreadfds, errwritefds, errexceptfds;readfds.fd_count = 0;writefds.fd_count = 0;exceptfds.fd_count = 0;for (i = 0; i < numfds; i++) {/* prepare select structures for the i-th socket */errreadfds.fd_count = 0;errwritefds.fd_count = 0;if (fds[i].events & POLLIN) {errreadfds.fd_array[0] = fds[i].fd;errreadfds.fd_count = 1;}if (fds[i].events & (POLLOUT | POLLCONN)){errwritefds.fd_array[0] = fds[i].fd;errwritefds.fd_count = 1;}errexceptfds.fd_array[0] = fds[i].fd;errexceptfds.fd_count = 1;/* call select on the i-th socket */if (select(0, &errreadfds, &errwritefds, &errexceptfds, &zerotime)//调用系统的select函数== SOCKET_ERROR) {/* This socket causes an error. Add it to exceptfds set */exceptfds.fd_array[exceptfds.fd_count] = fds[i].fd;exceptfds.fd_count++;} else {/* This socket does not cause an error. Process result */if (errreadfds.fd_count == 1) {readfds.fd_array[readfds.fd_count] = fds[i].fd;readfds.fd_count++;}if (errwritefds.fd_count == 1) {writefds.fd_array[writefds.fd_count] = fds[i].fd;writefds.fd_count++;}if (errexceptfds.fd_count == 1) {exceptfds.fd_array[exceptfds.fd_count] = fds[i].fd;exceptfds.fd_count++;}}}}... //省略部分代码}
可见Window环境的JDK的nio是调用select系统函数来进行的。
linux下的多路复用实现
linux的jdk中有2个非抽象的Selector的子类——PollSelectorImpl和EPollSelectorImpl。
PollSelectorImpl
顾名思义,PollSelectorImpl是采用poll来进行多路复用。PollSelectorImpl继承了AbstractPollSelectorImpl。AbstractPollSelectorImpl中也维护了一个PollArrayWrapper来存储文件描述符和事件对,但linux下的PollArrayWrapper和windows下的实现并不相同。先看PollSelectorImpl的doSelect方法如下:
/*** 代码片段10 PollSelectorImpl的doSelect方法*/protected int doSelect(long timeout)throws IOException{if (channelArray == null)throw new ClosedSelectorException();processDeregisterQueue();try {begin();pollWrapper.poll(totalChannels, 0, timeout);} finally {end();}processDeregisterQueue();// 将pollfd结构中的信息复制到相应通道的ops中。将就绪的key添加到就绪队列。int numKeysUpdated = updateSelectedKeys();if (pollWrapper.getReventOps(0) != 0) {// Clear the wakeup pipepollWrapper.putReventOps(0, 0);synchronized (interruptLock) {IOUtil.drain(fd0);interruptTriggered = false;}}return numKeysUpdated;}
doSelect方法中会调用PollArrayWrapper中的poll方法,linux下的PollArrayWrapper和windows下的不太一样。PollArrayWrapper源码中的文档注释如下:
/*** 代码片段11 linux下的PollArrayWrapper类中的注释*//*** Manipulates a native array of pollfd structs on Solaris:** typedef struct pollfd {* int fd;* short events;* short revents;* } pollfd_t;** @author Mike McCloskey* @since 1.4*/
可以发现,与windows的相比,linux下的PollArrayWrapper操作的结构体多了一个revents(实际发生的事件)的字段,linux下的PollArrayWrapper类继承了抽象类AbstractPollArrayWrapper,AbstractPollArrayWrapper定义了对文件描述符和事件的操作方法:
/*** 代码片段12 AbstractPollArrayWrapper中定义的几个final常量和对文件描述符、事件的操作方法*/static final short SIZE_POLLFD = 8;static final short FD_OFFSET = 0;static final short EVENT_OFFSET = 4;static final short REVENT_OFFSET = 6;protected AllocatedNativeObject pollArray;// Access methods for fd structuresint getEventOps(int i) {int offset = SIZE_POLLFD * i + EVENT_OFFSET;return pollArray.getShort(offset);}int getReventOps(int i) {int offset = SIZE_POLLFD * i + REVENT_OFFSET;return pollArray.getShort(offset);}int getDescriptor(int i) {int offset = SIZE_POLLFD * i + FD_OFFSET;return pollArray.getInt(offset);}void putEventOps(int i, int event) {int offset = SIZE_POLLFD * i + EVENT_OFFSET;pollArray.putShort(offset, (short)event);}void putReventOps(int i, int revent) {int offset = SIZE_POLLFD * i + REVENT_OFFSET;pollArray.putShort(offset, (short)revent);}void putDescriptor(int i, int fd) {int offset = SIZE_POLLFD * i + FD_OFFSET;pollArray.putInt(offset, fd);}
可见,linux下的PollArrayWrapper中pollArray的每8个字节的后两个字节不是空,而是存储着两个字节的revents。PollArrayWrapper的poll方法如下:
/*** 代码片段13 PollArrayWrapper中poll方法*/int poll(int numfds, int offset, long timeout) {return poll0(pollArrayAddress + (offset * SIZE_POLLFD),numfds, timeout);}private native int poll0(long pollAddress, int numfds, long timeout);
poll0方法的c语言源码:
/*** 代码片段14 PollArrayWrapper中poll方法的c源码*/JNIEXPORT jint JNICALLJava_sun_nio_ch_PollArrayWrapper_poll0(JNIEnv *env, jobject this,jlong address, jint numfds,jlong timeout){struct pollfd *a;int err = 0;a = (struct pollfd *) jlong_to_ptr(address);if (timeout <= 0) { /* Indefinite or no wait *///如果timeout<=0,立即调用系统的poll函数RESTARTABLE (poll(a, numfds, timeout), err);} else { /* Bounded wait; bounded restarts *///如果timeout>0,会循环的调用poll函数直到到了timeout的时间err = ipoll(a, numfds, timeout);}if (err < 0) {JNU_ThrowIOExceptionWithLastError(env, "Poll failed");}return (jint)err;}static int ipoll(struct pollfd fds[], unsigned int nfds, int timeout){jlong start, now;int remaining = timeout;struct timeval t;int diff;gettimeofday(&t, NULL);start = t.tv_sec * 1000 + t.tv_usec / 1000;for (;;) {//调用poll函数 remaining是剩余的timeout,其实也就调用一次,用循环应该是为了防止poll函数的进程被异常唤醒int res = poll(fds, nfds, remaining);if (res < 0 && errno == EINTR) {if (remaining >= 0) {gettimeofday(&t, NULL);now = t.tv_sec * 1000 + t.tv_usec / 1000;diff = now - start;remaining -= diff;if (diff < 0 || remaining <= 0) {return 0;}start = now;}} else {return res;}}}
可见PollSelectorImpl确实是调用系统的poll函数实现多路复用的。
EPollSelectorImpl
EPollSelectorImpl中使用EPollArrayWrapper来操作文件描述符和事件,EPollArrayWrapper中的对EPoll事件结构体的文档注释:
/*** 代码片段15 EPollArrayWrapper类中的注释*//*** Manipulates a native array of epoll_event structs on Linux:** typedef union epoll_data {* void *ptr;* int fd;* __uint32_t u32;* __uint64_t u64;* } epoll_data_t;** struct epoll_event {* __uint32_t events;* epoll_data_t data;* };** The system call to wait for I/O events is epoll_wait(2). It populates an* array of epoll_event structures that are passed to the call. The data* member of the epoll_event structure contains the same data as was set* when the file descriptor was registered to epoll via epoll_ctl(2). In* this implementation we set data.fd to be the file descriptor that we* register. That way, we have the file descriptor available when we* process the events.*/
等待IO时间的系统调用函数是epoll_wait(2),它填充了一个epoll_event结构体的数组,这个数组被传递给系统调用。epoll_event结构的数据成员包含的数据与通过epoll_ctl(2)将文件描述符注册到epoll时设置的数据相同。在这个实现中,我们将data.fd设置为注册的文件描述符。这样,我们在处理事件时就有了可用的文件描述符。
很明显,EPollSelectorImpl中操作的结构体大小比PollSelectorImpl要大,这里不一一解读了。EPoll的调用和select、poll不同,需要调用三个系统函数,分别是epoll_create,epoll_ctl 和 epoll_wait,这点在JDK NIO中也得到验证。在EPollArrayWrapper创建时会调用epollCreate方法:
/*** 代码片段16 EPollArrayWrapper的构造方法和构造方法中调用的epollCreate方法*/EPollArrayWrapper() throws IOException {// creates the epoll file descriptorepfd = epollCreate();// the epoll_event array passed to epoll_waitint allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;pollArray = new AllocatedNativeObject(allocationSize, true);pollArrayAddress = pollArray.address();// eventHigh needed when using file descriptors > 64kif (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)eventsHigh = new HashMap<>();}private native int epollCreate();
这里的epollCreate方法也就是进行epoll_create系统调用,创建一个EPoll实例。以下是C源码:
/*** 代码片段17 epollCreate方法的c源码*/JNIEXPORT jint JNICALLJava_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this){/** epoll_create expects a size as a hint to the kernel about how to* dimension internal structures. We can't predict the size in advance.*///进行epoll_create系统调用int epfd = epoll_create(256);if (epfd < 0) {JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");}return epfd;}
EPollSelectorImpl的构造方法中创建完成一个EPollArrayWrapper实例后,会执行该实例的initInterrupt方法,这个方法中调用了epollCtl方法:
/*** 代码片段18 EPollSelectorImpl的构造方法、构造方法中调用的EPollArrayWrapper中的initInterrupt方法* 和initInterrupt中调用的epollCtl方法*//*** Package private constructor called by factory method in* the abstract superclass Selector.*/EPollSelectorImpl(SelectorProvider sp) throws IOException {super(sp);long pipeFds = IOUtil.makePipe(false);fd0 = (int) (pipeFds >>> 32);fd1 = (int) pipeFds;pollWrapper = new EPollArrayWrapper();//调用initInterrupt方法pollWrapper.initInterrupt方法(fd0, fd1);fdToKey = new HashMap<>();}void initInterrupt(int fd0, int fd1) {outgoingInterruptFD = fd1;incomingInterruptFD = fd0;//调用epollCtlepollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);}private native void epollCtl(int epfd, int opcode, int fd, int events);
这里的epollCtl方法也就是进行epoll_ctl系统调用,往刚刚创建的EPoll实例中添加要监控的事件。以下是C源码:
/*** 代码片段19 epollCtl方法的c源码*/JNIEXPORT void JNICALLJava_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,jint opcode, jint fd, jint events){struct epoll_event event;int res;event.events = events;event.data.fd = fd;//调用epoll_ctlRESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);/** A channel may be registered with several Selectors. When each Selector* is polled a EPOLL_CTL_DEL op will be inserted into its pending update* list to remove the file descriptor from epoll. The "last" Selector will* close the file descriptor which automatically unregisters it from each* epoll descriptor. To avoid costly synchronization between Selectors we* allow pending updates to be processed, ignoring errors. The errors are* harmless as the last update for the file descriptor is guaranteed to* be EPOLL_CTL_DEL.*/if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");}}
EPollSelectorImpl的doSelect方法会调用EPollArrayWrapper的poll方法,而在poll方法中会调用epollWait:
/*** 代码片段20 EPollSelectorImpl中的doSelect方法、doSelect方法中调用的EPollArrayWrapper的poll方法* 和poll方法中调用的epollWait方法*/protected int doSelect(long timeout) throws IOException {if (closed)throw new ClosedSelectorException();processDeregisterQueue();try {begin();pollWrapper.poll(timeout);} finally {end();}processDeregisterQueue();int numKeysUpdated = updateSelectedKeys();if (pollWrapper.interrupted()) {// Clear the wakeup pipepollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);synchronized (interruptLock) {pollWrapper.clearInterrupted();IOUtil.drain(fd0);interruptTriggered = false;}}return numKeysUpdated;}int poll(long timeout) throws IOException {//更新注册信息,如果监视的实践发生变化,会调用epoll_ctl往Epoll实例中增加或删除事件updateRegistrations();//调用epollWaitupdated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);for (int i=0; i<updated; i++) {if (getDescriptor(i) == incomingInterruptFD) {interruptedIndex = i;interrupted = true;break;}}return updated;}private native int epollWait(long pollAddress, int numfds, long timeout,int epfd) throws IOException;
这里的epollWait方法也就是进行epoll_wait系统调用,调用者进程被挂起,在等待内核I/O事件的分发。以下是C源码:
/*** 代码片段21 epollWait方法的c源码*/JNIEXPORT jint JNICALLJava_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,jlong address, jint numfds,jlong timeout, jint epfd){struct epoll_event *events = jlong_to_ptr(address);int res;if (timeout <= 0) { /* Indefinite or no wait *///如果timeout<=0,立即调用系统的epoll_wait函数RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);} else { /* Bounded wait; bounded restarts *///如果timeout>0,循环调用直到超时时间到了,用循环应该是为了防止异常唤醒res = iepoll(epfd, events, numfds, timeout);}if (res < 0) {JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");}return res;}
总结
至此,本文已经对三种IO多路复用技术和在JDK中的应用进行了解读。在windows环境下,JDK NIO中只有WindowsSelectorImpl这有一个Selector的非抽象实现,采用的IO多路复用方式是select;在linux环境下PollSelectorImpl和EPollSelectorImpl两种实现,分别采用poll和epoll实现IO多路复用。本文还对这些Selector的具体实现进行了详细的解读,不足之处,敬请指正。
