Netty是一个Java NIO客户端/服务器框架,是一个为了快速开发可维护的高性能、高可扩展的网络服务器和客户端程序而提供的异步事件驱动基础框架和工具。基于Netty,可以快速轻松地开发网络服务器和客户端的应用程序。与直接使用Java NIO相比,Netty给大家造出了一个非常优美的轮子,它可以大大简化网络编程流程。

5.1 第一个Netty实战案例DiscardServer

5.1.1 创建第一个Netty项目

首先我们需要创建项目(或者模块),这里取名为NettyDemos。第一个Netty的实战案例DiscardServer就在这个项目中进行实战开发。DiscardServer功能很简单:读取客户端的输入数据,直接丢弃,不给客户端任何回复。
使用maven导入Netty的依赖坐标到工程(或项目),Netty的依赖坐标如下:

  1. <dependency>
  2. <groupId>io.netty</groupId>
  3. <artifactId>netty-all</artifactId>
  4. <version>4.1.6.Final</version>
  5. </dependency>

5.1.2 第一个Netty服务端程序

创建一个服务端类NettyDiscardServer,用以实现消息的Discard(丢弃)功能

  1. package com.crazymakercircle.netty.basic;
  2. //…
  3. public class NettyDiscardServer {
  4. private final int serverPort;
  5. ServerBootstrap b = new ServerBootstrap();
  6. public NettyDiscardServer(int port) {
  7. this.serverPort = port;
  8. }
  9. public void runServer() {
  10. //创建反应器轮询组
  11. EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
  12. EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
  13. try {
  14. //1. 设置反应器轮询组
  15. b.group(bossLoopGroup, workerLoopGroup);
  16. //2. 设置nio类型的通道
  17. b.channel(NioServerSocketChannel.class);
  18. //3. 设置监听端口
  19. b.localAddress(serverPort);
  20. //4. 设置通道的参数
  21. b.option(ChannelOption.SO_KEEPALIVE, true);
  22. //5. 装配子通道流水线
  23. b.childHandler(new ChannelInitializer<SocketChannel>() {
  24. //有连接到达时会创建一个通道
  25. protected void initChannel(SocketChannel ch){
  26. //流水线的职责:负责管理通道中的处理器
  27. //向“子通道”(传输通道)流水线添加一个处理器
  28. ch.pipeline().addLast(new NettyDiscardHandler());
  29. }
  30. });
  31. //6. 开始绑定服务器
  32. //通过调用sync同步方法阻塞直到绑定成功
  33. ChannelFuture channelFuture = b.bind().sync();
  34. Logger.info(" 服务器启动成功,监听端口: " +
  35. channelFuture.channel().localAddress());
  36. //7. 等待通道关闭的异步任务结束
  37. //服务监听通道会一直等待通道关闭的异步任务结束
  38. ChannelFuture closeFuture = channelFuture.channel().closeFuture();
  39. closeFuture.sync();
  40. } catch (Exception e) {
  41. e.printStackTrace();
  42. } finally {
  43. //8. 优雅关闭EventLoopGroup
  44. //释放掉所有资源,包括创建的线程
  45. workerLoopGroup.shutdownGracefully();
  46. bossLoopGroup.shutdownGracefully();
  47. }
  48. }
  49. public static void main(String[] args) {
  50. int port = NettyDemoConfig.SOCKET_SERVER_PORT;
  51. new NettyDiscardServer(port).runServer();
  52. }
  53. }
  1. 一般来说,对应于多线程的Java NIO通信的应用场景,Netty对应的反应器组件为NioEventLoopGroup。<br />在上面的例子中,使用了两个NioEventLoopGroup反应器组件实例:第一个负责服务器通道新连接的IO事件的监听,可以形象地理解为“包工头”角色;第二个主要负责传输通道的IO事件的处理和数据传输,可以形象地理解为“工人”角色。<br />Reactor模式中的Handler(处理器)角色组件。Handler的作用是对应到IO事件,完成IO事件的业务处理。<br />上面的例子中还用到了Netty的服务引导类ServerBootstrap。服务引导类是一个组装和集成器,职责是将不同的Netty组件组装在一起。此外,ServerBootstrap能够按照应用场景的需要为组件设置好基础性的参数,最后帮助快速实现Netty服务器的监听和启动

5.1.3 业务处理器NettyDiscardHandler

在Reactor模式中,所有的业务处理都在Handler中完成,业务处理一般需要自己编写,这里编写一个新类:NettyDiscardHandler。这里的业务处理很简单:把收到的任何内容直接丢弃,也不会回复任何消息。

  1. package com.crazymakercircle.netty.basic;
  2. //…
  3. NettyDiscardHandler extends ChannelInboundHandlerAdapter {
  4. @Override
  5. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  6. ByteBuf in = (ByteBuf) msg;
  7. try {
  8. Logger.info("收到消息,丢弃如下:");
  9. while (in.isReadable()) {
  10. System.out.print((char) in.readByte());
  11. }
  12. System.out.println();//换行
  13. } finally {
  14. ReferenceCountUtil.release(msg);
  15. }
  16. }
  17. }
  1. NettyHandler需要处理多种IO事件(如读就绪、写就绪),对应于不同的IO事件,Netty提供了一些基础方法。这些方法都已经提前封装好,应用程序直接继承或者实现即可。比如说,对于处理入站的IO事件,其对应的接口为ChannelInboundHandler,并且Netty提供了ChannelInboundHandlerAdapter适配器作为入站处理器的默认实现。<br />如果要实现自己的入站处理器,可以简单地继承ChannelInboundHandlerAdapter入站处理器适配器,再写入自己的入站处理的业务逻辑。也就是说,重写通道读取方法channelRead()即可。<br />NettyByteBuf缓冲区组件(后面会单独对其进行详细的介绍)可以对应到前面介绍的Java NIO类库的数据缓冲区Buffer组件。只不过相对而言,NettyByteBuf缓冲区性能更好,使用也更加方便。

5.1.4 运行NettyDiscardServer

看不懂以上NettyDiscardServer程序,没有关系。此程序的目的只是为大家展示一下Netty开发中会涉及什么内容,给大家留一个初步的印象。

5.2 解密Netty中的Reactor模式

5.2.1 回顾Reactor模式中IO事件的处理流程

一个IO事件从操作系统底层产生后,在Reactor模式中的处理流程。
image.png
Reactor模式中IO事件的处理流程大致分为4步,具体如下:

  1. 通道注册。IO事件源于通道(Channel),IO是和通道(对应于底层连接而言)强相关的。一个IO事件一定属于某个通道。如果要查询通道的事件,首先就要将通道注册到选择器。
  2. 查询事件。在Reactor模式中,一个线程会负责一个反应器(或者SubReactor子反应器),不断地轮询,查询选择器中的IO事件(选择键)。
  3. 事件分发。如果查询到IO事件,则分发给与IO事件有绑定关系的Handler业务处理器。
  4. 完成真正的IO操作和业务处理,这一步由Handler业务处理器负责。

其中,第1步和第2步其实是Java NIO的功能,Reactor模式仅仅是利用了Java NIO的优势而已。

5.2.2 Netty中的Channel

Channel组件是Netty中非常重要的组件,为什么首先要说的是Channel组件呢?原因是:Reactor模式和通道紧密相关,反应器的查询和分发的IO事件都来自Channel组件。
Netty中不直接使用Java NIO的Channel组件,对Channel组件进行了自己的封装。Netty实现了一系列的Channel组件,为了支持多种通信协议,换句话说,对于每一种通信连接协议,Netty都实现了自己的通道.除了Java的NIO,Netty还提供了Java面向流的OIO处理通道。
对应于不同的协议,Netty中常见的通道类型如下:

  • NioSocketChannel:异步非阻塞TCP Socket传输通道。
  • NioServerSocketChannel:异步非阻塞TCP Socket服务端监听通道。
  • NioDatagramChannel:异步非阻塞的UDP传输通道。
  • NioSctpChannel:异步非阻塞Sctp传输通道。
  • NioSctpServerChannel:异步非阻塞Sctp服务端监听通道。
  • OioSocketChannel:同步阻塞式TCP Socket传输通道。
  • OioServerSocketChannel:同步阻塞式TCP Socket服务端监听通道。
  • OioDatagramChannel:同步阻塞式UDP传输通道。
  • OioSctpChannel:同步阻塞式Sctp传输通道。
  • OioSctpServerChannel:同步阻塞式Sctp服务端监听通道。

不论是哪种通道类型,在主要的API和使用方式上和NioSocketChannel类基本都是相同的,更多是底层的传输协议不同,而Netty帮大家极大地屏蔽了传输差异。
在Netty的NioSocketChannel内部封装了一个Java NIO的SelectableChannel成员,通过对该内部的Java NIO通道的封装,对Netty的NioSocketChannel通道上的所有IO操作最终都会落地到Java NIO的SelectableChannel底层通道。
image.png

5.2.3 Netty中的Reactor

在Reactor模式中,一个反应器(或者SubReactor子反应器)会由一个事件处理线程负责事件查询和分发。该线程不断进行轮询,通过Selector选择器不断查询注册过的IO事件(选择键)。如果查询到IO事件,就分发给Handler业务处理器。
Netty中的反应器组件有多个实现类,这些实现类与其通道类型相互匹配。对应于NioSocketChannel通道,Netty的反应器类为NioEventLoop(NIO事件轮询)。
NioEventLoop类有两个重要的成员属性:一个是Thread线程类的成员,一个是Java NIO选择器的成员属性。
image.png
通过这个关系图可以看出:NioEventLoop和前面章节讲的反应器实现在思路上是一致的:一个NioEventLoop拥有一个线程,负责一个Java NIO选择器的IO事件轮询。
在Netty中,EventLoop反应器和Channel的关系是什么呢?理论上来说,一个EventLoop反应器和NettyChannel通道是一对多的关系:一个反应器可以注册成千上万的通道
image.png

5.2.4 Netty中的Handler

Java NIO的IO事件类型时讲到,可供选择器监控的通道IO事件类型包括以下4种:

  1. 可读:SelectionKey.OP_READ。
  2. 可写:SelectionKey.OP_WRITE。
  3. 连接:SelectionKey.OP_CONNECT。
  4. 接收:SelectionKey.OP_ACCEPT。

在Netty中,EventLoop反应器内部有一个线程负责Java NIO选择器的事件的轮询,然后进行对应的事件分发。事件分发(Dispatch)的目标就是Netty的Handler(含用户定义的业务处理器)。
Netty的Handler分为两大类:第一类是ChannelInboundHandler入站处理器;第二类是ChannelOutboundHandler出站处理器,二者都继承了ChannelHandler处理器接口。
image.png
无论是入站还是出站,Netty都提供了各自的默认适配器实现:ChannelInboundHandler的默认实现为ChannelInboundHandlerAdapter(入站处理适配器)。ChannelOutboundHandler的默认实现为ChannelOutBoundHandlerAdapter(出站处理适配器)。这两个默认的通道处理适配器分别实现了基本的入站操作和出站操作功能。如果要实现自己的业务处理器,不需要从零开始去实现处理器的接口,只需要继承通道处理适配器即可。

5.2.5 Netty中的Pipeline

Netty的Reactor模式实现中各个组件之间的关系:

  1. 反应器(或者SubReactor子反应器)和通道之间是一对多的关系:一个反应器可以查询很多个通道的IO事件。
  2. 通道和Handler处理器实例之间是多对多的关系:一个通道的IO事件可以被多个Handler实例处理;一个Handler处理器实例也能绑定到很多通道,处理多个通道的IO事件。

通道和Handler处理器实例之间的绑定关系,Netty是如何组织的呢?
Netty设计了一个特殊的组件,叫作ChannelPipeline(通道流水线)。它像一条管道,将绑定到一个通道的多个Handler处理器实例串联在一起,形成一条流水线。ChannelPipeline的默认实现实际上被设计成一个双向链表。所有的Handler处理器实例被包装成双向链表的节点,被加入到ChannelPipeline中。

