一 netty 概述

1.1 什么是netty

  1. Netty是由JBOSS提供的一个Java开源框架,现为Github上的独立项目
  2. Netty是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络IO程序
  3. Netty主要针对在TCP协议下,面向Clients端的高并发应用,或者Peer-to-Peer场景下的大量数据持续传输的应用。

    1.2 Netty的应用场景

    互联网行业

  4. 阿里分布式服务框架Dubbo的RPC框架使用Dubbo协议进行节点间通信,Dubbo协议进行节点间通信,Dubbo协议默认使用Netty作为基础通信组件,用于实现各进程节点之间的内部通信

游戏行业

  1. Netty 作为高性能的基础通信组件,提供了 TCP/UDP 和 HTTP 协议栈,方便定制和开发私有协议栈,账号登录服务器
  2. 地图服务器之间可以方便的通过 Netty 进行高性能的通信

大数据领域

  1. 经典的 Hadoop的高性能通信和序列化组件Avro的 RPC 框架,默认采用 Netty 进行跨界点通信
  2. 它的 Netty Service 基于 Netty 框架二次封装实现。

    二 Java IO模型介绍

    2.1 I/O模型简单说明

    I/O模型简单的理解:就是用什么样的通道进行数据的发送和接收,很大程度上决定了程序通信的性能。Java共支持3种网络编程模型IO模式:BIO、NIO、AIO。

  3. Java BIO : 同步并阻塞(传统阻塞型),服务器实现模式为:一个连接对应一个线程,即客户端有连接请求时,服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销。

image.png

  1. Java NIO : 同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理。

image.png

  1. Java AIO(NIO.2) : 异步非阻塞,AIO引入异步通道的概念,采用了Proactor模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。

    2.2 I/O模型使用场景

  2. BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择。

  3. NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4开始支持。
  4. AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。

    2.3 BIO 原理

    同步并阻塞(传统阻塞型),服务器实现模式为:一个连接对应一个线程,即客户端有连接请求时,服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销。可以通过线程池机制改善(实现多个客户端连接服务器),但是不能解决性能问题。

    2.3.1 BIO 工作机制

    image.png

  5. 服务器端启动一个ServerSocket

  6. 客户端启动Socket对服务器进行通信,默认情况下服务器端需要对每个客户建立一个线程与之通讯
  7. 客户端发出请求后, 先咨询服务器是否有线程响应,如果没有则会等待,或者被拒绝
  8. 如果有响应,客户端线程会等待请求结束后,在继续执行

    2.3.2 BIO demo

  9. 使用BIO模型编写一个服务器端,监听6666端口,当有客户端连接时,就启动一个线程与之通讯。

  10. 要求使用线程池机制改善,可以连接多个客户端.
  11. 服务器端可以接收客户端发送的数据(通过cmd的telnet方式即可)。 ```java public class BIOServer { public static void main(String[] args) throws IOException {

    1. //1、创建一个线程池
    2. //2、如果有客户端连接,就创建一个线程,与之通讯(单独写一个方法)
    3. ExecutorService executorService = Executors.newCachedThreadPool();
    4. //创建ServerSocket
    5. ServerSocket serverSocket = new ServerSocket(6666);
    6. System.out.println("服务器启动了");
    7. while (true) {
    8. System.out.println("线程信息:id= "+ Thread.currentThread().getId() + "; 线程名字:" + Thread.currentThread().getName());
    9. //监听,等待客户端连接
    10. System.out.println("等待连接");
    11. final Socket socket = serverSocket.accept();
    12. System.out.println("连接到一个客户端");
    13. //创建一个线程,与之通讯
    14. executorService.execute(() -> {
    15. //重写Runnable方法,与客户端进行通讯
    16. handler(socket);
    17. });
    18. }

    }

    //编写一个Handler方法,和客户端通讯 public static void handler(Socket socket) {

    1. try {
    2. System.out.println("线程信息:id= "+ Thread.currentThread().getId() + "; 线程名字:" + Thread.currentThread().getName());
    3. byte[] bytes = new byte[1024];
    4. //通过socket获取输入流
    5. InputStream inputStream = socket.getInputStream();
    6. //循环的读取客户端发送的数据
    7. while (true){
    8. System.out.println("线程信息:id= "+ Thread.currentThread().getId() + "; 线程名字:" + Thread.currentThread().getName());
    9. System.out.println("read....");
    10. int read = inputStream.read(bytes);
    11. if (read != -1){
    12. System.out.println(new String(bytes, 0, read));//输出客户端发送的数据
    13. } else {
    14. break;
    15. }
    16. }
    17. } catch (IOException e) {
    18. e.printStackTrace();
    19. } finally {
    20. System.out.println("关闭和client的连接");
    21. try {
    22. socket.close();
    23. } catch (IOException e) {
    24. e.printStackTrace();
    25. }
    26. }

    } }

  1. **问题分析:**
  2. 1. 服务端主线程会阻塞在 serverSocket.accept() 这个方法处,当有新的客户端发起请求时,主线程通过线程池调用新线程与其通信,每个通信线程会阻塞在socket.getInputStream() 这个方法处。
  3. 1. 每个请求都需要创建独立的线程,与对应的客户端进行数据 Read,业务处理,数据 Write
  4. 1. 当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大。
  5. 1. 连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在 Read 操作上,造成线程资源浪费
  6. <a name="dB6qB"></a>
  7. ## 2.4 NIO 原理
  8. 1. Java NIO 全称 java non-blocking IO,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输出的新特性,被统称为 NIO(即 New IO),是同步非阻塞的
  9. 1. NIO 有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)
  10. 1. NIO 面向缓冲区 ,或者面向块编程的。数据读取后放到缓冲区中,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络
  11. 1. Java NIO的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直到数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某个通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。
  12. 1. 通俗理解:NIO是可以做到用一个线程来处理多个操作的。假设有10000个请求过来,根据实际情况,可以分配50或者100个线程来处理。不像之前的阻塞IO那样,非得分配10000个。
  13. PS: HTTP2.0使用了多路复用的技术,做到同一个连接并发处理多个请求,而且并发请求的数量比HTTP1.1大了好几个数量级。
  14. <a name="cQn8U"></a>
  15. ### 2.4.1 NIO 工作机制
  16. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/1502688/1600915148822-966978bd-a87e-4130-963f-3a31dec04f28.png#height=454&id=HuOJM&name=image.png&originHeight=454&originWidth=362&originalType=binary&ratio=1&size=110074&status=done&style=none&width=362)
  17. 1. 每个channel 都对应一个Buffer
  18. 1. Selector 对应一个线程,一个线程对应多个channel(连接)
  19. 1. 假设三个channel 注册到selector
  20. 1. Event(事件)决定程序切换到哪个channel
  21. 1. Selector根据不同的事件,在各个通道上切换
  22. 1. Buffer是一块内存,底层是一个数组
  23. 1. 数据的读取写入是通过Buffer, 这个和BIO 一样, BIO中要么是输入流,或者是输出流, 不能双向,但是NIOBuffer 是可以读也可以写, 需要flip方法切换。
  24. <a name="LT38A"></a>
  25. ### 2.4.2 Buffer 介绍
  26. 缓冲区本质上是一个可读可写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由Buffer。<br />![image.png](https://cdn.nlark.com/yuque/0/2020/png/1502688/1600917191630-ede60874-5a34-42a0-9b41-80258b30c563.png#height=114&id=kJ5fH&margin=%5Bobject%20Object%5D&name=image.png&originHeight=272&originWidth=1264&originalType=binary&ratio=1&size=54122&status=done&style=none&width=529)<br />**主要种类**
  27. 1. ByteBuffer,存储字节数据到缓冲区
  28. 1. ShortBuffer,存储字符串数据到缓冲区
  29. 1. CharBuffer,存储字符数据到缓冲区
  30. 1. IntBuffer,存储整数数据到缓冲区
  31. 1. LongBuffer,存储长整型数据到缓冲区
  32. 1. DoubleBuffer,存储小数到缓冲区
  33. 1. FloatBuffer,存储小数到缓冲区
  34. **主要属性**
  35. | **属性** | **描述** |
  36. | --- | --- |
  37. | Capacity | 容量,即可以容纳的最大数据量;在缓冲区创建时被设定并且不能改变 |
  38. | Limit | 表示缓冲区的当前终点,不能对缓冲区超过极限的位置进行读写操作。且极限是可以修改的 |
  39. | Position | 位置,下一个要被读或写的元素的索引,每次读写缓冲区数据时都会改变改值,为下次读写作准备 |
  40. | Mark | 标记 ,一般不会主动修改,在`flip()`<br />被调用后,mark就作废了。 |
  41. **类关系图**<br />![WX20200924-123555@2x.png](https://cdn.nlark.com/yuque/0/2020/png/1502688/1600922201481-86213458-141a-4ce9-8cd7-3f7fde3975f3.png#height=466&id=gNfFV&margin=%5Bobject%20Object%5D&name=WX20200924-123555%402x.png&originHeight=466&originWidth=1648&originalType=binary&ratio=1&size=109600&status=done&style=none&width=1648)<br />MappedByteBuffer 可以直接操作系统内存<br />**主要方法**
  42. 1. public final int capacity();
  43. 1. 直接返回了此缓冲区的容量,capacity
  44. 2. public final int position();
  45. 1. 直接返回了此缓冲区指针的当前位置
  46. 3. public final Buffer position(int newPosition);
  47. 1. 设置此缓冲区的位置,设置position
  48. 4. public final int limit();
  49. 1. 返回此缓冲区的限制
  50. 5. public final Buffer limit(int newLimit);
  51. 1. 设置此缓冲区的限制,设置limit
  52. 6. public final Buffer clear();
  53. 1. 清除此缓冲区,即将各个标记恢复到初识状态, position = 0;limit = capacity; mark = -1,但是并没有删除数据。
  54. 7. public final Buffer flip();
  55. 1. 反转此缓冲区, limit = position;position = 0;mark = -1
  56. 1. 当指定数据存放在缓冲区中后,position所指向的即为此缓冲区数据最后的位置。只有当数据大小和此缓冲区大小相同时,position才和limit的指向相同。
  57. 1. flip()方法将limit置向position position0,那么从position读取数据到limit即为此缓冲区中所有的数据。
  58. 8. public final boolean hasRemaining();
  59. 1. 告知当前位置和限制之间是否有元素。return position < limit;
  60. 9. public abstract boolean isReadOnly();
  61. 1. 此方法为抽象方法,告知此缓冲区是否为只读缓冲区,具体实现在各个实现类中。
  62. 10. public abstract boolean hasArray();
  63. 1. 告知此缓冲区是否具有可访问的底层实现数组
  64. 11. public abstract Object array();
  65. 1. 返回此缓冲区的底层实现数组
  66. **ByteBuffer 方法**
  67. 1. public static ByteBuffer allocateDirect(int capacity);
  68. 1. 创建直接缓冲区
  69. 2. public static ByteBuffer allocate(int capacity) ;
  70. 1. 设置缓冲区的初识容量
  71. 3. public abstract byte get();
  72. 1. 从当前位置positionget数据,获取之后,position会自动加1
  73. 4. public abstract byte get(int index);
  74. 1. 通过绝对位置获取数据。
  75. 5. public abstract ByteBuffer put(byte b);
  76. 1. 从当前位置上添加,put之后,position会自动加1
  77. 6. public abstract ByteBuffer put(int index, byte b);
  78. 1. 从绝对位置上添加数据
  79. 7. public abstract ByteBuffer putXxx(Xxx value [, int index]);
  80. 1. position当前位置插入元素。Xxx表示基本数据类型
  81. 1. 此方法时类型化的 put getput放入的是什么数据类型,get就应该使用相应的数据类型来取出,否则可能有 BufferUnderflowException 异常。
  82. <a name="dWYMv"></a>
  83. ### 2.4.3 Buffer demo
  84. ```java
  85. //创建一个Buffer,大小为5,即可以存放5个int
  86. IntBuffer intBuffer = IntBuffer.allocate(5);
  87. //向buffer中存放数据
  88. for (int i = 0; i < intBuffer.capacity(); i++) {
  89. intBuffer.put(i * 2);
  90. }
  91. //如何从buffer中读取数据
  92. //将buffer转换,读写切换
  93. intBuffer.flip();
  94. while (intBuffer.hasRemaining()) {
  95. System.out.println(intBuffer.get());
  96. }

