Unix系统有五种IO模型分别是阻塞IO(blocking IO),非阻塞IO( non-blocking IO),IO多路复用(IO multiplexing),信号驱动(SIGIO/Signal IO)和异步IO(Asynchronous IO)。而IO多路复用通常有select,poll,epoll,kqueue等方式。而多路复用器Selector,就是采用这些IO多路复用的机制获取事件。JDK中的NIO(new IO)包,采用的就是IO多路复用的模型。

select,poll和epoll

阻塞IO下,应用程序调用IO函数,如果没有数据贮备好,那么IO操作会一直阻塞下去,阻塞IO不会占用大量CPU,但在这种IO模型下,一个线程只能处理一个文件描述符的IO事件;非阻塞IO下,应用程序调用IO事件,如果数据没有准备好,会直接返回一个错误,应用程序不会阻塞,这样就可以同时处理多个文件描述符的IO事件,但是需要不间断地轮询来获取IO事件,对CPU是很大的浪费。并且阻塞IO和非阻塞IO调用一个IO函数只能获取一个IO事件。
select,poll和epoll是最常见的三种IO多路复用的方式,它们都支持同时感知多个IO事件,它们的工作特点和区别如下:

  • select可以在一定时间内监视多个文件描述符的IO事件,select函数需要传入要监视的文件描述符的句柄作为参数,并且返回所有文件描述符,应用程序需要遍历所有的循环来看每一个文件描述符是否有IO事件发生,效率较低。并且,select默认只能监视1024个文件描述符,这些文件描述符采用数组进行存储,可以修改FD_SETSIZE的值来修改文件描述符的数量限制。
  • poll和select类似,poll采用链表存储监视的文件描述符,可以超过1024的限制。
  • epoll可以监控的文件描述符数量是可以打开文件的数量上限。与select和poll不同,epoll获取事件不是通过轮询得到,而是通过给每个文件描述符定义回调得到,因此,在监视的文件描述符很多的情况下,epoll的效率不会有明显的下降。并且,select和poll返回给应用程序的是所有的文件描述符,而epoll返回的是就绪(有事件发生的)的文件描述符。

JDK NIO包中的各种Selector

JDK中的Selector是一个抽象类,创建一个Selector通常以下面代码中的方式进行:

  1. /**
  2. * 代码片段1 创建Selector
  3. */
  4. Selector selector = Selector.open();

下面是具体的实现:

  1. /**
  2. * 代码片段2 Selector中的open方法和SelectorProvider中的provider方法
  3. */
  4. //调用SelectorProvider的openSelector创建Selector
  5. public static Selector open() throws IOException {
  6. return SelectorProvider.provider().openSelector();
  7. }
  8. //创建SelectorProvider,最终调用sun.nio.ch.DefaultSelectorProvider.create(), 这个方法在不同平台上有不同的实现
  9. public static SelectorProvider provider() {
  10. synchronized (lock) {
  11. if (provider != null)
  12. return provider;
  13. return AccessController.doPrivileged(
  14. new PrivilegedAction<SelectorProvider>() {
  15. public SelectorProvider run() {
  16. if (loadProviderFromProperty())
  17. return provider;
  18. if (loadProviderAsService())
  19. return provider;
  20. provider = sun.nio.ch.DefaultSelectorProvider.create();
  21. return provider;
  22. }
  23. });
  24. }
  25. }

在不同的操作系统平台下,SelectorProvider的实现也不相同,创建出来的Selector的实现也不一样。

windows下的多路复用实现

windows下的jdk中只有一个非抽象的SelectorProvider的实现类——WindowsSelectorProvider。显然,sun.nio.ch.DefaultSelectorProvider.create()返回的也是一个WindowsSelectorProvider对象:

  1. /**
  2. * 代码片段3 windows环境下jdk中的sun.nio.ch.DefaultSelectorProvider.create()方法
  3. */
  4. public static SelectorProvider create() {
  5. return new WindowsSelectorProvider();
  6. }