Netty的通道流水线与普通的流水线不同,Netty的流水线不是单向的,而是双向的,而普通的流水线基本都是单向的。Netty是这样规定的:入站处理器的执行次序是从前到后,出站处理器的执行次序是从后到前。
image.png
为了方便开发者,Netty提供了一系列辅助类,用于把上面的三个组件快速组装起来完成一个Netty应用,这个系列的类叫作引导类。服务端的引导类叫作ServerBootstrap类,客户端的引导类叫作Bootstrap类。

5.3 详解Bootstrap

Bootstrap类是Netty提供的一个便利的工厂类,可以通过它来完成Netty的客户端或服务端的Netty组件的组装,以及Netty程序的初始化和启动执行。Netty的官方解释是,完全可以不用这个Bootstrap类,可以一点点去手动创建通道、完成各种设置和启动注册到EventLoop反应器,然后开始事件的轮询和处理,但是这个过程会非常麻烦。通常情况下,使用这个便利的Bootstrap工具类的效率会更高。
在Netty中有两个引导类,分别用于服务器和客户端
image.png

5.3.1 父子通道

在Netty中,将有接收关系的监听通道和传输通道叫作父子通道。其中,负责服务器连接监听和接收的监听通道(如NioServerSocketChannel)也叫父通道(Parent Channel),对应于每一个接收到的传输类通道(如NioSocketChannel)也叫子通道(Child Channel)。

5.3.2 EventLoopGroup

Netty中的Reactor模式实现不是单线程版本的,而是多线程版本的。
实际上,在Netty中一个EventLoop相当于一个子反应器(SubReactor),一个NioEventLoop子反应器拥有了一个事件轮询线程,同时拥有一个Java NIO选择器。
EventLoopGroup的构造函数有一个参数,用于指定内部的线程数。在构造器初始化时,会按照传入的线程数量在内部构造多个线程和多个EventLoop子反应器(一个线程对应一个EventLoop子反应器),进行多线程的IO事件查询和分发。
如果使用EventLoopGroup的无参数构造函数,没有传入线程数量或者传入的数量为0,那么EventLoopGroup内部的线程数量到底是多少呢?默认的EventLoopGroup内部线程数量为最大可用的CPU处理器数量的2倍。假设电脑使用的是4核的CPU,那么在内部会启动8个EventLoop线程,相当于8个子反应器实例。
为了及时接收新连接,在服务端,一般有两个独立的反应器,一个负责新连接的监听和接收,另一个负责IO事件轮询和分发,并且两个反应器相互隔离。对应到Netty服务器程序中,则需要设置两个EventLoopGroup,一个组负责新连接的监听和接受,另外一个组负责IO传输事件的轮询与分发,两个轮询组的职责具体如下:

  1. 负责新连接的监听和接收的EventLoopGroup中的反应器完成查询通道的新连接IO事件查询。这些反应器有点像负责招工的包工头,因此,该轮询组可以形象地称为“包工头”(Boss)轮询组。
  2. 负责IO事件轮询和分发的反应器完成查询所有子通道的IO事件,并且执行对应的Handler处理器完成IO处理——例如数据的输入和输出(有点儿像搬砖),这个轮询组可以形象地称为“工人”(Worker)轮询组。

Netty的EventLoopGroup与EventLoop之间、EventLoop与Channel之间的关系如图
image.png

5.3.3 Bootstrap启动流程

Bootstrap的启动流程也就是Netty组件的组装、配置,以及Netty服务器或者客户端的启动流程。在本节中对启动流程进行了梳理,大致分成8个步骤。本书仅仅演示的是服务端引导类的使用,用到的引导类为ServerBootstrap。正式使用前,首先创建一个服务端的引导类实例。

  1. //创建一个服务端的引导类
  2. ServerBootstrap b = new ServerBootstrap();
  1. 创建反应器轮询组,并设置到ServerBootstrap引导类实例 ```java //创建反应器轮询组 //boss轮询组 EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1); //worker轮询组 EventLoopGroup workerLoopGroup = new NioEventLoopGroup(); //… //step1:为引导类实例设置反应器轮询组 b.group(bossLoopGroup, workerLoopGroup);
  1. 如果不需要分开监听新连接事件和输出事件,就不一定非得配置两个轮询组,可以仅配置一个EventLoopGroup反应器轮询组。在这种模式下,新连接监听IO事件和数据传输IO事件可能被挤在了同一个线程中处理。这样会带来一个风险:新连接的接收被更加耗时的数据传输或者业务处理所阻塞。所以,在服务端,建议设置成两个轮询组的工作模式。
  2. 2. 设置通道的IO类型。Netty不止支持Java NIO,也支持阻塞式的OIO。下面配置的是Java NIO类型的通道类型:
  3. ```java
  4. //step2:设置传输通道的类型为NIO类型
  5. b.channel(NioServerSocketChannel.class);
  1. 如果确实指定BootstrapIO模型为OIO类型,可以配置为OioServerSocketChannel.class类。NIO的优势巨大,因此通常不会在Netty中使用OIO
  1. 设置监听端口,代码大致如下:

    1. //step3:设置监听端口
    2. b.localAddress(new InetSocketAddress(port));
  2. 设置传输通道的配置选项,代码大致如下:

    1. //step4:设置通道的参数
    2. b.option(ChannelOption.SO_KEEPALIVE, true);
    3. b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

    调用了Bootstrap的option()选项设置方法。对于服务器的Bootstrap而言,这个方法的作用是:给父通道(Parent Channel)设置一些与传输协议相关的选项。如果要给子通道(Child Channel)设置一些通道选项,则需要调用childOption()设置方法。
    设置了一个底层TCP相关的选项ChannelOption.SO_KEEPALIVE。该选项表示是否开启TCP底层心跳机制,true为开启,false为关闭

  3. 装配子通道的Pipeline。每一个通道都用一条ChannelPipeline流水线,它的内部有一个双向的链表。装配流水线的方式是:将业务处理器ChannelHandler实例包装之后加入双向链表中。

装配子通道的Handler流水线调用引导类的childHandler()方法,该方法需要传入一个ChannelInitializer通道初始化类的实例作为参数。每当父通道成功接收到一个连接并创建成功一个子通道后,就会初始化子通道,此时这里配置的ChannelInitializer实例就会被调用。

  1. //step5:装配子通道流水线
  2. b.childHandler(new ChannelInitializer<SocketChannel>() {
  3. //有连接到达时会创建一个通道的子通道,并初始化
  4. protected void initChannel(SocketChannel ch) …{
  5. //这里可以管理子通道中的Handler业务处理器
  6. //向子通道流水线添加一个Handler业务处理器
  7. ch.pipeline().addLast(new NettyDiscardHandler());
  8. }
  9. });
  1. 为什么仅装配子通道的流水线,而不需要装配父通道的流水线呢?<br />原因是:父通道(NioServerSocketChannel)的内部业务处理是固定的:接收新连接后,创建子通道,然后初始化子通道,所以不需要特别的配置,由Netty自行进行装配。<br />ChannelInitializer处理器有一个泛型参数SocketChannel,它代表需要初始化的通道类型,这个类型需要和前面的引导类中设置的传输通道类型一一对应起来。
  1. 开始绑定服务器新连接的监听端口,代码大致如下:

    1. //step6:开始绑定端口,通过调用sync()同步方法阻塞直到绑定成功
    2. ChannelFuture channelFuture = b.bind().sync();
    3. Logger.info(" 服务器启动成功,监听端口: " + channelFuture.channel().localAddress());

    b.bind()方法的功能是返回一个端口绑定Netty的异步任务channelFuture。在这里,并没有给channelFuture异步任务增加回调监听器,而是阻塞channelFuture异步任务,直到端口绑定任务执行完成。
    在Netty中,所有的IO操作都是异步执行的,这就意味着任何一个IO操作都会立即返回,返回时异步任务还没有真正执行。什么时候执行完成呢?
    Netty中的IO操作都会返回异步任务实例(如channelFuture实例)。通过该异步任务实例,既可以实现同步阻塞一直到channelFuture异步任务执行完成,也可以通过为其增加事件监听器的方式注册异步回调逻辑,以获得Netty中的IO操作的真正结果。上面所使用的是同步阻塞一直到channelFuture异步任务执行完成的处理方式。

  2. 自我阻塞,直到监听通道关闭 ```java //step7:自我阻塞,直到通道关闭的异步任务结束 ChannelFuture closeFuture = channelFuture.channel().closeFuture(); closeFuture.sync();

  1. 8. 关闭EventLoopGroup
  2. ```java
  3. //step8:释放掉所有资源,包括创建的反应器线程
  4. workerLoopGroup.shutdownGracefully();
  5. bossLoopGroup.shutdownGracefully();
  1. 关闭反应器轮询组,同时会关闭内部的子反应器线程,也会关闭内部的选择器、内部的轮询线程以及负责查询的所有子通道。在子通道关闭后,会释放掉底层的资源,如Socket文件描述符等。

5.3.4 ChannelOption

无论是对于NioServerSocketChannel父通道类型还是对于NioSocketChannel子通道类型,都可以设置一系列的ChannelOption(通道选项)。ChannelOption类中定义了一系列选项

  1. SO_RCVBUF和SO_SNDBUF

这两个为TCP传输选项,每个TCP socket(套接字)在内核中都有一个发送缓冲区和一个接收缓冲区,这两个选项就是用来设置TCP连接的两个缓冲区大小的。TCP的全双工工作模式以及TCP的滑动窗口对两个独立的缓冲区都有依赖。

  1. TCP_NODELAY

此为TCP传输选项,如果设置为true就表示立即发送数据。TCP_NODELAY用于开启或关闭Nagle算法。如果要求高实时性,有数据发送时就马上发送,就将该选项设置为true(关闭Nagle算法);如果要减少发送次数、减少网络交互,就设置为false(开启Nagle算法),等累积一定大小的数据后再发送。关于TCP_NODELAY的值,Netty默认为true,而操作系统默认为false。

  1. SO_KEEPALIVE

此为TCP传输选项,表示是否开启TCP的心跳机制。true为连接保持心跳,默认值为false。启用该功能时,TCP会主动探测空闲连接的有效性。需要注意的是:默认的心跳间隔是7200秒,即2小时。Netty默认关闭该功能。

  1. SO_REUSEADDR

此为TCP传输选项,为true时表示地址复用,默认值为false。有四种情况需要用到这个参数设置:

  • 当有一个地址和端口相同的连接socket1处于TIME_WAIT状态时,而又希望启动一个新的连接socket2要占用该地址和端口。
  • 有多块网卡或用IP Alias技术的机器在同一端口启动多个进程,但每个进程绑定的本地IP地址不能相同。
  • 同一进程绑定相同的端口到多个socket(套接字)上,但每个socket绑定的IP地址不同。
  • 完全相同的地址和端口的重复绑定,但这只用于UDP的多播,不用于TCP。
  1. SO_LINGER

此为TCP传输选项,可以用来控制socket.close()方法被调用后的行为,包括延迟关闭时间。如果此选项设置为-1,就表示socket.close()方法在调用后立即返回,但操作系统底层会将发送缓冲区的数据全部发送到对端;如果此选项设置为0,就表示socket.close()方法在调用后会立即返回,但是操作系统会放弃发送缓冲区数据,直接向对端发送RST包,对端将收到复位错误;如果此选项设置为非0整数值,就表示调用socket.close()方法的线程被阻塞,直到延迟时间到来,发送缓冲区中的数据发送完毕,若超时,则对端会收到复位错误。SO_LINGER的默认值为-1,表示禁用该功能。

  1. SO_BACKLOG

此为TCP传输选项,表示服务端接收连接的队列长度,如果队列已满,客户端连接将被拒绝。服务端在处理客户端新连接请求时(三次握手)是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端到来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,队列的大小通过SO_BACKLOG指定。

  1. SO_BROADCAST

