上一篇文章已经介绍过 Java NIO 的基本使用,NIO 中有一个重要的组件 Selector,其解决一个线程在多个网络连接上的多路复用问题。我会分为上下两篇文章来为大家深入分析 Java NIO 中 IO 多路复用的底层原理,这篇文章我将介绍 Selector 组件的底层系统调用过程。

什么是 SelectionKey?

我们以下面简单的代码为例,启动一个服务端进程:

  1. public static void main(String[] args) throws IOException, InterruptedException {
  2. // 创建NIO ServerSocketChannel
  3. ServerSocketChannel serverSocket = ServerSocketChannel.open();
  4. serverSocket.socket().bind(new InetSocketAddress(9000));
  5. // 设置ServerSocketChannel为非阻塞
  6. serverSocket.configureBlocking(false);
  7. // 打开Selector处理Channel,即创建epoll
  8. Selector selector = Selector.open();
  9. // 把ServerSocketChannel注册到selector上,并且selector对客户端accept连接操作感兴趣
  10. SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
  11. System.out.println("服务启动成功");
  12. while (true) {
  13. // 阻塞等待需要处理的事件发生
  14. selector.select();
  15. // 获取selector中注册的全部事件的 SelectionKey实例
  16. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  17. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  18. // 遍历SelectionKey对事件进行处理
  19. while (iterator.hasNext()) {
  20. SelectionKey key = iterator.next();
  21. // 如果是OP_ACCEPT事件,则进行连接获取和事件注册
  22. if (key.isAcceptable()) {
  23. ServerSocketChannel server = (ServerSocketChannel) key.channel();
  24. SocketChannel socketChannel = server.accept();
  25. socketChannel.configureBlocking(false);
  26. // 这里只注册了读事件,如果需要给客户端发送数据可以注册写事件
  27. socketChannel.register(selector, SelectionKey.OP_READ);
  28. System.out.println("客户端连接成功");
  29. // 如果是OP_READ事件,则进行读取
  30. } else if (key.isReadable()) {
  31. SocketChannel socketChannel = (SocketChannel) key.channel();
  32. ByteBuffer byteBuffer = ByteBuffer.allocate(128);
  33. int len = socketChannel.read(byteBuffer);
  34. // 如果有数据,把数据打印出来
  35. if (len > 0) {
  36. System.out.println("接收到消息:" + new String(byteBuffer.array()));
  37. key.attach("服务端已收到消息");
  38. key.interestOps(SelectionKey.OP_WRITE);
  39. } else if (len == -1) { // 如果客户端断开连接,关闭Socket
  40. System.out.println("客户端断开连接");
  41. socketChannel.close();
  42. }
  43. }else if(key.isWritable()){
  44. SocketChannel socketChannel = (SocketChannel) key.channel();
  45. String msg = (String) key.attachment();
  46. key.attach(null);
  47. socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
  48. key.interestOps(SelectionKey.OP_READ);
  49. }
  50. //从事件集合里删除本次处理的key,防止下次select重复处理
  51. iterator.remove();
  52. }
  53. }
  54. }

上面的代码中通过创建一个 Selector,将 Channel 绑定对应的事件(read、write、accept、connect)注册到 Selector 中,返回一个 SelectionKey 于该 Channel 绑定,与。Selector调用 select() 方法开始监听事件的发生,如果没有事件则会阻塞直到新的事件到来。
微信截图_20210703104225.png

SelectionKey 是一个抽象类,表示 selectableChannel 在 Selector 中注册的标识。每个Channel 向 Selector 注册时,都将会创建一个 SelectionKey。SelectionKey 将 Channel 与 Selector 建立了关系,并维护了 channel 事件。

可以通过 cancel 方法取消键,取消的键不会立即从 selector 中移除,而是添加到 cancelledKeys 中,在下一次 select 操作时移除它.所以在调用某个 key 时,需要使用 isValid 进行校。