WindowsSelectorProvider的openSelector方法会返回一个WindowsSelectorImpl对象,WindowsSelectorImpl继承了SelectorImpl这个抽象类:

  1. /**
  2. * 代码片段4
  3. */
  4. public AbstractSelector openSelector() throws IOException {
  5. return new WindowsSelectorImpl(this);
  6. }

WindowsSelectorImpl的成员变量pollWrapper是一个PollArrayWrapper对象,PollArrayWrapper类在openjdk的源码中有这样的一段文档注释:

  1. /**
  2. * 代码片段5 windows下的PollArrayWrapper类中的注释
  3. */
  4. /**
  5. * Manipulates a native array of structs corresponding to (fd, events) pairs.
  6. *
  7. * typedef struct pollfd {
  8. * SOCKET fd; // 4 bytes
  9. * short events; // 2 bytes
  10. * } pollfd_t;
  11. *
  12. * @author Konstantin Kladko
  13. * @author Mike McCloskey
  14. */

PollArrayWrapper是用来操作(fd,events)对相对应的结构的原生数组。这个原生数组的结构就是上面的注释中的结构体所定义的。PollArrayWrapper类中通过操作AllocatedNativeObject类型的成员变量pollArray来操作文件描述符和事件。AllocatedNativeObject类继承了NativeObject类,NativeObject类型是驻留在本地内存中的对象的代理,提供了在堆外内存中存放和取出除boolean外的基本类型数据的方法。以byte类型为例,其存取方法如下:

  1. /**
  2. * 代码片段6 NativeObject中的getByte和putByte方法
  3. */
  4. /**
  5. * Reads a byte starting at the given offset from base of this native
  6. * object.
  7. *
  8. * @param offset
  9. * The offset at which to read the byte
  10. *
  11. * @return The byte value read
  12. */
  13. final byte getByte(int offset) {
  14. return unsafe.getByte(offset + address);
  15. }
  16. /**
  17. * Writes a byte at the specified offset from this native object's
  18. * base address.
  19. *
  20. * @param offset
  21. * The offset at which to write the byte
  22. *
  23. * @param value
  24. * The byte value to be written
  25. */
  26. final void putByte(int offset, byte value) {
  27. unsafe.putByte(offset + address, value);
  28. }

PollArrayWrapper中提供了方法存储事件和文件描述符,这些方法都通过来pollArray存取int和short类型,这些方法和PollArrayWrapper构造方法如下:

  1. /**
  2. * 代码片段7 PollArrayWrapper构造方法和对文件描述符和事件的操作方法
  3. */
  4. PollArrayWrapper(int newSize) {
  5. int allocationSize = newSize * SIZE_POLLFD;
  6. pollArray = new AllocatedNativeObject(allocationSize, true);
  7. pollArrayAddress = pollArray.address();
  8. this.size = newSize;
  9. }
  10. // Access methods for fd structures
  11. void putDescriptor(int i, int fd) {
  12. pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
  13. }
  14. void putEventOps(int i, int event) {
  15. pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
  16. }
  17. int getEventOps(int i) {
  18. return pollArray.getShort(SIZE_POLLFD * i + EVENT_OFFSET);
  19. }
  20. int getDescriptor(int i) {
  21. return pollArray.getInt(SIZE_POLLFD * i + FD_OFFSET);
  22. }

因为pollfd结构体中,fd占用4个字节,events占用2个字节分别对应int和short的长度。FD_OFFSET,EVENT_OFFSET和SIZE_POLLFD分别是final修饰的int常量0,4和8。PollArrayWrapper会用8个字节来存储一个event和fd的配对,构造一个PollArrayWrapper对象会从堆外内存分配newSize_8字节的空间。获取第i个fd则获取对应的第8_i个字节对应的int,获取第i个event则只要获取第8*i+4个字节对应的short。所以构造一个size大小的PollArrayWrapper对象就可以存储size个fd,event对,并且他们在内存上是连续的(每对的空间末尾有2个字节用不到),所以这也是一个数组。