此为TCP传输选项,表示设置为广播模式。

5.4 详解Channel

5.4.1 Channel的主要成员和方法

通道是Netty的核心概念之一,代表网络连接,由它负责同对端进行网络通信,既可以写入数据到对端,也可以从对端读取数据。
Netty通道的抽象类AbstractChannel的构造函数如下:

  1. protected AbstractChannel(Channel parent) {
  2. this.parent = parent; //父通道
  3. id = new Id();
  4. unsafe = new Unsafe(); //新建一个底层的NIO 通道,完成实际的IO操作
  5. pipeline = new ChannelPipeline(); //新建一条通道流水线
  6. }

AbstractChannel内部有一个pipeline属性,表示处理器的流水线。Netty在对通道进行初始化的时候,将pipeline属性初始化为DefaultChannelPipeline的实例。以上代码表明每个通道都拥有一条ChannelPipeline处理器流水线。
AbstractChannel内部有一个parent父通道属性,保持通道的父通道。对于连接监听通道(如NioServerSocketChannel)来说,其parent属性的值为null;对于传输通道(如NioSocketChannel)来说,其parent属性的值为接收到该连接的监听通道。
几乎所有的Netty通道实现类都继承了AbstractChannel抽象类,都拥有上面的parent和pipeline两个属性成员。
通道接口中所定义的几个重要方法。

  1. ChannelFuture connect(SocketAddress address)

此方法的作用为连接远程服务器。方法的参数为远程服务器的地址,调用后会立即返回,其返回值为执行连接操作的异步任务ChannelFuture。此方法在客户端的传输通道使用。

  1. ChannelFuture bind(SocketAddress address)

此方法的作用为绑定监听地址,开始监听新的客户端连接。此方法在服务器的新连接监听和接收通道时调用。

  1. ChannelFuture close()

此方法的作用为关闭通道连接,返回连接关闭的ChannelFuture异步任务。如果需要在连接正式关闭后执行其他操作,则需要为异步任务设置回调方法;或者调用ChannelFuture异步任务的sync()方法来阻塞当前线程,一直等到通道关闭的异步任务执行完毕。

  1. Channel read()

此方法的作用为读取通道数据,并且启动入站处理。具体来说,从内部的Java NIO Channel通道读取数据,然后启动内部的Pipeline流水线,开启数据读取的入站处理。此方法的返回通道自身用于链式调用。

  1. ChannelFuture write(Object o)

此方法的作用为启程出站流水处理,把处理后的最终数据写到底层通道(如Java NIO通道)。此方法的返回值为出站处理的异步处理任务。

  1. Channel flush()

此方法的作用为将缓冲区中的数据立即写出到对端。调用前面的write()出站处理时,并不能将数据直接写出到对端,write操作的作用在大部分情况下仅仅是写入操作系统的缓冲区,操作系统会根据缓冲区的情况决定什么时候把数据写到对端。执行flush()方法会立即将缓冲区的数据写到对端。

5.4.2 EmbeddedChannel

一般单元测试的大致流程是:先将Handler业务处理器加入到通道的Pipeline流水线中,接下来先后启动Netty服务器、客户端程序,相互发送消息,测试业务处理器的效果。这些复杂的工序存在一个问题:如果每开发一个业务处理器都进行服务器和客户端的重复启动,那么整个的过程是非常烦琐和浪费时间的。如何解决这种徒劳、低效的重复工作呢?Netty提供了一个专用通道,即EmbeddedChannel(嵌入式通道)。
EmbeddedChannel仅仅是模拟入站与出站的操作,底层不进行实际传输,不需要启动Netty服务器和客户端。除了不进行传输之外,EmbeddedChannel的其他事件机制和处理流程和真正的传输通道是一模一样的。
为了模拟数据的发送和接收,EmbeddedChannel提供了一组专门的方法
image.png
最为重要的两个方法为writeInbound()和writeOutbound()方法。

  1. writeInbound()

它的使用场景是测试入站处理器。在测试入站处理器时(例如测试一个解码器),需要读取入站(Inbound)数据。可以调用writeInbound()方法,向EmbeddedChannel写入一个入站数据(如二进制ByteBuf数据包),模拟底层的入站包,从而被入站处理器处理到,达到测试的目的。

  1. writeOutbound()

它的使用场景是测试出站处理器。在测试出站处理器时(例如测试一个编码器),需要有出站(Outbound)的数据进入流水线。可以调用writeOutbound()方法,向模拟通道写入一个出站数据(如二进制ByteBuf数据包),该包将进入处理器流水线,被待测试的出站处理器所处理。

5.5 详解Handler

在Reactor经典模型中,反应器查询到IO事件后会分发到Handler业务处理器,由Handler完成IO操作和业务处理。
image.png
入站处理触发的方向为自底向上,从Netty的内部(如通道)到ChannelInboundHandler入站处理器。
出站处理触发的方向为自顶向下,从ChannelOutboundHandler出站处理器到Netty的内部(如通道)。

5.5.1 ChannelInboundHandler入站处理器

当对端数据入站到Netty通道时,Netty将触发ChannelInboundHandler入站处理器所对应的入站API,进行入站操作处理。
image.png
ChannelInboundHandler的核心方法

  1. channelRegistered()

当通道注册完成后,Netty会调用fireChannelRegistered()方法,触发通道注册事件,而在通道流水线注册过的入站处理器的channelRegistered()回调方法会被调用。

  1. channelActive()

当通道激活完成后,Netty会调用fireChannelActive()方法,触发通道激活事件,而在通道流水线注册过的入站处理器的channelActive()回调方法会被调用。

  1. channelRead()

当通道缓冲区可读时,Netty会调用fireChannelRead()方法,触发通道可读事件,而在通道流水线注册过的入站处理器的channelRead()回调方法会被调用,以便完成入站数据的读取和处理。

  1. channelReadComplete()

当通道缓冲区读完时,Netty会调用fireChannelReadComplete()方法,触发通道缓冲区读完事件,而在通道流水线注册过的入站处理器的channelReadComplete()回调方法会被调用。

  1. channelInactive()

当连接被断开或者不可用时,Netty会调用fireChannelInactive()方法,触发连接不可用事件,而在通道流水线注册过的入站处理器的channelInactive()回调方法会被调用。

  1. exceptionCaught()

当通道处理过程发生异常时,Netty会调用fireExceptionCaught()方法,触发异常捕获事件,而在通道流水线注册过的入站处理器的exceptionCaught()方法会被调用。注意,这个方法是在ChannelHandler中定义的方法,入站处理器、出站处理器接口都继承了该方法。

5.5.2 ChannelOutboundHandler出站处理器

image.png

  1. bind()

监听地址(IP+端口)绑定:完成底层Java IO通道的IP地址绑定。如果使用TCP传输协议,这个方法用于服务端。

  1. connect()

连接服务端:完成底层Java IO通道的服务端的连接操作。如果使用TCP传输协议,那么这个方法将用于客户端。

  1. write()

写数据到底层:完成Netty通道向底层Java IO通道的数据写入操作。此方法仅仅是触发一下操作,并不是完成实际的数据写入操作。

  1. flush()

将底层缓存区的数据腾空,立即写出到对端。

  1. read ()

从底层读数据:完成Netty通道从Java IO通道的数据读取。

  1. disConnect()

断开服务器连接:断开底层Java IO通道的socket连接。如果使用TCP传输协议,此方法主要用于客户端。

  1. close()

主动关闭通道:关闭底层的通道,例如服务端的新连接监听通道。

5.5.3 ChannelInitializer通道初始化处理器

如何向流水线中装配业务处理器呢?
这就得借助通道的初始化处理器——ChannelInitializer。
首先回顾一下NettyDiscardServer丢弃服务端的代码,在给接收到的新连接装配Handler业务处理器时,调用childHandler()方法设置了一个ChannelInitializer实例:

  1. //step5:装配子通道流水线
  2. b.childHandler(new ChannelInitializer<SocketChannel>() {
  3. //有连接到达时会创建一个通道的子通道,并初始化
  4. protected void initChannel(SocketChannel ch) …{
  5. //这里可以管理子通道中的Handler业务处理器
  6. //向子通道流水线添加一个Handler业务处理器
  7. ch.pipeline().addLast(new NettyDiscardHandler());
  8. }
  9. });

initChannel()方法是ChannelInitializer定义的一个抽象方法,这个抽象方法需要开发人员自己实现。
一般来说,initChannel()方法的大致业务代码是:拿到新连接通道作为实际参数,往它的流水线中装配Handler业务处理器。

5.5.4 ChannelInboundHandler的生命周期的实战案例

为了弄清Handler业务处理器的各个方法的执行顺序和生命周期,这里定义一个简单的入站Handler处理器——InHandlerDemo。这个类继承于ChannelInboundHandlerAdapter适配器,实现了基类的大部分入站处理方法,并在每一个方法的实现代码中都加上必要的输出信息,以便于观察方法是否被执行到。

  1. package com.crazymakercircle.netty.handler;
  2. //…
  3. public class InHandlerDemo extends ChannelInboundHandlerAdapter {
  4. @Override
  5. public void handlerAdded(ChannelHandlerContext ctx)…{
  6. Logger.info("被调用:handlerAdded()");
  7. super.handlerAdded(ctx);
  8. }
  9. @Override
  10. public void channelRegistered(ChannelHandlerContext ctx)…{
  11. Logger.info("被调用:channelRegistered()");
  12. super.channelRegistered(ctx);
  13. }
  14. @Override
  15. public void channelActive(ChannelHandlerContext ctx)…{
  16. Logger.info("被调用:channelActive()");
  17. super.channelActive(ctx);
  18. }
  19. @Override
  20. public void channelRead(ChannelHandlerContext ctx, Object msg)…{
  21. Logger.info("被调用:channelRead()");
  22. super.channelRead(ctx, msg);
  23. }
  24. @Override
  25. public void channelReadComplete(ChannelHandlerContext ctx)…{
  26. Logger.info("被调用:channelReadComplete()");
  27. super.channelReadComplete(ctx);
  28. }
  29. @Override
  30. public void channelInactive(ChannelHandlerContext ctx)…{
  31. Logger.info("被调用:channelInactive()");
  32. super.channelInactive(ctx);
  33. }
  34. @Override
  35. public void channelUnregistered(ChannelHandlerContext ctx)…{
  36. Logger.info("被调用: channelUnregistered()");
  37. super.channelUnregistered(ctx);
  38. }
  39. @Override
  40. public void handlerRemoved(ChannelHandlerContext ctx)…{
  41. Logger.info("被调用:handlerRemoved()");
  42. super.handlerRemoved(ctx);
  43. }
  44. }
  45. 为了演示这个入站处理器,需要编写一个单元测试代码:将上面的Inhandler入站处理器加入一个EmbeddedChannel嵌入式通道的流水线中。接着,通过writeInbound()方法写入ByteBuf数据包。InHandlerDemo作为一个入站处理器,会处理到流水线上的入站报文。单元测试的代码如下:
  46. package com.crazymakercircle.netty.handler;
  47. //省略import
  48. public class InHandlerDemoTester {
  49. @Test
  50. public void testInHandlerLifeCircle() {
  51. final InHandler DemoinHandler = new InHandlerDemo();
  52. //初始化处理器
  53. ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>(){
  54. protected void initChannel(EmbeddedChannel ch) {
  55. ch.pipeline().addLast(inHandler);
  56. }
  57. };
  58. //创建嵌入式通道
  59. EmbeddedChannel channel = new EmbeddedChannel(i);
  60. ByteBuf buf = Unpooled.buffer();
  61. buf.writeInt(1);
  62. //模拟入站,向嵌入式通道写一个入站数据包
  63. channel.writeInbound(buf);
  64. channel.flush();
  65. //模拟入站,再写一个入站数据包
  66. channel.writeInbound(buf);
  67. channel.flush();
  68. //通道关闭
  69. channel.close();
  70. //…
  71. }
  72. }
  73. //结果
  74. [main|handlerAdded]:被调用:handlerAdded()
  75. [main|channelRegistered]:被调用:channelRegistered()
  76. [main|channelActive]:被调用:channelActive()
  77. [main|channelRead]:被调用:channelRead()
  78. [main|channelReadComplete]:被调用:channelReadComplete()
  79. [main|channelRead]:被调用:channelRead()
  80. [main|channelReadComplete]:被调用:channelReadComplete()
  81. [main|channelInactive]:被调用:channelInactive()
  82. [main|channelUnregistered]:被调用: channelUnregistered()
  83. [main|handlerRemoved]:被调用:handlerRemoved()
  1. 从输出的结果可以看到,ChannelHandler中回调方法的执行顺序为:<br />handlerAdded()→channelRegistered()→channelActive()→数据传输的入站回调→channelInactive()→channelUnregistered()→handlerRemoved()<br />其中,数据传输的入站回调过程为:<br />channelRead()→channelReadComplete()

