SocketChannel

Java NIO中的SocketChannel是一个连接到TCP网络套接字的通道。可以通过以下2种方式创建SocketChannel

  1. 打开一个SocketChannel并连接到互联网上的某台服务器。
  2. 一个新连接到达ServerSocketChannel时,会创建一个SocketChannel

    打开 SocketChannel

    下面是SocketChannel的打开方式:
  1. SocketChannel socketChannel = SocketChannel.open();
  2. socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));

关闭 SocketChannel

当用完SocketChannel之后调用SocketChannel.close()关闭SocketChannel

  1. socketChannel.close();

从 SocketChannel 读取数据

要从SocketChannel中读取数据,调用一个read()的方法之一。以下是例子:

  1. ByteBuffer buf = ByteBuffer.allocate(48);
  2. int bytesRead = socketChannel.read(buf);

首先,分配一个Buffer。从SocketChannel读取到的数据将会放到这个Buffer中。
然后,调用SocketChannel.read()。该方法将数据从SocketChannel读到Buffer中。read()方法返回的int值表示读了多少字节进Buffer里。如果返回的是-1,表示已经读到了流的末尾(连接关闭了)。

写入 SocketChannel

写数据到SocketChannel用的是SocketChannel.write()方法,该方法以一个Buffer作为参数。示例如下:

  1. String newData = "New String to write to file..." + System.currentTimeMillis();
  2. //生成Buffer,并向Buffer中写数据
  3. ByteBuffer buf = ByteBuffer.allocate(48);
  4. buf.clear();
  5. buf.put(newData.getBytes());
  6. //切换buffer为读模式
  7. buf.flip();
  8. while(buf.hasRemaining()) {
  9. channel.write(buf);
  10. }

注意SocketChannel.write()方法的调用是在一个while循环中的。write()方法无法保证能写多少字节到SocketChannel。所以,我们重复调用write()直到Buffer没有要写的字节为止。

非阻塞模式

可以设置 SocketChannel 为非阻塞模式(non-blocking mode).设置之后,就可以在异步模式下调用connect(), read()write()了。

connect()

如果SocketChannel在非阻塞模式下,此时调用connect(),该方法可能在连接建立之前就返回了。为了确定连接是否建立,可以调用finishConnect()的方法。像这样:

  1. socketChannel.configureBlocking(false);
  2. socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));
  3. while(! socketChannel.finishConnect() ){
  4. //wait, or do something else...
  5. }
  6. write()

非阻塞模式下,write()方法在尚未写出任何内容时可能就返回了。所以需要在循环中调用write()。前面已经有例子了,这里就不赘述了。

read()

非阻塞模式下,read()方法在尚未读取到任何数据时可能就返回了。所以需要关注它的int返回值,它会告诉你读取了多少字节。

非阻塞模式与选择器

非阻塞模式与选择器搭配会工作的更好,通过将一或多个SocketChannel注册到Selector,可以询问选择器哪个通道已经准备好了读取,写入等。SelectorSocketChannel的搭配使用会在后面详讲。

ServerSockerChannel

Java NIO中的 ServerSocketChannel 是一个可以监听新进来的TCP连接的通道, 就像标准IO中的ServerSocket一样。ServerSocketChannel类在 java.nio.channels包中。
这里有个例子:

  1. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  2. serverSocketChannel.socket().bind(new InetSocketAddress(9999));
  3. while(true){
  4. SocketChannel socketChannel =
  5. serverSocketChannel.accept();
  6. //使用socketChannel做一些工作...
  7. }

打开 ServerSocketChannel

通过调用 ServerSocketChannel.open() 方法来打开ServerSocketChannel.如:

  1. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

关闭 ServerSocketChannel

通过调用ServerSocketChannel.close() 方法来关闭ServerSocketChannel. 如:

  1. serverSocketChannel.close();

监听新进来的连接

通过 ServerSocketChannel.accept() 方法监听新进来的连接。当 accept()方法返回的时候,它返回一个包含新进来的连接的 SocketChannel。因此, accept()方法会一直阻塞到有新连接到达。
通常不会仅仅只监听一个连接,在while循环中调用 accept()方法. 如下面的例子:

  1. while(true){
  2. SocketChannel socketChannel =
  3. serverSocketChannel.accept();
  4. //使用socketChannel做一些工作...
  5. }

当然,也可以在while循环中使用除了true以外的其它退出准则。

非阻塞模式

ServerSocketChannel可以设置成非阻塞模式。在非阻塞模式下,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。 因此,需要检查返回的SocketChannel是否是null.如:

  1. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  2. serverSocketChannel.socket().bind(new InetSocketAddress(9999));
  3. serverSocketChannel.configureBlocking(false);
  4. while(true){
  5. SocketChannel socketChannel =
  6. serverSocketChannel.accept();
  7. if(socketChannel != null){
  8. //使用socketChannel做一些工作...
  9. }
  10. }

Java NIO中的DatagramChannel是一个能收发UDP包的通道。因为UDP是无连接的网络协议,所以不能像其它通道那样读取和写入。它发送和接收的是数据包。

