Channel 在JDK1.4中引入,分别与 InputStream , OutputStream 相对应,相比 Stream 的单向读取和写入, Channel 即可写入也可以读取.

类型

使用较多的类型分别是

  • ServerSocketChannel 服务端创建使用的 socket .
  • SocketChannel 客户端或者是服务端 accept 建立的 socket .

使用

open 😄

ServerSocketChannel 通过调用 ServerSocketChannel.open 获取实例化的 channel 对象. 新创建的 ServerSocket 处于 open 但是未绑定状态. 使用 bind(endpoint,backlog) 方法来触发一个系统调用,将socket绑定到一个具体的地址上.

  1. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  2. 使用socket系统调用创建一个文件描述符

bind 😊

  1. serverSocketChannel.configureBlocking(false);
  2. final ServerSocket serverSocket = serverSocketChannel.socket();
  3. ....
  4. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, serverSocketChannel);
  5. //分别执行socket的bind和listener
  6. serverSocket.bind(new InetSocketAddress("localhost", 9998));

bind的流程比较简单,内部设置 绑定状态 ,随后执行系统调用将socket绑定到一个地址上.

此时创建的socket还不是非阻塞的,必须使用 configureBlocking(false) 来进行配置,此时也会触发一个底层的系统调用,将socket对应的文件描述符配置非阻塞.

socket的选项设置 (这里的设置会触发一次内核的系统调用,对socket的文件描述符设置选项)

  • SO_REUSEADDR 重用地址

Socket会占用一个端口来进行绑定(如果不指定系统会随机选取一个端口),如果此时服务重启,但是端口还处于time_wait状态,此时重启会出现一个 in thread "main" java.net.BindException: Address already in use . 指定该选项会重用处于该状态的端口.

  • 等等

selector 😣

配合 Selector 多路复用器使用可以使用一个线程来监听成千上万个channel,而在传统的BIO情况下,这些操作都是阻塞的.

开始的第一步就是将对应的channel注册到 Selector 上.

  1. final Selector selector = Selector.open();
  2. //第一个参数selector,第二个感兴趣的时间列表,第三个是attach的对象,可以在后续使用的时候取出
  3. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, attachObj);
  4. ------------------------------------------------------------
  5. 如果已经注册过了,仅仅修改感兴趣的事件
  6. public final SelectionKey register(Selector sel, int ops, Object att) {
  7. SelectionKey k = findKey(sel);
  8. if (k != null) {
  9. k.attach(att);
  10. k.interestOps(ops);
  11. } else {
  12. // New registration
  13. k = ((AbstractSelector)sel).register(this, ops, att);
  14. addKey(k);
  15. }
  16. return k;
  17. }
  18. ------------------------------------------------------------
  19. 真实注册事件,这里只是将key添加到set集合中,还尚未写入到kqueue/poll/select等数据中去
  20. @Override
  21. protected final SelectionKey register(AbstractSelectableChannel ch,int ops,Object attachment) {
  22. SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
  23. ...
  24. //添加到selecttor的key集合汇总,如果selector关闭会立即移除掉,此时key还在channel的集合中
  25. keys.add(k);
  26. try {
  27. k.interestOps(ops);
  28. } catch (ClosedSelectorException e) {
  29. assert ch.keyFor(this) == null;
  30. keys.remove(k);
  31. k.cancel();
  32. throw e;
  33. }
  34. return k;
  35. }

此时还未操作到具体到socket实例上,仅仅在Java层面做了一层处理.
等到注册完成后,可以使用 select() 来选择IO就绪的channel了.

  • select() 执行一个阻塞的select操作,当且仅当至少一个channel处于IO就绪之后返回
  • selectNow() 执行一个非阻塞的select操作,从上次选择操作到现在没有channel处于就绪状态,这个方法立即返回0
  • select(long) 同 select() ,但是多了一个超时时间
    • 超时时间>0 , 阻塞到超时一直等待IO就绪
    • 0,无限期阻塞,一直到有channel IO就绪
    • <0 , 抛出异常

      IO就绪: IO就绪是指在未配置 O_NONBLOCK 标志的情况下,执行 read write 等真实IO操作时不阻塞.

