3. Netty核心原理

3.1 介绍

3.1.1 原生NIO为题

1、NIO类库和API复杂,使用麻烦,需要掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等
2、需要具备额外技能:熟悉Java多线程,因为NIO涉及到Reactor模式,必须对多线程和网络编程熟悉,才能编写高质量NIO程序
3、开发工作量和难度大:例如客户端面临断连重连、网络闪断、半包读写、失效缓存、网络拥塞和异常流的处理等
4、JDK NIO的bug:臭名昭著的Epoll Bug,会导致Selector空轮询,最终导致CPU 100%,直到JDK1.7版本该问题仍旧存在,没有根本解决

在NIO中通过Selector的轮询当前是否有IO事件,根据JDK NIO api描述,Selector的select方法会一直阻塞,直到IO事件达到或超时,但是在linux平台上有时会出问题,在某些场景下select方法会直接返回,及时没有超时并且也没有IO事件到达,这就是著名的epoll bug,比较严重,会导致线程陷入死循环,让CPU飙到100%,极大地影响系统的可靠性,到目前为止,jdk都没有完全解决这个问题

3.1.2 概述

Netty由JBOSS提供的java开源框架,Netty提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络IO程序。Netty是一个基于NIO的网络编程框架,使用Netty可以快速、简单的开发出一个网络应用,相当于简化和流程化了NIO的开发过程。作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛应用,知名的Elasticsearch、Dubbo框架内部都采用了Netty。
强大之处:零拷贝、可拓展事件类型;支持TCP、UDP、HTTP、WebSocket等协议;提供安全传输、压缩、大文件传输、编解码支持等等。
image.png
优点:
1、设计优雅,提供阻塞和非阻塞的Socket;提供灵活可拓展的事件模型;提供高度可定制的线程模型
2、具备更高的性能和吞吐量,使用零拷贝技术最小化不必要的内存复制,减少资源的消耗。
3、提供安全传输特性
4、支持多种主流协议;预置多种编解码功能,支持用户开发私有协议

3.2 线程模型

3.2.1 介绍

目前存在的线程模型:
1、传统阻塞IO服务模型
2、Reactor模型,根据Reactor的数量和处理资源池线程的数量不同,有3种典型实现
a、单Reactor单线程
b、单Reactor多线程
c、主从Reactor多线程

3.2.2 传统阻塞IO服务模型

阻塞IO模式获取输入的数据,每个连接都需要独立的线程完成数据的输入,业务处理和数据返回工作
问题:
1、并发很大时,会创建大量的线程,占用系统资源
2、连接创建后,如果当前线程暂时没有数据可读,会阻塞在read操作,造成线程资源浪费

3.2.3 Reactor模型

通过一个或多个输入同时传递给服务处理器的模式,服务器端处理传入的多个请求,并将它们同步分派到相应的处理线程,因此该模式也叫Dispatcher模式。使用IO复用监听事件,收到事件后,分发给某个线程(进程),这点就是网络服务器高并发处理关键

3.2.3.1 单Reactor单线程

image.png

  • Selector是可以实现应用程序通过一个阻塞对象监听多路连接请求
  • Reactor对象通过Selector监控客户端请求事件,收到事件后通过Dispatch进行分发
  • 建立连接请求事件,则由Acceptor通过Accept处理连接请求,然后创建一个Handler对象处理连接完成后的后续业务处理
  • Handler会完成read -> 业务处理 -> send 的完整业务流程

优点:

  1. 模型简单,没有多线程、进程通信、竞争的问题,全部在一个线程完成

缺点:

  1. 性能问题:只有一个线程,无法完全发挥多核CPU的性能。Handler在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈
  2. 可靠性问题:线程意外终止或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障

    3.2.3.2 单Reactor多线程

image.png

  • Reactor对象通过selector监控客户端请求事件,收到事件后,通过dispatch进行分发
  • 如果建立连接请求,则由Acceptor通过accept处理连接请求
  • 如果不是连接请求,则由reactor分发调用连接对应的handler对象处理
  • handler只负责响应事件,不做具体的业务处理,通过read读取数据后,会分发给后面的worker线程池的某个线程处理业务
  • worker线程池会分配独立线程完成真正的业务,并将结果返回给handler
  • handler收到响应后,通过send将结果返回给client