Buffer 执行原理

  1. Buffer 刚创建时,capacity = 5 ,固定不变。limit指针指向5,position指向0,mark指向-1

image.png

  1. 之后调用 intBuffer.put方法,向buffer中添加数据,会不断移动position指针,最后position变量会和limit指向相同,即limit=position=5

image.png

  1. 调用 buffer.flip()实际上是重置了position和limit两个变量,将limit放在position的位置,即 position=0、limit=5。

image.png

  1. 调用 intBuffer.get方法,实际上是不断移动position指针,直到它移动到limit的位置

    2.4.4 通道(Channel)

  2. 通道可以同时进行读写,而流只能读或者只能写

  3. 通道可以实现异步读写数据
  4. 通道可以从缓存读数据,也可以写数据到缓存

image.png

  1. BIO中的stream是单向的,例如:FileInputStream对象只能进行读取数据的操作,而NIO中的Channel是双向的,可读可写操作。
  2. Channel在 NIO 中是一个接口:public interface Channel extends Closeable{}
  3. 常用的Channel类有:FileChannel、DatagramChannel、ServerSocketChannel(类似ServerSocket)、SocketChannel(类似Socket)
  4. FileChannel 用于文件数据的读写,DatagramChannel用于UDP数据的读写,ServerSocketChannel和SocketChannel用于TCP数据读写。

类图关系
image.png
FileChannel类 常见方法

  1. public int read(ByteBuffer dst)
    1. 从通道读取数据并放到缓冲区中
    2. 此操作也会移动 Buffer 中的position指针,不断往position中放数据,read完成后position指向limit。
  2. public int write(ByteBuffer src)
    1. 把缓冲区的数据写到通道中
    2. 此操作也会不断移动Buffer中的position位置直到limit,读取到的数据就是position到limit这两个指针之间的数据。
  3. public long transferFrom(ReadableByteChannel src, long position, long count)
    1. 从目标通道中复制数据到当前通道
  4. public long transferTo(long position, long count, WritableByteChannel target)

    1. 把数据从当前通道复制给目标通道
    2. 该方法拷贝数据使用了零拷贝,通常用来在网络IO传输中,将FileChannel里面的文件数据直接拷贝到与客户端或者服务端连接的Channel里面从而达到文件传输。

      2.4.5 通道(Channel)demo

  5. 将数据写入到本地文件

    1. public static void main(String[] args) throws IOException {
    2. String str = "hello,world";
    3. //创建一个输出流 -> Channel
    4. FileOutputStream fileOutputStream = new FileOutputStream("/Users/hezhaoming/Downloads/file01.txt");
    5. //通过 FileOutputStream 获取对应的 FileChannel
    6. //这个 FileChannel 真实类型是 FileChannelImpl
    7. FileChannel fileChannel = fileOutputStream.getChannel();
    8. //创建一个缓冲区 ByteBuffer
    9. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    10. //将str放入ByteBuffer
    11. byteBuffer.put(str.getBytes());
    12. //对ByteBuffer进行反转,开始读取
    13. byteBuffer.flip();
    14. //将ByteBuffer数据写入到FileChannel
    15. //此操作会不断移动 Buffer中的 position到 limit 的位置
    16. fileChannel.write(byteBuffer);
    17. fileOutputStream.close();
    18. }

    将数据写入到本地文件.png
    有代码得知我们java原生的流里面内嵌了通道。

  6. 从本地文件读取数据

    1. public static void main(String[] args) throws IOException {
    2. //创建文件的输入流
    3. File file = new File("/Users/hezhaoming/Downloads/file01.txt");
    4. FileInputStream fileInputStream = new FileInputStream(file);
    5. //通过fileInputStream 获取对应的FileChannel -> 实际类型 FileChannelImpl
    6. FileChannel fileChannel = fileInputStream.getChannel();
    7. //创建缓冲区
    8. ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
    9. //将通道的数据读入到buffer
    10. fileChannel.read(byteBuffer);
    11. //将ByteBuffer 的字节数据转成String
    12. System.out.println(new String(byteBuffer.array()));
    13. fileInputStream.close();
    14. }

    本地文件打印输出.png
    3. 文件01输出到文件02

    1. public static void main(String[] args) throws IOException {
    2. //读取
    3. FileInputStream fileInputStream = new FileInputStream("/Users/hezhaoming/Downloads/file01.txt");
    4. FileChannel fileChannel1 = fileInputStream.getChannel();
    5. //输出
    6. FileOutputStream fileOutputStream = new FileOutputStream("/Users/hezhaoming/Downloads/file02.txt");
    7. FileChannel fileChannel2 = fileOutputStream.getChannel();
    8. //缓冲块
    9. ByteBuffer byteBuffer = ByteBuffer.allocate(512);
    10. while (true) {
    11. //清空buffer,由于循环的最后执行了 write 操作,会将 position 移动到 limit 的位置
    12. //清空 Buffer的操作才为上一次的循环重置position的位置
    13. // 如果没有重置position,那么上次读取后,position和limit位置一样,读取后read的值永远为0
    14. byteBuffer.clear();
    15. //将数据存入 ByteBuffer,它会基于 Buffer 此刻的 position 和 limit 的值,
    16. // 将数据放入position的位置,然后不断移动position直到其与limit相等;
    17. int read = fileChannel1.read(byteBuffer);
    18. if (read == -1) { //表示读完
    19. break;
    20. }
    21. //将buffer中的数据写入到 FileChannel02 ---- file02.txt
    22. byteBuffer.flip();
    23. fileChannel2.write(byteBuffer);
    24. }
    25. //关闭相关的流
    26. fileInputStream.close();
    27. fileOutputStream.close();
    28. }

    将一个文件写入到另一个文件.png

    拷贝图片

    1. public static void main(String[] args) throws IOException {
    2. //创建相关流
    3. FileInputStream fileInputStream = new FileInputStream("/Users/hezhaoming/Downloads/tupian.jpeg");
    4. FileOutputStream fileOutputStream = new FileOutputStream("/Users/hezhaoming/Documents/tupian.jpeg");
    5. //获取各个流对应的FileChannel
    6. FileChannel source = fileInputStream.getChannel();
    7. FileChannel dest = fileOutputStream.getChannel();
    8. //使用 transferForm 完成拷贝
    9. dest.transferFrom(source, 0, source.size());
    10. //关闭相关的通道和流
    11. source.close();
    12. dest.close();
    13. fileInputStream.close();
    14. fileOutputStream.close();
    15. }

    2.4.6 通道(ServerSocketChannel)demo

    ServerSocketChannel:主要用于在服务器监听新的客户端Socket连接

  7. public static ServerSocketChannel open()

    1. 得到一个 ServerSocketChannel 通道
  8. public final ServerSocketChannel bind(SocketAddress local)
    1. 设置服务器监听端口
  9. public final SelectableChannel configureBlocking(boolean block)
    1. 用于设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
    2. 此方法位于 ServerSocketChannel 和 SocketChannel的共同父类AbstractSelectableChannel类中
  10. public abstract SocketChannel accept()
    1. 接受一个连接,返回代表这个连接的通道对象
  11. public final SelectionKey register(Selector sel, int ops)
    1. 将Channel注册到选择器并设置监听事件,也可以在绑定的同时注册多个事件,如下所示:
    2. channel.register(selector,Selectionkey.OP_READ | Selectionkey.OP_CONNECT)