SelectorImpl.select 🕳️

  1. public int select(Consumer<SelectionKey> action, long timeout) {
  2. return doSelect(Objects.requireNonNull(action), timeout);
  3. }
  4. ---------------------------------------------------------------------------
  5. private int doSelect(Consumer<SelectionKey> action, long timeout) {
  6. synchronized (this) {
  7. //获取注册到selector的key集合
  8. Set<SelectionKey> selectedKeys = selectedKeys();
  9. synchronized (selectedKeys) {
  10. selectedKeys.clear();
  11. int numKeySelected;
  12. if (timeout < 0) {//立即执行一次select
  13. numKeySelected = selectNow();
  14. } else {//执行一次select,阻塞timeout知道存在io就绪到channel
  15. numKeySelected = select(timeout);
  16. }
  17. // copy selected-key set as action may remove keys
  18. Set<SelectionKey> keysToConsume = Set.copyOf(selectedKeys);
  19. assert keysToConsume.size() == numKeySelected;
  20. selectedKeys.clear();
  21. // invoke action for each selected key
  22. keysToConsume.forEach(k -> {
  23. action.accept(k);
  24. if (!isOpen())
  25. throw new ClosedSelectorException();
  26. });
  27. return numKeySelected;
  28. }
  29. }
  30. }
  31. ---------------------------------------------------------------------------
  32. @Override
  33. public final int select(long timeout) throws IOException {
  34. if (timeout < 0)
  35. throw new IllegalArgumentException("Negative timeout");
  36. return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
  37. }

SelectorImpl.lockAndDoSelect

  1. private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout){
  2. ...
  3. synchronized (publicSelectedKeys) {
  4. return doSelect(action, timeout);//子类实现不同到抽象方法,实现多平台支持
  5. }
  6. }

doSelect 是一个抽象方法,分别在 Windows , LinuxMacosx 上有不同到实现,由于仅下载了 MacosxJDK ,这里描述的是Mac的的 jdk 实现

KQueueSelectorImpl

KQueue based Selector implementation for macOS

  1. @Override
  2. protected int doSelect(Consumer<SelectionKey> action, long timeout)
  3. throws IOException
  4. {
  5. assert Thread.holdsLock(this);
  6. long to = Math.min(timeout, Integer.MAX_VALUE); // max kqueue timeout
  7. //是否阻塞
  8. boolean blocking = (to != 0);
  9. boolean timedPoll = (to > 0);
  10. int numEntries;
  11. //处理上次select以后,兴趣列表发生变化的key. 上边registry也是在这一步进行处理
  12. processUpdateQueue();
  13. //处理取消的key
  14. processDeregisterQueue();
  15. try {
  16. begin(blocking);
  17. do {
  18. long startTime = timedPoll ? System.nanoTime() : 0;
  19. //执行KQueue的poll调用
  20. numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
  21. if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
  22. // timed poll interrupted so need to adjust timeout
  23. long adjust = System.nanoTime() - startTime;
  24. to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
  25. if (to <= 0) {
  26. // timeout expired so no retry
  27. numEntries = 0;
  28. }
  29. }
  30. } while (numEntries == IOStatus.INTERRUPTED);
  31. assert IOStatus.check(numEntries);
  32. } finally {
  33. end(blocking);
  34. }
  35. //处理取消的key
  36. processDeregisterQueue();
  37. //处理kqueue#poll以后IO就绪的key
  38. return processEvents(numEntries, action);
  39. }

这里的核心在于 KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to) ,在其前后各有一次 processDeregisterQueue() 处理,以及开始的一次更新 processUpdateQueue() 事件.

processUpdateQueue 处理更新队列

注册在selecotr上的key会发生变更的数据无非是channel被关闭,兴趣列表变更等有限的事件.

  1. /**
  2. * Process changes to the interest ops.
  3. */
  4. private void processUpdateQueue() {
  5. assert Thread.holdsLock(this);
  6. synchronized (updateLock) {
  7. SelectionKeyImpl ski;
  8. //获取锁数据,随后对更新队列进行遍历
  9. while ((ski = updateKeys.pollFirst()) != null) {
  10. if (ski.isValid()) {
  11. int fd = ski.getFDVal();
  12. //将文件描述符和key放到map中
  13. SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
  14. assert (previous == null) || (previous == ski);
  15. //获取兴趣选项
  16. int newEvents = ski.translateInterestOps();
  17. //获取注册的事件
  18. int registeredEvents = ski.registeredEvents();
  19. //如果不一致说明发生了变更
  20. if (newEvents != registeredEvents) {
  21. //事件列表,用于描述事件
  22. //https://linux.die.net/HOWTO/SCSI-Generic-HOWTO/poll.html
  23. //如果注册事件是读取&&新事件不是读取,那么就删除这个事件
  24. if ((registeredEvents & Net.POLLIN) != 0) {
  25. if ((newEvents & Net.POLLIN) == 0) {
  26. KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE);
  27. }
  28. } else if ((newEvents & Net.POLLIN) != 0) {
  29. //如果新事件是读取 添加到列表中
  30. KQueue.register(kqfd, fd, EVFILT_READ, EV_ADD);
  31. }
  32. //如果上次的注册事件是写
  33. if ((registeredEvents & Net.POLLOUT) != 0) {
  34. //新事件是不是写 删除
  35. if ((newEvents & Net.POLLOUT) == 0) {
  36. KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
  37. }
  38. } else if ((newEvents & Net.POLLOUT) != 0) {
  39. KQueue.register(kqfd, fd, EVFILT_WRITE, EV_ADD);
  40. }
  41. //设置事件信号
  42. ski.registeredEvents(newEvents);
  43. }
  44. }
  45. }
  46. }
  47. }