优点:

  1. 可以充分利用多核CPU的处理能来

缺点:

  1. 多线程数据共享和访问比较复杂,reactor处理所有的事件的监听和响应,在单线程运行,在高并发场景容易出现性能瓶颈

    3.2.3.3 主从Reactor多线程

    image.png
  • Reactor主线程MainReactor想通过select监听客户端连接事件,收到事件后,通过Acceptor处理客户端连接事件
  • 当Acceptor处理完客户端连接事件后(与客户端建立好Socket连接),MainReactor将连接分配给SubReactor。(即:MainReactor只负责监听客户端连接请求,和客户端建立连接之后将连接交由SubReactor监听后面的IO事件。)
  • SubReactor将连接加入到自己的连接队列进行监听,并创建Handler对各种事件进行处理
  • 当连接上有新事件发生的时候,SubReactor就会调用对应的Handler处理
  • Handler通过read从连接上读取请求数据,将请求数据分发给Worker线程池进行业务处理
  • Worker线程池会分配独立线程来完成真正的业务处理,并将处理结果返回给Handler。Handler通过send向客户端发送响应数据
  • 一个MainReactor可以对应多个SubReactor,即一个MainReactor线程可以对应多个SubReactor线程

优点:

  1. MainReactor线程与SubReactor线程的数据交互简单职责明确,MainReactor线程只需要接收新连接,SubReactor线程完成后续的业务处理
  2. MainReactor线程与SubReactor线程的数据交互简单,MainReactor线程只需要把新连接传给SubReactor线程,SubReactor线程无需返回数据
  3. 多个SubReactor线程能够应对更高的并发请求

缺点:

  1. 这种模式的缺点是编程复杂。但是由于优点明显,在许多项目中被广泛使用,包括Nginx、Memcached、Netty等。这种模式也被叫做服务器的1+M+N线程模式,即使用该模式开发的服务器包含一个(或多个,1只是表示相对较少)连接建立线程+M个IO线程+N个业务处理线程。这是业界成熟的服务器程序设计模式。

    3.2.4 Netty线程模型

    Netty的实际基于主从Reactor多线程模式,并做了一定的改进。

    3.2.4.1 简单版Netty模型

    image.png
  • BossGroup线程维护Selector,ServerSocketChannel注册到这个Selector上,只关注连接建立请求事件(主Reactor)
  • 当接收到来自客户端的连接建立请求事件的时候,通过ServerSocketChannel.accept方法获得对应的SocketChannel,并封装成NioSocketChannel注册到WorkerGroup线程中的Selector,每个Selector运行在一个线程中(从Reactor)
  • 当WorkerGroup线程中的Selector监听到自己感兴趣的IO事件后,就调用Handler进行处理

3.2.4.2 进阶版Netty模型

image.png

  • 有两组线程池:BossGroup和WorkerGroup,BossGroup中的线程专门负责和客户端建立连接,WorkerGroup中的线程专门负责处理连接上的读写
  • BossGroup和WorkerGroup含有多个不断循环的执行事件处理的线程,每个线程都包含一个Selector,用于监听注册在其上的Channel
  • 每个BossGroup中的线程循环执行以下三个步骤
    • 轮询注册的ServerSocketChannel的accept事件(OP_ACCEPT事件)
    • 处理accept事件,与客户端建立连接,生成一个NioSocketChannel,并将其注册到WorkerGroup中某个线程上的Selector上
    • 再去依次循环处理任务队列中的下一个事件
  • 每个WorkerGroup中的线程循环执行以下三个步骤
    • 轮询注册的NioSocketChannel的read / write事件(OP_READ / OP_WRITE事件)
    • 在对应的NioSocketChannel上处理read / write事件
    • 再去依次循环处理任务队列中的下一个事件

3.2.4.3 详细版Netty模型