除了两个入站回调方法外,其余的6个方法都和ChannelHandler的生命周期有关

  1. handlerAdded():当业务处理器被加入到流水线后,此方法将被回调。也就是在完成ch.pipeline().addLast(handler)语句之后会回调handlerAdded()。
  2. channelRegistered():当通道成功绑定一个NioEventLoop反应器后,此方法将被回调。
  3. channelActive():当通道激活成功后,此方法将被回调。通道激活成功指的是所有的业务处理器添加、注册的异步任务完成,并且与NioEventLoop反应器绑定的异步任务完成。
  4. channelInactive():当通道的底层连接已经不是ESTABLISH状态或者底层连接已经关闭时,会首先回调所有业务处理器的channelInactive()方法。
  5. channelUnregistered():通道和NioEventLoop反应器解除绑定,移除掉对这条通道的事件处理之后,回调所有业务处理器的channelUnregistered ()方法。
  6. handlerRemoved():Netty会移除掉通道上所有的业务处理器,并且回调所有业务处理器的handlerRemoved()方法。

除了生命周期的回调,还有数据传输的入站回调方法。对于Inhandler入站处理器,有两个很重要的回调方法:

  1. channelRead():有数据包入站,通道可读。流水线会启动入站处理流程,从前向后,入站处理器的channelRead()方法会被依次回调到。
  2. channelReadComplete():流水线完成入站处理后,会从前向后依次回调每个入站处理器的channelReadComplete()方法,表示数据读取完毕。

    5.6 详解Pipeline

    一条Netty通道需要很多业务处理器来处理业务。每条通道内部都有一条流水线(Pipeline)将Handler装配起来。Netty的业务处理器流水线ChannelPipeline是基于责任链设计模式(Chain of Responsibility)来设计的,内部是一个双向链表结构,能够支持动态地添加和删除业务处理器。

    5.6.1 Pipeline入站处理流程

    为了完整地演示Pipeline入站处理流程,将新建三个极为简单的入站处理器:SimpleInHandlerA、SimpleInHandlerB、SimpleInHandlerC。在ChannelInitializer处理器的initChannel方法中,把它们加入到流水线中。添加的顺序为A→B→C ```java package com.crazymakercircle.netty.pipeline; //… public class InPipeline { //内部类:第一个入站处理器 static class SimpleInHandlerA extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg)…{
    1. Logger.info("入站处理器 A: 被回调 ");
    2. super.channelRead(ctx, msg);
    } } //内部类:第二个入站处理器 static class SimpleInHandlerB extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg)…{
    1. Logger.info("入站处理器 B: 被回调 ");
    2. super.channelRead(ctx, msg);
    } } //内部类:第三个入站处理器 static class SimpleInHandlerC extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg)…{
    1. Logger.info("入站处理器 C: 被回调 ");
    2. super.channelRead(ctx, msg);
    } } @Test public void testPipelineInBound() {
    1. ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
    2. protected void initChannel(EmbeddedChannel ch) {
    3. ch.pipeline().addLast(new SimpleInHandlerA());
    4. ch.pipeline().addLast(new SimpleInHandlerB());
    5. ch.pipeline().addLast(new SimpleInHandlerC());
    6. }
    7. };
    8. EmbeddedChannel channel = new EmbeddedChannel(i);
    9. ByteBuf buf = Unpooled.buffer();
    10. buf.writeInt(1);
    11. //向通道写一个入站报文(数据包)
    12. channel.writeInbound(buf);
    13. //省略不相关代码
    } }

[main|InPipeline$SimpleInHandlerA.channelRead] |> 入站处理器 A: 被回调 [main|InPipeline$SimpleInHandlerB.channelRead] |> 入站处理器 B: 被回调 [main|InPipeline$SimpleInHandlerC.channelRead] |> 入站处理器 C: 被回调

  1. 在以上三个内部入站处理器的channelRead()方法中,我们打印当前Handler业务处理器的信息,然后调用父类的channelRead()方法,而父类的channelRead()方法的主要作用是把当前入站处理器中处理完毕的结果传递到下一个入站处理器。<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/27178439/1649838061532-4199c362-3406-46e1-9b69-a12dc2c44038.png#clientId=u161e5a9f-6cca-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=94&id=uffc00198&margin=%5Bobject%20Object%5D&name=image.png&originHeight=118&originWidth=695&originalType=binary&ratio=1&rotation=0&showTitle=false&size=20239&status=done&style=none&taskId=u08422127-1295-410d-bb4d-0fb3cf7ef8e&title=&width=556)
  2. <a name="Oqwv8"></a>
  3. ## 5.6.2 Pipeline出站处理流程
  4. 为了完整地演示Pipeline出站处理流程,将新建三个极为简单的出站处理器:SimpleOutHandlerASimpleOutHandlerBSimpleOutHandlerC。在ChannelInitializer处理器的initChannel()方法中,把它们加入到流水线中,添加的顺序为ABC
  5. ```java
  6. package com.crazymakercircle.netty.pipeline;
  7. //…
  8. public class OutPipeline {
  9. public class SimpleOutHandlerA extends ChannelOutboundHandlerAdapter {
  10. @Override
  11. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
  12. Logger.info("出站处理器 A: 被回调" );
  13. super.write(ctx, msg, promise);
  14. }
  15. }
  16. public class SimpleOutHandlerB extends ChannelOutboundHandlerAdapter {
  17. @Override
  18. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
  19. Logger.info("出站处理器 B: 被回调" );
  20. super.write(ctx, msg, promise);
  21. }
  22. }
  23. public class SimpleOutHandlerC extends ChannelOutboundHandlerAdapter {
  24. @Override
  25. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
  26. Logger.info("出站处理器 C: 被回调" );
  27. super.write(ctx, msg, promise);
  28. }
  29. }
  30. @Test
  31. public void testPipelineOutBound() {
  32. ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
  33. protected void initChannel(EmbeddedChannel ch) {
  34. ch.pipeline().addLast(new SimpleOutHandlerA());
  35. ch.pipeline().addLast(new SimpleOutHandlerB());
  36. ch.pipeline().addLast(new SimpleOutHandlerC());
  37. }
  38. };
  39. EmbeddedChannel channel = new EmbeddedChannel(i);
  40. ByteBuf buf = Unpooled.buffer();
  41. buf.writeInt(1);
  42. //向通道写一个出站报文
  43. channel.writeOutbound(buf);
  44. try {
  45. Thread.sleep(Integer.MAX_VALUE);
  46. } catch (InterruptedException e) {
  47. e.printStackTrace();
  48. }
  49. }
  50. }
  51. [main|OutPipeline$SimpleOutHandlerC.write] |> 出站处理器 C: 被回调
  52. [main|OutPipeline$SimpleOutHandlerB.write] |> 出站处理器 B: 被回调
  53. [main|OutPipeline$SimpleOutHandlerA.write] |> 出站处理器 A: 被回调

在代码中,通过pipeline. addLast()方法添加OutBoundHandler出站处理器的顺序为A→B→C。从结果可以看出,出站流水处理次序为从后向前(C→B→A)
image.png

5.6.3 ChannelHandlerContext

在Netty的设计中Handler是无状态的,不保存和Channel有关的信息。Handler的目标是将自己的处理逻辑做得很通用,可以给不同的Channel使用。与Handler不同的是,Pipeline是有状态的,保存了Channel的关系。于是,Handler和Pipeline之间需要一个中间角色将它们联系起来。这个中间角色是谁呢?ChannelHandlerContext(通道处理器上下文)
当业务处理器被添加到流水线中时会为其专门创建一个ChannelHandlerContext实例,主要封装了ChannelHandler(通道处理器)和ChannelPipeline(通道流水线)之间的关联关系。
image.png
ChannelHandlerContext中包含了许多方法,主要可以分为两类:第一类是获取上下文所关联的Netty组件实例,如所关联的通道、所关联的流水线、上下文内部Handler业务处理器实例等;第二类是入站和出站处理方法。
在Channel、ChannelPipeline、ChannelHandlerContext三个类中,都存在同样的出站和入站处理方法,这些出现在不同的类中的相同方法,功能有何不同呢?
如果通过Channel或ChannelPipeline的实例来调用这些出站和入站处理方法,它们就会在整条流水线中传播。如果是通过ChannelHandlerContext调用出站和入站处理方法,就只会从当前的节点开始往同类型的下一站处理器传播,而不是在整条流水线从头至尾进行完整的传播。

5.6.4 HeadContext与TailContext

通道流水线在没有加入任何处理器之前装配了两个默认的处理器上下文:一个头部上下文HeadContext,一个尾部上下文TailContext。pipeline的创建、初始化除了保存一些必要的属性外,核心就在于创建了HeadContext头节点和TailContext尾节点。
每个流水线中双向链表结构从一开始就存在了HeadContext和TailContext两个节点,后面添加的处理器上下文节点都添加在HeadContext实例和TailContext实例之间。在添加了一些必要的解码器、业务处理器、编码器之后,一条流水线的结构大致如图
image.png
流水线尾部的TailContext不仅仅是一个上下文类,还是一个入站处理器类,实现了所有入站处理回调方法,这些回调实现的主要工作基本上都是有关收尾处理的,如释放缓冲区对象、完成异常处理等。
TailContext是流水线默认实现类DefaultChannelPipeline的一个内部类,代码大致如下:

  1. //流水线默认实现类(来自Netty4.1.49版本)
  2. public class DefaultChannelPipeline implements ChannelPipeline {
  3. //内部类:尾部处理器和尾部上下文是同一个类
  4. final class TailContextextends AbstractChannelHandlerContext implements ChannelInboundHandler {
  5. //入站处理方法:读取通道
  6. @Override
  7. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  8. //释放缓冲区
  9. }
  10. //省略TailContext 其他的入站处理方法
  11. }
  12. }

流水线头部的HeadContext比TailContext复杂得多,既是一个出站处理器,也是一个入站处理器,还保存了一个unsafe(完成实际通道传输的类)实例,也就是HeadContext还需要负责最终的通道传输工作。
HeadContext也是流水线默认实现类DefaultChannelPipeline的一个内部类,代码大致如下:

  1. //流水线默认实现类(来自Netty4.1.49版本)
  2. public class DefaultChannelPipeline implements ChannelPipeline {
  3. //内部类:头部处理器和头部上下文是同一个类
  4. //并且头部处理器既是出站处理器也是入站处理器
  5. final class HeadContextextends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
  6. //传输操作类实例:完成通道最终的输入、输出等操作
  7. //此类专供Netty内部使用,应用程序不能使用,所以取名unsafe
  8. private final Unsafe unsafe;
  9. //入站处理举例:入站(从Channel到Handler)读操作
  10. @Override
  11. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  12. ctx.fireChannelRead(msg);
  13. }
  14. //出站处理举例:出站(从Handler到Channel)读取传输数据
  15. @Override
  16. public void read(ChannelHandlerContext ctx) {
  17. unsafe.beginRead();
  18. }
  19. //出站处理举例:出站(从Handler到Channel)写操作
  20. @Override
  21. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
  22. unsafe.write(msg, promise);
  23. }
  24. //省略HeadContext其他的处理方法
  25. }
  26. }