SocketChannel:网络IO通道,具体负责进行读写操作。NIO把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区

  1. public static SocketChannel open()
    1. 得到一个SocketChannel通道
  2. public final SelectableChannel configureBlocking(boolean block)
    1. 设置阻塞或非阻塞模式,取值 false表示采用非阻塞模式
    2. 此方法位于 ServerSocketChannel 和 SocketChannel的共同父类AbstractSelectableChannel类中
  3. public abstract boolean connect(SocketAddress remote)
    1. 连接服务器
  4. public boolean finishConnect()
    1. 如果上面的方法连接失败,接下来就要通过该方法完成连接操作
  5. public int write(ByteBuffer src)
    1. 往通道里写数据
    2. 这里写入的是buffer里面position到limit这个之间的数据
  6. public int read(ByteBuffer dst)
    1. 从通道里读数据
  7. public final SelectionKey register(Selector sel, int ops, Object att)
    1. 注册Channel到选择器并设置监听事件,最后一个参数可以设置共享数据
  8. public final void close()

    1. 关闭通道

      1. /**
      2. * Scattering:将数据写入到buffer时,可以采用buffer数组,初次写入 【分散】
      3. * Gathering:从buffer读取数据时,也可以采用buffer数组,依次读
      4. */
      5. public static void main(String[] args) throws IOException {
      6. //使用 ServerSocketChannel 和 SocketChannel
      7. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
      8. InetSocketAddress inetSocketAddress = new InetSocketAddress(7000);
      9. //绑定端口到socket,并启动
      10. serverSocketChannel.socket().bind(inetSocketAddress);
      11. //创建一个Buffer数组
      12. ByteBuffer[] byteBuffers = new ByteBuffer[2];
      13. byteBuffers[0] = ByteBuffer.allocate(5);
      14. byteBuffers[1] = ByteBuffer.allocate(3);
      15. //等待客户端的连接(Telnet)
      16. SocketChannel socketChannel = serverSocketChannel.accept();
      17. int msgLength = 8; //假定从客户端接受8个字节
      18. //循环的读取
      19. while (true) {
      20. int byteRead = 0;
      21. while (byteRead < msgLength) {
      22. long l = socketChannel.read(byteBuffers);
      23. byteRead += l; //累计读取的字节数
      24. System.out.println("byteRead= " + byteRead);
      25. //使用流打印,看看当前这个buffer的position和limit
      26. Arrays.stream(byteBuffers)
      27. .map(buffer -> "position=" + buffer.position() + ", limit = " + buffer.limit())
      28. .forEach(System.out::println);
      29. }
      30. //读书数据后需要将所有的buffer进行flip
      31. Arrays.asList(byteBuffers).forEach(Buffer::flip);
      32. //将数据读出显示到客户端
      33. long byteWrite = 0;
      34. while (byteWrite < msgLength) {
      35. long l = socketChannel.write(byteBuffers);
      36. byteWrite += l;
      37. }
      38. //将所有的 buffer 进行clear操作
      39. Arrays.asList(byteBuffers).forEach(Buffer::clear);
      40. System.out.println("byteRead=" + byteRead + ", byteWrite=" + byteWrite
      41. + ", msgLength=" + msgLength);
      42. }
      43. }

      2.4.7 Selector(选择器)

  9. NIO是非阻塞的IO方式。可以用一个线程处理多个客户端连接,就会使用到Selector(选择器)

  10. Selector检测多个“已注册的chanel”的事件发生(多个Channel以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。
  11. 只有在真正有读写事件发生时,才会进行读写,这就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程
  12. 避免了多线程之间的上下文切换导致的开销

image.png

  1. Netty的IO线程NioEventLoop 聚合了Selector(选择器,也叫多路复用器),可以同时并发处理成百上千个客户端连接。
  2. 当线程从某客户端 Socket 通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。
  3. 线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行IO 操作,所以单独的线程可以管理多个输入和输出通道。
  4. 由于读写操作都是非阻塞的,这就可以充分提升IO线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。
  5. 一个 I/O 线程可以并发处理N个客户端连接和读写操作,这从根本上解决了传统同步阻塞I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。

    2.4.8 SelectionKey

    Selector通过管理SelectionKey的集合从而去监听各个Channel。当Channel注册到Selector上面时,会携带该Channel关注的事件(SelectionKey包含Channel以及与之对应的事件),并会返回一个SelectionKey的对象,Selector将该对象加入到它统一管理的集合中去,从而对Channel进行管理。SelectionKey表示的是Selector和网络通道的注册关系,所以FileChannel是没有办法通过SelectionKey注册到Selector上去的。

image.png

2.4.9 (ServerSocketChannel) Demo

  1. 当客户端连接时,会通过ServerSocketChannel 得到 SocketChannel
  2. Selector 进行监听 select 方法, 返回有事件发生的通道的个数.
  3. 将socketChannel注册到Selector上, register(Selector sel, int ops), 一个selector上可以注册多个SocketChannel
  4. 注册后返回一个 SelectionKey, 会和该Selector 关联(集合)
  5. 进一步得到各个 SelectionKey (有事件发生)
  6. 在通过 SelectionKey 反向获取 SocketChannel , 方法 channel()
  7. 判断该Channel的事件类型,对不同事件进行不同的业务处理 ```java public static void Server() throws IOException { //创建ServerSocketChannel -> ServerSocket ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //得到一个Selector对象 Selector selector = Selector.open(); //绑定一个端口6666 serverSocketChannel.socket().bind(new InetSocketAddress(6666)); //设置非阻塞 serverSocketChannel.configureBlocking(false); //把 serverSocketChannel 注册到 selector ,关心事件为:OP_ACCEPT,有新的客户端连接 SelectionKey register = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    //循环等待客户端连接 while (true) {

    1. //等待1秒,如果没有事件发生,就返回
    2. if (selector.select(1000) == 0) {
    3. System.out.println("服务器等待了1秒,无连接");
    4. continue;
    5. }
    6. //如果返回的 > 0,表示已经获取到关注的事件
    7. // 就获取到相关的 selectionKey 集合,反向获取通道
    8. Set<SelectionKey> selectionKeys = selector.selectedKeys();
    9. //遍历 Set<SelectionKey>,使用迭代器遍历
    10. Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
    11. while (keyIterator.hasNext()) {
    12. //获取到SelectionKey
    13. SelectionKey key = keyIterator.next();
    14. //根据 key 对应的通道发生的事件,做相应的处理
    15. if (key.isAcceptable()) {//如果是 OP_ACCEPT,有新的客户端连接
    16. //该客户端生成一个 SocketChannel
    17. SocketChannel socketChannel = serverSocketChannel.accept();
    18. System.out.println("客户端连接成功,生成了一个SocketChannel:" + socketChannel.hashCode());
    19. //将SocketChannel设置为非阻塞
    20. socketChannel.configureBlocking(false);
    21. //将socketChannel注册到selector,关注事件为 OP_READ,同时给SocketChannel关联一个Buffer
    22. socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
    23. }
    24. if (key.isReadable()) {
    25. //通过key,反向获取到对应的Channel
    26. SocketChannel channel = (SocketChannel) key.channel();
    27. //获取到该channel关联的Buffer
    28. ByteBuffer buffer = (ByteBuffer) key.attachment();
    29. channel.read(buffer);
    30. System.out.println("from 客户端:" + new String(buffer.array()));
    31. }
    32. //手动从集合中移除当前的 selectionKey,防止重复操作
    33. keyIterator.remove();
    34. }

    } }

public static void Client() throws IOException { //得到一个网络通道 SocketChannel socketChannel = SocketChannel.open(); //设置非阻塞 socketChannel.configureBlocking(false); //提供服务器端的IP和端口 InetSocketAddress socketAddress = new InetSocketAddress(“127.0.0.1”, 6666); //连接服务器 if (!socketChannel.connect(socketAddress)) { //如果不成功 while (!socketChannel.finishConnect()) { System.out.println(“因为连接需要时间,客户端不会阻塞,可以做其他工作。。。”); } } //如果连接成功,就发送数据 String str = “hello, world”; ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes()); //发送数据,实际上就是将buffer数据写入到channel socketChannel.write(byteBuffer); System.in.read(); }

  1. <a name="Cx8qN"></a>
  2. ## 2.5 NIO 群聊系统demo
  3. <a name="mqnCO"></a>
  4. ### 2.5.1 服务端
  5. 老套路:
  6. 1. 初始化服务器
  7. 1. 新建选择器 **selector**
  8. 1. **新建服务端通道 **ServerSocketChannel,并设置非阻塞
  9. 1. 通道注册到selector
  10. 2. 监听客户端事件
  11. 1. 轮询一直去获取selector中发生的事件
  12. 1. 如果发生了事件,那么遍历SelectionKey ,获取各个通道的事件
  13. 1. 判断事件是连接事件,则处理连接,将客户端通道注册到选择器中,并设置非阻塞
  14. 1. 判断事件是读事件,则处理读数据,首先获取通道,建立Buffer块,将数据读取到块中,如果读到数据,将数据转发给其他所有的通道
  15. 1. 遍历**selector**.keys() 获取所有的通道,然后遍历发送,注意,排除自身的通道。
  16. ```java
  17. /********************服务端********************/
  18. public class GroupChatServer {
  19. //定义属性
  20. private Selector selector;
  21. private ServerSocketChannel listenChannel;
  22. private static final int PORT = 6666;
  23. //构造器
  24. //初始化工作
  25. public GroupChatServer() {
  26. try {
  27. //得到选择器
  28. selector = Selector.open();
  29. listenChannel = ServerSocketChannel.open();
  30. //绑定端口
  31. listenChannel.socket().bind(new InetSocketAddress(PORT));
  32. //设置非阻塞模式
  33. listenChannel.configureBlocking(false);
  34. //将listenChannel注册到selector,绑定监听事件
  35. listenChannel.register(selector, SelectionKey.OP_ACCEPT);
  36. } catch (IOException e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. //监听
  41. public void listen() {
  42. try {
  43. //循环处理
  44. while (true) {
  45. int count = selector.select();
  46. if (count > 0) { //有事件处理
  47. //遍历得到SelectionKey集合
  48. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
  49. while (iterator.hasNext()) {
  50. //取出SelectionKey
  51. SelectionKey key = iterator.next();
  52. //监听到accept,连接事件
  53. if (key.isAcceptable()) {
  54. SocketChannel socketChannel = listenChannel.accept();
  55. //将该channel设置非阻塞并注册到selector
  56. socketChannel.configureBlocking(false);
  57. socketChannel.register(selector, SelectionKey.OP_READ);
  58. //提示
  59. System.out.println(socketChannel.getRemoteAddress() + " 上线...");
  60. }
  61. if (key.isReadable()) { //通道可以读取数据,即server端收到客户端的消息,
  62. //处理读(专门写方法)
  63. readData(key);
  64. }
  65. iterator.remove();
  66. }
  67. } else {
  68. System.out.println("等待。。。");
  69. }
  70. }
  71. } catch (Exception e) {
  72. e.printStackTrace();
  73. }
  74. }
  75. //读取客户端消息
  76. private void readData(SelectionKey key) {
  77. //定义一个SocketChannel
  78. SocketChannel channel = null;
  79. try {
  80. //取到关联的channel
  81. channel = (SocketChannel) key.channel();
  82. //创建缓冲buffer
  83. ByteBuffer buffer = ByteBuffer.allocate(1024);
  84. int count = channel.read(buffer);
  85. //根据count值判断是否读取到数据
  86. if (count > 0) {
  87. //把缓冲区的数据转成字符串
  88. String msg = new String(buffer.array());
  89. //输出该消息
  90. System.out.println("from 客户端:" + msg);
  91. //向其他的客户端转发消息(去掉自己),专门写一个方法处理
  92. sendInfoToOtherClients(msg, channel);
  93. }
  94. } catch (IOException e) {
  95. try {
  96. System.out.println(channel.getRemoteAddress() + "离线了...");
  97. //取消注册
  98. key.cancel();
  99. //关闭通道
  100. channel.close();
  101. } catch (IOException ex) {
  102. ex.printStackTrace();
  103. }
  104. }
  105. }
  106. //转发消息给其他客户端(通道)
  107. private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException {
  108. System.out.println("服务器转发消息中。。。");
  109. //遍历 所有注册到selector上的SocketChannel,并排除self
  110. for (SelectionKey key : selector.keys()) {
  111. //通过key取出对应的SocketChannel
  112. Channel targetChannel = key.channel();
  113. //排除自己
  114. if (targetChannel instanceof SocketChannel && targetChannel != self) {
  115. //转型
  116. SocketChannel dest = (SocketChannel) targetChannel;
  117. //将msg,存储到buffer
  118. ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
  119. //将buffer的数据写入通道
  120. dest.write(buffer);
  121. }
  122. }
  123. }
  124. public static void main(String[] args) {
  125. //创建服务器对象
  126. GroupChatServer groupChatServer = new GroupChatServer();
  127. groupChatServer.listen();
  128. }
  129. }

2.5.2 客户端

老套路:

  1. 注册服务器
    1. 新增选择器
    2. 创建连接服务器,并设置非阻塞
    3. 通道注册到选择器上
  2. 发送消息
    1. 通道直接写Buffer数据即可
  3. 死循环读信息

    1. 判断是否可用的通道
    2. 遍历selectedKeys 获取各个通道(客户端也是多通道)的事件
    3. 判断事件,如果是读事件,那么获取数据,并打印 ```java /*客户端**/ public class GroupChatClient { //定义相关的属性 private static final String HOST = “127.0.0.1”; //服务器的IP地址 private static final int PORT = 6666; //服务器端口 private Selector selector; private SocketChannel socketChannel; private String username;

      //构造器,初始化操作 public GroupChatClient() throws IOException { selector = Selector.open(); //连接服务器 socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT)); //设置非阻塞 socketChannel.configureBlocking(false); //将channel注册到selector socketChannel.register(selector, SelectionKey.OP_READ); //得到username username = socketChannel.getLocalAddress().toString().substring(1); System.out.println(username + “ is ok…”); }

      //向服务器发送消息 public void sendInfo(String info) { info = username + “ 说:” + info; try {

      1. socketChannel.write(ByteBuffer.wrap(info.getBytes()));

      } catch (IOException e) {

      1. e.printStackTrace();

      } }

      //读取从服务器端回复的消息 public void readInfo() { try {

      1. int readChannels = selector.select();
      2. if (readChannels > 0) {//有可用的通道
      3. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
      4. while (iterator.hasNext()) {
      5. SelectionKey key = iterator.next();
      6. if (key.isReadable()) {
      7. //得到相关的通道
      8. SocketChannel sc = (SocketChannel) key.channel();
      9. //得到一个buffer
      10. ByteBuffer buf = ByteBuffer.allocate(1024);
      11. //读取
      12. sc.read(buf);
      13. //把缓冲区的数据转成字符串
      14. String msg = new String(buf.array());
      15. System.out.println(msg.trim());
      16. }
      17. }
      18. } else {
      19. System.out.println("没有可以用的通道...");
      20. }

      } catch (Exception e) { } }

      public static void main(String[] args) throws IOException { //启动客户端 GroupChatClient chatClient = new GroupChatClient(); //启动一个线程用于读取服务器的消息 new Thread(() -> {

      1. while (true) {
      2. chatClient.readInfo();
      3. try {
      4. Thread.sleep(3000);
      5. } catch (InterruptedException e) {
      6. e.printStackTrace();
      7. }
      8. }

      }).start(); //主线程用于发送数据给服务器端 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) {

      1. String s = scanner.nextLine();
      2. chatClient.sendInfo(s);

      } } }

  1. <a name="tEmDD"></a>
  2. ### 2.5.3 总结
  3. 使用int read = channel.read(buffer)读取数据时,读取的结果情况:
  4. 1. 当read=-1时,说明客户端的数据发送完毕,并且主动的关闭socket。所以这种情况下,服务器程序需要关闭socketSocket,并且取消key的注册。注意:这个时候继续使用SocketChannel进行读操作的话,就会抛出:==远程主机强迫关闭一个现有的连接==的IO异常
  5. 1. 当read=0时:
  6. 1. 某一时刻SocketChannel中当前没有数据可读。
  7. 1. 客户端的数据发送完毕。
  8. <a name="5rKQN"></a>
  9. ## 2.6 NIO的零拷贝
  10. 零拷贝是网络编程的关键,很多性能优化都离不开它。零拷贝是指:从操作系统的角度来看,文件的传输不存在CPU的拷贝,只存在DMA拷贝。在Java程序中,常用的零拷贝有 mmap(内存映射)和 sendFile。零拷贝不仅仅带来更少的数据复制,还能减少线程的上下文切换,减少CPU缓存伪共享以及无CPU校验和计算。
  11. <a name="SvXSj"></a>
  12. ### 2.6.1 传统IO拷贝
  13. ```java
  14. File file = new File("test.txt");
  15. RandomAccessFile raf = new RandomAccessFile(file, "rw");
  16. byte[] arr = new byte[(int) file.length()];
  17. raf.read(arr);
  18. Socket socket = new ServerSocket(8080).accept();
  19. socket.getOutputStream().write(arr);

image.png

两次CPU拷贝
DMA: direct memory access 直接内存拷贝(不使用CPU)

2.6.2 mmap优化的IO

直接内存拷贝,需要一次CPU拷贝
image.png

2.6.3 sendFile 优化的IO

需要一次CPU拷贝
image.png

2.6.4 linux2.4 零拷贝

需要0次cpu拷贝
image.png

2.6.5 零拷贝demo

transferTo

  1. public static void main(String[] args) throws Exception {
  2. SocketChannel socketChannel = SocketChannel.open();
  3. socketChannel.connect(new InetSocketAddress("localhost", 7001));
  4. String filename = "protoc-3.6.1-win32.zip";
  5. //得到一个文件channel
  6. FileChannel fileChannel = new FileInputStream(filename).getChannel();
  7. //准备发送
  8. long startTime = System.currentTimeMillis();
  9. //在linux下一个transferTo 方法就可以完成传输
  10. //在windows 下 一次调用 transferTo 只能发送8m , 就需要分段传输文件, 而且要主要
  11. //transferTo 底层使用到零拷贝
  12. long transferCount = fileChannel.transferTo(0, fileChannel.size(), socketChannel);
  13. System.out.println("发送的总的字节数 =" + transferCount + " 耗时:" + (System.currentTimeMillis() - startTime));
  14. //关闭
  15. fileChannel.close();
  16. }

2.6.6 总结

  1. 零拷贝,是从操作系统的角度来说的。因为内核缓冲区之间,没有数据是重复的(只有kernel buffer 有一份数据)。
  2. 零拷贝不仅仅带来更少的数据复制,还能带来其他的性能优势,例如更少的上下文切换,更少的CPU 缓存伪共享以及无 CPU 校验和计算。

    三 Netty高性能架构设计

    3.1 原生IO的问题

  3. NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。

  4. 需要具备其他的额外技能:要熟悉 Java 多线程编程,因为NIO编程涉及到Reactor模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的NIO程序。
  5. 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。
  6. JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。直到 JDK 1.7 版本该问题仍旧存在,没有被根本解决。

    3.2 netty的优势

    Netty对JDK自带的NIO的API进行了封装,解决了上述问题。

  7. 设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池.

  8. 安全:完整的 SSL/TLS 和 StartTLS 支持
  9. 高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。

    3.3 I/O线程模型

  10. 目前存在的线程模型主要有:

    1. 传统阻塞I/O服务模型
    2. Reactor模式
  11. 根据Reactor的数量和处理资源池线程的数量不同,有如下3种典型的实现
    1. 单Reactor单线程
    2. 单Reactor多线程
    3. 主从Reactor多线程
  12. Netty线程模型主要基于主从Reactor多线程模型做了一定的改进,其中主从Reactor多线程模型有多个Reactor。

    3.3.1 传统阻塞I/O服务模型

  13. 模型特点:

    1. 采用阻塞IO模式获取输入的数据
    2. 每个链接都需要独立的线程完成数据的输入,业务处理、数据返回。
  14. 问题分析:
    1. 当并发数很大,就会创建大量的线程,占用很大系统资源
    2. 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在read操作,造成线程资源浪费。

image.png
图解说明:黄色的框表示对象,蓝色的框表示线程、白色的框表示方法(API)。之后的图相同。

3.3.2 传统阻塞I/O服务模型 demo

由于模型的逻辑主要集中在服务端,所以所有模型代码示例基本上都是服务端的示例

  1. public static void main(String[] args) throws IOException {
  2. //1、创建一个线程池
  3. //2、如果有客户端连接,就创建一个线程,与之通讯(单独写一个方法)
  4. ExecutorService executorService = Executors.newCachedThreadPool();
  5. //创建ServerSocket
  6. ServerSocket serverSocket = new ServerSocket(6666);
  7. System.out.println("服务器启动了");
  8. while (true) {
  9. //监听,等待客户端连接
  10. final Socket socket = serverSocket.accept();
  11. System.out.println("连接到一个客户端");
  12. //创建一个线程,与之通讯
  13. executorService.execute(() -> {
  14. //重写Runnable方法,与客户端进行通讯
  15. handler(socket);
  16. });
  17. }
  18. }
  19. //编写一个Handler方法,和客户端通讯。主要进行数据的读取和业务处理。
  20. public static void handler(Socket socket) {
  21. try {
  22. byte[] bytes = new byte[1024];
  23. //通过socket获取输入流
  24. InputStream inputStream = socket.getInputStream();
  25. //循环的读取客户端发送的数据
  26. while (true){
  27. int read = inputStream.read(bytes);
  28. if (read != -1){
  29. System.out.println(new String(bytes, 0, read));//输出客户端发送的数据
  30. } else {
  31. break;
  32. }
  33. }
  34. } catch (IOException e) {
  35. e.printStackTrace();
  36. } finally {
  37. System.out.println("关闭和client的连接");
  38. try {
  39. socket.close();
  40. } catch (IOException e) {
  41. e.printStackTrace();
  42. }
  43. }
  44. }

3.3.3 Reactor模型

针对传统阻塞I/O服务模型的2个缺点,解决方案如下:

  1. 基于I/O复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。Reactor对应的叫法: 1. 反应器模式 2. 分发者模式(Dispatcher) 3. 通知者模式(notifier)
  2. 基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。

I/O复用结合线程池,就是Reactor模式基本设计思想,如图所示:
image.png

  1. Reactor模式,通过一个或多个输入同时传递给服务处理器的模式(基于事件驱动)
  2. 服务器端程序处理传入的多个请求,并将它们同步分派到响应的处理线程,因此Reactor模式也叫Dispatcher模式。
  3. Reactor模式使用IO复用监听事件,收到事件后,分发的某个线程(进程),这点就是网络服务高并发处理的关键。

    3.3.4 Reactor核心组成部分

  4. Reactor:Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对IO事件作出反应。

    1. 我的理解是将Reactor理解成一个Selector,它可以对建立新的连接,也可以将产生的读写事件交换给Handler进行处理
  5. Handlers:处理程序执行I/O事件要完成的实际事件, Reactor通过调度适当的处理程序来响应I/O事件,处理程序执行非阻塞操作。

    3.3.5 单Reactor单线程模式

    image.png
    方案说明:

  6. Select是前面 I/O复用模型介绍的标准网络编程API,可以实现应用程序通过一个阻塞对象监听多路连接请求

  7. Reactor对象通过 Select 监控客户端请求事件,收到事件后通过Dispatch 进行分发
  8. 如果是建立连接请求事件,则由Acceptor通过Accept处理连接请求,然后创建一个Handler对象处理连接完成后的后续业务处理
  9. 如果不是建立连接事件,则Reactor会分发调用连接对应的Handler来响应
  10. Handler会完成Read→业务处理→Send 的完整业务流程

结合实例
服务器端用一个线程通过多路复用搞定所有的IO操作(包括连接,读、写等),编码简单,清晰明了,但是如果客户端连接数量较多,将无法支撑,前面的 NIO 案例就属于这种模型。

模型分析

  1. 优点
    1. 模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成
  2. 缺点
    1. 性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。Handler 在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈
    2. 可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障
  3. 使用场景

    1. 客户端的数量有限,业务处理非常快速,比如 Redis在业务处理的时间复杂度 O(1) 的情况

      3.3.6 单Reactor单线程demo

      ```java public class SReactorSThread { private Selector selector; private ServerSocketChannel serverSocketChannel; private int PORT = 6666;

      public SReactorSThread() { try {

      1. selector = Selector.open();
      2. serverSocketChannel = ServerSocketChannel.open();
      3. serverSocketChannel.bind(new InetSocketAddress(PORT));
      4. serverSocketChannel.configureBlocking(false);
      5. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

      } catch (IOException e) {

      1. e.printStackTrace();

      } }

      //对客户端进行监听 public void listen() { try {

      1. while (true) {
      2. int count = selector.select();
      3. //表示有客户端产生事件
      4. if (count > 0) {
      5. Set<SelectionKey> selectionKeys = selector.selectedKeys();//取出产生事件的Channel
      6. Iterator<SelectionKey> iterator = selectionKeys.iterator();//准备对其进行遍历
      7. while (iterator.hasNext()) {
      8. SelectionKey key = iterator.next();
      9. //将key交给dispatch去处理
      10. dispatch(key);
      11. iterator.remove();
      12. }
      13. }
      14. }

      } catch (Exception e) {

      1. e.printStackTrace();

      } }

      //dispatch private void dispatch(SelectionKey key) { if (key.isAcceptable()){

      1. accept(key);

      }else {

      1. handler(key);

      } }

      //建立新的连接 private void accept(SelectionKey key) { try {

      1. SocketChannel socketChannel = serverSocketChannel.accept();
      2. socketChannel.configureBlocking(false);
      3. socketChannel.register(selector, SelectionKey.OP_READ);

      } catch (IOException e) {

      1. e.printStackTrace();

      } }

      //对请求进行处理,接收消息—-业务处理—-返回消息 private void handler(SelectionKey key) { SocketChannel channel = null; try {

      1. channel = (SocketChannel) key.channel();
      2. ByteBuffer buffer = ByteBuffer.allocate(3);
      3. StringBuilder msg = new StringBuilder();
      4. while (channel.read(buffer) > 0) {
      5. msg.append(new String(buffer.array()));
      6. buffer.clear();
      7. }
      8. System.out.println("接收到消息:" + msg.toString());
      9. //发送消息
      10. String ok = "OK";
      11. buffer.put(ok.getBytes());
      12. //这个flip非常重要哦,是将position置0,limit置于position的位置,以便下面代码进行写入操作能够正确写入buffer中的所有数据
      13. buffer.flip();
      14. channel.write(buffer);
      15. buffer.clear();

      } catch (IOException e) {

      1. try {
      2. System.out.println(channel.getRemoteAddress() + "离线了");
      3. //取消该通道的注册并关闭通道,这里非常重要,没有这一步的话当客户端断开连接就会不断抛出IOException
      4. //是因为,select会一直产生该事件。
      5. key.cancel();
      6. channel.close();
      7. } catch (IOException ex) {
      8. ex.printStackTrace();
      9. }

      } } }

/**调用**/

public static void main(String[] args) { SReactorSThread sReactorSThread = new SReactorSThread(); sReactorSThread.listen(); }

  1. <a name="v4HpT"></a>
  2. ### 3.3.7 单Reactor多线程模式
  3. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/1502688/1601003334463-6bdc8bdd-000e-4560-846e-a87fe2752056.png#height=586&id=yPWLE&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1172&originWidth=1416&originalType=binary&ratio=1&size=1610871&status=done&style=none&width=708)
  4. 1. Reactor 对象通过select 监控客户端请求事件, 收到事件后,通过dispatch进行分发
  5. 1. 如果建立连接请求, 则由Acceptor 通过accept 处理连接请求, 然后创建一个Handler对象处理完成连接后的各种事件
  6. 1. 如果不是连接请求,则由reactor分发调用连接对应的handler 来处理
  7. 1. handler 只负责响应事件,不做具体的业务处理, 通过read 读取数据后,会分发给后面的worker线程池的某个线程处理业务
  8. 1. worker 线程池会分配独立线程完成真正的业务,并将结果返回给handler
  9. 1. handler收到响应后,通过send 将结果返回给client
  10. **模型分析**
  11. 1. **优点**:
  12. 1. 可以充分的利用多核cpu 的处理能力
  13. 2. **缺点**:
  14. 1. 多线程数据共享和访问比较复杂, reactor 处理所有的事件的监听和响应,在单线程运行, 在高并发场景容易出现性能瓶颈.
  15. <a name="DwjnY"></a>
  16. ### 3.3.8 主从Reactor多线程模式
  17. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/1502688/1601003821019-c9bee28e-3f43-4ed5-a7ff-83b131849ceb.png#height=594&id=SR9fV&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1188&originWidth=1224&originalType=binary&ratio=1&size=1308895&status=done&style=none&width=612)<br />**方案说明:**
  18. 1. Reactor主线程MainReactor 对象就只注册一个用于监听连接请求的ServerSocketChannel,通过select 监听连接事件, 收到事件后,通过Acceptor 处理连接事件
  19. 1. 当 Acceptor 处理连接事件后,MainReactor 通过accept获取新的连接,并将连接注册到SubReactor
  20. 1. subreactor 将连接加入到连接队列进行监听,并创建handler进行各种事件处理
  21. 1. 当有新事件发生时, subreactor 就会调用对应的handler处理
  22. 1. handler 通过read 读取数据,分发给后面的worker 线程处理
  23. 1. worker 线程池分配独立的worker 线程进行业务处理,并返回结果
  24. 1. handler 收到响应的结果后,再通过send 将结果返回给client
  25. 1. Reactor 主线程可以对应多个Reactor 子线程, 即MainRecator 可以关联多个SubReactor
  26. 1. <br />
  27. **模型分析**
  28. 1. **优点**:
  29. 1. 父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。
  30. 1. 父线程与子线程的数据交互简单,Reactor 主线程只需要把新连接传给子线程,子线程无需返回数据
  31. 2. **缺点**:
  32. 1. 编程复杂度较高
  33. 3. **结合实例**:
  34. 1. 这种模型在许多项目中广泛使用,包括 Nginx 主从 Reactor 多进程模型,Memcached 主从多线程,Netty 主从多线程模型的支持
  35. <a name="uWB1b"></a>
  36. ### 3.3.9 总结
  37. 1. 单 Reactor单线程,前台接待员和服务员是同一个人,全程为顾客服
  38. 1. 单 Reactor多线程,1 个前台接待员,多个服务员,接待员只负责接待
  39. 1. 主从 Reactor多线程,多个前台接待员,多个服务生
  40. **优点:**
  41. 1. 响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的
  42. 1. 可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销
  43. 1. 扩展性好,可以方便的通过增加Reactor实例个数来充分利用CPU 资源
  44. 1. 复用性好,Reactor模型本身与具体事件处理逻辑无关,具有很高的复用性
  45. <a name="WKz5K"></a>
  46. ## 3.4 netty 模型
  47. <a name="yoD4s"></a>
  48. ### 3.4.1 netty基础版
  49. Netty主要是基于主从Reactor多线程模式做了一定的改进,其中主从Reactor都有单一的一个变成了多个。下面是简单的改进图。<br />![image.png](https://cdn.nlark.com/yuque/0/2020/png/1502688/1601008652707-5e11e71c-3830-4c11-b5df-e3a37744bd8b.png#height=379&id=biLrn&margin=%5Bobject%20Object%5D&name=image.png&originHeight=758&originWidth=2162&originalType=binary&ratio=1&size=166402&status=done&style=none&width=1081)
  50. 1. 增加了BossGroup来维护多个主Reactor,主Reactor还是只关注连接的Accept;增加了WorkGroup来维护多个从Reactor,从Reactor将接收到的请求交给Handler进行处理。
  51. 1. 在主Reactor中接收到Accept事件,获取到对应的SocketChannel,Netty会将它进一步封装成NIOSocketChannel对象,这个封装后的对象还包含了该Channel对应的SelectionKey、通信地址等详细信息
  52. 1. Netty会将装个封装后的Channel对象注册到WorkerGroup中的从Reactor中。
  53. 1. 当WorkerGroup中的从Reactor监听到事件后,就会将之交给与此Reactor对应的Handler进行处理
  54. <a name="kY8Zn"></a>
  55. ### 3.4.2 netty进阶版
  56. Netty主要基于主从Reactors多线程模型(如图)做了一定的改进,其中主从 Reactor多线程模型有多个Reactor<br />![image.png](https://cdn.nlark.com/yuque/0/2020/png/1502688/1601008887502-43d77c16-4163-419d-9884-232bd5851daa.png#height=425&id=gqyyD&margin=%5Bobject%20Object%5D&name=image.png&originHeight=850&originWidth=1528&originalType=binary&ratio=1&size=1253248&status=done&style=none&width=764)
  57. <a name="jwx3r"></a>
  58. ### 3.4.3 netty最终版
  59. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/1502688/1601009009399-d4c993d9-cfab-401b-9699-d1a2573eb350.png#height=571&id=kvblf&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1142&originWidth=1442&originalType=binary&ratio=1&size=1242826&status=done&style=none&width=721)
  60. 1. Netty抽象出两组线程池,BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写
  61. 1. BossGroup和WorkerGroup类型的本质都是NioEventLoopGroup类型。
  62. 1. NioEventLoopGroup相当于一个线程管理器(类似于ExecutorServevice),它下面维护很多个NioEventLoop线程。
  63. 1. 在初始化这两个Group线程组时,默认会在每个Group中生成CPU*2个NioEventLoop线程
  64. 1. 当n个连接来了,Group默认会按照连接请求的顺序分别将这些连接分给各个NioEventLoop去处理。
  65. 1. 同时Group还负责管理EventLoop的生命周期。
  66. 4. NioEventLoop表示一个不断循环的执行处理任务的线程
  67. 1. 它维护了一个线程和任务队列。
  68. 1. 每个NioEventLoop都包含一个Selector,用于监听绑定在它上面的socket通讯。
  69. 1. 每个NioEventLoop相当于Selector,负责处理多个Channel上的事件
  70. 1. 每增加一个请求连接,NioEventLoopGroup就将这个请求依次分发给它下面的NioEventLoop处理。
  71. 5. 每个Boss NioEventLoop循环执行的步骤有3步:
  72. 1. 轮询accept事件
  73. 1. 处理accept事件,与client建立连接,生成NioSocketChannel,并将其注册到某个Worker NioEventLoop的selector上。
  74. 1. 处理任务队列到任务,即runAllTasks
  75. 6. 每个Worker NioEventLoop循环执行的步骤:
  76. 1. 轮询read,write事件
  77. 1. 处理I/O事件,即read,write事件,在对应的NioSocketChannel中进行处理
  78. 1. 处理任务队列的任务,即runAllTasks
  79. 7. 每个 Worker NioEventLoop处理业务时,会使用pipeline(管道),pipeline中维护了一个ChannelHandlerContext链表,而ChannelHandlerContext则保存了Channel相关的所有上下文信息,同时关联一个ChannelHandler对象。如图所示,Channel和pipeline一一对应,ChannelHandler和ChannelHandlerContext一一对应。
  80. <a name="JIYaQ"></a>
  81. ### 3.4.4 netty demo
  82. ```java
  83. public class NettyServer {
  84. public static void main(String[] args) throws InterruptedException {
  85. //创建BossGroup 和 WorkerGroup
  86. //1、创建两个线程组,bossGroup 和 workerGroup
  87. //2、bossGroup 只是处理连接请求,真正的和客户端业务处理,会交给 workerGroup 完成
  88. //3、两个都是无限循环
  89. //4、bossGroup 和 workerGroup 含有的子线程(NioEventLoop)个数为实际 cpu 核数 * 2
  90. EventLoopGroup bossGroup = new NioEventLoopGroup();
  91. EventLoopGroup worderGroup = new NioEventLoopGroup();
  92. try {
  93. //创建服务器端的启动对象,配置参数
  94. ServerBootstrap bootstrap = new ServerBootstrap();
  95. //使用链式编程来进行设置,配置
  96. bootstrap.group(bossGroup, worderGroup) //设置两个线程组
  97. .channel(NioServerSocketChannel.class) //使用 NioServerSocketChannel 作为服务器的通道实现
  98. .option(ChannelOption.SO_BACKLOG, 128) //设置线程队列得到连接个数
  99. .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
  100. .childHandler(new ChannelInitializer<SocketChannel>() { //为accept channel的pipeline预添加的handler
  101. //给 pipeline 添加处理器,每当有连接accept时,就会运行到此处。
  102. @Override
  103. protected void initChannel(SocketChannel socketChannel) throws Exception {
  104. socketChannel.pipeline().addLast(new NettyServerHandler());
  105. }
  106. }); //给我们的 workerGroup 的 EventLoop 对应的管道设置处理器
  107. System.out.println("........服务器 is ready...");
  108. //绑定一个端口并且同步,生成了一个ChannelFuture 对象
  109. //启动服务器(并绑定端口)
  110. ChannelFuture future = bootstrap.bind(6668).sync();
  111. //对关闭通道进行监听
  112. future.channel().closeFuture().sync();
  113. } finally {
  114. bossGroup.shutdownGracefully();
  115. worderGroup.shutdownGracefully();
  116. }
  117. }
  118. }

服务端Handler处理

  1. public class NettyServerHandler extends ChannelInboundHandlerAdapter {
  2. /**
  3. *读取客户端发送过来的消息
  4. * @param ctx 上下文对象,含有 管道pipeline,通道channel,地址
  5. * @param msg 就是客户端发送的数据,默认Object
  6. * @throws Exception
  7. */
  8. @Override
  9. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  10. System.out.println("服务器读取线程:" + Thread.currentThread().getName());
  11. System.out.println("server ctx = " + ctx);
  12. //看看Channel和Pipeline的关系
  13. Channel channel = ctx.channel();
  14. ChannelPipeline pipeline = ctx.pipeline(); //本质是个双向链表,出栈入栈
  15. //将msg转成一个ByteBuf,比NIO的ByteBuffer性能更高
  16. ByteBuf buf = (ByteBuf)msg;
  17. System.out.println("客户端发送的消息是:" + buf.toString(CharsetUtil.UTF_8));
  18. System.out.println("客户端地址:" + ctx.channel().remoteAddress());
  19. }
  20. //数据读取完毕
  21. @Override
  22. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  23. //它是 write + flush,将数据写入到缓存buffer,并将buffer中的数据flush进通道
  24. //一般讲,我们对这个发送的数据进行编码
  25. ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~", CharsetUtil.UTF_8));
  26. }
  27. //处理异常,一般是关闭通道
  28. @Override
  29. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  30. ctx.close();
  31. }
  32. }