在向 Selector 对象注册感兴趣的事件时,JAVA NIO 共定义了四种事件:

  1. /**
  2. * Operation-set bit for read operations.
  3. *
  4. * <p> Suppose that a selection key's interest set contains
  5. * <tt>OP_READ</tt> at the start of a <a
  6. * href="Selector.html#selop">selection operation</a>. If the selector
  7. * detects that the corresponding channel is ready for reading, has reached
  8. * end-of-stream, has been remotely shut down for further reading, or has
  9. * an error pending, then it will add <tt>OP_READ</tt> to the key's
  10. * ready-operation set and add the key to its selected-key&nbsp;set. </p>
  11. */
  12. public static final int OP_READ = 1 << 0;
  13. /**
  14. * Operation-set bit for write operations.
  15. *
  16. * <p> Suppose that a selection key's interest set contains
  17. * <tt>OP_WRITE</tt> at the start of a <a
  18. * href="Selector.html#selop">selection operation</a>. If the selector
  19. * detects that the corresponding channel is ready for writing, has been
  20. * remotely shut down for further writing, or has an error pending, then it
  21. * will add <tt>OP_WRITE</tt> to the key's ready set and add the key to its
  22. * selected-key&nbsp;set. </p>
  23. */
  24. public static final int OP_WRITE = 1 << 2;
  25. /**
  26. * Operation-set bit for socket-connect operations.
  27. *
  28. * <p> Suppose that a selection key's interest set contains
  29. * <tt>OP_CONNECT</tt> at the start of a <a
  30. * href="Selector.html#selop">selection operation</a>. If the selector
  31. * detects that the corresponding socket channel is ready to complete its
  32. * connection sequence, or has an error pending, then it will add
  33. * <tt>OP_CONNECT</tt> to the key's ready set and add the key to its
  34. * selected-key&nbsp;set. </p>
  35. */
  36. public static final int OP_CONNECT = 1 << 3;
  37. /**
  38. * Operation-set bit for socket-accept operations.
  39. *
  40. * <p> Suppose that a selection key's interest set contains
  41. * <tt>OP_ACCEPT</tt> at the start of a <a
  42. * href="Selector.html#selop">selection operation</a>. If the selector
  43. * detects that the corresponding server-socket channel is ready to accept
  44. * another connection, or has an error pending, then it will add
  45. * <tt>OP_ACCEPT</tt> to the key's ready set and add the key to its
  46. * selected-key&nbsp;set. </p>
  47. */
  48. public static final int OP_ACCEPT = 1 << 4;

OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT,分别对应读、写、请求连接、接受连接等网络 Socket 操作。这些时间的触发条件如下:

  • OP_READ:当操作系统的读缓冲区有数据可读时就绪。并非时刻都有数据可读,所以一般需要注册该操作,仅当就绪时才发起读操作,避免浪费 CPU。
  • OP_WRITE:当操作系统的写缓冲区有空闲空间时就绪。一般情况下写缓冲区都有空闲空间,小块数据直接写入即可,没必要注册该操作类型,否则该条件不断就绪浪费 CPU;但如果是写密集型的任务,比如文件下载等,缓冲区很可能满,注册该操作类型就很有必要,同时注意写完后取消注册。
  • OP_CONNECT:当 SocketChannel.connect() 请求连接成功后就绪,该操作只给客户端使用。
  • OP_ACCEPT:当接收到一个客户端连接请求时就绪,该操作只给服务器使用。

服务器启动 ServerSocketChannel,关注 OP_ACCEPT 事件。服务器接受连接,启动一个服务器的 SocketChannel,这个 SocketChannel 可以关注 OP_READ、OP_WRITE 事件,一般连接建立后会直接关注 OP_READ 事件。

客户端启动 SocketChannel,连接服务器,关注 OP_CONNECT 事件。客户端这边的客户端 SocketChannel 发现连接建立后,可以关注 OP_READ、OP_WRITE 事件,一般是需要客户端需要发送数据了才关注 OP_READ 事件。

连接建立后客户端与服务器端开始相互发送消息(读写),根据实际情况来关注OP_READ、 OP_WRITE 事件。

此外上面代码中调用了三个重要的方法 :

  • Selector.open():创建多路复用器;
  • socketChannel.regrister():将 Channel 注册到多路复用器上(真的是注册吗?);
  • Selector.select():阻塞等待需要处理的事件发生;