Selector中的doSelect方法是具体对文件描述符的操作,WindowsSelectorImpl中的doSelector方法如下:

  1. /**
  2. * 代码片段8 WindowsSelectorImpl内部类SubSelector的poll方法中的doSelect方法及其调用的方法具体实现
  3. */
  4. protected int doSelect(long timeout) throws IOException {
  5. if (channelArray == null)
  6. throw new ClosedSelectorException();
  7. this.timeout = timeout; // set selector timeout
  8. processDeregisterQueue();
  9. if (interruptTriggered) {
  10. resetWakeupSocket();
  11. return 0;
  12. }
  13. // 计算轮询所需的辅助线程数。如果需要,在这里创建线程并开始等待startLock
  14. adjustThreadsCount();
  15. // 重置finishLock
  16. finishLock.reset();
  17. // 唤醒辅助线程,等待启动锁,线程启动后会开始轮询。冗余线程将在唤醒后退出。
  18. startLock.startThreads();
  19. // 在主线程中进行轮询。主线程负责pollArray中的前MAX_SELECTABLE_FDS(默认1024)个fd,event对。
  20. try {
  21. begin();
  22. try {
  23. subSelector.poll();
  24. } catch (IOException e) {
  25. // 保存异常
  26. finishLock.setException(e);
  27. }
  28. // 主线程poll()调用结束。唤醒其他线程并等待他们
  29. if (threads.size()0)
  30. finishLock.waitForHelperThreads();
  31. } finally {
  32. end();
  33. }
  34. finishLock.checkForException();
  35. processDeregisterQueue();
  36. // 更新相应channel的操作。将就绪的key添加到就绪队列。
  37. int updated = updateSelectedKeys();
  38. // poll()调用完成。为下一次运行,将wakeupSocket设置为nonsigned。
  39. resetWakeupSocket();
  40. return updated;
  41. }
  42. //WindowsSelectorImpl内部类SubSelector的poll方法
  43. private int poll() throws IOException{ // poll for the main thread
  44. return poll0(pollWrapper.pollArrayAddress,
  45. Math.min(totalChannels, MAX_SELECTABLE_FDS),
  46. readFds, writeFds, exceptFds, timeout);
  47. }
  48. //WindowsSelectorImpl内部类SubSelector的poll0方法
  49. private native int poll0(long pollAddress, int numfds,
  50. int[] readFds, int[] writeFds, int[] exceptFds, long timeout);

poll0方法的C语言源码:

  1. /**
  2. * 代码片段9 WindowsSelectorImpl内部类SubSelector的poll方法的c源码
  3. */
  4. JNIEXPORT jint JNICALL
  5. Java_sun_nio_ch_WindowsSelectorImpl_00024SubSelector_poll0(JNIEnv *env, jobject this,
  6. jlong pollAddress, jint numfds,
  7. jintArray returnReadFds, jintArray returnWriteFds,
  8. jintArray returnExceptFds, jlong timeout)
  9. {
  10. ... //省略部分代码
  11. /* Call select */
  12. if ((result = select(0 , &readfds, &writefds, &exceptfds, tv))//调用系统的select函数
  13. == SOCKET_ERROR) {
  14. /* Bad error - this should not happen frequently */
  15. /* Iterate over sockets and call select() on each separately */
  16. FD_SET errreadfds, errwritefds, errexceptfds;
  17. readfds.fd_count = 0;
  18. writefds.fd_count = 0;
  19. exceptfds.fd_count = 0;
  20. for (i = 0; i < numfds; i++) {
  21. /* prepare select structures for the i-th socket */
  22. errreadfds.fd_count = 0;
  23. errwritefds.fd_count = 0;
  24. if (fds[i].events & POLLIN) {
  25. errreadfds.fd_array[0] = fds[i].fd;
  26. errreadfds.fd_count = 1;
  27. }
  28. if (fds[i].events & (POLLOUT | POLLCONN))
  29. {
  30. errwritefds.fd_array[0] = fds[i].fd;
  31. errwritefds.fd_count = 1;
  32. }
  33. errexceptfds.fd_array[0] = fds[i].fd;
  34. errexceptfds.fd_count = 1;
  35. /* call select on the i-th socket */
  36. if (select(0, &errreadfds, &errwritefds, &errexceptfds, &zerotime)//调用系统的select函数
  37. == SOCKET_ERROR) {
  38. /* This socket causes an error. Add it to exceptfds set */
  39. exceptfds.fd_array[exceptfds.fd_count] = fds[i].fd;
  40. exceptfds.fd_count++;
  41. } else {
  42. /* This socket does not cause an error. Process result */
  43. if (errreadfds.fd_count == 1) {
  44. readfds.fd_array[readfds.fd_count] = fds[i].fd;
  45. readfds.fd_count++;
  46. }
  47. if (errwritefds.fd_count == 1) {
  48. writefds.fd_array[writefds.fd_count] = fds[i].fd;
  49. writefds.fd_count++;
  50. }
  51. if (errexceptfds.fd_count == 1) {
  52. exceptfds.fd_array[exceptfds.fd_count] = fds[i].fd;
  53. exceptfds.fd_count++;
  54. }
  55. }
  56. }
  57. }
  58. ... //省略部分代码
  59. }