5.6.5 Pipeline入站和出站的双向链接操作

  1. final class DefaultChannelPipeline implements ChannelPipeline {
  2. final AbstractChannelHandlerContext head; //HeadContext
  3. final AbstractChannelHandlerContext tail; //TailContext
  4. //出站:启动流水线的出站写
  5. @Override
  6. public ChannelFuture write(Object msg) {
  7. return tail.write(msg); //从后往前传递
  8. }
  9. //入站:启动流水线的入站读
  10. @Override
  11. public ChannelPipeline fireChannelRead(Object msg) {
  12. head.fireChannelRead(msg); //从头往后传递
  13. return this;
  14. }
  15. }
  1. 出站和入站被流水线启动之后,其传播的中间过程具体如何呢?<br />这里需要了解一下流水线链表的节点实现,其默认的实现类为AbstractChannelHandlerContext抽象类,此类也是HeadContextTailContext的父类。pipeline内部的双向链表的指针维护以及节点前驱和后继的计算方法都在这个类中实现。AbstractChannelHandlerContext的核心成员如下:
  1. abstract class AbstractChannelHandlerContext implements ChannelHandlerContext {
  2. //双向链表的指针:指向后继
  3. volatile AbstractChannelHandlerContext next;
  4. //双向链表的指针:指向前驱
  5. volatile AbstractChannelHandlerContext prev;
  6. private final boolean inbound; //标志:是否为入站节点
  7. private final boolean outbound; //标志:是否为出站节点
  8. private final AbstractChannel channel; //上下文节点所关联的通道
  9. private final DefaultChannelPipeline pipeline; //所属流水线
  10. private final String name; //上下文节点名称,可以在加入流水线时指定
  11. //节点的执行线程,如果没有特别设置,则为通道的IO线程
  12. final EventExecutor executor;
  13. //…
  14. }

Pipeline如何通过上下文实例进行出入站的传播呢?
首先介绍入站操作的传播。以入站读ChannelRead操作为例,下面是fireChannelRead()方法(传播入站读)的源码:

  1. abstract class AbstractChannelHandlerContext
  2. implements ChannelHandlerContext {
  3. //…
  4. @Override
  5. public ChannelHandlerContext fireChannelRead(final Object msg) {
  6. if (msg == null) {
  7. throw new NullPointerException("msg");
  8. }
  9. //在双向链表中向后查找,找到下一个入站节点(同类的后继)
  10. final AbstractChannelHandlerContext next = findContextInbound();
  11. EventExecutor executor = next.executor();//获取后继的处理线程
  12. if (executor.inEventLoop()) {
  13. //如果当前线程为后继的处理线程
  14. //执行后继上下文所包装的处理器
  15. next.invokeChannelRead(msg);
  16. } else {
  17. //如果当前处理线程不是后继的处理线程,则提交到后继处理线程去排队
  18. //保障该节点的处理器被设置的线程调用,避免发生线程安全问题
  19. executor.execute(new OneTimeTask() {
  20. @Override
  21. public void run() {
  22. //提交到后继处理线程
  23. next.invokeChannelRead(msg);
  24. }
  25. });
  26. }
  27. return this;
  28. }
  29. }
  1. Pipeline的入站和出站的传播方向是相反的,入站是顺着双向链表向后传播,出站是顺着双向链表向前传播。所以,在fireChannelRead()方法中,调用findContextInbound()方法,找到下一个入站节点(后继的入站节点),该方法的源码如下:
  1. //在双向链表中向后查找,找到下一个入站节点
  2. private AbstractChannelHandlerContext findContextInbound() {
  3. AbstractChannelHandlerContext ctx = this;
  4. do {
  5. ctx = ctx.next; //向后查找,一直到末尾或者找到入站类型节点为止
  6. } while (!ctx.inbound);
  7. return ctx;
  8. }
  1. fireChannelRead()方法中通过findContextInbound()方法找到下一棒入站Context之后,准备开始执行下一站所包装的处理器,只不过这里需要确保执行的线程是该Context实例的executor成员线程以保证线程安全。执行下一站的处理器的方法如下:
  1. //执行下一棒入站Context所包装的处理器
  2. private void invokeChannelRead(Object msg) {
  3. try {
  4. ((ChannelInboundHandler) handler()).channelRead(this, msg);
  5. } catch (Throwable t) {
  6. notifyHandlerException(t);
  7. }
  8. }

5.6.6 截断流水线的入站处理传播过程

这里以channelRead入站读的处理流程为例,看看如何截断入站处理流程。这里采用的办法是在处理器的channelRead()方法中不再调用父处理器的channelRead()入站方法。代码如下:

  1. package com.crazymakercircle.netty.pipeline;
  2. //…
  3. public class InPipeline {
  4. //省略SimpleInHandlerA、SimpleInHandlerC
  5. //定义 SimpleInHandlerB2,替换掉SimpleInHandlerB
  6. static class SimpleInHandlerB2 extends ChannelInboundHandlerAdapter {
  7. @Override
  8. public void channelRead(ChannelHandlerContext ctx, Object msg)…{
  9. Logger.info("入站处理器 B: 被回调 ");
  10. //不调用基类的channelRead,终止流水线的执行
  11. //super.channelRead(ctx, msg);
  12. }
  13. }
  14. @Test
  15. public void testPipelineCutting() {
  16. ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
  17. protected void initChannel(EmbeddedChannel ch) {
  18. ch.pipeline().addLast(new SimpleInHandlerA());
  19. ch.pipeline().addLast(new SimpleInHandlerB2());
  20. ch.pipeline().addLast(new SimpleInHandlerC());
  21. }
  22. };
  23. EmbeddedChannel channel = new EmbeddedChannel(i);
  24. ByteBuf buf = Unpooled.buffer();
  25. buf.writeInt(1);
  26. //向通道写一个入站报文(或数据包),启动入站处理器流程
  27. channel.writeInbound(buf);
  28. //…
  29. }
  30. }
  31. [T:main|F:channelRead] |>入站处理器 A: 被回调
  32. [T:main|F:channelRead] |>入站处理器 B: 被回调

image.png
在以上代码中,通过不调用基类的channelRead()方法截断流水线的执行。在channelRead()方法中,将入站处理结果发送到一站还有一种方法:调用Context上下文的ctx.fireChannelRead(msg)方法。如果要截断流水线的处理,显然不能调用ctx.fireChannelRead(msg)方法。

  1. 不调用supper.channelXxx(ChannelHandlerContext)。
  2. 不调用ctx.fireChannelXxx()。

大家在编写入站处理器的代码时一般会继承ChannelInboundHandlerAdapter适配器,而该适配器的默认入站实现主要是进行入站操作的流水线传播,并且是通过上下文Context实例完成的,大致的源码如下:

  1. //入站处理适配器
  2. public class ChannelInboundHandlerAdapter
  3. extends ChannelHandlerAdapter implements ChannelInboundHandler {
  4. //入站方法举例:入站读
  5. @Override
  6. public void channelRead(ChannelHandlerContext ctx, Object msg)…{
  7. //通过上下文进行入站读操作的流水线传播
  8. ctx.fireChannelRead(msg);
  9. }
  10. //…其他的入站方法的源码类似,故省略
  11. }
  1. 流水线的出站处理传播流程如何截断呢?结论是:出站处理流程只要开始执行,就不能被截断,强行截断的话Netty会抛出异常。如果业务条件不满足,可以不启动出站处理。

5.6.7 在流水线上热插拔Handler

Netty中的处理器流水线是一个双向链表。在程序执行过程中,可以动态进行业务处理器的热插拔:动态地增加、删除流水线上的业务处理器。主要的Handler热拔插方法声明在ChannelPipeline接口中,具体如下:

  1. package io.netty.channel;
  2. //…
  3. public interface ChannelPipeline
  4. extends Iterable<Entry<String, ChannelHandler>>
  5. {
  6. //…
  7. //在流水线头部增加一个业务处理器,名字由name指定
  8. ChannelPipeline addFirst(String name, ChannelHandler handler);
  9.    //在流水线尾部增加一个业务处理器,名字由name指定
  10. ChannelPipeline addLast(String name, ChannelHandler handler);
  11.     //在baseName处理器的前面增加一个业务处理器,名字由name指定
  12. ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
  13.      //在baseName处理器的后面增加一个业务处理器,名字由name指定
  14. ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
  15.     //删除一个业务处理器实例
  16. ChannelPipeline remove(ChannelHandler handler);
  17.     //删除一个处理器实例
  18. ChannelHandler remove(String handler);
  19.     //删除第一个业务处理器
  20. ChannelHandler removeFirst();
  21.     //删除最后一个业务处理器
  22. ChannelHandler removeLast();
  23. //…
  24. }

下面是一个简单的示例:调用流水线实例的remove(ChannelHandler)方法,从流水线动态地删除一个Handler。

  1. package com.crazymakercircle.netty.pipeline;
  2. //…
  3. public class PipelineHotOperateTester {
  4. static class SimpleInHandlerA extends ChannelInboundHandlerAdapter {
  5. public void channelRead(ChannelHandlerContext ctx, Object msg)…{
  6. Logger.info("入站处理器 A: 被回调 ");
  7. super.channelRead(ctx, msg);
  8. //从流水线删除当前业务处理器
  9. ctx.pipeline().remove(this);
  10. }
  11. }
  12. //省略SimpleInHandlerB、SimpleInHandlerC的定义
  13. //测试业务处理器的热拔插
  14. @Test
  15. public void testPipelineHotOperating() {
  16. ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
  17. protected void initChannel(EmbeddedChannel ch) {
  18. ch.pipeline().addLast(new SimpleInHandlerA());
  19. ch.pipeline().addLast(new SimpleInHandlerB());
  20. ch.pipeline().addLast(new SimpleInHandlerC());
  21. }
  22. };
  23. EmbeddedChannel channel = new EmbeddedChannel(i);
  24. ByteBuf buf = Unpooled.buffer();
  25. buf.writeInt(1);
  26. //第一次向通道写入站报文(或数据包)
  27. channel.writeInbound(buf);
  28. //第二次向通道写入站报文(或数据包)
  29. channel.writeInbound(buf);
  30. //第三次向通道写入站报文(或数据包)
  31. channel.writeInbound(buf);
  32. //省略其他代码
  33. }
  34. […A|F:channelRead] |>入站处理器 A: 被回调
  35. […B|F:channelRead] |>入站处理器 B: 被回调
  36. […C|F:channelRead] |>入站处理器 C: 被回调
  37. […B|F:channelRead] |>入站处理器 B: 被回调
  38. […C|F:channelRead] |>入站处理器 C: 被回调
  39. […B|F:channelRead] |>入站处理器 B: 被回调
  40. […C|F:channelRead] |>入站处理器 C: 被回调

通道初始化处理器ChannelInitializer没有被重复调用的原因。通过翻看源码可以知道,在注册完成channelRegistered回调方法中调用ctx.pipeline().remove(this)将自己从流水线中删除了,所以该处理器仅仅被执行了一次
有关ChannelInitializer的源代码,节选如下:

  1. package io.netty.channel;
  2. //省略不相关代码
  3. public abstract class ChannelInitializer extends
  4. ChannelInboundHandlerAdapter {
  5. //…
  6. //通道初始化,抽象方法,需要子类实现
  7. protected abstract void initChannel(Channel var1) throws Exception;
  8.    //回调方法:加入通道(注册完成)后触发
  9. public final void channelRegistered(ChannelHandlerContext ctx){
  10. //调用通道初始化实现
  11. this.initChannel(ctx.channel());
  12. //删除通道初始化处理器
  13. ctx.pipeline().remove(this);
  14. //发送注册消息到下一站
  15. ctx.fireChannelRegistered();
  16. }
  17. //…
  18. }