示例

  1. //客户端
  2. @Test
  3. void client() throws IOException {
  4. //1.打开SocketChannel
  5. SocketChannel channel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));
  6. channel.configureBlocking(false);//设置为非阻塞通道
  7. //2.创建缓冲区
  8. ByteBuffer buffer = ByteBuffer.allocate(1024);
  9. //3.读取键盘输入 并发送数据
  10. Scanner scanner = new Scanner(System.in);
  11. System.out.println("请输入信息:");
  12. while (scanner.hasNextLine()) {
  13. String line = scanner.nextLine();
  14. //写入缓冲区
  15. buffer.put((LocalDateTime.now() + " " + line).getBytes());
  16. buffer.flip();
  17. //将缓冲区放入通道
  18. channel.write(buffer);
  19. buffer.clear();
  20. }
  21. //4.关闭通道
  22. channel.close();
  23. }
  24. //服务端
  25. @Test
  26. void server() throws IOException {
  27. //1.打开ServerSocketChannel 并绑定端口 获取通道
  28. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  29. serverSocketChannel.configureBlocking(false);//切换到非阻塞模式
  30. serverSocketChannel.bind(new InetSocketAddress(9999));
  31. //2.获取选择器
  32. Selector selector = Selector.open();
  33. //3.将通道注册到选择器上 指定监听"准备就绪"事件
  34. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  35. //4.循环获取选择器上"准备就绪"的事件
  36. while (selector.select() > 0) {
  37. //5.获取当前选择器上所有注册的"选择键(监听准备就绪事件)"
  38. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
  39. while (iterator.hasNext()) {
  40. SelectionKey key = iterator.next();
  41. if (key.isAcceptable()) {
  42. System.out.println("接收就绪");
  43. //6.接收就绪后,获取客户端的连接
  44. SocketChannel socketChannel = serverSocketChannel.accept();
  45. if (socketChannel != null) {
  46. socketChannel.configureBlocking(false)//切换为非阻塞模式
  47. .register(selector, SelectionKey.OP_READ);//注册到选择器上
  48. }
  49. } else if (key.isConnectable()) {
  50. System.out.println("连接就绪");
  51. } else if (key.isReadable()) {
  52. System.out.println("读就绪");
  53. //7.获取选择器上读就绪的通道
  54. SocketChannel socketChannel = (SocketChannel) key.channel();
  55. //8.创建缓冲区 读数据
  56. ByteBuffer buffer = ByteBuffer.allocate(1024);
  57. socketChannel.read(buffer);
  58. buffer.flip();
  59. System.out.println("服务端收到: " + new String(buffer.array()));
  60. buffer.clear();
  61. } else if (key.isWritable()) {
  62. System.out.println("写就绪");
  63. } else {
  64. //取消选择键
  65. iterator.remove();
  66. }
  67. }
  68. }
  69. }

DatagramChannel

打开 DatagramChannel

下面是 DatagramChannel 的打开方式:

  1. DatagramChannel channel = DatagramChannel.open();
  2. channel.socket().bind(new InetSocketAddress(9999));

这个例子打开的 DatagramChannel可以在UDP端口9999上接收数据包。

接收数据

通过receive()方法从DatagramChannel接收数据,如:

  1. ByteBuffer buf = ByteBuffer.allocate(48);
  2. buf.clear();
  3. channel.receive(buf);

receive()方法会将接收到的数据包内容复制到指定的Buffer. 如果Buffer容不下收到的数据,多出的数据将被丢弃。

发送数据

通过send()方法从DatagramChannel发送数据,如:

  1. String newData = "New String to write to file..."
  2. + System.currentTimeMillis();
  3. ByteBuffer buf = ByteBuffer.allocate(48);
  4. buf.clear();
  5. buf.put(newData.getBytes());
  6. buf.flip();
  7. int bytesSent = channel.send(buf, new InetSocketAddress("jenkov.com", 80));

这个例子发送一串字符到”jenkov.com”服务器的UDP端口80。 因为服务端并没有监控这个端口,所以什么也不会发生。也不会通知你发出的数据包是否已收到,因为UDP在数据传送方面没有任何保证。

连接到特定的地址

可以将DatagramChannel“连接”到网络中的特定地址的。由于UDP是无连接的,连接到特定地址并不会像TCP通道那样创建一个真正的连接。而是锁住DatagramChannel ,让其只能从特定地址收发数据。
这里有个例子:

  1. channel.connect(new InetSocketAddress("jenkov.com", 80));

当连接后,也可以使用read()write()方法,就像在用传统的通道一样。只是在数据传送方面没有任何保证。这里有几个例子:

  1. int bytesRead = channel.read(buf);
  2. int bytesWritten = channel.write(buf);

示例

  1. //udp协议
  2. @Test
  3. public void send() throws IOException {
  4. DatagramChannel channel = DatagramChannel.open();
  5. channel.configureBlocking(false);
  6. ByteBuffer buffer = ByteBuffer.allocate(1024);
  7. Scanner scanner = new Scanner(System.in);
  8. System.out.println("请输入信息:");
  9. while (scanner.hasNextLine()) {
  10. String line = scanner.nextLine();
  11. buffer.put((LocalDateTime.now() + " " + line).getBytes());
  12. buffer.flip();
  13. channel.send(buffer, new InetSocketAddress("127.0.0.1", 9898));
  14. buffer.clear();
  15. }
  16. channel.close();
  17. }
  18. @Test
  19. public void receive() throws IOException {
  20. DatagramChannel channel = DatagramChannel.open();
  21. channel.configureBlocking(false);
  22. channel.bind(new InetSocketAddress(9898));
  23. Selector selector = Selector.open();
  24. channel.register(selector, SelectionKey.OP_READ);
  25. while (selector.select() > 0) {
  26. Iterator<SelectionKey> it = selector.selectedKeys().iterator();
  27. while (it.hasNext()) {
  28. SelectionKey sk = it.next();
  29. if (sk.isReadable()) {
  30. ByteBuffer buffer = ByteBuffer.allocate(1024);
  31. channel.receive(buffer);
  32. buffer.flip();
  33. System.out.println(new String(buffer.array(), 0, buffer.limit()));
  34. buffer.clear();
  35. }
  36. }
  37. it.remove();
  38. }
  39. }