Channel
在JDK1.4中引入,分别与 InputStream
, OutputStream
相对应,相比 Stream
的单向读取和写入, Channel
即可写入也可以读取.
类型
使用较多的类型分别是
- ServerSocketChannel 服务端创建使用的
socket
. - SocketChannel 客户端或者是服务端
accept
建立的socket
.
使用
open 😄
ServerSocketChannel
通过调用 ServerSocketChannel.open
获取实例化的 channel
对象. 新创建的 ServerSocket
处于 open
但是未绑定状态. 使用 bind(endpoint,backlog)
方法来触发一个系统调用,将socket绑定到一个具体的地址上.
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
使用socket系统调用创建一个文件描述符
bind 😊
serverSocketChannel.configureBlocking(false);
final ServerSocket serverSocket = serverSocketChannel.socket();
....
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, serverSocketChannel);
//分别执行socket的bind和listener
serverSocket.bind(new InetSocketAddress("localhost", 9998));
bind的流程比较简单,内部设置 绑定状态
,随后执行系统调用将socket绑定到一个地址上.
此时创建的socket还不是非阻塞的,必须使用 configureBlocking(false)
来进行配置,此时也会触发一个底层的系统调用,将socket对应的文件描述符配置非阻塞.
socket的选项设置 (这里的设置会触发一次内核的系统调用,对socket的文件描述符设置选项)
- SO_REUSEADDR 重用地址
Socket会占用一个端口来进行绑定(如果不指定系统会随机选取一个端口),如果此时服务重启,但是端口还处于time_wait状态,此时重启会出现一个 in thread "main" java.net.BindException: Address already in use
. 指定该选项会重用处于该状态的端口.
- 等等
selector 😣
配合 Selector
多路复用器使用可以使用一个线程来监听成千上万个channel,而在传统的BIO情况下,这些操作都是阻塞的.
开始的第一步就是将对应的channel注册到 Selector
上.
final Selector selector = Selector.open();
//第一个参数selector,第二个感兴趣的时间列表,第三个是attach的对象,可以在后续使用的时候取出
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, attachObj);
------------------------------------------------------------
如果已经注册过了,仅仅修改感兴趣的事件
public final SelectionKey register(Selector sel, int ops, Object att) {
SelectionKey k = findKey(sel);
if (k != null) {
k.attach(att);
k.interestOps(ops);
} else {
// New registration
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
return k;
}
------------------------------------------------------------
真实注册事件,这里只是将key添加到set集合中,还尚未写入到kqueue/poll/select等数据中去
@Override
protected final SelectionKey register(AbstractSelectableChannel ch,int ops,Object attachment) {
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
...
//添加到selecttor的key集合汇总,如果selector关闭会立即移除掉,此时key还在channel的集合中
keys.add(k);
try {
k.interestOps(ops);
} catch (ClosedSelectorException e) {
assert ch.keyFor(this) == null;
keys.remove(k);
k.cancel();
throw e;
}
return k;
}
此时还未操作到具体到socket实例上,仅仅在Java层面做了一层处理.
等到注册完成后,可以使用 select()
来选择IO就绪的channel了.
- select() 执行一个阻塞的select操作,当且仅当至少一个channel处于IO就绪之后返回
- selectNow() 执行一个非阻塞的select操作,从上次选择操作到现在没有channel处于就绪状态,这个方法立即返回0
- select(long) 同
select()
,但是多了一个超时时间- 超时时间>0 , 阻塞到超时一直等待IO就绪
- 0,无限期阻塞,一直到有channel IO就绪
- <0 , 抛出异常
IO就绪: IO就绪是指在未配置
O_NONBLOCK
标志的情况下,执行read
write
等真实IO操作时不阻塞.
SelectorImpl.select 🕳️
public int select(Consumer<SelectionKey> action, long timeout) {
return doSelect(Objects.requireNonNull(action), timeout);
}
---------------------------------------------------------------------------
private int doSelect(Consumer<SelectionKey> action, long timeout) {
synchronized (this) {
//获取注册到selector的key集合
Set<SelectionKey> selectedKeys = selectedKeys();
synchronized (selectedKeys) {
selectedKeys.clear();
int numKeySelected;
if (timeout < 0) {//立即执行一次select
numKeySelected = selectNow();
} else {//执行一次select,阻塞timeout知道存在io就绪到channel
numKeySelected = select(timeout);
}
// copy selected-key set as action may remove keys
Set<SelectionKey> keysToConsume = Set.copyOf(selectedKeys);
assert keysToConsume.size() == numKeySelected;
selectedKeys.clear();
// invoke action for each selected key
keysToConsume.forEach(k -> {
action.accept(k);
if (!isOpen())
throw new ClosedSelectorException();
});
return numKeySelected;
}
}
}
---------------------------------------------------------------------------
@Override
public final int select(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}
SelectorImpl.lockAndDoSelect
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout){
...
synchronized (publicSelectedKeys) {
return doSelect(action, timeout);//子类实现不同到抽象方法,实现多平台支持
}
}
doSelect
是一个抽象方法,分别在 Windows
, Linux
和 Macosx
上有不同到实现,由于仅下载了 Macosx
的 JDK
,这里描述的是Mac的的 jdk
实现
KQueueSelectorImpl
KQueue based Selector implementation for macOS
@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
assert Thread.holdsLock(this);
long to = Math.min(timeout, Integer.MAX_VALUE); // max kqueue timeout
//是否阻塞
boolean blocking = (to != 0);
boolean timedPoll = (to > 0);
int numEntries;
//处理上次select以后,兴趣列表发生变化的key. 上边registry也是在这一步进行处理
processUpdateQueue();
//处理取消的key
processDeregisterQueue();
try {
begin(blocking);
do {
long startTime = timedPoll ? System.nanoTime() : 0;
//执行KQueue的poll调用
numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
// timed poll interrupted so need to adjust timeout
long adjust = System.nanoTime() - startTime;
to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
if (to <= 0) {
// timeout expired so no retry
numEntries = 0;
}
}
} while (numEntries == IOStatus.INTERRUPTED);
assert IOStatus.check(numEntries);
} finally {
end(blocking);
}
//处理取消的key
processDeregisterQueue();
//处理kqueue#poll以后IO就绪的key
return processEvents(numEntries, action);
}
这里的核心在于 KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to)
,在其前后各有一次 processDeregisterQueue()
处理,以及开始的一次更新 processUpdateQueue()
事件.
processUpdateQueue 处理更新队列
注册在selecotr上的key会发生变更的数据无非是channel被关闭,兴趣列表变更等有限的事件.
/**
* Process changes to the interest ops.
*/
private void processUpdateQueue() {
assert Thread.holdsLock(this);
synchronized (updateLock) {
SelectionKeyImpl ski;
//获取锁数据,随后对更新队列进行遍历
while ((ski = updateKeys.pollFirst()) != null) {
if (ski.isValid()) {
int fd = ski.getFDVal();
//将文件描述符和key放到map中
SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
assert (previous == null) || (previous == ski);
//获取兴趣选项
int newEvents = ski.translateInterestOps();
//获取注册的事件
int registeredEvents = ski.registeredEvents();
//如果不一致说明发生了变更
if (newEvents != registeredEvents) {
//事件列表,用于描述事件
//https://linux.die.net/HOWTO/SCSI-Generic-HOWTO/poll.html
//如果注册事件是读取&&新事件不是读取,那么就删除这个事件
if ((registeredEvents & Net.POLLIN) != 0) {
if ((newEvents & Net.POLLIN) == 0) {
KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE);
}
} else if ((newEvents & Net.POLLIN) != 0) {
//如果新事件是读取 添加到列表中
KQueue.register(kqfd, fd, EVFILT_READ, EV_ADD);
}
//如果上次的注册事件是写
if ((registeredEvents & Net.POLLOUT) != 0) {
//新事件是不是写 删除
if ((newEvents & Net.POLLOUT) == 0) {
KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
}
} else if ((newEvents & Net.POLLOUT) != 0) {
KQueue.register(kqfd, fd, EVFILT_WRITE, EV_ADD);
}
//设置事件信号
ski.registeredEvents(newEvents);
}
}
}
}
}
KQueue.register
将channel对应的文件描述符添加到kqueue的待扫描列表中.
JNIEXPORT jint JNICALL
Java_sun_nio_ch_KQueue_keventRegister(JNIEnv *env, jclass c, jint kqfd,
jint fd, jint filter, jint flags)
{
struct kevent changes[1];
struct timespec timeout = {0, 0};
int res;
EV_SET(&changes[0], fd, filter, flags, 0, 0, 0);
RESTARTABLE(kevent(kqfd, &changes[0], 1, NULL, 0, &timeout), res);
return (res == -1) ? errno : 0;
}
processDeregisterQueue
取消事件处理
/**
* Invoked by selection operations to process the cancelled-key set
*/
protected final void processDeregisterQueue() throws IOException {
assert Thread.holdsLock(this);
assert Thread.holdsLock(publicSelectedKeys);
Set<SelectionKey> cks = cancelledKeys();
synchronized (cks) {
if (!cks.isEmpty()) {
Iterator<SelectionKey> i = cks.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
i.remove();
// remove the key from the selector
implDereg(ski);//执行kqueue的EV_DELETE,将该文件描述符从集合中删除
selectedKeys.remove(ski);
keys.remove(ski);
// remove from channel's key set
deregister(ski);
SelectableChannel ch = ski.channel();
if (!ch.isOpen() && !ch.isRegistered())
((SelChImpl)ch).kill();
}
}
}
}
进行 select()
操作后,一批处于IO就绪的channel已经可以执行真实的IO操作了,可以从对应的socket中读取出数据,然后进行处理.
由于TCP协议一次传输的量可能比应用所需的数据要少, 可能会导致应用所需要的数据在TCP层面被拆成了N个包,此时应用需要手动处理,将多个TCP的package组成应用所需要的数据.
eg: http需要传输几M的数据,放在TCP层面是必然被切分成了N个package了,tomcat/jetty必须对socket读取的数据进行处理.
现在可以便利这批已经就绪的SelectionKey
. SelectionKey
分别引用了对应的 channel
和 selector
.
总结1
select 执行的操作本身是不执行IO操作的,他们只是检测文件描述符号是否处于就绪状态,然后才去执行真实的IO操作,例如 read()
和 write
操作.
在这个简易的例子中
Java IO(3)
从 socket 中读取数据后直接打印在控制台上.
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.register(selector, SelectionKey.OP_WRITE);
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(buffer);
随后使用 SelectionKey.OP_WRITE 注册到 selector
,表明监听写入事件,在channel写就绪时,从队列中获取数据,写入到socket中
而在复杂的实现中,可能会将数据投递到一个流水线上,由流水线上到 过滤器/处理器 对数据进行处理,可能包含但不限于SSL握手,HTTP交互/MQTT协议等等.
Netty就是这么处理的.
socket 😂
select
本身是不执行IO操作的,只是检测 channel
对应的文件描述符是否处理IO就绪状态. 如果不能写入就直接返回.
内核会为每一个socket都维护一个写入和读取的缓冲区.
- 写就绪: 数据能够直接写入缓冲区. 即缓冲区有足够的空间写入该数据,不阻塞
- 读就绪: 能够从socket中读取数据.
水平触发和边缘触发 🌲
- 水平触发 : 只要socket满足IO就绪,任何时候都可以.
- 只要缓冲内容不为空,返回读就绪。
- 只要缓冲区还不满,LT模式会返回写就绪
- 边缘触发 : 自从上一次触发依赖,除非产生了新的IO就绪,否则不会触发.
- 当文件描述符关联的读内核缓冲区由空转化为非空的时候,则发出可读信号进行通知,
- 当文件描述符关联的内核写缓冲区由满转化为不满的时候,则发出可写信号进行通知
Java默认是水平触发 意味者,只要socket缓存区未满,一直触发
select
,如果不处理的话,这样的while(true) 可以直接打满1C,在编程中每次读取以后需要变更为write,写入后变更为read.