Channel 代表了一个与设备的连接的抽象,它本身不存储数据,因此需要配合缓冲区将数据块移入或移出到各种 IO 源,如文件、socket、数据报等。Channel 和传统 IO 中的 Stream 很相似,但又有些不同:
- 通道是双向的,一个 Channel 既可以读,也可以写,而 Stream 只能进行单向操作。
- 通道支持非阻塞模式。
- 通道本身不存储数据,对通道进行读、写必须要通过 Buffer 来操作。
通道类的层次结构非常复杂,有多个接口和许多可选操作。不过,对于网络编程来说,实际上只有三个重要的通道类:SocketChannel、ServerSocketChannel 和 DatagramChannel。
SelectableChannel 实现了 Channel 接口,是一种支持阻塞 I/O 和非阻塞 I/O 的通道。在非阻塞模式下,读写数据不会阻塞,并且 SelectableChannel 可以向 Selector 注册读就绪和写就绪等事件。Selector 负责监控这些事件,等到事件发生时,比如发生了读就绪事件,SelectableChannel 就可以执行读操作了。
// ture表示阻塞模式,false表示非阻塞模式,默认采用阻塞模式
public abstract SelectableChannel configureBlocking(boolean block) throws IOException;
// 注册事件
public final SelectionKey register(Selector sel, int ops);
public abstract SelectionKey register(Selector sel, int ops, Object att);
registry() 方法返回一个 SelectionKey 对象,SelectionKey 用来跟踪被注册的事件。第二个 registry() 方法还有一个 Object 类型的参数,它用于为 SelectionKey 关联一个附件,当被注册事件发生后,需要处理该事件时,可以从 SelectionKey 中获得这个附件,该附件可用来包含与处理这个事件相关的信息。
SocketChannel
从图中可以看到,SocketChannel 不仅从 SelectableChannel 父类中继承了 configureBlocking() 和 registry() 方法用以实现非阻塞和 IO 多路复用,还实现了用于读写数据的 ByteChannel 接口,因此可以对 TCP socket 进行读写,但数据必须编码到 ByteBuffer 对象中来完成数据读写。
1. 连接
SocketChannel 类没有任何公共构造函数,它提供了两个静态 open() 方法创建新的 SocketChannel 对象。
public static SocketChannel open() throws IOException
public static SocketChannel open(SocketAddress remote) throws IOException
第一个方法不会立即连接,它创建一个初始未连接的 socket,后面必须用 connect() 方法进行连接。如果我们在连接前设置了非阻塞模式,则 connect() 会立即返回,甚至在连接建立之前就会返回。为了确定在非阻塞模式下连接是否建立完成,则需要通过 finishConnect() 方法来判断。
SocketChannel channel = SocketChannel.open();
SocketAddress address = new InetSocketAddress("127.0.0.0", 1234);
channel.configureBlocking(false);
channel.connect();
第二个方法会建立连接,该方法会在连接建立或抛出异常之前一直阻塞。
2. 读取
为了读取 SocketChannel 数据,需要要创建一个空的 ByteBuffer 对象,然后将这个 ByteBuffer 作为入参传给 read() 方法,这样通道就可以向其中存储数据了。
public abstract int read(ByteBuffer dst) throws IOException;
通道会用尽可能多的数据填充缓冲区,然后返回实际放入的字节数。如果遇到流末尾,通道会用所有剩余的字节填充缓冲区,并且在下一次调用 read() 方法时返回 -1。
如果通道是阻塞的,read() 方法会争取读满这个 ByteBuffer,如果通道中的字节数不足以读满缓冲区就进入阻塞状态,直到读满或者读到了输入流末尾返回 -1,或者抛出一个异常。但如果这个通道是非阻塞的,read() 方法会读取当前通道中的所有可读数据,有可能不会读满但总会立即返回,也可能返回 0。
因为数据将存储在缓冲区的当前位置,而这个位置会随着增加更多数据而自动更新,所以可以一直向 read() 方法中传入同一个缓冲区,直到缓冲区填满。例如,下面示例的循环会一直读取数据,直到缓冲区填满或者检测到流末尾为止:
while(buffer.hasRemaining() && channel.read(buffer) != -1);
有时如果能从一个源填充多个缓冲区,这会很有用,这称为散布(scatter)。下面两个方法接受一个 ByteBuffer 对象数组作为入参,通道会按顺序填充数组中的各个 ByteBuffer。
public final long read(ByteBuffer[] dsts) throws IOException
public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException;
3. 写入
Socket 通道提供了读写方法。一般情况下它们是全双工的。要想写入,只需将数据填充到一个 ByteBuffer 中,将其回绕(flip),然后在传给 write() 方法,这个方法会把数据复制到通道中。
public abstract int write(ByteBuffer src) throws IOException;
与读取一样,如果通道是阻塞的,write() 方法会争取排空 ByteBuffer 中的剩余字节,如果底层网络的输出缓冲区不能容纳,就进入阻塞模式,直到输出完或者抛出异常。
如果通道是非阻塞的,write() 方法会立即返回,它会尽量写入多的数据,但不能保证会写入缓冲区的全部内容。不过,由于缓冲区内部维护了 position 信息,所以我们可以反复调用 write() 方法,直到缓冲区完全排空,而且数据已完全写入:
Byte[] data = "hello channel".getBytes();
ByteBuffer buffer = ByteBuffer.wrap(data);
while(buffer.hasRemaining() && channel.write(buffer) != -1);
将多个缓冲区的数据写入到一个 socket 通常很有用,这称为聚集(gather)。下面两个方法接受一个 ByteBuffer 对象数组作为入参,通道会按顺序排空数组中的各个 ByteBuffer。
public final long write(ByteBuffer[] srcs) throws IOException
public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException;
ServerSocketChannel
ServerSocketChannel 类只有一个目的:接受客户端连接。我们无法对其进行读取、写入或连接操作,它支持的唯一操作是接受一个新的客户端连接。
1. 创建
ServerSocketChannel 也提供了静态方法 open() 用于创建一个新的 ServerSocketChannel 对象。不过,这个方法名有一点欺骗性,因为该方法实际上并不打开一个新的服务器 socket,而是只创建这个对象,这个对象没有与任何本地端口绑定。
每个 ServerSocketChannel 对象都与一个 ServerSocket 对象关联,在使用前需要调用 socket() 方法来获得与它关联的对等端 ServerSocket,通过这个 ServerSocket 的各种设置方法配置任何服务器选项,如接收缓冲区大小或 socket 超时值。然后将这个 ServerSocket 连接到你希望绑定的端口的 SocketAddress。
try {
ServerSocketChannel server = ServerSocketChannel.open();
ServerSocket socket = server.socket();
SocketAddress address = new InetSocketAddress(80);
socket.bind(address);
} catch (IOException e) {
System.out.println("Could not bind to port 80, because " + e.getMessage());
}
在 Java 7 中,这会更简单些,因为 ServerSocketChannel 提供了自己的 bind() 方法。
try {
ServerSocketChannel server = ServerSocketChannel.open();
SocketAddress address = new InetSocketAddress(80);
server.bind(address);
} catch (IOException e) {
System.out.println("Could not bind to port 80, because " + e.getMessage());
}
2. 接受连接
一旦打开并绑定了 ServerSocketChannel 对象,accept() 方法就可以监听客户端连接请求了。
public abstract SocketChannel accept() throws IOException;
accept() 可以在阻塞或非阻塞模式下执行。在阻塞模式下,accept() 方法会一直阻塞等待客户端连接,当有新连接建立后,该方法返回连接到远程客户端的一个 SocketChannel 对象。在建立连接之前,线程无法进行任何操作。这种策略适用于立即响应每一个请求的简单服务器,阻塞模式是 ServerSocketChannel 的默认模式。
当处于非阻塞模式时,如果没有客户端连接,accept() 方法会立即返回 null,因此我们需要检查返回的 SocketChannel 是否为 null。注意,如果有客户端连接,accept() 返回的客户端的 SocketChannel 对象是处于阻塞模式的,如果想把它改成非阻塞模式,必须手动设置。
非阻塞模式更适合于需要为每个连接完成大量工作的服务器,这样就可以并行地处理多个请求。非阻塞模式一般会与 Selector 结合使用。
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(80));
serverSocketChannel.configureBlocking(false);
while (ture) {
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
// do something with socketChannel...
}
}
DatagramChannel
DatagramChannel 类用于非阻塞 UDP 应用程序,就像 SocketChannel 和 ServerSocketChannel 用于非阻塞 TCP 应用程序一样。
DatagramChannel 也是 SelectableChannel 的子类,所以也可以注册到一个 Selector 上。如果服务器中一个线程可以管理与多个不同客户端的通信,对于这种服务器,DatagramChannel 就很有用。不过,UDP 天生就比 TCP 更具异步性,因而实际效果没有那么明显。在 UDP 中,一个数据报 socket 可以处理多个客户端的输入和输出请求,DatagramChannel 类所增加的就是能够以非阻塞的方式来做到这一点。
1. 创建
DatagramChannel 类也没有公共构造函数,它也提供了静态方法 open() 用于创建一个新的 DatagramChannel 对象。
public static DatagramChannel open() throws IOException
这个通道开始时没有绑定到任何端口,如果要绑定端口可以使用 bind() 方法进行绑定:
public abstract DatagramChannel bind(SocketAddress local) throws IOException;
2. 接收
receive() 方法从通道读取一个数据报包,放在一个 ByteBuffer 中,然后返回发送这个包的主机的地址:
public abstract SocketAddress receive(ByteBuffer dst) throws IOException;
如果通道是阻塞的(默认阻塞),这个方法在读取到包之前不会返回。如果通道是非阻塞的,在没有包可以读取的情况下该方法会立即返回 null。
如果数据报包的数据超出了 ByteBuffer 所能保存的数据,则额外的数据会被丢弃而没有任何通知。不会捕获到 BufferOverflowException 或类似的异常,这再次表明 UDP 是不可靠的。这个行为向系统又引入了一层不可靠性,因为数据可能已经从网络安全到达,但因为我们的缓冲区太小而被丢弃了。
3. 发送
send() 方法将一个数据报包从 ByteBuffer 写入通道,由第二个参数来指定要发送的地址:
public abstract int send(ByteBuffer src, SocketAddress target) throws IOException;
如果希望向多个客户端发送相同的数据,可以重用源 ByteBuffer,不过不要忘记将其回到(rewind)。
send() 方法返回写入的字节数,这可能是 ByteBuffer 中的可用字节数,也可能是 0,而不会是其它值。如果通道处于非阻塞模式,而且数据不能立即发送,就会返回 0。如果通道处于阻塞模式,send() 会等待返回,直到它能发送缓冲区中的全部数据。
Socket 选项
从 Java 7 开始,SocketChannel、ServerSocketChannel 和 DatagramChannel 都实现了 NetworkChannel 接口,这个接口的主要用途是支持各种 TCP 选项。无论是在 Socket 上设置还是在通道上设置,这些选项在底层的 TCP 栈中都有相同的含义。
不过,并非对应所支持的各个选项有单独的方法,通道类分别有 3 个方法来获取、设置和列出所支持的选项:
<T> NetworkChannel setOption(SocketOption<T> name, T value) throws IOException;
<T> T getOption(SocketOption<T> name) throws IOException;
Set<SocketOption<?>> supportedOptions();
SocketOption 类是一个泛型类,指定了各个选项的名字和类型,类型参数
不同的通道和 Socket 支持不同的选项。例如,ServerSocketChannel 支持 SO_RCVBUF 但不支持 SO_SNDBUF。如果试图设置通道不支持的选项,则会抛出一个 UnsupportedOperationException 异常。