这三个方法是 Selector 的核心方法,下面的内容中将从 JDK 源码(linux 版本)及 Hotspot 与 Linux 内核函数级别来讲解以上方法。

Selector.open()

首先创建一个 Provider,再由该 Provider 开启一个 Selector。

  1. public static Selector open() throws IOException {
  2. return SelectorProvider.provider().openSelector();
  3. }

provider()

创建 provider,若没有则调用 create() 方法创建。

  1. public static SelectorProvider provider() {
  2. synchronized (lock) {
  3. if (provider != null)
  4. return provider;
  5. return AccessController.doPrivileged(
  6. new PrivilegedAction<SelectorProvider>() {
  7. public SelectorProvider run() {
  8. if (loadProviderFromProperty())
  9. return provider;
  10. if (loadProviderAsService())
  11. return provider;
  12. //调用create()方法创建provider
  13. provider = sun.nio.ch.DefaultSelectorProvider.create();
  14. return provider;
  15. }
  16. });
  17. }
  18. }

判断当前操作系统是否为 linux 操作系统,如果是则通过反射创建 EPollSelectorProvider 对象。

  1. public static SelectorProvider create() {
  2. String osname = (String)AccessController.doPrivileged(new GetPropertyAction("os.name"));
  3. if (osname.equals("SunOS")) {
  4. return createProvider("sun.nio.ch.DevPollSelectorProvider");
  5. } else {
  6. return (SelectorProvider)(osname.equals("Linux") ?
  7. //判断当前是否是linux系统,是则利用创建EPollSelectorProvider
  8. createProvider("sun.nio.ch.EPollSelectorProvider") : new PollSelectorProvider());
  9. }
  10. }
  11. -------------------------------
  12. private static SelectorProvider createProvider(String cn) {
  13. Class c;
  14. try {
  15. //获取class对象
  16. c = Class.forName(cn);
  17. } catch (ClassNotFoundException var4) {
  18. throw new AssertionError(var4);
  19. }
  20. try {
  21. //利用反射创建实例
  22. return (SelectorProvider)c.newInstance();
  23. } catch (InstantiationException | IllegalAccessException var3) {
  24. throw new AssertionError(var3);
  25. }
  26. }

openSelector()

创建完 EPollSelectorProvider 对象后,紧接着调用其 openSelector() 方法,直接返回 JDK 封装好的 EPollSelectorImpl 对象,在该对象的构造方法里面开始创建 Linux 底层的 epoll 实例。

  1. public AbstractSelector openSelector() throws IOException {
  2. return new EPollSelectorImpl(this);
  3. }
  4. ---------------------------
  5. EPollSelectorImpl(SelectorProvider sp) throws IOException {
  6. super(sp);
  7. long pipeFds = IOUtil.makePipe(false);
  8. this.fd0 = (int)(pipeFds >>> 32);
  9. this.fd1 = (int)pipeFds;
  10. try {
  11. this.pollWrapper = new EPollArrayWrapper();
  12. this.pollWrapper.initInterrupt(this.fd0, this.fd1);
  13. this.fdToKey = new HashMap();
  14. } catch (Throwable var8) {
  15. .........省略
  16. }
  17. }

里面创建了一个 EpollArrayWrapper 包装类,该类初始化时调用 native 方法创建了一个 epoll 实例

  1. EPollArrayWrapper() throws IOException {
  2. ......
  3. this.epfd = this.epollCreate();
  4. ......
  5. }
  6. ----------------------
  7. private native int epollCreate()

系统调用 epoll_create

Hotspot 底层调用 linux 的 epoll_create(int size) 函数,返回一个 epfd(epoll 对应的文件描述符)

到底什么是 epoll 以及其实现原理,我会在下一篇文章中具体介绍,大家这里留一个印象

  1. JNIEXPORT jint JNICALL
  2. Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
  3. {
  4. //调用linux的 epoll_create 函数,返回一个epoll对应的文件描述符
  5. int epfd = epoll_create(256);
  6. if (epfd < 0) {
  7. JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed")
  8. }
  9. return epfd;
  10. }