客户端

  1. public class NettyClient {
  2. public static void main(String[] args) throws InterruptedException {
  3. //客户端需要一个事件循环组
  4. EventLoopGroup group = new NioEventLoopGroup();
  5. try {
  6. //创建客户端启动对象
  7. //注意:客户端使用的不是 ServerBootStrap 而是 Bootstrap
  8. Bootstrap bootstrap = new Bootstrap();
  9. //设置相关参数
  10. bootstrap.group(group) //设置线程组
  11. .channel(NioSocketChannel.class) //设置客户端通道的实现类(使用反射)
  12. .handler(new ChannelInitializer<SocketChannel>() {
  13. @Override
  14. protected void initChannel(SocketChannel socketChannel) throws Exception {
  15. socketChannel.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
  16. }
  17. });
  18. System.out.println("客户端 OK...");
  19. //启动客户端去连接服务器端
  20. //关于 channelFuture 涉及到 netty 的异步模型
  21. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
  22. //给关闭通道进行监听
  23. channelFuture.channel().closeFuture().sync();
  24. } finally {
  25. group.shutdownGracefully();
  26. }
  27. }
  28. }

客户端Handler处理:

  1. public class NettyClientHandler extends ChannelInboundHandlerAdapter {
  2. /**
  3. * 当通道就绪就会触发
  4. * @param ctx
  5. * @throws Exception
  6. */
  7. @Override
  8. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  9. System.out.println("client: " + ctx);
  10. ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server", CharsetUtil.UTF_8));
  11. }
  12. /**
  13. * 当通道有读取事件时,会触发
  14. * @param ctx
  15. * @param msg
  16. * @throws Exception
  17. */
  18. @Override
  19. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  20. ByteBuf buf = (ByteBuf)msg;
  21. System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
  22. System.out.println("服务器的地址:" + ctx.channel().remoteAddress());
  23. }
  24. @Override
  25. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  26. cause.printStackTrace();
  27. ctx.close();
  28. }
  29. }