image.png

  • Netty抽象出两组线程池:BossGroup和WorkerGroup,也可以叫做BossNioEventLoopGroup和WorkerNioEventLoopGroup。每个线程池中都有NioEventLoop线程。BossGroup中的线程专门负责和客户端建立连接,WorkerGroup中的线程专门负责处理连接上的读写。BossGroup和WorkerGroup的类型都是NioEventLoopGroup
  • NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环就是一个NioEventLoop
  • NioEventLoop表示一个不断循环的执行事件处理的线程,每个NioEventLoop都包含一个Selector,用于监听注册的Socket网络连接(Channel)
  • NioEventLoopGroup可以含有多个线程,既可以含有多个NioEventLoop
  • 每个BossNioEventLoop中循环执行以下三个步骤
    • select:轮询注册在其上的ServerSocketChannel的accept事件(OP_ACCEPT事件)
    • processSelectedKeys:处理accept事件,与客户端建立连接,生成一个NioSocketChannel,并注册到某个WorkerNioEventLoop上的Selector上
    • runAllTasks:再去依次循环处理任务队列中的其他任务
  • 每个WorkerNioEventLoop中循环执行以下三个步骤
    • select:轮询注册在其上的NioSocketChannel的read / write事件(OP_READ / OP_WRITE事件)
    • processSelectedKeys:在对应的NioSocketChannel上处理read / write事件
    • runAllTasks:再去依次循环处理任务队列中的其他任务
  • 在以上两个processSelectedKeys步骤中,会使用Pipeline(管道),Pipeline中引用了Channel,即通过Pipeline可以获取到对应的Channel,Pipeline中维护了很多的处理器(拦截处理器、过滤处理器、自定义处理器等)

    3.3 核心API

    3.3.1 ChannelHandler及其实现类

    该接口定义了许多事件处理的方法,可以通过重写这些方法去实现具体的业务逻辑。API关系如下图所示:
    image.png
    Netty开发时需要自定义一个Handler去实现ChannelHandler接口或其子接口或其实现类,然后通过重写相应的方法实现业务逻辑:

  • public void channelActive(ChannelHandlerContext ctx),通道就绪事件

  • public void channelRead(ChannelHandlerContext ctx, Object msg),通道读取数据事件
  • public void channelReadComplete(ChannelHandlerContext ctx) ,数据读取完毕事件
  • public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause),通道发生异常事件

3.3.2 ChannelPipeline

是一个Handler的集合,负责处理和拦截inbound或者outbound的事件和操作,相当于贯穿Netty的责任链
image.png
如果客户端和服务器的Handler是一样的,消息从客户端到服务端或者反过来,每个Inbound类型或Outbound类型的Handler只会经过一次,混合类型的Handler(实现了Inbound和Outbound的Handler)会经过两次。准确的说ChannelPipeline中是一个ChannelHandlerContext,每个上下文对象中有ChannelHandler,InboundHandler是按照Pipeline的加载顺序而顺序执行,OutboundHandler是按照Pipeline的加载顺序逆序执行。

3.3.3 ChannelHandlerContext

事件处理器上下文对象,Pipeline链中的实际处理节点。每个处理节点ChannelHandlerContext中包含一个具体的事件处理器ChannelHandler,同时ChannelHandlerContext中也绑定了对应的ChannelPipeline和Channel的信息,方便对ChannelHandler进行调用。常用方法如下:

  • ChannelFuture close(),关闭通道
  • ChannelOutboundInvoker flush(),刷新
  • ChannelFuture writeAndFlush(Object msg) , 将 数 据 写 到 ChannelPipeline 中 当 前ChannelHandler 的下一个 ChannelHandler 开始处理(出站)

    3.3.4 ChannelOption

    Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。ChannelOption 是 Socket 的标
    准参数,而非 Netty 独创的。常用的参数配置有:

  • ChannelOption.SO_BACKLOG

对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小。服务端处理
客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户 端来的时候,服
务端将不能处理的客户端连接请求放在队列中等待处理,backlog 参数指定 了队列的大小。

  • ChannelOption.SO_KEEPALIVE

一直保持连接活动状态。该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态, 这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通 信时,TCP会自动发送一个活动探测数据报文。

3.3.5 ChannelFuture