Linux 对 epoll_create 函数描述如下,创建一个 epoll 结构体,并返回一个非负数作为文件描述符,用于对 epoll 接口的所有后续调用。在 epoll 结构体中有两个重要的成员:红黑树,是用于存储注册进来的 socket 链接;就绪事件链表(双向链表),用于存储就绪的事件。

参数 size 代表可能会容纳 size 个描述符,但 size 不是一个最大值,只是提示操作系统它的数量级,现在这个参数基本上已经弃用了。
微信截图_20210703174448.png

Selector.open() 方法就是在操作系统底层创建了一个 epoll 实例,返回了该实例的一个文件描述符
微信截图_20210703160436.png

SocketChannel.register()

register() 方法把表面上是把 socketChannel 注册到 Selector 上并指定该 socketChannel 感兴趣的事件,但真正的注册其实不是在这一步完成。该方法主要就是获取所有的 SocketChannel 并存起来。

  1. public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException{
  2. synchronized (regLock) {
  3. ..............
  4. SelectionKey k = findKey(sel);
  5. ..............
  6. if (k == null) {
  7. // New registration
  8. synchronized (keyLock) {
  9. if (!isOpen())
  10. throw new ClosedChannelException();
  11. //将socketChannel注册到Selector上
  12. k = ((AbstractSelector)sel).register(this, ops, att);
  13. addKey(k);
  14. }
  15. }
  16. return k;
  17. }
  18. }

implRegister()

创建 SelectionKey 对象,绑定对应的 SocketChannel,并调用上一步创建出来的EPollSelectorImpl 对象的 implRegister() 方法:

  1. protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {
  2. if (!(var1 instanceof SelChImpl)) {
  3. throw new IllegalSelectorException();
  4. } else {
  5. //将SocketChannel传进去,返回一个对应的SelectionKey
  6. SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);
  7. var4.attach(var3);
  8. synchronized(this.publicKeys) {
  9. //将SelectionKey传进去,实现注册
  10. this.implRegister(var4);
  11. }
  12. //指定感兴趣的事件
  13. var4.interestOps(var2);
  14. return var4;
  15. }
  16. }
  17. -----------------------------
  18. SelectionKeyImpl(SelChImpl var1, SelectorImpl var2) {
  19. this.channel = var1;
  20. this.selector = var2;
  21. }

将 SocketChannel 的 fd(文件描述符)添加到第一步创建的 EpollArrayWrapper 包装类中集合里面。

  1. protected void implRegister(SelectionKeyImpl ski) {
  2. if (this.closed) {
  3. throw new ClosedSelectorException();
  4. } else {
  5. SelChImpl ch = ski.channel;
  6. int fd = Integer.valueOf(ch.getFDVal());
  7. this.fdToKey.put(fd, ski);
  8. //把SocketChannel对应添加到EpollArrayWrapper包装类中结合里面
  9. this.pollWrapper.add(fd);
  10. this.keys.add(ski);
  11. }
  12. }

微信截图_20210703161117.png

Selector.select()

该方法非常重要,主要做的是有两件:

  • 将 SocketChannel 添加到 epoll 池中;
  • 等待 epoll 中的 IO 事件;

该方法最终会调用 EPollSelectorImpl 的 doSelect()方法:

  1. protected int doSelect(long timeout) throws IOException {
  2. if (this.closed) {
  3. throw new ClosedSelectorException();
  4. } else {
  5. this.processDeregisterQueue();
  6. try {
  7. this.begin();
  8. this.pollWrapper.poll(timeout);
  9. } finally {
  10. this.end();
  11. }
  12. //将已经cancel的SocketChannel从pollWrapper中移除
  13. this.processDeregisterQueue();
  14. int numKeysUpdated = this.updateSelectedKeys();
  15. //中断处理
  16. if (this.pollWrapper.interrupted()) {
  17. this.pollWrapper.putEventOps(this.pollWrapper.interruptedIndex(), 0);
  18. synchronized(this.interruptLock) {
  19. this.pollWrapper.clearInterrupted();
  20. IOUtil.drain(this.fd0);
  21. this.interruptTriggered = false;
  22. }
  23. }
  24. return numKeysUpdated;
  25. }
  26. }