可见Window环境的JDK的nio是调用select系统函数来进行的。

linux下的多路复用实现

linux的jdk中有2个非抽象的Selector的子类——PollSelectorImpl和EPollSelectorImpl。

PollSelectorImpl

顾名思义,PollSelectorImpl是采用poll来进行多路复用。PollSelectorImpl继承了AbstractPollSelectorImpl。AbstractPollSelectorImpl中也维护了一个PollArrayWrapper来存储文件描述符和事件对,但linux下的PollArrayWrapper和windows下的实现并不相同。先看PollSelectorImpl的doSelect方法如下:

  1. /**
  2. * 代码片段10 PollSelectorImpl的doSelect方法
  3. */
  4. protected int doSelect(long timeout)
  5. throws IOException
  6. {
  7. if (channelArray == null)
  8. throw new ClosedSelectorException();
  9. processDeregisterQueue();
  10. try {
  11. begin();
  12. pollWrapper.poll(totalChannels, 0, timeout);
  13. } finally {
  14. end();
  15. }
  16. processDeregisterQueue();
  17. // 将pollfd结构中的信息复制到相应通道的ops中。将就绪的key添加到就绪队列。
  18. int numKeysUpdated = updateSelectedKeys();
  19. if (pollWrapper.getReventOps(0) != 0) {
  20. // Clear the wakeup pipe
  21. pollWrapper.putReventOps(0, 0);
  22. synchronized (interruptLock) {
  23. IOUtil.drain(fd0);
  24. interruptTriggered = false;
  25. }
  26. }
  27. return numKeysUpdated;
  28. }

doSelect方法中会调用PollArrayWrapper中的poll方法,linux下的PollArrayWrapper和windows下的不太一样。PollArrayWrapper源码中的文档注释如下:

  1. /**
  2. * 代码片段11 linux下的PollArrayWrapper类中的注释
  3. */
  4. /**
  5. * Manipulates a native array of pollfd structs on Solaris:
  6. *
  7. * typedef struct pollfd {
  8. * int fd;
  9. * short events;
  10. * short revents;
  11. * } pollfd_t;
  12. *
  13. * @author Mike McCloskey
  14. * @since 1.4
  15. */