表示 Channel 中异步 I/O 操作的结果,在 Netty 中所有的 I/O 操作都是异步的,I/O 的调用会直接返
回,调用者并不能立刻获得结果,但是可以通过 ChannelFuture 来获取 I/O 操作 的处理状态。
常用方法如下所示:

  • Channel channel(),返回当前正在进行 IO 操作的通道
  • ChannelFuture sync(),等待异步操作执行完毕,将异步改为同步

    3.3.6 EventLoopGroup和实现类NioEventLoopGroup

    EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般 会有多个
    EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。
    EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop 来处理任
    务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:
    BossEventLoopGroup 和 WorkerEventLoopGroup。 通常一个服务端口即一个 ServerSocketChannel
    对应一个Selector 和一个EventLoop线程。 BossEventLoop 负责接收客户端的连接并将
    SocketChannel 交给 WorkerEventLoopGroup 来进 行 IO 处理,如下图所示:
    image.png
    BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了
    ServerSocketChannel 的 Selector 实例,BossEventLoop 不断轮询 Selector 将连接事件分离出来, 通
    常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给 WorkerEventLoopGroup,
    WorkerEventLoopGroup 会由 next 选择其中一个 EventLoopGroup 来将这个 SocketChannel 注
    册到其维护的 Selector 并对其后续的 IO 事件进行处理。
    一般情况下我们都是用实现类NioEventLoopGroup.
    常用方法如下所示:

  • public NioEventLoopGroup(),构造方法,创建线程组

  • public Future<?> shutdownGracefully(),断开连接,关闭线程

    3.3.7 ServerBootstrap和Bootstrap

    ServerBootstrap 是 Netty 中的服务器端启动助手,通过它可以完成服务器端的各种配置;
    Bootstrap 是 Netty 中的客户端启动助手,通过它可以完成客户端的各种配置。常用方法如下 所示:

  • public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup), 该方法用于服务器端,用来设置两个 EventLoop

  • public B group(EventLoopGroup group) ,该方法用于客户端,用来设置一个 EventLoop
  • public B channel(Class<? extends C> channelClass),该方法用来设置一个服务器端的通道 实现
  • public B option(ChannelOption option, T value),用来给 ServerChannel 添加配置
  • public ServerBootstrap childOption(ChannelOption childOption, T value),用来给接收到的通道添加配置
  • public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务 处理类(自定义的 handler)
  • public ChannelFuture bind(int inetPort) ,该方法用于服务器端,用来设置占用的端口号
  • public ChannelFuture connect(String inetHost, int inetPort) ,该方法用于客户端,用来连 接服务器端

    3.3.8 Unpooled类

    这是 Netty 提供的一个专门用来操作缓冲区的工具类,常用方法如下所示:

  • public static ByteBuf copiedBuffer(CharSequence string, Charset charset),通过给定的数据 和字符编码返回一个 ByteBuf 对象(类似于 NIO 中的 ByteBuffer 对象)

    3.4 入门案例

    Netty 是由 JBOSS 提供的一个 Java 开源框架,所以在使用得时候首先得导入Netty的maven坐标

    1. <dependency>
    2. <groupId>io.netty</groupId>
    3. <artifactId>netty-all</artifactId>
    4. <version>4.1.42.Final</version>
    5. </dependency>

    3.4.1 服务端

    步骤:
    1. 创建bossGroup线程组: 处理网络事件—连接事件
    2. 创建workerGroup线程组: 处理网络事件—读写事件
    3. 创建服务端启动助手
    4. 设置bossGroup线程组和workerGroup线程组
    5. 设置服务端通道实现为NIO
    6. 参数设置
    7. 创建一个通道初始化对象
    8. 向pipeline中添加自定义业务处理handler
    9. 启动服务端并绑定端口,同时将异步改为同步
    10. 关闭通道和关闭连接池
    代码: ```java package com.lagou.demo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;

/**

  • Netty服务端 */ public class NettyServer { public static void main(String[] args) throws InterruptedException {

    1. //1.创建bossGroup线程组: 处理网络事件--连接事件 线程数默认为: 2 * 处理器线程数
    2. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    3. //2.创建workerGroup线程组: 处理网络事件--读写事件 2 * 处理器线程数
    4. EventLoopGroup workerGroup = new NioEventLoopGroup();
    5. //3.创建服务端启动助手
    6. ServerBootstrap bootstrap = new ServerBootstrap();
    7. //4.设置线程组
    8. bootstrap.group(bossGroup, workerGroup)
    9. .channel(NioServerSocketChannel.class)//5.设置服务端通道实现;
    10. .option(ChannelOption.SO_BACKLOG, 128)//6.参数设置-设置线程队列中等待连接个数
    11. .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)//7.参数设置-设置活跃状态,child是设置workerGroup
    12. .childHandler(new ChannelInitializer<SocketChannel>() {//8.创建一个通道初始化对象
    13. @Override
    14. protected void initChannel(SocketChannel ch) throws
    15. Exception {
    16. //9.向pipeline中添加自定义业务处理handler
    17. ch.pipeline().addLast(new NettyServerHandle());
    18. }
    19. });
    20. //10.启动服务端并绑定端口,同时将异步改为同步
    21. ChannelFuture future = bootstrap.bind(9999).sync();
    22. System.out.println("服务器启动成功....");
    23. //11.关闭通道(并不是真正意义上的关闭,而是监听通道关闭状态)和关闭连接池
    24. future.channel().closeFuture().sync();
    25. bossGroup.shutdownGracefully();
    26. workerGroup.shutdownGracefully();

    } }

    1. 自定义服务端handle
    2. ```java
    3. package com.lagou.demo;
    4. import io.netty.buffer.ByteBuf;
    5. import io.netty.buffer.Unpooled;
    6. import io.netty.channel.ChannelHandlerContext;
    7. import io.netty.channel.ChannelInboundHandler;
    8. import io.netty.util.CharsetUtil;
    9. public class NettyServerHandle implements ChannelInboundHandler {
    10. /**
    11. * 通道读取事件
    12. *
    13. * @param ctx 通道上下文对象
    14. * @param msg 消息
    15. * @throws Exception
    16. */
    17. @Override
    18. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    19. ByteBuf byteBuf = (ByteBuf) msg;
    20. System.out.println("客户端发来消息:" +
    21. byteBuf.toString(CharsetUtil.UTF_8));
    22. }
    23. /**
    24. * 读取完毕事件
    25. *
    26. * @param ctx
    27. * @throws Exception
    28. */
    29. @Override
    30. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{
    31. ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是Netty服务端.",CharsetUtil.UTF_8));
    32. }
    33. /**
    34. * 异常发生事件
    35. *
    36. * @param ctx
    37. * @param cause
    38. * @throws Exception
    39. */
    40. @Override
    41. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    42. cause.printStackTrace();
    43. ctx.close();
    44. }
    45. @Override
    46. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    47. }
    48. @Override
    49. public void channelUnregistered(ChannelHandlerContext ctx) throws Exception{
    50. }
    51. /**
    52. * 通道就绪事件
    53. *
    54. * @param ctx
    55. * @throws Exception
    56. */
    57. @Override
    58. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    59. }
    60. @Override
    61. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    62. }
    63. @Override
    64. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws
    65. Exception {
    66. }
    67. @Override
    68. public void channelWritabilityChanged(ChannelHandlerContext ctx) throws
    69. Exception {
    70. }
    71. @Override
    72. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    73. }
    74. @Override
    75. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    76. }
    77. }