里面调用了最关键的 poll() 方法,调用了两个本地方法 epollCtl() epollWait()。

  1. int poll(long timeout) throws IOException {
  2. this.updateRegistrations();
  3. this.updated = this.epollWait(this.pollArrayAddress, NUM_EPOLLEVENTS, timeout, this.epfd);
  4. for(int i = 0; i < this.updated; ++i) {
  5. if (this.getDescriptor(i) == this.incomingInterruptFD) {
  6. this.interruptedIndex = i;
  7. this.interrupted = true;
  8. break;
  9. }
  10. }
  11. return this.updated;
  12. }

系统调用 epollCtl()

在updateRegistrations()方法里调用了native方法 epollCtl(),完成真正的注册。

  1. private void updateRegistrations() {
  2. synchronized(this.updateLock) {
  3. for(int j = 0; j < this.updateCount; ++j) {
  4. int fd = this.updateDescriptors[j];
  5. short events = (short)this.getUpdateEvents(fd);
  6. boolean isRegistered = this.registered.get(fd);
  7. int opcode = false;
  8. if (events != -1) {
  9. int opcode;
  10. if (isRegistered) {
  11. opcode = events != 0 ? 3 : 2;
  12. } else {
  13. opcode = events != 0 ? 1 : 0;
  14. }
  15. if (opcode != 0) {
  16. //当opcode的值对应为EPOLL_CTL_ADD,真正注册SocketChannel到epoll上
  17. this.epollCtl(this.epfd, opcode, fd, events);
  18. if (opcode == 1) {
  19. this.registered.set(fd);
  20. } else if (opcode == 2) {
  21. this.registered.clear(fd);
  22. }
  23. }
  24. }
  25. }
  26. this.updateCount = 0;
  27. }
  28. }

epollCtl() 底层调用Linux内核函数

epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

该方法对 epoll 池实例所对应的文件描述符执行参数 op 所指定的操作,同时为该 fd 绑定相关事件。
微信截图_20210703175151.png

参数解释

epfd
epoll实例对应的文件描述符

fd
SocketChannel对应的文件描述符

*event
该结构体中通过 events 指定了 SocketChannel 感兴趣的事件,data 保存了当事件触发时相关的数据

  1. struct epoll_event {
  2. __uint32_t events; /* Epoll events */
  3. epoll_data_t data; /* User data variable */
  4. };
  5. typedef{
  6. void *ptr;
  7. int fd;
  8. __uint32_t u32;
  9. __uint61_t u64;
  10. }epoll_data_t

events表示感兴趣的事件,常用的有以下几种

EPOLLIN :表示对应的文件描述符是可读的; EPOLLOUT:表示对应的文件描述符是可写的; EPOLLERR:表示对应的文件描述符发生了错误 EPOLLRDHUP:表示对应的文件描述符被挂断

op
对epoll对应的文件描述符对应的操作

EPOLL_CTL_ADD:注册新的fd(SocketChannel)到epfd(epoll)中,并关联事件event EPOLL_CTL_MOD:修改已经注册的fd的监听事件; EPOLL_CTL_DEL:从epfd中移除fd,并且忽略掉绑定的event,这时event可以为null

系统调用 epollWait()

epollWait() 是一个 native 方法,Hotspot底层调用了 Linux 内核函数:

int epoll_wait (int epfd, struct epoll_event *events, int maxevents, int timeout)

该方法负责从 epoll 池中的就绪事件列表中获取发生的事件。该方法会阻塞,只要有 “事件” 发生就会马上唤醒。
微信截图_20210703185022.png
等待 Epoll 文件描述符 epfd 上的事件发生,其实就是往就绪事件列表中获取已发生的事件,如果有事件发生则放入到一个集合中,没有则阻塞等待。

Set selectionKeys = selector.selectedKeys() 拿到所有响应的事件

微信截图_20210703191034.png

以上就是 JDK 中 Selector 的实现机制源码,这里涉及到 LInux 底层的 epoll 模型,在下一篇文章我会为大家详解介绍其工作原理和其高性能原因。