可以发现,与windows的相比,linux下的PollArrayWrapper操作的结构体多了一个revents(实际发生的事件)的字段,linux下的PollArrayWrapper类继承了抽象类AbstractPollArrayWrapper,AbstractPollArrayWrapper定义了对文件描述符和事件的操作方法:

  1. /**
  2. * 代码片段12 AbstractPollArrayWrapper中定义的几个final常量和对文件描述符、事件的操作方法
  3. */
  4. static final short SIZE_POLLFD = 8;
  5. static final short FD_OFFSET = 0;
  6. static final short EVENT_OFFSET = 4;
  7. static final short REVENT_OFFSET = 6;
  8. protected AllocatedNativeObject pollArray;
  9. // Access methods for fd structures
  10. int getEventOps(int i) {
  11. int offset = SIZE_POLLFD * i + EVENT_OFFSET;
  12. return pollArray.getShort(offset);
  13. }
  14. int getReventOps(int i) {
  15. int offset = SIZE_POLLFD * i + REVENT_OFFSET;
  16. return pollArray.getShort(offset);
  17. }
  18. int getDescriptor(int i) {
  19. int offset = SIZE_POLLFD * i + FD_OFFSET;
  20. return pollArray.getInt(offset);
  21. }
  22. void putEventOps(int i, int event) {
  23. int offset = SIZE_POLLFD * i + EVENT_OFFSET;
  24. pollArray.putShort(offset, (short)event);
  25. }
  26. void putReventOps(int i, int revent) {
  27. int offset = SIZE_POLLFD * i + REVENT_OFFSET;
  28. pollArray.putShort(offset, (short)revent);
  29. }
  30. void putDescriptor(int i, int fd) {
  31. int offset = SIZE_POLLFD * i + FD_OFFSET;
  32. pollArray.putInt(offset, fd);
  33. }

可见,linux下的PollArrayWrapper中pollArray的每8个字节的后两个字节不是空,而是存储着两个字节的revents。PollArrayWrapper的poll方法如下:

  1. /**
  2. * 代码片段13 PollArrayWrapper中poll方法
  3. */
  4. int poll(int numfds, int offset, long timeout) {
  5. return poll0(pollArrayAddress + (offset * SIZE_POLLFD),
  6. numfds, timeout);
  7. }
  8. private native int poll0(long pollAddress, int numfds, long timeout);

poll0方法的c语言源码:

  1. /**
  2. * 代码片段14 PollArrayWrapper中poll方法的c源码
  3. */
  4. JNIEXPORT jint JNICALL
  5. Java_sun_nio_ch_PollArrayWrapper_poll0(JNIEnv *env, jobject this,
  6. jlong address, jint numfds,
  7. jlong timeout)
  8. {
  9. struct pollfd *a;
  10. int err = 0;
  11. a = (struct pollfd *) jlong_to_ptr(address);
  12. if (timeout <= 0) { /* Indefinite or no wait */
  13. //如果timeout<=0,立即调用系统的poll函数
  14. RESTARTABLE (poll(a, numfds, timeout), err);
  15. } else { /* Bounded wait; bounded restarts */
  16. //如果timeout>0,会循环的调用poll函数直到到了timeout的时间
  17. err = ipoll(a, numfds, timeout);
  18. }
  19. if (err < 0) {
  20. JNU_ThrowIOExceptionWithLastError(env, "Poll failed");
  21. }
  22. return (jint)err;
  23. }
  24. static int ipoll(struct pollfd fds[], unsigned int nfds, int timeout)
  25. {
  26. jlong start, now;
  27. int remaining = timeout;
  28. struct timeval t;
  29. int diff;
  30. gettimeofday(&t, NULL);
  31. start = t.tv_sec * 1000 + t.tv_usec / 1000;
  32. for (;;) {
  33. //调用poll函数 remaining是剩余的timeout,其实也就调用一次,用循环应该是为了防止poll函数的进程被异常唤醒
  34. int res = poll(fds, nfds, remaining);
  35. if (res < 0 && errno == EINTR) {
  36. if (remaining >= 0) {
  37. gettimeofday(&t, NULL);
  38. now = t.tv_sec * 1000 + t.tv_usec / 1000;
  39. diff = now - start;
  40. remaining -= diff;
  41. if (diff < 0 || remaining <= 0) {
  42. return 0;
  43. }
  44. start = now;
  45. }
  46. } else {
  47. return res;
  48. }
  49. }
  50. }

可见PollSelectorImpl确实是调用系统的poll函数实现多路复用的。

EPollSelectorImpl