3.4.2 客户端

步骤:
1. 创建线程组
2. 创建客户端启动助手
3. 设置线程组
4. 设置客户端通道实现为NIO
5. 创建一个通道初始化对象
6. 向pipeline中添加自定义业务处理handler
7. 启动客户端,等待连接服务端,同时将异步改为同步
8. 关闭通道和关闭连接池
代码:

  1. package com.lagou.demo;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.EventLoopGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. /**
  10. * Netty客户端
  11. */
  12. public class NettyClient {
  13. public static void main(String[] args) throws InterruptedException {
  14. //1. 创建线程组
  15. EventLoopGroup group = new NioEventLoopGroup();
  16. //2. 创建客户端启动助手
  17. Bootstrap bootstrap = new Bootstrap();
  18. //3. 设置线程组
  19. bootstrap.group(group)
  20. .channel(NioSocketChannel.class)//4. 设置服务端通道实现为NIO
  21. .handler(new ChannelInitializer<SocketChannel>() { //5. 创建一个通道初始化对象
  22. @Override
  23. protected void initChannel(SocketChannel ch) throws Exception {
  24. //6. 向pipeline中添加自定义业务处理handler
  25. ch.pipeline().addLast(new NettyClientHandle());
  26. }
  27. });
  28. //7. 启动客户端, 等待连接服务端, 同时将异步改为同步
  29. ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
  30. //8. 关闭通道和关闭连接池
  31. future.channel().closeFuture().sync();
  32. group.shutdownGracefully();
  33. }
  34. }

