Netty

1.介绍

Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty是一个异步的,基于事件驱动开发的网络应用框架用于开发高性能,高可靠性的网络IO程序。

也就是说,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。

2.BIO

BIO是(Block IO)即阻塞IO,是java最初的IO模型,JDK1.4之前唯一选择。

BIO中主要角色就是Server(可以自己写Client),必须使用多线程。每次有客户端连接都要新建一个线程,去单独处理这个线程。

Server

  1. package bio;
  2. import java.io.IOException;
  3. import java.net.ServerSocket;
  4. import java.net.Socket;
  5. public class Server {
  6. public static void main(String[] args) throws IOException {
  7. ServerSocket serverSocket = null;
  8. //服务端绑定端口
  9. serverSocket = new ServerSocket(9000);
  10. System.out.println("等待连接......");
  11. //阻塞接收客户端
  12. while (true){
  13. Socket client = serverSocket.accept();
  14. //直接创建线程
  15. new Thread(new Runnable() {
  16. @Override
  17. public void run() {
  18. //System.out.println("连接成功");
  19. String msg = null;
  20. try {
  21. msg = handle(client);
  22. } catch (IOException e) {
  23. e.printStackTrace();
  24. }
  25. System.out.println("接收到的数据:"+msg);
  26. }
  27. }).start();
  28. }
  29. }
  30. public static String handle(Socket clientSocket) throws IOException {
  31. byte buffer[] = new byte[1024];
  32. String msg = null;
  33. int len = clientSocket.getInputStream().read(buffer);
  34. if(len != -1){
  35. msg = new String(buffer,0,len,"utf-8");
  36. }
  37. return msg;
  38. }
  39. }