EPollSelectorImpl中使用EPollArrayWrapper来操作文件描述符和事件,EPollArrayWrapper中的对EPoll事件结构体的文档注释:

  1. /**
  2. * 代码片段15 EPollArrayWrapper类中的注释
  3. */
  4. /**
  5. * Manipulates a native array of epoll_event structs on Linux:
  6. *
  7. * typedef union epoll_data {
  8. * void *ptr;
  9. * int fd;
  10. * __uint32_t u32;
  11. * __uint64_t u64;
  12. * } epoll_data_t;
  13. *
  14. * struct epoll_event {
  15. * __uint32_t events;
  16. * epoll_data_t data;
  17. * };
  18. *
  19. * The system call to wait for I/O events is epoll_wait(2). It populates an
  20. * array of epoll_event structures that are passed to the call. The data
  21. * member of the epoll_event structure contains the same data as was set
  22. * when the file descriptor was registered to epoll via epoll_ctl(2). In
  23. * this implementation we set data.fd to be the file descriptor that we
  24. * register. That way, we have the file descriptor available when we
  25. * process the events.
  26. */

等待IO时间的系统调用函数是epoll_wait(2),它填充了一个epoll_event结构体的数组,这个数组被传递给系统调用。epoll_event结构的数据成员包含的数据与通过epoll_ctl(2)将文件描述符注册到epoll时设置的数据相同。在这个实现中,我们将data.fd设置为注册的文件描述符。这样,我们在处理事件时就有了可用的文件描述符。

很明显,EPollSelectorImpl中操作的结构体大小比PollSelectorImpl要大,这里不一一解读了。EPoll的调用和select、poll不同,需要调用三个系统函数,分别是epoll_create,epoll_ctl 和 epoll_wait,这点在JDK NIO中也得到验证。在EPollArrayWrapper创建时会调用epollCreate方法:

  1. /**
  2. * 代码片段16 EPollArrayWrapper的构造方法和构造方法中调用的epollCreate方法
  3. */
  4. EPollArrayWrapper() throws IOException {
  5. // creates the epoll file descriptor
  6. epfd = epollCreate();
  7. // the epoll_event array passed to epoll_wait
  8. int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
  9. pollArray = new AllocatedNativeObject(allocationSize, true);
  10. pollArrayAddress = pollArray.address();
  11. // eventHigh needed when using file descriptors > 64k
  12. if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
  13. eventsHigh = new HashMap<>();
  14. }
  15. private native int epollCreate();

这里的epollCreate方法也就是进行epoll_create系统调用,创建一个EPoll实例。以下是C源码:

  1. /**
  2. * 代码片段17 epollCreate方法的c源码
  3. */
  4. JNIEXPORT jint JNICALL
  5. Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
  6. {
  7. /*
  8. * epoll_create expects a size as a hint to the kernel about how to
  9. * dimension internal structures. We can't predict the size in advance.
  10. */
  11. //进行epoll_create系统调用
  12. int epfd = epoll_create(256);
  13. if (epfd < 0) {
  14. JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
  15. }
  16. return epfd;
  17. }

EPollSelectorImpl的构造方法中创建完成一个EPollArrayWrapper实例后,会执行该实例的initInterrupt方法,这个方法中调用了epollCtl方法:

  1. /**
  2. * 代码片段18 EPollSelectorImpl的构造方法、构造方法中调用的EPollArrayWrapper中的initInterrupt方法
  3. * 和initInterrupt中调用的epollCtl方法
  4. */
  5. /**
  6. * Package private constructor called by factory method in
  7. * the abstract superclass Selector.
  8. */
  9. EPollSelectorImpl(SelectorProvider sp) throws IOException {
  10. super(sp);
  11. long pipeFds = IOUtil.makePipe(false);
  12. fd0 = (int) (pipeFds >>> 32);
  13. fd1 = (int) pipeFds;
  14. pollWrapper = new EPollArrayWrapper();
  15. //调用initInterrupt方法
  16. pollWrapper.initInterrupt方法(fd0, fd1);
  17. fdToKey = new HashMap<>();
  18. }
  19. void initInterrupt(int fd0, int fd1) {
  20. outgoingInterruptFD = fd1;
  21. incomingInterruptFD = fd0;
  22. //调用epollCtl
  23. epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
  24. }
  25. private native void epollCtl(int epfd, int opcode, int fd, int events);