KQueue.register

将channel对应的文件描述符添加到kqueue的待扫描列表中.

  1. JNIEXPORT jint JNICALL
  2. Java_sun_nio_ch_KQueue_keventRegister(JNIEnv *env, jclass c, jint kqfd,
  3. jint fd, jint filter, jint flags)
  4. {
  5. struct kevent changes[1];
  6. struct timespec timeout = {0, 0};
  7. int res;
  8. EV_SET(&changes[0], fd, filter, flags, 0, 0, 0);
  9. RESTARTABLE(kevent(kqfd, &changes[0], 1, NULL, 0, &timeout), res);
  10. return (res == -1) ? errno : 0;
  11. }

processDeregisterQueue

取消事件处理

  1. /**
  2. * Invoked by selection operations to process the cancelled-key set
  3. */
  4. protected final void processDeregisterQueue() throws IOException {
  5. assert Thread.holdsLock(this);
  6. assert Thread.holdsLock(publicSelectedKeys);
  7. Set<SelectionKey> cks = cancelledKeys();
  8. synchronized (cks) {
  9. if (!cks.isEmpty()) {
  10. Iterator<SelectionKey> i = cks.iterator();
  11. while (i.hasNext()) {
  12. SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
  13. i.remove();
  14. // remove the key from the selector
  15. implDereg(ski);//执行kqueue的EV_DELETE,将该文件描述符从集合中删除
  16. selectedKeys.remove(ski);
  17. keys.remove(ski);
  18. // remove from channel's key set
  19. deregister(ski);
  20. SelectableChannel ch = ski.channel();
  21. if (!ch.isOpen() && !ch.isRegistered())
  22. ((SelChImpl)ch).kill();
  23. }
  24. }
  25. }
  26. }

进行 select() 操作后,一批处于IO就绪的channel已经可以执行真实的IO操作了,可以从对应的socket中读取出数据,然后进行处理.

由于TCP协议一次传输的量可能比应用所需的数据要少, 可能会导致应用所需要的数据在TCP层面被拆成了N个包,此时应用需要手动处理,将多个TCP的package组成应用所需要的数据.

eg: http需要传输几M的数据,放在TCP层面是必然被切分成了N个package了,tomcat/jetty必须对socket读取的数据进行处理.

现在可以便利这批已经就绪的SelectionKey . SelectionKey 分别引用了对应的 channelselector .

总结1

select 执行的操作本身是不执行IO操作的,他们只是检测文件描述符号是否处于就绪状态,然后才去执行真实的IO操作,例如 read()write 操作.

在这个简易的例子中
Java IO(3)
从 socket 中读取数据后直接打印在控制台上.

  1. SocketChannel socketChannel = (SocketChannel) key.channel();
  2. socketChannel.register(selector, SelectionKey.OP_WRITE);
  3. ByteBuffer buffer = ByteBuffer.allocate(1024);
  4. int read = socketChannel.read(buffer);

随后使用 SelectionKey.OP_WRITE 注册到 selector ,表明监听写入事件,在channel写就绪时,从队列中获取数据,写入到socket中

而在复杂的实现中,可能会将数据投递到一个流水线上,由流水线上到 过滤器/处理器 对数据进行处理,可能包含但不限于SSL握手,HTTP交互/MQTT协议等等.

Netty就是这么处理的.

socket 😂

select 本身是不执行IO操作的,只是检测 channel 对应的文件描述符是否处理IO就绪状态. 如果不能写入就直接返回.

内核会为每一个socket都维护一个写入和读取的缓冲区.

  • 写就绪: 数据能够直接写入缓冲区. 即缓冲区有足够的空间写入该数据,不阻塞
  • 读就绪: 能够从socket中读取数据.

水平触发和边缘触发 🌲

  • 水平触发 : 只要socket满足IO就绪,任何时候都可以.
    • 只要缓冲内容不为空,返回读就绪。
    • 只要缓冲区还不满,LT模式会返回写就绪
  • 边缘触发 : 自从上一次触发依赖,除非产生了新的IO就绪,否则不会触发.
    • 当文件描述符关联的读内核缓冲区由空转化为非空的时候,则发出可读信号进行通知,
    • 当文件描述符关联的内核写缓冲区由满转化为不满的时候,则发出可写信号进行通知

      Java默认是水平触发 意味者,只要socket缓存区未满,一直触发 select ,如果不处理的话,这样的while(true) 可以直接打满1C,在编程中每次读取以后需要变更为write,写入后变更为read.