Client

  1. package bio;
  2. import java.io.IOException;
  3. import java.io.OutputStream;
  4. import java.net.Socket;
  5. public class Client {
  6. public void sendMsg(String name){
  7. Socket socket = null;
  8. try {
  9. socket = new Socket("localhost",9000);
  10. String msg = "这是来自"+name+"的消息";
  11. byte[] buffer = msg.getBytes("utf-8");
  12. OutputStream outputStream = socket.getOutputStream();
  13. outputStream.write(buffer);
  14. outputStream.close();
  15. socket.close();
  16. } catch (IOException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. }

建立10个客户端通信

  1. package bio;
  2. import java.util.Random;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.Executors;
  5. public class Clients {
  6. public static void main(String[] args) throws InterruptedException {
  7. // for (int i = 0; i < 5; i++) {
  8. // //Thread.sleep(1000);
  9. // int index = i;
  10. // new Thread(new Runnable() {
  11. // @Override
  12. // public void run() {
  13. // new Client().sendMsg("xiao"+index);
  14. // }
  15. // }).start();
  16. // }
  17. /**
  18. * 线程池版本
  19. */
  20. ExecutorService executorService = Executors.newFixedThreadPool(10);
  21. for (int i = 0; i < 10; i++) {
  22. executorService.submit(new Runnable() {
  23. @Override
  24. public void run() {
  25. new Client().sendMsg("xiao"+new Random().nextInt(99));
  26. }
  27. });
  28. }
  29. }
  30. }

3.NIO

3.1.组件关系

  • 每个 channel 都会对应一个 Buffer
  • 一个线程对应一个Selector , 一个Selector对应多个 channel(连接)
  • 程序切换到哪个 channel 是由事件决定的
  • Selector 会根据不同的事件,在各个通道上切换
  • Buffer 就是一个内存块 , 底层是一个数组
  • 数据的读取写入是通过 Buffer完成的 ,BIO 中要么是输入流,或者是输出流, 不能双向,但是 NIO 的 Buffer 是可以读也可以写。
  • Java NIO系统的核心在于:通道(Channel)和缓冲区 (Buffer)。通道表示打开到 IO 设备(例如:文件、 套接字)的连接。若需要使用 NIO 系统,需要获取用于连接 IO 设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理。简而言之,Channel 负责传输, Buffer 负责存取数据

3.2.Buffer

缓冲区Buffer,实质上是一个可以读写数据的内存块,可以理解为是一个容器对象(含数组),该对象提供一组方法,可以轻松使用内存块,缓冲区对选哪个内置了一些机制,能够跟踪和记录缓冲区的状态变化,Channel提供从文件,网络读取数据的渠道,但是读取或写入的数据必须经由Buffer

Buffer4个属性

  1. private int mark = -1;
  2. private int position = 0;
  3. private int limit;
  4. private int capacity;
属性 描述
mark 标记
position 位置,下一个要读或写的元素的索引,每次读写缓冲区都会改变当前值,以便下次读写。
limit 表示缓冲区的当前终点,不能对缓冲区超过极限的位置进行读写,且极限是可以修改的。
Capacity 当前缓冲区的容量,在缓冲区创建时确定并不能修改。

0<=mark<=position<=limit<=capacity

  1. package nioBasic;
  2. import java.nio.IntBuffer;
  3. public class Buffer1 {
  4. public static void main(String[] args) {
  5. //声明一个10个大小的intBuffer
  6. IntBuffer intBuffer = IntBuffer.allocate(10);
  7. for (int i = 0; i < 10; i++) {
  8. //在intBuffer中放入元素
  9. intBuffer.put(i*10);
  10. }
  11. /**
  12. public final Buffer flip() {
  13. limit = position;
  14. position = 0;
  15. mark = -1;
  16. return this;
  17. }
  18. 在一次完整的读或写操作后position == limit,需要归零,
  19. 注意不是回到最初的状态limit == position[而是limit等于存放的数据的长度]
  20. */
  21. intBuffer.flip();
  22. while (intBuffer.hasRemaining()/*判断当前position<limit*/){
  23. System.out.println(intBuffer.get());
  24. }
  25. }
  26. }

3.3.Channel

Channel 经常翻译为通道,类似 IO 中的流,用于读取和写入。它与前面介绍的 Buffer 打交道,读操作的时候将 Channel 中的数据填充到 Buffer 中,而写操作时将 Buffer 中的数据写入到 Channel 中

  • FileChannel用于对文件的数据读写
  • SocketChannel与ServerSocketChannel对应Socket与ServerSocket,用于TCP通信
  • DatagramChannel用于UDP通信

FileChannel实验

实验1:通过nio向文件中输出内容

  1. package nioBasic;
  2. /**
  3. * 通过nio向文件中输出内容
  4. */
  5. import java.io.File;
  6. import java.io.FileOutputStream;
  7. import java.io.IOException;
  8. import java.nio.ByteBuffer;
  9. import java.nio.channels.FileChannel;
  10. public class Channel1 {
  11. public static void main(String[] args) throws IOException {
  12. //创建一个测试文件
  13. File file = new File("nioTest1.txt");
  14. if (!file.exists()){
  15. file.createNewFile();
  16. }
  17. //获取文件输出流
  18. FileOutputStream fos = new FileOutputStream(file);
  19. //获取文件输出流的Channel
  20. FileChannel channel = fos.getChannel();
  21. //创建一个Buffer
  22. ByteBuffer msg = ByteBuffer.wrap("Hello,肖云飞\n".getBytes());
  23. //将Buffer的内容写到Channel中,由于这个Channel与指定文件绑定,所以会写到文件中
  24. channel.write(msg);
  25. fos.close();
  26. }
  27. }

实验二乐观版本:

  1. package nioBasic;
  2. import java.io.FileInputStream;
  3. import java.io.IOException;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.FileChannel;
  6. public class Channel2_1 {
  7. public static void main(String[] args) throws IOException {
  8. FileInputStream fis = new FileInputStream("nioTest1.txt");
  9. FileChannel channel = fis.getChannel();
  10. /**
  11. * 实现预知到文件内容小于1024字节
  12. */
  13. ByteBuffer buffer = ByteBuffer.allocate(1024);
  14. int read = channel.read(buffer);
  15. String msg = new String(buffer.array());
  16. System.out.println(msg);
  17. fis.close();
  18. }
  19. }

实验二悲观版本

  1. package nioBasic;
  2. /**
  3. * 通过nio从文件中获取内容
  4. */
  5. import java.io.FileInputStream;
  6. import java.io.IOException;
  7. import java.nio.ByteBuffer;
  8. import java.nio.channels.FileChannel;
  9. import java.util.ArrayList;
  10. import java.util.List;
  11. public class Channel2_2 {
  12. public static void main(String[] args) throws IOException {
  13. FileInputStream fis = new FileInputStream("nioTest1.txt");
  14. FileChannel channel = fis.getChannel();
  15. //初始化一个大小为10的ByteBuffer
  16. /**
  17. * 注意,这个FileInputStream内置的Channel(FileChannel)是特殊的,只能使用ByteBuffer
  18. * public abstract class FileChannel
  19. * extends AbstractInterruptibleChannel
  20. * implements SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel{...}
  21. */
  22. ByteBuffer buffer = ByteBuffer.allocate(10);
  23. List<Byte> bytes = new ArrayList<>();
  24. while (true){
  25. /**
  26. * 注意:当buffer不能一次把所有的内容读完必须要有这句话
  27. * 如果没有,第一次读完后position == limit,到极限了,之后每次都会读到0个,就会死循环
  28. **/
  29. buffer.clear();
  30. int read = channel.read(buffer);
  31. if(read == -1){break;}
  32. else{
  33. /**
  34. * 现在我们无法预测文件的大小,所以不能确保Buffer的大小,
  35. * 所以“悲观地”认为Buffer无法一次读完,这种情况下需要拼接
  36. *
  37. * 但是有个问题,我们每次读可以得到一个ByteBuffer,里面是n个Byte,
  38. * 使用StringBuilder拼接的话append方法不能传入byte类型,只能是Char
  39. * 但如果使用char的话极有可能使中文字符(占三个字节)被分割开,出现乱码,
  40. * 所以需要将所有的ByteBuffer转化为byte[]后拼接为一个大的byte[],再转化为String
  41. */
  42. byte[] array = buffer.array();
  43. for (int i = 0; i < read; i++) {
  44. bytes.add(array[i]);
  45. }
  46. }
  47. }
  48. /**
  49. * 将List<byte>转化为byte[],方便转化为字符串
  50. */
  51. byte[] msgArray = new byte[bytes.size()];
  52. for (int i = 0; i < bytes.size(); i++) {
  53. msgArray[i] = bytes.get(i);
  54. }
  55. System.out.println(new String(msgArray));
  56. fis.close();
  57. }
  58. }

MappedByteBuffer

MappedByteBuffer是ByteBuffer的子类,特点是首先它是针对RandomAccessFile使用的,所以是内存直接映射文件的,其次,它对RandomAccessFile功能进一步升级,可以指定映射的范围

  1. package nioBasic;
  2. import java.io.IOException;
  3. import java.io.RandomAccessFile;
  4. import java.nio.MappedByteBuffer;
  5. import java.nio.channels.FileChannel;
  6. public class MappedByteBufferTest {
  7. public static void main(String[] args) throws IOException {
  8. /**
  9. * 创建随机读写文件(这个类只能操作文件)
  10. * 随机读写文件的特点是可以把文件的操作像操作数组一样,是以byte为单位的
  11. * 有一个特点是seek(long pos)方法,可以把文件的指针直接指向pos索引处开始操作
  12. */
  13. RandomAccessFile raf = new RandomAccessFile("nioTest1.txt","rw");
  14. //获取channel
  15. FileChannel channel = raf.getChannel();
  16. /**
  17. * public abstract MappedByteBuffer map(MapMode mode,long position, long size)
  18. * 1.MapMode,这个Channel的操作模式(只读,读写,私有->写时复制)
  19. * 2.position,从哪个位置开始映射
  20. * 3.size,映射多少个块[多少个byte]
  21. *
  22. * 这就是MappedByteBuffer的特点,可以只将文件中某一段内容映射到内存中操作
  23. */
  24. MappedByteBuffer map = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
  25. //先可以将所有的映射读取出来
  26. while (map.hasRemaining()){
  27. System.out.print((char)map.get());
  28. }
  29. //map.flip();
  30. System.out.println("\n========================");
  31. //修改内容
  32. map.put(0,(byte)'H');
  33. map.put(1,(byte)'e');
  34. map.put(2,(byte)'l');
  35. map.put(3,(byte)'l');
  36. map.put(4,(byte)'o');
  37. //map.put(5,(byte)'p');会抛出异常,因为size是大小,不是索引
  38. map.flip();
  39. while (map.hasRemaining()){
  40. System.out.print((char)map.get());
  41. }
  42. raf.close();
  43. }
  44. }
  45. /**结果
  46. hELLO
  47. ========================
  48. Hello
  49. **/

3.4.Buffer的聚合与分散

分散/聚集 I/O 是使用多个而不是单个缓冲区来保存数据的读写方法。

一个分散的读取就像一个常规通道读取,只不过它是将数据读到一个缓冲区数组中而不是读到单个缓冲区中。同样地,一个聚集写入是向缓冲区数组而不是向单个缓冲区写入数据。

分散/聚集 I/O 对于将数据流划分为单独的部分很有用,这有助于实现复杂的数据格式。

ScatteringByteChannel、GatheringByteChannel 是两个interface,而 FileChannel 和 SocketChannel 等实现了这两个接口。

ScatteringByteChannel 是一个具有两个附加读方法的通道:

  1. long read( ByteBuffer[] dsts );
  2. long read( ByteBuffer[] dsts, int offset, int length );

这些 long read() 方法很像标准的 read 方法,只不过它们不是取单个缓冲区而是取一个缓冲区数组。

在 分散读取 中,通道依次填充每个缓冲区。填满一个缓冲区后,它就开始填充下一个,在移动下一个buffer前,必须填满当前的buffer,这也意味着它不适用于动态消息(消息大小不固定)。在某种意义上,缓冲区数组就像一个大缓冲区。

GatheringByteChannel聚集写入 类似于分散读取,只不过是用来写入。它也有接受缓冲区数组的方法:

  1. long write( ByteBuffer[] srcs );
  2. long write( ByteBuffer[] srcs, int offset, int length );

分散/聚集 I/O 对于将数据划分为几个部分很有用。例如,可能在编写一个使用消息对象的网络应用程序,每一个消息被划分为固定长度的头部和固定长度的正文。可以创建一个刚好可以容纳头部的缓冲区和另一个刚好可以容难正文的缓冲区。当您将它们放入一个数组中并使用分散读取来向它们读入消息时,头部和正文将整齐地划分到这两个缓冲区中。我们从缓冲区所得到的方便性对于缓冲区数组同样有效。因为每一个缓冲区都跟踪自己还可以接受多少数据,所以分散读取会自动找到有空间接受数据的第一个缓冲区。在这个缓冲区填满后,它就会移动到下一个缓冲区。聚集写对于把一组单独的缓冲区中组成单个数据流很有用。为了与上面的消息例子保持一致,可以使用聚集写入来自动将网络消息的各个部分组装为单个数据流,以便跨越网络传输消息。

3.5.Selector

选择器提供选择执行已经就绪的任务的能力。从底层来看,Selector提供了询问通道是否已经准备好执行每个I/O操作的能力。Selector 允许单线程处理多个Channel。仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道,这样会大量的减少线程之间上下文切换的开销。

并不是所有的Channel,都是可以被Selector 复用的。比方说,FileChannel就不能被选择器复用。

判断一个Channel 能被Selector 复用,有一个前提:判断他是否继承了一个抽象类SelectableChannel。如果继承了SelectableChannel,则可以被复用,否则不能。

SelectableChannel类提供了实现通道的可选择性所需要的公共方法。它是所有支持就绪检查的通道类的父类。所有socket通道,都继承了SelectableChannel类都是可选择的,包括从管道(Pipe)对象的中获得的通道。而FileChannel类,没有继承SelectableChannel,因此是不是可选通道。

3.6.nio服务端实例

  1. package nio;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.ServerSocketChannel;
  8. import java.nio.channels.SocketChannel;
  9. import java.util.Iterator;
  10. import java.util.Set;
  11. public class ServerPro {
  12. public static void main(String[] args) throws IOException, InterruptedException {
  13. //对服务端配置
  14. ServerSocketChannel server = ServerSocketChannel.open();
  15. server.bind(new InetSocketAddress(9000));
  16. server.configureBlocking(false);
  17. //开启多路复用器
  18. Selector selector = Selector.open();
  19. //将服务端Channel以OP_ACCEPT方式注册到复用器里面
  20. server.register(selector, SelectionKey.OP_ACCEPT);
  21. System.out.println("等待连接......");
  22. while (true){
  23. //多路复用器开始阻塞等待是否有事件发生(任何事件都会跳出阻塞)
  24. selector.select();
  25. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  26. Iterator<SelectionKey> channels = selectionKeys.iterator();
  27. while(channels.hasNext()){
  28. SelectionKey selectionKey = channels.next();
  29. {
  30. //如果是接收事件,说明是有客户连接服务端
  31. if (selectionKey.isAcceptable()){
  32. ServerSocketChannel serverTmp = (ServerSocketChannel)selectionKey.channel();
  33. //获取客户通道,配置为非阻塞
  34. SocketChannel accept = serverTmp.accept();
  35. accept.configureBlocking(false);
  36. //因为建立连接,认为其会有读事件发生
  37. accept.register(selector,SelectionKey.OP_READ);
  38. System.out.println("连接成功......");
  39. }
  40. //如果是可读事件,说明是客户端
  41. else if (selectionKey.isReadable()){
  42. ByteBuffer buffer = ByteBuffer.allocate(256);
  43. SocketChannel clientTmp = (SocketChannel) selectionKey.channel();
  44. int len = clientTmp.read(buffer);
  45. if(len > 0){
  46. String msg = new String(buffer.array(),0,len);
  47. System.out.println("接收到的消息:"+msg);
  48. if(msg.equals("通信完成,可以关闭")){
  49. if(!clientTmp.isConnected()){
  50. System.out.println("异常结束");
  51. }
  52. else if(clientTmp.isConnected()){
  53. selectionKey.cancel();
  54. System.out.println("通信完成");
  55. }
  56. }
  57. else{
  58. clientTmp.register(selector,SelectionKey.OP_WRITE);
  59. }
  60. }
  61. else if(len == -1){
  62. System.out.println("断开连接");
  63. selectionKey.cancel();
  64. }
  65. }
  66. else if(selectionKey.isWritable()){
  67. SocketChannel writeChannel = (SocketChannel)selectionKey.channel();
  68. writeChannel.write(ByteBuffer.wrap("已接收到客户端消息。".getBytes()));
  69. writeChannel.register(selector,SelectionKey.OP_READ);
  70. //Thread.sleep(2000);
  71. }
  72. channels.remove();
  73. }
  74. }
  75. }
  76. }
  77. }

3.7.nio客户端实例

  1. package nio;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.*;
  6. import java.util.Iterator;
  7. import java.util.Set;
  8. public class Client {
  9. public void sendMsg(Selector selector,SocketChannel socket,String name) throws IOException, InterruptedException {
  10. //SocketChannel socket = SocketChannel.open();
  11. //Selector selector = Selector.open();
  12. socket.configureBlocking(false);
  13. socket.register(selector, SelectionKey.OP_CONNECT);
  14. socket.connect(new InetSocketAddress("localhost",9000));
  15. while(true){
  16. selector.select();
  17. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  18. Iterator<SelectionKey> channels = selectionKeys.iterator();
  19. while (channels.hasNext()){
  20. SelectionKey channel = channels.next();
  21. if(channel.isConnectable()){
  22. //while(!socket.connect(new InetSocketAddress("localhost",9000)));
  23. /**
  24. * 下面这个判断是判断连接是否正在进行,不是必要的
  25. * 但是这个循环是必要的,直到socket.finishConnect()返回true才说明连接成功
  26. * 当然,在socket.finishConnect()返回false的阶段,客户端可以做其他事
  27. */
  28. //if(socket.isConnectionPending()){
  29. while(!socket.finishConnect());
  30. //}
  31. //if(socket.finishConnect()){
  32. String msg = name + "的消息";
  33. byte[] bytes = msg.getBytes("utf-8");
  34. ByteBuffer buffer = ByteBuffer.wrap(bytes);
  35. socket.write(buffer);
  36. //channel.cancel();
  37. socket.register(selector,SelectionKey.OP_READ);
  38. //}
  39. }
  40. else if(channel.isReadable()){
  41. SocketChannel serverTmp = (SocketChannel)channel.channel();
  42. ByteBuffer buffer = ByteBuffer.allocate(256);
  43. int len = serverTmp.read(buffer);
  44. if(len > 0){
  45. String msg = (new String(buffer.array(),0,len).split("。"))[0];
  46. //System.out.println(msg.length());
  47. //System.out.println("已接收到客户端消息".length());
  48. System.out.println(msg);
  49. if(msg.equals("已接收到客户端消息")){
  50. System.out.println("正在关闭......");
  51. serverTmp.write(ByteBuffer.wrap("通信完成,可以关闭".getBytes()));
  52. //这里休眠2秒是为了确保服务端能正常关闭通道
  53. Thread.sleep(2000);
  54. System.exit(0);
  55. }
  56. }
  57. else if (len == -1){
  58. System.out.println("连接断开");
  59. channel.cancel();
  60. }
  61. }
  62. channels.remove();
  63. }
  64. }
  65. }
  66. }

3.8.多客户端运行(多线程)

  1. package nio;
  2. import java.io.IOException;
  3. import java.nio.channels.Selector;
  4. import java.nio.channels.SocketChannel;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. public class Clients {
  8. public static void main(String[] args) throws IOException {
  9. ExecutorService executorService = Executors.newCachedThreadPool();
  10. for (int i = 0; i < 10; i++) {
  11. String id = new Integer(i).hashCode()+"";
  12. executorService.execute(
  13. new Runnable() {
  14. @Override
  15. public void run() {
  16. try {
  17. //让每个客户端channel都持有一个selector
  18. SocketChannel channel = SocketChannel.open();
  19. Selector selector = Selector.open();
  20. new Client().sendMsg(selector,channel,"肖云飞"+id);
  21. } catch (IOException e) {
  22. e.printStackTrace();
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. }
  28. );
  29. }
  30. }
  31. }

3.9.小实战

nio实现群聊系统

服务端

  1. package nio.groupChat;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.*;
  6. import java.util.Iterator;
  7. import java.util.Set;
  8. public class ChatServer {
  9. private Selector selector = null;
  10. private ServerSocketChannel serverSocketChannel = null;
  11. private final int port = 9000;
  12. //初始化属性
  13. public ChatServer(){
  14. try {
  15. selector = Selector.open();
  16. serverSocketChannel = ServerSocketChannel.open();
  17. serverSocketChannel.socket().bind(new InetSocketAddress(port));
  18. serverSocketChannel.configureBlocking(false);
  19. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  20. } catch (IOException e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. public void listen() throws IOException {
  25. while (true){
  26. int sum = selector.select(10000);
  27. if(sum > 0){
  28. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
  29. while (iterator.hasNext()){
  30. SelectionKey key = iterator.next();
  31. if(key.isAcceptable()){
  32. SocketChannel client = serverSocketChannel.accept();
  33. client.configureBlocking(false);
  34. client.register(selector,SelectionKey.OP_READ);
  35. System.out.println(client.getRemoteAddress()+"上线");
  36. }else if(key.isReadable()){
  37. SocketChannel client = (SocketChannel) key.channel();
  38. Boolean flag = true;
  39. //读取并广播
  40. readAndBroadcast(client,flag);
  41. System.out.println();
  42. }
  43. iterator.remove();
  44. }
  45. }else{
  46. continue;
  47. //System.out.println("无连接");
  48. }
  49. }
  50. }
  51. /**
  52. * 从指定客户端读取消息
  53. * @param client 客户顿
  54. * @param flag 标志,表示是否可以继续读
  55. */
  56. private void readAndBroadcast(SocketChannel client,Boolean flag){
  57. while (flag) {
  58. ByteBuffer buffer = ByteBuffer.allocate(1024);
  59. String msg = null;
  60. try {
  61. int read = client.read(buffer);
  62. if (read > 0) {
  63. msg = new String(buffer.array());
  64. System.out.println(client.getRemoteAddress()+"说:"+msg);
  65. //接收到消息后广播
  66. broadCast(msg,client);
  67. } else {
  68. //没有消息说明已经不用读了,但通道不用注销
  69. flag = false;
  70. }
  71. } catch (IOException e) {
  72. try {
  73. System.out.println("客户端" + client.getRemoteAddress() + "已下线");
  74. //出现异常,关闭通道
  75. flag = false;
  76. client.close();
  77. } catch (IOException ex) {
  78. ex.printStackTrace();
  79. }
  80. }
  81. }
  82. }
  83. /**
  84. * 广播消息
  85. * @param msg 消息
  86. * @param self 广播对象需要排除自己
  87. */
  88. private void broadCast(String msg,SocketChannel self) throws IOException {
  89. /**
  90. * 广播不需要selector.select()了,直接遍历所有注册到selector的key
  91. * 遍历排除serverSocketChannel与自己
  92. * 发送消息
  93. */
  94. Set<SelectionKey> keys = selector.keys();
  95. for (SelectionKey key : keys) {
  96. SelectableChannel channel = key.channel();
  97. if (channel instanceof SocketChannel && channel != self) {
  98. SocketChannel client = (SocketChannel)channel;
  99. String endMag = "["+self.getRemoteAddress()+"]->"+msg;
  100. ByteBuffer buffer = ByteBuffer.wrap(endMag.getBytes());
  101. client.write(buffer);
  102. }
  103. }
  104. }
  105. public static void main(String[] args) throws IOException {
  106. new ChatServer().listen();
  107. }
  108. }

客户端

  1. package nio.groupChat;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.SocketChannel;
  8. import java.util.Iterator;
  9. import java.util.Scanner;
  10. public class ChatClient {
  11. private Selector selector = null;
  12. private SocketChannel client = null;
  13. private static final String RemoveHost = "localhost";
  14. private static final int port = 9000;
  15. //初始化
  16. public ChatClient(){
  17. try {
  18. selector = Selector.open();
  19. client = SocketChannel.open();
  20. client.configureBlocking(false);
  21. //初始注册为SelectionKey.OP_CONNECT
  22. client.register(selector, SelectionKey.OP_CONNECT);
  23. client.connect(new InetSocketAddress(RemoveHost,port));
  24. } catch (IOException e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. /**
  29. * 确保客户端的连接成功
  30. * @throws IOException
  31. */
  32. public void connect() throws IOException {
  33. selector.select();
  34. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
  35. while (iterator.hasNext()){
  36. SelectionKey key = iterator.next();
  37. if(key.isConnectable()){
  38. while (!client.finishConnect());
  39. client.register(selector,SelectionKey.OP_READ);
  40. }
  41. }
  42. }
  43. public void write() throws IOException {
  44. //必须等到连接建立,如果先调用了上面的connect方法就不需要了
  45. //while (!client.finishConnect());
  46. /**
  47. * 确定连接建立后,就把原来的CONNECT改注册为READ
  48. */
  49. //client.register(selector,SelectionKey.OP_READ);
  50. //发消息
  51. while (true){
  52. Scanner scanner = new Scanner(System.in);
  53. String msg = scanner.nextLine();
  54. ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
  55. try {
  56. client.write(buffer);
  57. } catch (IOException e) {
  58. e.printStackTrace();
  59. }
  60. }
  61. }
  62. public void readInfo() throws IOException {
  63. //必须等到连接建立,如果先调用了上面的connect方法就不需要了
  64. //while (!client.finishConnect());
  65. /**
  66. * 确定连接建立后,就把原来的CONNECT改注册为READ
  67. */
  68. //client.register(selector,SelectionKey.OP_READ);
  69. try {
  70. while (true){
  71. int sum = selector.select(2000);
  72. if(sum > 0){
  73. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
  74. while (iterator.hasNext()){
  75. SelectionKey key = iterator.next();
  76. if(key.isReadable()){
  77. ByteBuffer buffer = ByteBuffer.allocate(1024);
  78. int read = client.read(buffer);
  79. //System.out.print("接收到消息:");
  80. System.out.println(new String(buffer.array()));
  81. }
  82. iterator.remove();
  83. }
  84. }else {
  85. continue;
  86. }
  87. }
  88. } catch (IOException e) {
  89. e.printStackTrace();
  90. }
  91. }
  92. }

客户端开通

  1. package nio.groupChat;
  2. import java.io.IOException;
  3. public class Client0 {
  4. public static void main(String[] args) throws IOException {
  5. ChatClient chatClient0 = new ChatClient();
  6. //如果在write()与readInfo()里面加了确保连接以及修改注册的语句,就可以不用connect()了
  7. /**
  8. * 但是我发现由于初始注册的是CONNECT,就需要在建立连接后修改注册为READ,否则无法接受消息
  9. * 如果不执行connect(),就需要在write()与readInfo()中都添加修改注册的语句
  10. * (因为不能确保当前客户端先执行write()),这样会重复执行两次,不是很好
  11. */
  12. chatClient0.connect();
  13. new Thread(new Runnable() {
  14. @Override
  15. public void run() {
  16. try {
  17. chatClient0.write();
  18. } catch (IOException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. }).start();
  23. new Thread(new Runnable() {
  24. @Override
  25. public void run() {
  26. try {
  27. chatClient0.readInfo();
  28. } catch (IOException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. }).start();
  33. }
  34. }
  1. package nio.groupChat;
  2. import java.io.IOException;
  3. public class Client1 {
  4. public static void main(String[] args) throws IOException {
  5. ChatClient chatClient1 = new ChatClient();
  6. /**
  7. * 确保连接建立
  8. */
  9. chatClient1.connect();
  10. new Thread(new Runnable() {
  11. @Override
  12. public void run() {
  13. try {
  14. chatClient1.write();
  15. } catch (IOException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }).start();
  20. new Thread(new Runnable() {
  21. @Override
  22. public void run() {
  23. try {
  24. chatClient1.readInfo();
  25. } catch (IOException e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. }).start();
  30. }
  31. }
  1. package nio.groupChat;
  2. import java.io.IOException;
  3. public class Client2 {
  4. public static void main(String[] args) throws IOException {
  5. ChatClient chatClient2 = new ChatClient();
  6. /**
  7. * 确保连接建立
  8. */
  9. chatClient2.connect();
  10. new Thread(new Runnable() {
  11. @Override
  12. public void run() {
  13. try {
  14. chatClient2.write();
  15. } catch (IOException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }).start();
  20. new Thread(new Runnable() {
  21. @Override
  22. public void run() {
  23. try {
  24. chatClient2.readInfo();
  25. } catch (IOException e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. }).start();
  30. }
  31. }

3.10.零拷贝

在java程序中,常用的零拷贝实现有mmap(内存映射)和sendFile

关于零拷贝,在另一个md里面,下面是零拷贝在nio中的使用

NewIOClient

  1. import java.io.FileInputStream;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.channels.FileChannel;
  5. import java.nio.channels.SocketChannel;
  6. public class NewIOClient {
  7. public static void main(String[] args) throws IOException {
  8. SocketChannel socketChannel = SocketChannel.open();
  9. socketChannel.connect(new InetSocketAddress("localhost",4399));
  10. String filename="CodeForces.rar";
  11. //得到文件的channel
  12. FileChannel fileChannel = new FileInputStream(filename).getChannel();
  13. long startTimeMillis = System.currentTimeMillis();
  14. //在Linux下一个transto函数可以完成传输
  15. //在windows下一次调用只能发送8M的文件,需要分段传输文件,而且要注意传送时的位置
  16. //transferto底层用到零拷贝,传送到socketchannel中
  17. long transfercount = fileChannel.transferTo(0, fileChannel.size(), socketChannel);
  18. System.out.println("发送的字节数为:"+transfercount+",耗时为:" + (System.currentTimeMillis() - startTimeMillis));
  19. fileChannel.close();
  20. }
  21. }

NewIOServer

  1. import java.io.IOException;
  2. import java.net.InetSocketAddress;
  3. import java.net.ServerSocket;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.ServerSocketChannel;
  6. import java.nio.channels.SocketChannel;
  7. public class NewIOServer {
  8. public static void main(String[] args) throws IOException {
  9. InetSocketAddress inetSocketAddress = new InetSocketAddress(4399);
  10. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  11. ServerSocket serverSocket = serverSocketChannel.socket();
  12. serverSocket.bind(inetSocketAddress);
  13. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  14. while (true){
  15. SocketChannel socketChannel = serverSocketChannel.accept();
  16. int readacount=0;
  17. while (readacount != -1){
  18. try {
  19. readacount = socketChannel.read(byteBuffer);
  20. }catch (Exception e){
  21. break;
  22. }
  23. byteBuffer.rewind(); //将buffer的position置为0,mark作废
  24. }
  25. }
  26. }
  27. }