自定义客户端handle:

  1. package com.lagou.demo;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.ChannelInboundHandler;
  6. import io.netty.util.CharsetUtil;
  7. /**
  8. * 客户端处理类
  9. */
  10. public class NettyClientHandle implements ChannelInboundHandler {
  11. /**
  12. * 通道就绪事件
  13. *
  14. * @param ctx
  15. * @throws Exception
  16. */
  17. @Override
  18. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  19. ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端",
  20. CharsetUtil.UTF_8));
  21. }
  22. @Override
  23. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  24. ByteBuf byteBuf = (ByteBuf) msg;
  25. System.out.println("服务端发来消息:" +
  26. byteBuf.toString(CharsetUtil.UTF_8));
  27. }
  28. @Override
  29. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  30. }
  31. @Override
  32. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  33. }
  34. @Override
  35. public void channelUnregistered(ChannelHandlerContext ctx) throws Exception{
  36. }
  37. @Override
  38. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  39. }
  40. @Override
  41. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{
  42. }
  43. @Override
  44. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  45. }
  46. @Override
  47. public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
  48. }
  49. @Override
  50. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  51. }
  52. @Override
  53. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  54. }
  55. }

3.5 异步模型

3.5.1 介绍

异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。
Netty中的IO操作是异步的,包括Bind、Write、Connect等操作会简单的返回一个ChannelFuture。调用者并不能立刻获得结果,而是通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。Netty的异步模型是建立在future和callback之上的。callback就是回调。Future的核心思想是:假设一个方法fun,计算过程可能非常耗时,等待fun返回显然不合适。那么可以在调用fun的时候,立马返回一个future,后续可以通过Future去监控方法fun的处理过程(即:Future-Listener机制)

3.5.2 Future和Future-Listener

1、Future
表示异步的执行结果,可以通过它提供的方法来检测执行是否完成,ChannelFuture是他的一个子接口。ChannelFuture是一个接口,可以添加监听器,当监听的事件发生时,就会通知到监听器
当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。
常用的方法:

  • sync 方法, 阻塞等待程序结果反回
  • isDone 方法来判断当前操作是否完成;
  • isSuccess 方法来判断已完成的当前操作是否成功;
  • getCause 方法来获取已完成的当前操作失败的原因;
  • isCancelled 方法来判断已完成的当前操作是否被取消;
  • addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果Future 对象已完成,则通知指定的监听器

2、Future-Listener机制
给Future添加监听器,监听操作结果
代码:

  1. ChannelFuture future = bootstrap.bind(9999);
  2. future.addListener(new ChannelFutureListener() {
  3. @Override
  4. public void operationComplete(ChannelFuture future) throws Exception {
  5. if (future.isSuccess()) {
  6. System.out.println("端口绑定成功!");
  7. } else {
  8. System.out.println("端口绑定失败!");
  9. }
  10. }
  11. });
  1. ChannelFuture channelFuture = ctx.writeAndFlush(
  2. Unpooled.copiedBuffer("你好呀,我是Netty客户端", CharsetUtil.UTF_8)
  3. );
  4. channelFuture.addListener(new ChannelFutureListener() {
  5. @Override
  6. public void operationComplete(ChannelFuture future) throws Exception {
  7. if (future.isSuccess()) {
  8. System.out.println("数据发送成功.");
  9. } else {
  10. System.out.println("数据发送失败.");
  11. }
  12. }
  13. });