5.7 详解ByteBuf

Netty提供了ByteBuf缓冲区组件来替代Java NIO的ByteBuffer缓冲区组件,以便更加快捷和高效地操纵内存缓冲区。

5.7.1 ByteBuf的优势

  1. Pooling(池化),减少了内存复制和GC,提升了效率。
  2. 复合缓冲区类型,支持零复制。
  3. 不需要调用flip()方法去切换读/写模式。
  4. 可扩展性好。
  5. 可以自定义缓冲区类型。
  6. 读取和写入索引分开。
  7. 方法的链式调用。
  8. 可以进行引用计数,方便重复使用。

    5.7.2 ByteBuf的组成部分

    ByteBuf是一个字节容器,内部是一个字节数组。从逻辑上来分,字节容器内部可以分为四个部分
    image.png
    第一部分是已用字节,表示已经使用完的废弃的无效字节;第二部分是可读字节,这部分数据是ByteBuf保存的有效数据,从ByteBuf中读取的数据都来自这一部分;第三部分是可写字节,写入ByteBuf的数据都会写到这一部分中;第四部分是可扩容字节,表示的是该ByteBuf最多还能扩容的大小。

    5.7.3 ByteBuf的重要属性

    ByteBuf通过三个整数类型的属性有效地区分可读数据和可写数据的索引,使得读写之间相互没有冲突。这三个属性定义在AbstractByteBuf抽象类中,分别是:

  9. readerIndex(读指针):指示读取的起始位置。每读取一个字节,readerIndex自动增加1。一旦readerIndex与writerIndex相等,则表示ByteBuf不可读了。

  10. writerIndex(写指针):指示写入的起始位置。每写一个字节,writerIndex自动增加1。一旦增加到writerIndex与capacity()容量相等,则表示ByteBuf不可写了。注意,capacity()是一个成员方法,不是一个成员属性,表示ByteBuf中可以写入的容量,而且它的值不一定是最大容量值。
  11. maxCapacity(最大容量):表示ByteBuf可以扩容的最大容量。当向ByteBuf写数据的时候,如果容量不足,可以进行扩容。扩容的最大限度由maxCapacity来设定,超过maxCapacity就会报错。

image.png

5.7.4 ByteBuf的方法

ByteBuf的方法大致可以分为三组。

  1. 容量系列
  • capacity():表示ByteBuf的容量,是废弃的字节数、可读字节数和可写字节数之和。
  • maxCapacity():表示ByteBuf能够容纳的最大字节数。当向ByteBuf中写数据的时候,如果发现容量不足,则进行扩容,直至扩容到maxCapacity设定的上限。
  1. 写入系列
  • isWritable():表示ByteBuf是否可写。如果capacity()容量大于writerIndex指针的位置,则表示可写,否则为不可写。注意:isWritable()返回false并不代表不能再往ByteBuf中写数据了。如果Netty发现往ByteBuf中写数据写不进去,就会自动扩容ByteBuf。
  • writableBytes():取得可写入的字节数,它的值等于容量capacity()减去writerIndex。
  • maxWritableBytes():取得最大的可写字节数,它的值等于最大容量maxCapacity减去writerIndex。
  • writeBytes(byte[] src):把入参src字节数组中的数据全部写到ByteBuf。这是最为常用的一个方法。
  • writeTYPE(TYPE value):写入基础数据类型的数据。TYPE表示基础数据类型,这里包含了八种大基础数据类型:writeByte()、writeBoolean()、writeChar()、writeShort()、writeInt()、writeLong()、writeFloat()、writeDouble()。
  • setTYPE(TYPE value):基础数据类型的设置,不改变writerIndex指针值。TYPE表示基础数据类型这里包含了八大基础数据类型的设置,即setByte()、setBoolean()、setChar()、setShort()、setInt()、setLong()、setFloat()、setDouble()。setTYPE系列与writeTYPE系列的不同点是setTYPE系列不改变写指针writerIndex的值,writeTYPE系列会改变写指针writerIndex的值。
  • markWriterIndex()与resetWriterIndex():前一个方法表示把当前的写指针writerIndex属性的值保存在markedWriterIndex标记属性中;后一个方法表示把之前保存的markedWriterIndex的值恢复到写指针writerIndex属性中。这两个方法都用到了标记属性markedWriterIndex,相当于一个写指针的暂存属性。
  1. 读取系列
  • isReadable():返回ByteBuf是否可读。如果writerIndex指针的值大于readerIndex指针的值,则表示可读,否则为不可读。
  • readableBytes():返回表示ByteBuf当前可读取的字节数,它的值等于writerIndex减去readerIndex。
  • readBytes(byte[] dst):将数据从ByteBuf读取到dst目标字节数组中,这里dst字节数组的大小通常等于readableBytes()可读字节数。这个方法也是最为常用的方法之一。
  • readTYPE():读取基础数据类型。可以读取八大基础数据类型:readByte()、readBoolean()、readChar()、readShort()、readInt()、readLong()、readFloat()、readDouble()。
  • getTYPE():读取基础数据类型,并且不改变readerIndex读指针的值,具体为getByte()、getBoolean()、getChar()、getShort()、getInt()、getLong()、getFloat()、getDouble()。getTYPE系列与readTYPE系列的不同点是getTYPE系列不会改变读指针readerIndex的值,readTYPE系列会改变读指针readerIndex的值。
  • markReaderIndex()与resetReaderIndex():前一种方法表示把当前的读指针readerIndex保存在markedReaderIndex属性中;后一种方法表示把保存在markedReaderIndex属性的值恢复到读指针readerIndex中。markedReaderIndex属性定义在AbstractByteBuf抽象基类中,是一个标记属性,相当于一个读指针的暂存属性。

    5.7.5 ByteBuf基本使用的实战案例

    这里使用默认的分配器分配了一个初始容量为9、最大限制为100个字节的缓冲区。 ```java package com.crazymakercircle.netty.bytebuf; //… public class WriteReadTest { @Test public void testWriteRead() {
    1. ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);
    2. print("动作:分配ByteBuf(9, 100)", buffer);
    3. buffer.writeBytes(new byte[]{1, 2, 3, 4});
    4. print("动作:写入4个字节 (1,2,3,4)", buffer);
    5. Logger.info("start==========:get==========");
    6. getByteBuf(buffer);
    7. print("动作:取数据ByteBuf", buffer);
    8. Logger.info("start==========:read==========");
    9. readByteBuf(buffer);
    10. print("动作:读完ByteBuf", buffer);
    } //取字节 private void readByteBuf(ByteBuf buffer) {
    1. while (buffer.isReadable()) {
    2. Logger.info("取一个字节:" + buffer.readByte());
    3. }
    } //读字节,不改变指针 private void getByteBuf(ByteBuf buffer) {
    1. for (int i = 0; i<buffer.readableBytes(); i++) {
    2. Logger.info("读一个字节:" + buffer.getByte(i));
    3. }
    } }

[main|…:print]:after =======动作:分配ByteBuf(9, 100)============ [main|…:print]:1.0 isReadable(): false [main|…:print]:1.1 readerIndex(): 0 [main|…:print]:1.2 readableBytes(): 0 [main|…:print]:2.0 isWritable(): true [main|…:print]:2.1 writerIndex(): 0 [main|…:print]:2.2 writableBytes(): 9 [main|…:print]:3.0 capacity(): 9 [main|…:print]:3.1 maxCapacity(): 100 [main|…:print]:3.2 maxWritableBytes(): 100 //… [main|…:print]:after ========动作:写入4个字节 (1,2,3,4)=========== [main|…:print]:1.0 isReadable(): true [main|…:print]:1.1 readerIndex(): 0 [main|…:print]:1.2 readableBytes(): 4 [main|…:print]:2.0 isWritable(): true [main|…:print]:2.1 writerIndex(): 4 [main|…:print]:2.2 writableBytes(): 5 [main|…:print]:3.0 capacity(): 9 [main|…:print]:3.1 maxCapacity(): 100 [main|…:print]:3.2 maxWritableBytes(): 96 //… [main|…:print]:after =========动作:取数据ByteBuf============ [main|…:print]:1.0 isReadable(): true [main|…:print]:1.1 readerIndex(): 0 [main|…:print]:1.2 readableBytes(): 4 [main|…:print]:2.0 isWritable(): true [main|…:print]:2.1 writerIndex(): 4 [main|…:print]:2.2 writableBytes(): 5 [main|…:print]:3.0 capacity(): 9 [main|…:print]:3.1 maxCapacity(): 100 [main|…:print]:3.2 maxWritableBytes(): 96 //… [main|…:print]:after =========动作:读完ByteBuf============ [main|…:print]:1.0 isReadable(): false [main|…:print]:1.1 readerIndex(): 4 [main|…:print]:1.2 readableBytes(): 0 [main|…:print]:2.0 isWritable(): true [main|…:print]:2.1 writerIndex(): 4 [main|…:print]:2.2 writableBytes(): 5 [main|…:print]:3.0 capacity(): 9 [main|…:print]:3.1 maxCapacity(): 100 [main|…:print]:3.2 maxWritableBytes(): 96

  1. <a name="MxlzY"></a>
  2. ## 5.7.6 ByteBuf的引用计数
  3. Netty的ByteBuf的内存回收工作是通过引用计数方式管理的。<br />什么是池化(Pooled)的ByteBuf缓冲区呢?<br />从Netty 4版本开始,新增了ByteBuf的池化机制,即创建一个缓冲区对象池,将没有被引用的ByteBuf对象放入对象缓存池中,需要时重新从对象缓存池中取出,而不需要重新创建。<br />ByteBuf引用计数的大致规则如下:在默认情况下,当创建完一个ByteBuf时,引用计数为1;每次调用retain()方法,引用计数加1;每次调用release()方法,引用计数减1;如果引用为0,再次访问这个ByteBuf对象,将会抛出异常;如果引用为0,表示这个ByteBuf没有哪个进程引用,它占用的内存需要回收。
  4. ```java
  5. package com.crazymakercircle.netty.bytebuf;
  6. //…
  7. public class ReferenceTest {
  8. @Test
  9. public voidtestRef()
  10. {
  11. ByteBuf buffer =ByteBufAllocator.DEFAULT.buffer();
  12. Logger.info("after create:"+buffer.refCnt());
  13. buffer.retain(); //增加一次引用计数
  14. Logger.info("after retain:"+buffer.refCnt());
  15. buffer.release(); //减少一次引用计数
  16. Logger.info("after release:"+buffer.refCnt());
  17. buffer.release(); //减少一次引用计数
  18. Logger.info("after release:"+buffer.refCnt());
  19. //错误:refCnt: 0,不能再retain
  20. buffer.retain(); //增加一次引用计数
  21. Logger.info("after retain:"+buffer.refCnt());
  22. }
  23. }
  24. [main|ReferenceTest.testRef] |> after create:1
  25. [main|ReferenceTest.testRef] |> after retain:2
  26. [main|ReferenceTest.testRef] |> after release:1
  27. [main|ReferenceTest.testRef] |> after release:0
  28. …(省略不相关的输出)
  29. io.netty.util.IllegalReferenceCountException: refCnt: 0, increment: 1
  30. …(省略异常信息)

如果retain()和release()这两个方法一次都不调用呢?
Netty在缓冲区使用完成后会调用一次release(),就是释放一次。