这里的epollCtl方法也就是进行epoll_ctl系统调用,往刚刚创建的EPoll实例中添加要监控的事件。以下是C源码:

  1. /**
  2. * 代码片段19 epollCtl方法的c源码
  3. */
  4. JNIEXPORT void JNICALL
  5. Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
  6. jint opcode, jint fd, jint events)
  7. {
  8. struct epoll_event event;
  9. int res;
  10. event.events = events;
  11. event.data.fd = fd;
  12. //调用epoll_ctl
  13. RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);
  14. /*
  15. * A channel may be registered with several Selectors. When each Selector
  16. * is polled a EPOLL_CTL_DEL op will be inserted into its pending update
  17. * list to remove the file descriptor from epoll. The "last" Selector will
  18. * close the file descriptor which automatically unregisters it from each
  19. * epoll descriptor. To avoid costly synchronization between Selectors we
  20. * allow pending updates to be processed, ignoring errors. The errors are
  21. * harmless as the last update for the file descriptor is guaranteed to
  22. * be EPOLL_CTL_DEL.
  23. */
  24. if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
  25. JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
  26. }
  27. }

EPollSelectorImpl的doSelect方法会调用EPollArrayWrapper的poll方法,而在poll方法中会调用epollWait:

  1. /**
  2. * 代码片段20 EPollSelectorImpl中的doSelect方法、doSelect方法中调用的EPollArrayWrapper的poll方法
  3. * 和poll方法中调用的epollWait方法
  4. */
  5. protected int doSelect(long timeout) throws IOException {
  6. if (closed)
  7. throw new ClosedSelectorException();
  8. processDeregisterQueue();
  9. try {
  10. begin();
  11. pollWrapper.poll(timeout);
  12. } finally {
  13. end();
  14. }
  15. processDeregisterQueue();
  16. int numKeysUpdated = updateSelectedKeys();
  17. if (pollWrapper.interrupted()) {
  18. // Clear the wakeup pipe
  19. pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
  20. synchronized (interruptLock) {
  21. pollWrapper.clearInterrupted();
  22. IOUtil.drain(fd0);
  23. interruptTriggered = false;
  24. }
  25. }
  26. return numKeysUpdated;
  27. }
  28. int poll(long timeout) throws IOException {
  29. //更新注册信息,如果监视的实践发生变化,会调用epoll_ctl往Epoll实例中增加或删除事件
  30. updateRegistrations();
  31. //调用epollWait
  32. updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
  33. for (int i=0; i<updated; i++) {
  34. if (getDescriptor(i) == incomingInterruptFD) {
  35. interruptedIndex = i;
  36. interrupted = true;
  37. break;
  38. }
  39. }
  40. return updated;
  41. }
  42. private native int epollWait(long pollAddress, int numfds, long timeout,
  43. int epfd) throws IOException;

这里的epollWait方法也就是进行epoll_wait系统调用,调用者进程被挂起,在等待内核I/O事件的分发。以下是C源码:

  1. /**
  2. * 代码片段21 epollWait方法的c源码
  3. */
  4. JNIEXPORT jint JNICALL
  5. Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
  6. jlong address, jint numfds,
  7. jlong timeout, jint epfd)
  8. {
  9. struct epoll_event *events = jlong_to_ptr(address);
  10. int res;
  11. if (timeout <= 0) { /* Indefinite or no wait */
  12. //如果timeout<=0,立即调用系统的epoll_wait函数
  13. RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
  14. } else { /* Bounded wait; bounded restarts */
  15. //如果timeout>0,循环调用直到超时时间到了,用循环应该是为了防止异常唤醒
  16. res = iepoll(epfd, events, numfds, timeout);
  17. }
  18. if (res < 0) {
  19. JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
  20. }
  21. return res;
  22. }

总结

至此,本文已经对三种IO多路复用技术和在JDK中的应用进行了解读。在windows环境下,JDK NIO中只有WindowsSelectorImpl这有一个Selector的非抽象实现,采用的IO多路复用方式是select;在linux环境下PollSelectorImpl和EPollSelectorImpl两种实现,分别采用poll和epoll实现IO多路复用。本文还对这些Selector的具体实现进行了详细的解读,不足之处,敬请指正。