3.4.5 总结

  1. Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作。
  2. NioEventLoop 表示一个不断循环执行处理任务的线程,每个NioEventLoop 都有一个 selector,用于监听绑定在其上的socket网络通道。
  3. NioEventLoop 内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由IO 线程 NioEventLoop 负责
    1. NioEventLoopGroup 下包含多个NioEventLoop
    2. 每个 NioEventLoop 中包含有一个Selector,一个 taskQueue
    3. 每个 NioEventLoop 的 Selector上可以注册监听多个NioChannel
    4. 每个 NioChannel 只会绑定在唯一的NioEventLoop 上
    5. 每个 NioChannel 都绑定有一个自己的ChannelPipeline

      3.5 任务队列

      任务队列由NioEventLoop维护并不断执行。当我们就收到请求之后,在当前channel对应的pipeline中的各个Handler里面进行业务处理和请求过滤。当某些业务需要耗费大量时间的时候,我们可以将任务提交到由NioEventLoop维护的taskQueue或scheduleTaskQueue中,让当前的NioEventLoop线程在空闲时间去执行这些任务。下面将介绍提交任务的3种方式

      3.5.1 用户程序自定义的普通任务

      该方式会将任务提交到taskQueue队列中。提交到该队列中的任务会按照提交顺序依次执行。 ```java channelHandlerContext.channel().eventLoop().execute(new Runnable(){ @Override public void run() { //… } });
  1. <a name="mX8Mp"></a>
  2. ### 3.5.2 **用户自定义的定时任务:**
  3. 该方式会将任务提交到scheduleTaskQueue定时任务队列中。该队列是底层是优先队列PriorityQueue实现的,故该队列中的任务会按照时间的先后顺序定时执行。
  4. ```java
  5. channelHandlerContext.channel().eventLoop().schedule(new Runnable() {
  6. @Override
  7. public void run() {
  8. }
  9. }, 60, TimeUnit.SECONDS);