除了通过ByteBuf成员方法retain()和release()管理引用计数之外,Netty还提供了一组用于增加和减少引用计数的通用静态方法:

  1. ReferenceCountUtil.retain(Object):增加一次缓冲区引用计数的静态方法,从而防止该缓冲区被释放。
  2. ReferenceCountUtil.release(Object):减少一次缓冲区引用计数的静态方法,如果引用计数为0,缓冲区将被释放。

    5.7.7 ByteBuf的分配器

    Netty通过ByteBufAllocator分配器来创建缓冲区和分配内存空间。Netty提供了两种分配器实现:PoolByteBufAllocator和UnpooledByteBufAllocator。
    PoolByteBufAllocator(池化的ByteBuf分配器)将ByteBuf实例放入池中,提高了性能,将内存碎片减少到最小;池化分配器采用了jemalloc高效内存分配的策略,该策略被好几种现代操作系统所采用。
    UnpooledByteBufAllocator是普通的未池化ByteBuf分配器,没有把ByteBuf放入池中,每次被调用时,返回一个新的ByteBuf实例;使用完之后,通过Java的垃圾回收机制回收或者直接释放(对于直接内存而言)。
    现在PooledByteBufAllocator已经广泛使用了一段时间,并且有了增强的缓冲区泄漏追踪机制。因此,也可以在Netty程序中设置引导类Bootstrap装配的时候将PooledByteBufAllocator设置为默认的分配器。 ```java ServerBootstrap b = new ServerBootstrap() //设置通道的参数 b.option(ChannelOption.SO_KEEPALIVE, true); //设置父通道的缓冲区分配器 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); //设置子通道的缓冲区分配器 b.childOption(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT);
  1. 使用缓冲区分配器创建ByteBuf的方法有多种,下面列出几种主要的:
  2. ```java
  3. package com.crazymakercircle.netty.bytebuf;
  4. //…
  5. public class AllocatorTest {
  6. @Test
  7. public void showAlloc() {
  8. ByteBuf buffer = null;
  9.      //方法1:通过默认分配器分配
  10.      //初始容量为9、最大容量为100的缓冲区
  11. buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);
  12.      //方法2:通过默认分配器分配
  13.      //初始容量为256、最大容量为Integer.MAX_VALUE的缓冲区
  14. buffer = ByteBufAllocator.DEFAULT.buffer();
  15.      //方法3:非池化分配器,分配Java的堆(Heap)结构内存缓冲区
  16. buffer = UnpooledByteBufAllocator.DEFAULT.heapBuffer();
  17.      //方法4:池化分配器,分配由操作系统管理的直接内存缓冲区
  18. buffer = PooledByteBufAllocator.DEFAULT.directBuffer();
  19. //其他方法
  20. }
  21. }

5.7.8 ByteBuf缓冲区的类型

根据内存的管理方不同,缓冲区分为堆缓冲区和直接缓冲区,也就是Heap ByteBuf和Direct ByteBuf。另外,为了方便缓冲区进行组合,还提供了一种组合缓存区。
image.png
Direct Memory(直接内存):Direct Memory不属于Java堆内存,所分配的内存其实是调用操作系统malloc()函数来获得的,由Netty的本地Native堆进行管理。

5.7.9 两类ByteBuf使用的实战案例

Heap ByteBuf和Direct ByteBuf两类缓冲区的使用,它们有以下几点不同:

  1. Heap ByteBuf通过调用分配器的buffer()方法来创建;Direct ByteBuf通过调用分配器的directBuffer()方法来创建。
  2. Heap ByteBuf缓冲区可以直接通过array()方法读取内部数组;Direct ByteBuf缓冲区不能读取内部数组。
  3. 可以调用hasArray()方法来判断是否为Heap ByteBuf类型的缓冲区;如果hasArray()返回值为true,则表示是堆缓冲,否则为直接内存缓冲区。
  4. 从Direct ByteBuf读取缓冲数据进行Java程序处理时,相对比较麻烦,需要通过getBytes/readBytes等方法先将数据复制到Java的堆内存,然后进行其他的计算。 ```java package com.crazymakercircle.netty.bytebuf; //… public class BufferTypeTest { final static Charset UTF_8 = Charset.forName(“UTF-8”); //堆缓冲区测试用例 @Test public void testHeapBuffer() {

    1. //取得堆内存
    2. ByteBuf heapBuf = ByteBufAllocator.DEFAULT.heapBuffer();
    3. heapBuf.writeBytes("疯狂创客圈:高性能学习社群".getBytes(UTF_8));
    4. if (heapBuf.hasArray()) {
    5. //取得内部数组
    6. byte[] array = heapBuf.array();
    7. int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
    8. int length = heapBuf.readableBytes();
    9. Logger.info(new String(array,offset,length, UTF_8));
    10. }
    11. heapBuf.release();

    }

    //直接缓冲区测试用例 @Test public void testDirectBuffer() {

    1. ByteBuf directBuf= ByteBufAllocator.DEFAULT.directBuffer();
    2. directBuf.writeBytes("疯狂创客圈:高性能学习社群".getBytes(UTF_8));
    3. if (!directBuf.hasArray()) {
    4. int length = directBuf.readableBytes();
    5. byte[] array = new byte[length];
    6. //把数据读取到堆内存array中,再进行Java处理
    7. directBuf.getBytes(directBuf.readerIndex(), array);
    8. Logger.info(new String(array, UTF_8));
    9. }
    10. directBuf.release();

    } }

  1. 如果hasArray()返回false,不一定代表缓冲区一定就是Direct ByteBuf,也有可能是CompositeByteBufCompositeByteBuf缓冲区是Netty为了减少内存复制而提供的组合缓冲区<br />为了快速创建ByteBufferNetty提供了一个非常方便的获取缓冲区的类——Unpooled,用它可以创建和使用非池化的缓冲区。Unpooled的使用也很容易
  2. ```java
  3. //创建堆缓冲区
  4. ByteBuf heapBuf = Unpooled.buffer(8);
  5. //创建直接缓冲区
  6. ByteBuf directBuf = Unpooled.directBuffer(16);
  7. //创建复合缓冲区
  8. CompositeByteBuf compBuf = Unpooled.compositeBuffer();

Unpooled提供了很多方法,主要的方法大致如表
image.png
通过调用Context.alloc()方法来获取通道的缓冲区分配器来创建ByteBuf

  1. public class AllocatorTest
  2. {
  3. //辅助的方法:输出ByteBuf是否为直接内存,以及内存分配器
  4. public static void printByteBuf(String action, ByteBuf b)
  5. {
  6. Logger.info(" ===========" + action + "============");
  7. //true表示缓冲区为Java堆内存(组合缓冲例外)
  8. //false表示缓冲区为操作系统管理的内存(组合缓冲例外)
  9. Logger.info("b.hasArray: " + b.hasArray());
  10. //输出内存分配器
  11. Logger.info("b.ByteBufAllocator: " + b.alloc());
  12. }
  13. //处理器类:演示使用Context来获取ByteBuf
  14. static class AllocDemoHandler extends ChannelInboundHandlerAdapter
  15. {
  16. @Override
  17. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
  18. {
  19. printByteBuf("入站的ByteBuf", (ByteBuf) msg);
  20. ByteBuf buf = ctx.alloc().buffer();
  21. buf.writeInt(100);
  22. //向模拟通道写入一个出站包,模拟数据出站,需要刷新通道才能获取到输出
  23. ctx.channel().writeAndFlush(buf);
  24. }
  25. }
  26. //测试用例入口
  27. @Test
  28. public void testByteBufAlloc()
  29. {
  30. ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>()
  31. {
  32. protected void initChannel(EmbeddedChannel ch)
  33. {
  34. ch.pipeline().addLast(new AllocDemoHandler());
  35. }
  36. };
  37. EmbeddedChannel channel = new EmbeddedChannel(i);
  38. //配置通道的缓冲区分配器,这里设置一个池化的分配器
  39. channel.config().setAllocator(PooledByteBufAllocator.DEFAULT);
  40. ByteBuf buf = Unpooled.buffer();
  41. buf.writeInt(1);
  42. //向模拟通道写入一个入站包,模拟数据入站
  43. channel.writeInbound(buf);
  44. //获取通道的出站包
  45. ByteBuf outBuf = (ByteBuf) channel.readOutbound();
  46. printByteBuf("出站的ByteBuf", (ByteBuf) outBuf);
  47. //省略不相关代码
  48. }
  49. }
  50. //运行测试用例入口方法testByteBufAlloc(),输出大致如下:
  51. [main]|> ===========入站的ByteBuf============
  52. [main]|> b.hasArray: true
  53. [main]|> b.ByteBufAllocator: UnpooledByteBufAllocator(directByDefault: true)
  54. [main]|> ===========出站的ByteBuf============
  55. [main]|> b.hasArray: false
  56. [main]|> b.ByteBufAllocator: PooledByteBufAllocator(directByDefault: true)
  1. 以上代码的AllocDemoHandler处理器调用ctx.alloc().buffer()方法获取ByteBuf,有关ctx.alloc()方法的源码如下:
  1. abstract class AbstractChannelHandlerContext…{
  2. //获取通道的缓冲区分配器
  3. @Override
  4. public ByteBufAllocator alloc() {
  5. return channel().config().getAllocator();
  6. }
  7. }
  1. 通过源码可以看出,ctx.alloc()方法所获取的分配器是通道的缓冲区分配器。该分配器可以通过Bootstrap引导类为通道进行配置,也可以直接通过channel.config().setAllocator()为通道设置一个缓冲区分配器。

5.7.10 ByteBuf的自动创建与自动释放

  1. ByteBuf的自动创建

在入站处理时,Netty是何时自动创建入站的ByteBuf缓冲区的呢?
Netty的Reactor线程会通过底层的Java NIO通道读数据。发生NIO读取的方法为AbstractNioByteChannel.NioByteUnsafe.read(),其代码如下:

  1. public void read() {
  2. //channel的config信息
  3. final ChannelConfig config = config();
  4. //获取通道的缓冲区分配器
  5. final ByteBufAllocator allocator = config.getAllocator();
  6. //channel的pipeline
  7. final ChannelPipeline pipeline = pipeline();
  8. //缓冲区分配时的大小推测与计算组件
  9. final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
  10. //输入缓冲变量
  11. ByteBuf byteBuf = null;
  12. Throwable exception = null;
  13. try {
  14. do {
  15. //使用缓冲区分配器、大小计算组件一起
  16. //由分配器按照计算好的大小分配的一个缓冲区
  17.    byteBuf = allocHandle.allocate(allocator);
  18. //读取数据到缓冲区
  19. int localReadAmount = doReadBytes(byteBuf);
  20. //发送数据到流水线,进行入站处理
  21. pipeline.fireChannelRead(byteBuf);
  22. }while (++ messages < maxMessagesPerRead);
  23. } catch (Throwable t) {
  24. handleReadException(pipeline, byteBuf, t, close);
  25. }
  26. }
  1. 在入站处理完成时,入站的ByteBuf是如何自动释放的呢?
  1. TailContext自动释放

Netty默认会在ChannelPipline的最后添加一个TailContext(尾部上下文,也是一个入站处理器)。它实现了默认的入站处理方法,在这些方法中会帮助完成ByteBuf内存释放的工作
image.png

  1. //流水线实现类
  2. public class DefaultChannelPipeline implements ChannelPipeline {
  3. //内部类:尾部处理器和尾部上下文是同一个类
  4. final class TailContextextends AbstractChannelHandlerContext implements ChannelInboundHandler {
  5. //入站处理方法:读取通道
  6. @Override
  7. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  8. onUnhandledInboundMessage(ctx, msg);
  9. }
  10. }
  11. //入站消息没有被处理,或者说来到了流水线末尾,释放缓冲区
  12. protected void onUnhandledInboundMessage(Object msg) {
  13. try {
  14. logger.debug(…);
  15. } finally {
  16. //释放缓冲区
  17. ReferenceCountUtil.release(msg);
  18. }
  19. }
  20. }
  1. SimpleChannelInboundHandler自动释放