3.5.3 为其他EventLoop线程对应的Channel添加任务

可以在ChannelInitializer中,将刚创建的各个Channel以及对应的标识加入到统一的集合中去,然后可以根据标识获取Channel以及其对应的NioEventLoop,然后就课程调用execute()或者schedule()方法。

3.6 异步模型

基本介绍

  1. 异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。
  2. Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture。
  3. 调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果
  4. Netty 的异步模型是建立在 future 和 callback 的之上的。callback 就是回调。重点说 Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun返回显然不合适。那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future去监控方法 fun 的处理过程(即 : Future-Listener 机制)

Future说明

  1. 表示异步的执行结果, 可以通过它提供的方法来检测执行是否完成,比如检索计算等等.
  2. ChannelFuture 是一个接口 : public interface ChannelFuture extends Future。我们可以添加监听器,当监听的事件发生时,就会通知到监听器

工作原理
image.png

  1. 在使用 Netty 进行编程时,拦截操作和转换出入站数据只需要您提供 callback 或利用future 即可。这使得链式操作简单、高效, 并有利于编写可重用的、通用的代码。
  2. Netty 框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来、解脱出来

    3.6.1 Future-Listener机制

    当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作。
    常用方法如下:
方法名称 方法作用
isDone() 判断当前操作是否完成
isSuccess() 判断已完成的当前操作是否成功
getCause() 获取已完成当前操作失败的原因
isCancelled() 判断已完成的当前操作是否被取消
addListener() 注册监听器,当前操作(Future)已完成,将会通知指定的监听器

3.6.2 Future-Listener demo

绑定端口操作时异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑。

  1. serverBootstrap.bind(port).addListener(future -> {
  2. if(future.isSuccess()) {
  3. System.out.println(newDate() + ": 端口["+ port + "]绑定成功!");
  4. } else{
  5. System.err.println("端口["+ port + "]绑定失败!");
  6. }
  7. });

3.7 http demo

  1. public static void main(String[] args) throws InterruptedException {
  2. NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
  3. NioEventLoopGroup workerGroup = new NioEventLoopGroup();
  4. try {
  5. ServerBootstrap bootstrap = new ServerBootstrap();
  6. bootstrap.group(bossGroup, workerGroup)
  7. .channel(NioServerSocketChannel.class)
  8. .childHandler(new TestServerInitializer());
  9. ChannelFuture channelFuture = bootstrap.bind(8080).sync();
  10. channelFuture.channel().closeFuture().sync();
  11. } finally {
  12. bossGroup.shutdownGracefully();
  13. workerGroup.shutdownGracefully();
  14. }
  15. }

定义ChannelInitializer
用于给Channel对应的pipeline添加handler。该ChannelInitializer中的代码在SocketChannel被创建时都会执行

  1. public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
  2. @Override
  3. protected void initChannel(SocketChannel socketChannel) throws Exception {
  4. //向管道加入处理器
  5. //得到管道
  6. ChannelPipeline pipeline = socketChannel.pipeline();
  7. //加入一个 netty 提供的 httpServerCodec:codec => [coder - decoder]
  8. //1、HttpServerCodec 是 netty 提供的处理http的编解码器
  9. pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());
  10. //2、增加自定义的Handler
  11. pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());
  12. }
  13. }