以入站读数据为例,Handler业务处理器可以继承自SimpleChannelInboundHandler基类,此时必须将业务处理代码移动到重写的channelRead0(ctx, msg)方法中。
SimpleChannelInboundHandle类的入站处理方法(如channelRead等)会在调用完实际的channelRead0()方法后帮忙释放ByteBuf实例。

5.7.11 ByteBuf浅层复制的高级使用方式

首先说明浅层复制是一种非常重要的操作,可以很大程度地避免内存复制。这一点对于大规模消息通信来说是非常重要的。ByteBuf的浅层复制分为两种:切片(slice)浅层复制和整体(duplicate)浅层复制。

  1. 切片浅层复制
  2. 整体浅层复制
  3. 浅层复制的问题

浅层复制方法不会实际去复制数据,也不会改变ByteBuf的引用计数,会导致一个问题:在源ByteBuf调用release()方法之后,一旦引用计数为零,就变得不能访问了;在这种场景下,源ByteBuf的所有浅层复制实例也不能进行读写了;如果强行对浅层复制实例进行读写,则会报错。
因此,在调用浅层复制实例时,可以通过调用一次retain()方法来增加引用,表示它们对应的底层内存多了一次引用,引用计数为2。在浅层复制实例用完后,需要调用两次release()方法,将引用计数减1,这样就不会影响源ByteBuf的内存释放了。

5.8 Netty的零拷贝

大部分场景下,在Netty接收和发送ByteBuffer的过程中会使用直接内存进行Socket通道读写,使用JVM的堆内存进行业务处理,会涉及直接内存、堆内存之间的数据复制。内存的数据复制其实是效率非常低的,Netty提供了多种方法,以帮助应用程序减少内存的复制。
Netty的零拷贝(Zero-Copy)主要体现在五个方面:

  1. Netty提供CompositeByteBuf组合缓冲区类,可以将多个ByteBuf合并为一个逻辑上的ByteBuf,避免了各个ByteBuf之间的拷贝
  2. Netty提供了ByteBuf的浅层复制操作(slice、duplicate),可以将ByteBuf分解为多个共享同一个存储区域的ByteBuf,避免内存的拷贝
  3. 在使用Netty进行文件传输时,可以调用FileRegion包装的transferTo()方法直接将文件缓冲区的数据发送到目标通道,避免普通的循环读取文件数据和写入通道所导致的内存拷贝问题。
  4. 在将一个byte数组转换为一个ByteBuf对象的场景下,Netty提供了一系列的包装类,避免了转换过程中的内存拷贝。
  5. 如果通道接收和发送ByteBuf都使用直接内存进行Socket读写,就不需要进行缓冲区的二次拷贝。如果使用JVM的堆内存进行Socket读写,那么JVM会先将堆内存Buffer拷贝一份到直接内存再写入Socket中,相比于使用直接内存,这种情况在发送过程中会多出一次缓冲区的内存拷贝。所以,在发送ByteBuffer到Socket时,尽量使用直接内存而不是JVM堆内存。

    5.8.1 通过CompositeByteBuf实现零拷贝

    CompositeByteBuf可以把需要合并的多个ByteBuf组合起来,对外提供统一的readIndex和writerIndex。CompositeByteBuf只是在逻辑上是一个整体,在CompositeByteBuf内部,合并的多个ByteBuf都是单独存在的。CompositeByteBuf里面有一个Component数组,聚合的ByteBuf都放在Component数组里面,最小容量为16。
    使用CompositeByteBuf合并多个ByteBuf,大致的代码如下: ```java ByteBuf headerBuf = … ByteBuf bodyBuf = … CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer(); cbuf.addComponents(headerBuf, bodyBuf);
  1. 不使用CompositeByteBuf,将headerbody合并为一个ByteBuf的代码大致如下:
  2. ```java
  3. ByteBuf headerBuf = …
  4. ByteBuf bodyBuf = …
  5. long length=headerBuf.readableBytes() + bodyBuf.readableBytes();
  6. ByteBuf allBuf = Unpooled.buffer(length);
  7. allBuf.writeBytes(headerBuf );//拷贝header数据
  8. allBuf.writeBytes(body);//拷贝body数据

5.8.2 通过wrap操作实现零拷贝

Unpooled提供了一系列的wrap包装方法,可以帮助大家方便、快速地包装出CompositeByteBuf实例或者ByteBuf实例,而不用进行内存拷贝。

  1. ByteBuf headerBuf =
  2. ByteBuf bodyBuf =
  3. ByteBuf allByteBuf = Unpooled.wrappedBuffer(headerBuf , bodyBuf );

5.9 EchoServer的实战案例

5.9.1 NettyEchoServer

  1. package com.crazymakercircle.netty.echoServer;
  2. //…
  3. public class NettyEchoServer {
  4. //…
  5. public void runServer() {
  6. //创建反应器轮询组
  7. EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
  8. EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
  9. //省略设置: 1 反应器轮询组/2 通道类型/4 通道选项等
  10. //5 装配子通道流水线
  11. b.childHandler(new ChannelInitializer<SocketChannel>() {
  12. //有连接到达时会创建一个通道
  13. protected void initChannel(SocketChannel ch) …{
  14. //管理子通道中的Handler
  15. //向子通道流水线添加一个Handler
  16. ch.pipeline().addLast(NettyEchoServerHandler.INSTANCE);
  17. }
  18. });
  19. //省略启动、等待、优雅关闭等
  20. }
  21. //省略 main()方法
  22. }

5.9.2 NettyEchoServerHandler

回显服务器处理器的逻辑分为两步:

  1. 第一步,读取从对端输入的数据。channelRead()方法的msg参数的形参类型不是ByteBuf,而是Object,这是由流水线的上一站决定的。一般而言,入站处理的流程是:Netty读取底层的二进制数据,填充到msg时,msg是ByteBuf类型,然后经过流水线,传入第一个入站处理器;每一个节点处理完后,将自己的处理结果(类型不一定是ByteBuf)作为msg参数不断向后传递。因此,msg参数的形参类型只能是Object类型。第一个入站处理器的channelRead()方法的msg类型绝对是ByteBuf类型,因为它是Netty读取到的ByteBuf数据包。在本实例中,NettyEchoServerHandler就是第一个业务处理器,虽然msg的实参类型是Object,但是实际类型就是ByteBuf,所以可以强制转成ByteBuf类型。

另外,从Netty 4.1开始,ByteBuf的默认类型是Direct ByteBuf。注意,Java不能直接访问Direct ByteBuf内部的数据,必须通过调用getBytes()、readBytes()等方法将数据读入Java数组中才能继续进行处理。

  1. 第二步,将数据写回客户端。这一步很简单,直接复用前面的msg实例即可。不过要注意,如果上一步调用的是readBytes()方法,那么这一步就不能直接将msg写回了,因为数据已经被readBytes()方法读完了。幸好,上一步调用的读数据方法是getBytes(),它不影响ByteBuf的数据指针,因此可以继续使用。这里除了调用ctx.writeAndFlush()方法把msg数据写回客户端之外,也可调用通道的ctx.channel().writeAndFlush()方法发送数据。这两种方法在这里的效果是一样的,因为这个流水线上没有任何出站处理器。 ```java package com.crazymakercircle.netty.echoServer; //… @ChannelHandler.Sharable public class NettyEchoServerHandler extends ChannelInboundHandlerAdapter { public static final NettyEchoServerHandler INSTANCE = new NettyEchoServerHandler(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg)…{

    1. ByteBuf in = (ByteBuf) msg;
    2. Logger.info("msg type: " + (in.hasArray()?"堆内存":"直接内存"));
    3. int len = in.readableBytes();
    4. byte[] arr = new byte[len];
    5. in.getBytes(0, arr);
    6. Logger.info("server received: " + new String(arr, "UTF-8"));
    7. Logger.info("写回前,msg.refCnt:" + ((ByteBuf) msg).refCnt());
    8. //写回数据,异步任务
    9. ChannelFuture f = ctx.writeAndFlush(msg);
    10. f.addListener((ChannelFuturefutureListener) -> {
    11. Logger.info("写回后,msg.refCnt:" + ((ByteBuf) msg).refCnt());
    12. });

    } }

  1. NettyEchoServerHandler加了一个特殊的Netty注解:@ChannelHandler.Sharable。这个注解的作用是标注一个Handler实例可以被多个通道安全地共享(多个通道的流水线可以加入同一个Handler实例)。这种共享操作,Netty默认是不允许的。
  2. <a name="j2Rwz"></a>
  3. ## 5.9.3 NettyEchoClient
  4. ```java
  5. package com.crazymakercircle.netty.echoServer;
  6. //…
  7. public class NettyEchoClient {
  8. private int serverPort;
  9. private String serverIp;
  10. Bootstrap b = new Bootstrap();
  11. public NettyEchoClient(String ip, int port) {
  12. this.serverPort = port;
  13. this.serverIp = ip;
  14. }
  15. public void runClient() {
  16. //创建反应器轮询组
  17. EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
  18. try {
  19. //1 设置反应器轮询组
  20. b.group(workerLoopGroup);
  21. //2 设置nio类型的通道
  22. b.channel(NioSocketChannel.class);
  23. //3 设置监听端口
  24. b.remoteAddress(serverIp, serverPort);
  25. //4 设置通道的参数
  26. b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
  27. //5 装配子通道流水线
  28. b.handler(new ChannelInitializer<SocketChannel>() {
  29. //有连接到达时会创建一个通道
  30. protected void initChannel(SocketChannel ch)…{
  31. //管理子通道中的Handler
  32. //向子通道流水线添加一个Handler
  33. ch.pipeline().addLast(NettyEchoClientHandler.INSTANCE);
  34. }
  35. });
  36. ChannelFuture f = b.connect();
  37. f.addListener((ChannelFuturefutureListener) ->
  38. {
  39. if (futureListener.isSuccess()) {
  40. Logger.info("EchoClient客户端连接成功!");
  41. } else {
  42. Logger.info("EchoClient客户端连接失败!");
  43. }
  44. });
  45. //阻塞,直到连接成功
  46. f.sync();
  47. Channel channel = f.channel();
  48. Scanner scanner = new Scanner(System.in);
  49. Print.tcfo("请输入发送内容:");
  50. while (scanner.hasNext()) {
  51. //获取输入的内容
  52. String next = scanner.next();
  53. byte[] bytes = (Dateutil.getNow() + " >>"
  54. + next).getBytes("UTF-8");
  55. //发送ByteBuf
  56. ByteBuf buffer = channel.alloc().buffer();
  57. buffer.writeBytes(bytes);
  58. channel.writeAndFlush(buffer);
  59. Print.tcfo("请输入发送内容:");
  60. }
  61. } catch (Exception e) {
  62. e.printStackTrace();
  63. } finally {
  64. //优雅关闭EventLoopGroup,
  65. //释放掉所有资源,包括创建的线程
  66. workerLoopGroup.shutdownGracefully();
  67. }
  68. }
  69. //省略 main()方法
  70. }

5.9.4 NettyEchoClientHandler

  1. package com.crazymakercircle.netty.echoServer;
  2. //省略import
  3. @ChannelHandler.Sharable
  4. public class NettyEchoClientHandler extends
  5. ChannelInboundHandlerAdapter {
  6. public static final NettyEchoClientHandler INSTANCE
  7. = new NettyEchoClientHandler();
  8. //入站处理方法
  9. @Override
  10. public void channelRead(ChannelHandlerContext ctx, Object msg)…{
  11. ByteBuf byteBuf = (ByteBuf) msg;
  12. int len = byteBuf.readableBytes();
  13. byte[] arr = new byte[len];
  14. byteBuf.getBytes(0, arr);
  15. Logger.info("client received: " + new String(arr, "UTF-8"));
  16. //释放ByteBuf的两种方法
  17. //方法一:手动释放ByteBuf
  18. byteBuf.release();
  19. //方法二:调用父类的入站方法,将msg向后传递
  20. //super.channelRead(ctx,msg);
  21. }
  22. }