自定义Handler

  1. public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
  2. /**
  3. * 读取客户端数据。
  4. *
  5. * @param channelHandlerContext
  6. * @param httpObject 客户端和服务器端互相通讯的数据被封装成 HttpObject
  7. * @throws Exception
  8. */
  9. @Override
  10. protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
  11. //判断 msg 是不是 HTTPRequest 请求
  12. if (httpObject instanceof HttpRequest) {
  13. System.out.println("msg 类型 = " + httpObject.getClass());
  14. System.out.println("客户端地址:" + channelHandlerContext.channel().remoteAddress());
  15. //获取
  16. HttpRequest request = (HttpRequest) httpObject;
  17. //获取uri,进行路径过滤
  18. URI uri = new URI(request.uri());
  19. if ("/favicon.ico".equals(uri.getPath())) {
  20. System.out.println("请求了 favicon.ico,不做响应");
  21. }
  22. //回复信息给浏览器[http协议]
  23. ByteBuf content = Unpooled.copiedBuffer("HelloWorld", CharsetUtil.UTF_8);
  24. //构造一个http的响应,即HTTPResponse
  25. DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
  26. response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
  27. response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
  28. //将构建好的 response 返回
  29. channelHandlerContext.writeAndFlush(response);
  30. }
  31. }
  32. }