1、Netty结构

1.1 Netty基本介绍

原生 NIO 存在的问题
1. NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
2. 需要具备其他的额外技能:要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。
3. 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。
4. JDK NIO 的 Bug:臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。直到JDK 1.7版本该问题仍旧存在,没有被根本解决

  1. NIO中通过Selector的轮询当前是否有IO事件,根据JDK NIO api描述,Selectorselect方法会一直阻塞,直到IO事件达到或超时,但是在Linux平台上这里有时会出现问题,在某些场景下select方法会直接返回,即使没有超时并且也没有IO事件到达,这就是著名的epoll
  2. bug,这是一个比较严重的bug,它会导致线程陷入死循环,会让CPU飙到100%,极大地影响系统的可靠性,到目前为止,JDK都没有完全解决这个问题。

Netty介绍
Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序。 Elasticsearch 、Dubbo 框架内部都采用了 Netty。
image.png
从图中就能看出 Netty 的强大之处:零拷贝、可拓展事件模型;支持 TCP、UDP、HTTP、WebSocket 等协议;提供安全传输、压缩、大文件传输、编解码支持等等。
具备如下优点:
1. 设计优雅,提供阻塞和非阻塞的 Socket;提供灵活可拓展的事件模型;提供高度可定制的线程模型。
2. 具备更高的性能和更大的吞吐量,使用零拷贝技术最小化不必要的内存复制,减少资源的消耗。
3. 提供安全传输特性。
4. 支持多种主流协议;预置多种编解码功能,支持用户开发私有协议。

1.2 线程模型

  • 传统阻塞 I/O 服务模型

image.png

存在问题:
1. 当并发数很大,就会创建大量的线程,占用很大系统资源
2. 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在 read 操作,造成线程资源浪费

  • Reactor 模型

根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现
单 Reactor 单线程
单 Reactor 多线程
主从 Reactor 多线程

Reactor 模式,通过一个或多个输入同时传递给服务处理器的模式 , 服务器端程序处理传入的多个请求,并将它们同步分派到相应的处理线程, 因此 Reactor 模式也叫 Dispatcher模式. Reactor 模式使用
IO 复用监听事件, 收到事件后,分发给某个线程(进程), 这点就是网络服务器高并发处理关键.

(1)单 Reactor 单线程
image.png

  • Selector是可以实现应用程序通过一个阻塞对象监听多路连接请求
  • Reactor 对象通过 Selector监控客户端请求事件,收到事件后通过 Dispatch 进行分发是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler 对象处理连接完成后的后续业务处理
  • Handler 会完成 Read→业务处理→Send 的完整业务流程

优点:
优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成
缺点:
1. 性能问题: 只有一个线程,无法完全发挥多核 CPU 的性能。Handler 在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈
2. 可靠性问题: 线程意外终止或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障

(2) 单 Reactor多线程
image.png

  • Reactor 对象通过 selector 监控客户端请求事件, 收到事件后,通过 dispatch 进行分发
  • 如果建立连接请求, 则右 Acceptor 通过accept 处理连接请求
  • 如果不是连接请求,则由 reactor 分发调用连接对应的 handler 来处理
  • handler 只负责响应事件,不做具体的业务处理, 通过 read 读取数据后,会分发给后面的
  • worker 线程池的某个线程处理业务
  • worker 线程池会分配独立线程完成真正的业务,并将结果返回给 handler
  • handler 收到响应后,通过 send 将结果返回给 client

优点:
可以充分的利用多核 cpu 的处理能力
缺点:
多线程数据共享和访问比较复杂, reactor 处理所有的事件的监听和响应,在单线程运行, 在高并发场景容易出现性能瓶颈

(3). 主从 Reactor 多线程
image.png

  • Reactor 主线程 MainReactor 对象通过 select 监听客户端连接事件,收到事件后,通过Acceptor 处理客户端连接事件
  • 当 Acceptor 处理完客户端连接事件之后(与客户端建立好 Socket 连接),MainReactor 将连接分配给 SubReactor。(即:MainReactor 只负责监听客户端连接请求,和客户端建立连接之后将连接交由 SubReactor 监听后面的 IO 事件。)
  • SubReactor 将连接加入到自己的连接队列进行监听,并创建 Handler 对各种事件进行处理
  • 当连接上有新事件发生的时候,SubReactor 就会调用对应的 Handler 处理
  • Handler 通过 read 从连接上读取请求数据,将请求数据分发给 Worker 线程池进行业务处理
  • Worker 线程池会分配独立线程来完成真正的业务处理,并将处理结果返回给 Handler。Handler 通过 send 向客户端发送响应数据
  • 一个 MainReactor 可以对应多个 SubReactor,即一个 MainReactor 线程可以对应多个SubReactor 线程

优点:
1. MainReactor 线程与 SubReactor 线程的数据交互简单职责明确,MainReactor 线程只需要接收新连接,SubReactor 线程完成后续的业务处理
2. MainReactor 线程与 SubReactor 线程的数据交互简单, MainReactor 线程只需要把新连接传给 SubReactor 线程,SubReactor 线程无需返回数据
3. 多个 SubReactor 线程能够应对更高的并发请求

缺点:
这种模式的缺点是编程复杂度较高。但是由于其优点明显,在许多项目中被广泛使用,包括Nginx、Memcached、Netty 等。这种模式也被叫做服务器的 1+M+N 线程模式,即使用该模式开发的服务器包含一个(或多个,1 只是表示相对较少)连接建立线程+M 个 IO 线程+N 个业务处理线程。这是业界成熟的服务器程序设计模式。

1.3 Netty线程模型

  1. 简单版Netty模型
    image.png
  • BossGroup 线程维护 Selector,ServerSocketChannel 注册到这个 Selector 上,只关注连接建立请求事件(主 Reactor)
  • 当接收到来自客户端的连接建立请求事件的时候,通过 ServerSocketChannel.accept 方法获得对应的 SocketChannel,并封装成 NioSocketChannel 注册到 WorkerGroup 线程中的Selector,每个 Selector 运行在一个线程中(从 Reactor)
  • 当 WorkerGroup 线程中的 Selector 监听到自己感兴趣的 IO 事件后,就调用 Handler 进行处理
  1. 进阶版Netty模型
    image.png
  • 有两组线程池:BossGroup 和 WorkerGroup,BossGroup 中的线程专门负责和客户端建立连接,WorkerGroup 中的线程专门负责处理连接上的读写
  • BossGroup 和 WorkerGroup 含有多个不断循环的执行事件处理的线程,每个线程都包含一个 Selector,用于监听注册在其上的 Channel

每个 BossGroup 中的线程循环执行以下三个步骤

  • 轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
  • 处理 accept 事件,与客户端建立连接,生成一个 NioSocketChannel,并将其注册到WorkerGroup 中某个线程上的 Selector 上
  • 再去以此循环处理任务队列中的下一个事件

每个 WorkerGroup 中的线程循环执行以下三个步骤

  • 轮训注册在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)
  • 在对应的 NioSocketChannel 上处理 read/write 事件
  • 再去以此循环处理任务队列中的下一个事件
  1. 详细版Netty模型
    image.png
  • Netty 抽象出两组线程池:BossGroup 和 WorkerGroup,也可以叫做BossNioEventLoopGroup 和 WorkerNioEventLoopGroup。每个线程池中都有多个NioEventLoop 线程。BossGroup 中的线程专门负责和客户端建立连接,WorkerGroup 中的线程专门负责处理连接上的读写。BossGroup 和 WorkerGroup 的类型都是NioEventLoopGroup
  • NioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环就是一个 NioEventLoop
  • NioEventLoop 表示一个不断循环的执行事件处理的线程,每个 NioEventLoop 都包含一个Selector,用于监听注册在其上的 Socket 网络连接(Channel)
  • NioEventLoopGroup 可以含有多个线程,即可以含有多个 NioEventLoop
  • 每个 BossNioEventLoop 中循环执行以下三个步骤

select:轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
processSelectedKeys:处理 accept 事件,与客户端建立连接,生成一个NioSocketChannel,并将其注册到某个 WorkerNioEventLoop 上的 Selector 上
runAllTasks:再去以此循环处理任务队列中的其他任务

  • 每个 WorkerNioEventLoop 中循环执行以下三个步骤

select:轮训注册在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)
processSelectedKeys:在对应的 NioSocketChannel 上处理 read/write 事件
runAllTasks:再去以此循环处理任务队列中的其他任务

在以上两个processSelectedKeys步骤中,会使用 Pipeline(管道),Pipeline 中引用了Channel,即通过 Pipeline 可以获取到对应的 Channel,Pipeline 中维护了很多的处理器(拦截处理器、过滤处理器、自定义处理器等)。

2、Netty核心API

2.1 ChannelHandler及其实现类

ChannelHandler 接口定义了许多事件处理的方法,我们可以通过重写这些方法去实现具 体的业务逻辑。API 关系如下图所示
image.png
Netty开发中需要自定义一个 Handler 类去实现 ChannelHandle接口或其子接口或其实现类,然后通过重写相应方法实现业务逻辑,我们接下来看看一般都需要重写哪些方法

  • public void channelActive(ChannelHandlerContext ctx),通道就绪事件
  • public void channelRead(ChannelHandlerContext ctx, Object msg),通道读取数据事件
  • public void channelReadComplete(ChannelHandlerContext ctx) ,数据读取完毕事件
  • public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause),通道发生异常事件

2.2 ChannelPipeline

ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和操作,相当于一个贯穿 Netty 的责任链.
image.png
如果客户端和服务器的Handler是一样的,消息从客户端到服务端或者反过来,每个Inbound类型或Outbound类型的Handler只会经过一次,混合类型的Handler(实现了Inbound和Outbound的Handler)会经过两次。准确的说ChannelPipeline中是一个ChannelHandlerContext,每个上下文对象中有ChannelHandler. InboundHandler是按照Pipleline的加载顺序的顺序执行, OutboundHandler是按照Pipeline的加载顺序,逆序执行

2.3 ChannelHandlerContext

这 是 事 件 处 理 器 上 下 文 对 象 , Pipeline 链 中 的 实 际 处 理 节 点 。 每 个 处 理 节 点
ChannelHandlerContext 中 包 含 一 个 具 体 的 事 件 处 理 器 ChannelHandler ,同时ChannelHandlerContext 中也绑定了对应的 ChannelPipeline和 Channel 的信息,方便对ChannelHandler 进行调用。常用方法如下所示:

  • ChannelFuture close(),关闭通道
  • ChannelOutboundInvoker flush(),刷新
  • ChannelFuture writeAndFlush(Object msg) , 将 数 据 写 到 ChannelPipeline 中 当 前ChannelHandler 的下一个 ChannelHandler 开始处理(出站)

2.4 ChannelOption

Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。ChannelOption 是 Socket 的标准参数,而非 Netty 独创的。常用的参数配置有:

  • ChannelOption.SO_BACKLOG对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小。服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户 端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog 参数指定 了队列的大小。
  • ChannelOption.SO_KEEPALIVE ,一直保持连接活动状态。该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。

2.5 ChannelFuture

表示 Channel 中异步 I/O 操作的结果,在 Netty 中所有的 I/O 操作都是异步的,I/O 的调用会直接返回,调用者并不能立刻获得结果,但是可以通过 ChannelFuture 来获取 I/O 操作 的处理状态。
常用方法如下所示:

  • Channel channel(),返回当前正在进行 IO 操作的通道
  • ChannelFuture sync(),等待异步操作执行完毕,将异步改为同步

2.6 EventLoopGroup和实现类NioEventLoopGroup

EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般 会有多个EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。

EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop 来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。 通常一个服务端口即一个 ServerSocketChannel对应一个Selector 和一个EventLoop线程。 BossEventLoop 负责接收客户端的连接并将SocketChannel 交给 WorkerEventLoopGroup 来进 行 IO 处理,如下图所示:

image.png
BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了ServerSocketChannel 的 Selector 实例,BossEventLoop 不断轮询 Selector 将连接事件分离出来, 通常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给 WorkerEventLoopGroup,
WorkerEventLoopGroup 会由 next 选择其中一个 EventLoopGroup 来将这个 SocketChannel 注册到其维护的 Selector 并对其后续的 IO 事件进行处理。
一般情况下我们都是用实现类NioEventLoopGroup.
常用方法如下所示:

  • public NioEventLoopGroup(),构造方法,创建线程组
  • public Future<?> shutdownGracefully(),断开连接,关闭线程

2.7 ServerBootstrap和Bootstrap

ServerBootstrap 是 Netty 中的服务器端启动助手,通过它可以完成服务器端的各种配置;
Bootstrap 是 Netty 中的客户端启动助手,通过它可以完成客户端的各种配置。常用方法如下 所示:

  • public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroupchildGroup), 该方法用于服务器端,用来设置两个 EventLoop
  • public B group(EventLoopGroup group) ,该方法用于客户端,用来设置一个 EventLoop
  • public B channel(Class<? extends C> channelClass),该方法用来设置一个服务器端的通道 实现
  • public B option(ChannelOption option, T value),用来给 ServerChannel 添加配置
  • public ServerBootstrap childOption(ChannelOption childOption, T value),用来给接收到的通道添加配置
  • public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务 处理类(自定义的 handler)
  • public ChannelFuture bind(int inetPort) ,该方法用于服务器端,用来设置占用的端口号
  • public ChannelFuture connect(String inetHost, int inetPort) ,该方法用于客户端,用来连接服务器端

2.8 Unpooled类

这是 Netty 提供的一个专门用来操作缓冲区的工具类,常用方法如下所示:
public static ByteBuf copiedBuffer(CharSequence string, Charset charset),通过给定的数据 和字符编码返回一个 ByteBuf 对象(类似于 NIO 中的 ByteBuffer 对象)

3、Netty基础开发示例

3.1 服务端和客户端

依赖:

  1. <dependency>
  2. <groupId>io.netty</groupId>
  3. <artifactId>netty-all</artifactId>
  4. <version>4.1.42.Final</version>
  5. </dependency>

服务端代码:

  1. package org.example.netty;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.bootstrap.ServerBootstrap;
  4. import io.netty.channel.ChannelFuture;
  5. import io.netty.channel.ChannelInitializer;
  6. import io.netty.channel.ChannelOption;
  7. import io.netty.channel.EventLoopGroup;
  8. import io.netty.channel.nio.NioEventLoopGroup;
  9. import io.netty.channel.socket.SocketChannel;
  10. import io.netty.channel.socket.nio.NioServerSocketChannel;
  11. /**
  12. * @author :xjlonly
  13. * @date :Created in 2021/3/24 10:16
  14. * @description:netty服务端
  15. * @modified By:
  16. * @version: 1.0$
  17. */
  18. public class NettyServer {
  19. public static void main(String[] args) throws InterruptedException {
  20. //1.创建bossGroup线程组: 处理网络事件--连接事件 线程数默认为: 2 * 处理器线程数
  21. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  22. //2.创建workerGroup线程组: 处理网络事件--读写事件 默认线程数是 2 * 处理器线程数
  23. EventLoopGroup workerGroup = new NioEventLoopGroup();
  24. //3. 创建服务端启动助手
  25. ServerBootstrap serverBootstrap = new ServerBootstrap();
  26. //4. 设置bossGroup线程组和workerGroup线程组
  27. serverBootstrap.group(bossGroup, workerGroup)
  28. //5. 设置服务端通道实现为NIO
  29. .channel(NioServerSocketChannel.class)
  30. //6.参数设置-设置线程队列中等待 连接个数
  31. .option(ChannelOption.SO_BACKLOG,128)
  32. //7.参数设 置-设置活跃状态,child是设置workerGroup
  33. .childOption(ChannelOption.SO_KEEPALIVE, true)
  34. //8.创建一个通道初始化对象
  35. .childHandler(new ChannelInitializer<SocketChannel>() {
  36. @Override
  37. protected void initChannel(SocketChannel socketChannel) throws Exception {
  38. //9. 向pipeline中添加自定义业务处理handler
  39. socketChannel.pipeline().addLast(new NettyServerHandler());
  40. }
  41. });
  42. //10.启动服务端并绑定端口,同时将异步改为同步
  43. ChannelFuture channelFuture = serverBootstrap.bind(9999).sync();
  44. System.out.println("服务端启动成功...");
  45. //11.关闭通道(并不是真正意义上的关闭,而是监听通道关闭状态)和关闭连接池
  46. channelFuture.channel().closeFuture().sync();
  47. bossGroup.shutdownGracefully();
  48. workerGroup.shutdownGracefully();
  49. }
  50. }

自定义服务端handle:

  1. package org.example.netty;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.ChannelInboundHandler;
  6. import io.netty.util.CharsetUtil;
  7. import java.nio.charset.StandardCharsets;
  8. /**
  9. * @author :xjlonly
  10. * @date :Created in 2021/3/24 10:32
  11. * @description:服务端处理读写handler
  12. * @modified By:
  13. * @version: 1.0$
  14. */
  15. public class NettyServerHandler implements ChannelInboundHandler {
  16. /*
  17. * 通道读取事件
  18. *
  19. * */
  20. @Override
  21. public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
  22. ByteBuf byteBuf = (ByteBuf)o;
  23. System.out.println("客户端发来数据:" + byteBuf.toString(StandardCharsets.UTF_8));
  24. }
  25. @Override
  26. public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
  27. channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("你好,我是Netty服务端.", CharsetUtil.UTF_8));
  28. }
  29. @Override
  30. public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
  31. }
  32. @Override
  33. public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
  34. }
  35. @Override
  36. public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
  37. }
  38. @Override
  39. public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
  40. }
  41. @Override
  42. public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
  43. }
  44. @Override
  45. public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
  46. }
  47. @Override
  48. public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
  49. }
  50. @Override
  51. public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
  52. }
  53. /**
  54. * @author: xjlonly
  55. * @description: 异常处理事件
  56. * @date: 2021/3/24 10:40
  57. * @Param: null
  58. * @return
  59. */
  60. @Override
  61. public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
  62. throwable.printStackTrace();
  63. channelHandlerContext.close();
  64. }
  65. }

客户端代码:

  1. package org.example.netty;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.EventLoopGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. /**
  10. * @author :xjlonly
  11. * @date :Created in 2021/3/24 10:40
  12. * @description:netty客户端
  13. * @modified By:
  14. * @version: 1.0$
  15. */
  16. public class NettyClient {
  17. public static void main(String[] args) throws InterruptedException {
  18. //1. 创建线程组
  19. EventLoopGroup group = new NioEventLoopGroup();
  20. //2. 创建客户端启动助手
  21. Bootstrap bootstrap = new Bootstrap();
  22. //3. 设置线程组
  23. bootstrap.group(group).channel(NioSocketChannel.class)
  24. //4. 设置服务端通道实现为NIO
  25. .handler(new ChannelInitializer<SocketChannel>() {
  26. //5. 创建一个通 道初始化对象
  27. @Override
  28. protected void initChannel(SocketChannel ch) throws Exception {
  29. //6. 向pipeline中添加自定义业务处理handler
  30. ch.pipeline().addLast(new NettyClientHandle());
  31. }
  32. });
  33. //7. 启动客户端, 等待连接服务端, 同时将异步改为同步
  34. ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
  35. //8. 关闭通道和关闭连接池
  36. future.channel().closeFuture().sync();
  37. group.shutdownGracefully();
  38. }
  39. }

客户端自定义handle

  1. package org.example.netty;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandler;
  5. import io.netty.channel.ChannelHandlerContext;
  6. import io.netty.channel.ChannelInboundHandler;
  7. import io.netty.util.CharsetUtil;
  8. import io.netty.util.concurrent.EventExecutorGroup;
  9. /**
  10. * @author :xjlonly
  11. * @date :Created in 2021/3/24 10:43
  12. * @description:客户端处理类
  13. * @modified By:
  14. * @version: $
  15. */
  16. public class NettyClientHandle implements ChannelInboundHandler {
  17. @Override
  18. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  19. }
  20. @Override
  21. public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
  22. }
  23. /**
  24. * @author: xjlonly
  25. * @description: 通道就绪事件
  26. * @date: 2021/3/24 10:45
  27. * @Param: null
  28. * @return
  29. */
  30. @Override
  31. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  32. ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端", CharsetUtil.UTF_8));
  33. }
  34. @Override
  35. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  36. }
  37. @Override
  38. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  39. ByteBuf byteBuf = (ByteBuf) msg;
  40. System.out.println("服务端发来消息:" + byteBuf.toString(CharsetUtil.UTF_8));
  41. }
  42. @Override
  43. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  44. }
  45. @Override
  46. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  47. }
  48. @Override
  49. public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
  50. }
  51. @Override
  52. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  53. }
  54. @Override
  55. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  56. }
  57. @Override
  58. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  59. }
  60. }

3.2 Netty异步模型

image.png
Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个ChannelFuture。调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得IO 操作结果. Netty 的异步模型是建立在 future 和 callback 的之上的。callback 就是回调。重点说 Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun 返回显然不合适。那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future 去监控方法 fun 的处理过程(即 : Future-Listener 机制)

Future 和Future-Listener
1. Future
表示异步的执行结果, 可以通过它提供的方法来检测执行是否完成,ChannelFuture 是他的一个子接口. ChannelFuture 是一个接口 ,可以添加监听器,当监听的事件发生时,就会通知到监听器当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态, 注册监听函数来执行完成后的操作。
常用方法有:

  • sync 方法, 阻塞等待程序结果反回
  • isDone 方法来判断当前操作是否完成;
  • isSuccess 方法来判断已完成的当前操作是否成功;
  • getCause 方法来获取已完成的当前操作失败的原因;
  • isCancelled 方法来判断已完成的当前操作是否被取消;
  • addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果Future 对象已完成,则通知指定的监听器
  1. Future-Listener 机制
    给Future添加监听器,监听操作结果
    修改上小节代码实现:
    1. //10.启动服务端并绑定端口,同时将异步改为同步
    2. // ChannelFuture channelFuture = serverBootstrap.bind(9999).sync();
    3. ChannelFuture channelFuture = serverBootstrap.bind(9999);
    4. channelFuture.addListener(new ChannelFutureListener() {
    5. @Override
    6. public void operationComplete(ChannelFuture future) throws Exception {
    7. if (future.isSuccess()) {
    8. System.out.println("端口绑定成功!");
    9. } else {
    10. System.out.println("端口绑定失败!");
    11. }
    12. }
    13. });
    客户端发消息修改:
    1. //ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端", CharsetUtil.UTF_8));
    2. ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("你好 呀,我是Netty客户端", CharsetUtil.UTF_8));
    3. channelFuture.addListener(new ChannelFutureListener() {
    4. @Override
    5. public void operationComplete(ChannelFuture future) throws Exception {
    6. if (future.isSuccess()) {
    7. System.out.println("数据发送成功.");
    8. } else {
    9. System.out.println("数据发送失败.");
    10. }
    11. }
    12. });

    4、Netty高级应用

    4.1 Netty编解码器

    Java的编解码
    1. 编码(Encode)称为序列化, 它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途。
    2. 解码(Decode)称为反序列化,它把从网络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作。

java序列化对象只需要实现java.io.Serializable接口并生成序列化ID,这个类就能够通过java.io.ObjectInput和java.io.ObjectOutput序列化和反序列化。
Java序列化目的:1.网络传输。2.对象持久化。
Java序列化缺点:1.无法跨语言。 2.序列化后码流太大。3.序列化性能太低。
Java序列化仅仅是Java编解码技术的一种,由于它的种种缺陷,衍生出了多种编解码技术和框架,这些编解码框架实现消息的高效序列化。

Netty编解码器
对于Netty而言,编解码器由两部分组成:编码器、解码器。
解码器:负责将消息从字节或其他序列形式转成指定的消息对象。
编码器:将消息对象转成字节或其他序列形式在网络上传输。
Netty 的编(解)码器实现了 ChannelHandlerAdapter,也是一种特殊的 ChannelHandler,所以依赖于 ChannelPipeline,可以将多个编(解)码器链接在一起,以实现复杂的转换逻辑。
Netty里面的编解码: 解码器:负责处理“入站 InboundHandler”数据。 编码器:负责“出站OutboundHandler” 数据。

(1) 解码器(Decoder)
解码器负责 解码“入站”数据从一种格式到另一种格式,解码器处理入站数据是抽象ChannelInboundHandler的实现。需要将解码器放在ChannelPipeline中。对于解码器,Netty中主要提供了抽象基类ByteToMessageDecoder和MessageToMessageDecoder
image.png
抽象解码器
ByteToMessageDecoder: 用于将字节转为消息,需要检查缓冲区是否有足够的字节ReplayingDecoder: 继承ByteToMessageDecoder,不需要检查缓冲区是否有足够的字节,但是 ReplayingDecoder速度略慢于ByteToMessageDecoder,同时不是所有的ByteBuf都支持。
项目复杂性高则使用ReplayingDecoder,否则使用ByteToMessageDecoderMessageToMessageDecoder: 用于从一种消息解码为另外一种消息(例如POJO到POJO)
核心方法:

  1. decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)

修改上节代码 服务端加入解码器:

  1. package org.example.netty;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.handler.codec.MessageToMessageDecoder;
  5. import java.nio.charset.Charset;
  6. import java.nio.charset.StandardCharsets;
  7. import java.util.List;
  8. /**
  9. * @author :xjlonly
  10. * @date :Created in 2021/3/24 11:24
  11. * @description:消息解码-可以将字符串消息进行在进行解码.只有消息入站时才会进行解码
  12. * @modified By:
  13. * @version: $
  14. */
  15. public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
  16. @Override
  17. protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
  18. System.out.println("正在消息解码...");
  19. out.add(msg.toString(StandardCharsets.UTF_8));
  20. }
  21. }

将解码器加入处理链

  1. protected void initChannel(SocketChannel socketChannel) throws Exception {
  2. socketChannel.pipeline().addLast(new MessageDecoder());
  3. //9. 向pipeline中添加自定义业务处理handler
  4. socketChannel.pipeline().addLast(new NettyServerHandler());
  5. }

通道读取方法:

  1. /*
  2. * 通道读取事件
  3. *
  4. * */
  5. @Override
  6. public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
  7. //ByteBuf byteBuf = (ByteBuf)o;
  8. //System.out.println("客户端发来数据:" + byteBuf.toString(StandardCharsets.UTF_8));
  9. System.out.println("客户端发来消息:" + o);
  10. }

(2)编码器(Encoder)
与ByteToMessageDecoder和MessageToMessageDecoder相对应,Netty提供了对应的编码器实现MessageToByteEncoder和MessageToMessageEncoder,二者都实现ChannelOutboundHandler接口。
image.png
抽象编码器

  • MessageToByteEncoder: 将消息转化成字节
  • MessageToMessageEncoder: 用于从一种消息编码为另外一种消息(例如POJO到POJO)

核心方法:

  1. encode(ChannelHandlerContext ctx, String msg, List<Object> out)

编码器实现:

  1. package org.example.netty;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.handler.codec.MessageToMessageEncoder;
  6. import io.netty.util.CharsetUtil;
  7. import java.util.List;
  8. /**
  9. * @author :xjlonly
  10. * @date :Created in 2021/3/24 14:17
  11. * @description:消息编码器
  12. * @modified By:
  13. * @version: 1.0$
  14. */
  15. public class MessageEncoder extends MessageToMessageEncoder<String> {
  16. @Override
  17. protected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
  18. System.out.println("消息正在编码...");
  19. out.add(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
  20. }
  21. }
  22. protected void initChannel(SocketChannel socketChannel) throws Exception {
  23. socketChannel.pipeline().addLast(new MessageEncoder());
  24. socketChannel.pipeline().addLast(new MessageDecoder());
  25. socketChannel.pipeline().addLast(new NettyServerHandler());
  26. }
  27. @Override
  28. public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
  29. //channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("你好,我是Netty服务端.", CharsetUtil.UTF_8));
  30. channelHandlerContext.writeAndFlush("你好,我是Netty服务端.");
  31. }

(3). 编码解码器Codec
编码解码器: 同时具有编码与解码功能,特点同时实现了ChannelInboundHandler和ChannelOutboundHandler接口,因此在数据输入和输出时都能进行处理。
image.png
Netty提供提供了一个ChannelDuplexHandler适配器类,编码解码器的抽象基类ByteToMessageCodec ,MessageToMessageCodec都继承与此类.
代码实现

  1. package org.example.netty;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.handler.codec.MessageToMessageCodec;
  6. import io.netty.util.CharsetUtil;
  7. import java.nio.charset.StandardCharsets;
  8. import java.util.List;
  9. /**
  10. * @author :xjlonly
  11. * @date :Created in 2021/3/24 14:28
  12. * @description:编解码器
  13. * @modified By:
  14. * @version: 1.0$
  15. */
  16. public class MessageCodec extends MessageToMessageCodec<ByteBuf,String>{
  17. @Override
  18. protected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
  19. System.out.println("消息正在编码...");
  20. out.add(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
  21. }
  22. @Override
  23. protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
  24. System.out.println("正在消息解码...");
  25. out.add(msg.toString(StandardCharsets.UTF_8));
  26. }
  27. }
  28. protected void initChannel(SocketChannel ch) throws Exception {
  29. //8. 向pipeline中添加自定义业务处理handler
  30. ch.pipeline().addLast(new MessageCoder());
  31. //添加编解码器
  32. ch.pipeline().addLast(new NettyServerHandler());
  33. }

4.2 群聊天室

  1. 编写一个 Netty 群聊系统,实现服务器端和客户端之间的数据简单通讯
    2. 实现多人群聊
    3. 服务器端:可以监测用户上线,离线,并实现消息转发功能
    4. 客户端:可以发送消息给其它所有用户,同时可以接受其它用户发送的消息

服务端:

  1. package org.example.chat;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioServerSocketChannel;
  7. import io.netty.handler.codec.string.StringDecoder;
  8. import io.netty.handler.codec.string.StringEncoder;
  9. import org.example.netty.MessageDecoder;
  10. import org.example.netty.MessageEncoder;
  11. import org.example.netty.NettyServerHandler;
  12. /**
  13. * @author :xjlonly
  14. * @date :Created in 2021/3/24 14:38
  15. * @description:聊天室服务端
  16. * @modified By:
  17. * @version: 1.0$
  18. */
  19. public class NettyChatServer {
  20. private final int port = 9999;
  21. public void run() throws InterruptedException {
  22. //1.创建bossGroup线程组: 处理网络事件--连接事件 线程数默认为: 2 * 处理器线程数
  23. EventLoopGroup bossGroup = null;
  24. //2.创建workerGroup线程组: 处理网络事件--读写事件 默认线程数是 2 * 处理器线程数
  25. EventLoopGroup workerGroup = null;
  26. try {
  27. //1.创建bossGroup线程组: 处理网络事件--连接事件 线程数默认为: 2 * 处理器线程数
  28. bossGroup = new NioEventLoopGroup(1);
  29. //2.创建workerGroup线程组: 处理网络事件--读写事件 默认线程数是 2 * 处理器线程数
  30. workerGroup = new NioEventLoopGroup();
  31. //3. 创建服务端启动助手
  32. ServerBootstrap serverBootstrap = new ServerBootstrap();
  33. //4. 设置bossGroup线程组和workerGroup线程组
  34. serverBootstrap.group(bossGroup, workerGroup)
  35. //5. 设置服务端通道实现为NIO
  36. .channel(NioServerSocketChannel.class)
  37. //6.参数设置-设置线程队列中等待 连接个数
  38. .option(ChannelOption.SO_BACKLOG,128)
  39. //7.参数设 置-设置活跃状态,child是设置workerGroup
  40. .childOption(ChannelOption.SO_KEEPALIVE, true)
  41. //8.创建一个通道初始化对象
  42. .childHandler(new ChannelInitializer<SocketChannel>() {
  43. @Override
  44. protected void initChannel(SocketChannel socketChannel) throws Exception {
  45. //添加netty提供的string编解码器
  46. socketChannel.pipeline().addLast(new StringDecoder());
  47. socketChannel.pipeline().addLast(new StringEncoder());
  48. //9. 向pipeline中添加自定义业务处理handler
  49. socketChannel.pipeline().addLast(new NettyChatServerHandler());
  50. }
  51. });
  52. //10.启动服务端并绑定端口,同时将异步改为同步
  53. ChannelFuture channelFuture = serverBootstrap.bind(port);
  54. channelFuture.addListener((ChannelFutureListener) future -> {
  55. if (future.isSuccess()) {
  56. System.out.println("端口绑定成功!");
  57. } else {
  58. System.out.println("端口绑定失败!");
  59. }
  60. });
  61. System.out.println("聊天室服务端启动成功...");
  62. //11.关闭通道(并不是真正意义上的关闭,而是监听通道关闭状态)和关闭连接池
  63. channelFuture.channel().closeFuture().sync();
  64. }finally {
  65. assert bossGroup != null;
  66. bossGroup.shutdownGracefully();
  67. assert workerGroup != null;
  68. workerGroup.shutdownGracefully();
  69. }
  70. }
  71. public static void main(String[] args) throws InterruptedException {
  72. new NettyChatServer().run();
  73. }
  74. }

ServerHandler:

  1. package org.example.chat;
  2. import io.netty.channel.Channel;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.SimpleChannelInboundHandler;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. /**
  8. * @author :xjlonly
  9. * @date :Created in 2021/3/24 14:43
  10. * @description:聊天室业务处理类
  11. * @modified By:
  12. * @version: 1.0$
  13. */
  14. public class NettyChatServerHandler extends SimpleChannelInboundHandler<String> {
  15. public static List<Channel> channelList = new ArrayList<Channel>();
  16. /**
  17. * @author: xjlonly
  18. * @description: 通道读取事件
  19. * @date: 2021/3/24 14:44
  20. * @Param: null
  21. * @return
  22. */
  23. @Override
  24. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  25. //当前发送消息的客户端连接
  26. Channel channel = ctx.channel();
  27. for (Channel channel1 : channelList){
  28. if(!channel.equals(channel1)){
  29. channel1.writeAndFlush("["+ channel.remoteAddress().toString().substring(1) + "]说:" + msg);
  30. }
  31. }
  32. }
  33. /**
  34. * 通道就绪事件
  35. * */
  36. @Override
  37. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  38. Channel channel = ctx.channel();
  39. channelList.add(channel);
  40. System.out.println("[Server]:" + channel.remoteAddress().toString().substring(1) + "上线了");
  41. }
  42. /**
  43. * 通道未就绪 channel下线
  44. * */
  45. @Override
  46. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  47. Channel channel = ctx.channel();
  48. //当有客户端下线时,移除通道
  49. channelList.remove(channel);
  50. System.out.println("[Server]:" + channel.remoteAddress().toString().substring(1) + "下线了");
  51. }
  52. /**
  53. * 异常处理
  54. * */
  55. @Override
  56. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  57. Channel channel = ctx.channel();
  58. cause.printStackTrace();
  59. channelList.remove(channel);
  60. System.out.println("[Server]:" + channel.remoteAddress().toString().substring(1) + "出异常了");
  61. }
  62. }

客户端:

  1. package org.example.chat;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.Channel;
  4. import io.netty.channel.ChannelFuture;
  5. import io.netty.channel.ChannelInitializer;
  6. import io.netty.channel.EventLoopGroup;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.SocketChannel;
  9. import io.netty.channel.socket.nio.NioSocketChannel;
  10. import io.netty.handler.codec.string.StringDecoder;
  11. import io.netty.handler.codec.string.StringEncoder;
  12. import java.util.Scanner;
  13. /**
  14. * @author :xjlonly
  15. * @date :Created in 2021/3/24 14:56
  16. * @description:聊天室客户端
  17. * @modified By:
  18. * @version: 1.0$
  19. */
  20. public class NettyChatClient {
  21. private final String ip;
  22. private final int port;
  23. public NettyChatClient(String ip, int port) {
  24. this.ip = ip;
  25. this.port = port;
  26. }
  27. public void run() throws InterruptedException {
  28. //1. 创建线程组
  29. EventLoopGroup group = null;
  30. try {
  31. //1. 创建线程组
  32. group = new NioEventLoopGroup();
  33. //2. 创建客户端启动助手
  34. Bootstrap bootstrap = new Bootstrap();
  35. //3. 设置线程组
  36. bootstrap.group(group).channel(NioSocketChannel.class)
  37. //4. 设置服务端通道实现为NIO
  38. .handler(new ChannelInitializer<SocketChannel>() {
  39. //5. 创建一个通 道初始化对象
  40. @Override
  41. protected void initChannel(SocketChannel ch) throws Exception {
  42. ch.pipeline().addLast(new StringEncoder());
  43. ch.pipeline().addLast( new StringDecoder());
  44. //6. 向pipeline中添加自定义业务处理handler
  45. ch.pipeline().addLast(new NettyChatClientHandler());
  46. }
  47. });
  48. //7. 启动客户端, 等待连接服务端, 同时将异步改为同步
  49. ChannelFuture future = bootstrap.connect(ip, port).sync();
  50. var channel = future.channel();
  51. System.out.println("--------------------- "+ channel.localAddress().toString().substring(1) + "启动完成" +"-----------------------");
  52. //发送消息
  53. Scanner scanner = new Scanner(System.in);
  54. while (scanner.hasNextLine()){
  55. String msg = scanner.next();
  56. channel.writeAndFlush(msg);
  57. }
  58. //8. 关闭通道和关闭连接池
  59. future.channel().closeFuture().sync();
  60. }finally {
  61. assert group != null;
  62. group.shutdownGracefully();
  63. }
  64. }
  65. public static void main(String[] args) throws InterruptedException {
  66. new NettyChatClient("127.0.0.1", 9998).run();
  67. }
  68. }

ClientHandler:

  1. package org.example.chat;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.SimpleChannelInboundHandler;
  4. /**
  5. * @author :xjlonly
  6. * @date :Created in 2021/3/24 15:00
  7. * @description:客户端业务处理器
  8. * @modified By:
  9. * @version: 1.0$
  10. */
  11. public class NettyChatClientHandler extends SimpleChannelInboundHandler<String> {
  12. /**
  13. * 通道读取事件
  14. * */
  15. @Override
  16. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  17. System.out.println(msg);
  18. }
  19. }

4.3 基于Netty的Http服务器开发

Netty的HTTP协议栈无论在性能还是可靠性上,都表现优异,非常适合在非Web容器的场景下应用,相比于传统的Tomcat、Jetty等Web容器,它更加轻量和小巧,灵活性和定制性也更好。
1. Netty 服务器在 8080 端口监听
2. 浏览器发出请求 “http://localhost:8080/
3. 服务器可以回复消息给客户端 “Hello! 我是Netty服务器 “ ,并对特定请求资源进行过滤.
服务端代码:

  1. package org.example.chat;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioServerSocketChannel;
  7. import io.netty.handler.codec.http.HttpServerCodec;
  8. import io.netty.handler.codec.string.StringDecoder;
  9. import io.netty.handler.codec.string.StringEncoder;
  10. import org.example.http.NettyHttpServerHandler;
  11. import org.example.netty.MessageDecoder;
  12. import org.example.netty.MessageEncoder;
  13. import org.example.netty.NettyServerHandler;
  14. /**
  15. * @author :xjlonly
  16. * @date :Created in 2021/3/24 14:38
  17. * @description:Http服务端
  18. * @modified By:
  19. * @version: 1.0$
  20. */
  21. public class NettyHttpServer {
  22. private final int port;
  23. public NettyHttpServer(int port) {
  24. this.port = port;
  25. }
  26. public void run() throws InterruptedException {
  27. //1.创建bossGroup线程组: 处理网络事件--连接事件 线程数默认为: 2 * 处理器线程数
  28. EventLoopGroup bossGroup = null;
  29. //2.创建workerGroup线程组: 处理网络事件--读写事件 默认线程数是 2 * 处理器线程数
  30. EventLoopGroup workerGroup = null;
  31. try {
  32. //1.创建bossGroup线程组: 处理网络事件--连接事件 线程数默认为: 2 * 处理器线程数
  33. bossGroup = new NioEventLoopGroup(1);
  34. //2.创建workerGroup线程组: 处理网络事件--读写事件 默认线程数是 2 * 处理器线程数
  35. workerGroup = new NioEventLoopGroup();
  36. //3. 创建服务端启动助手
  37. ServerBootstrap serverBootstrap = new ServerBootstrap();
  38. //4. 设置bossGroup线程组和workerGroup线程组
  39. serverBootstrap.group(bossGroup, workerGroup)
  40. //5. 设置服务端通道实现为NIO
  41. .channel(NioServerSocketChannel.class)
  42. //6.参数设置-设置线程队列中等待 连接个数
  43. .option(ChannelOption.SO_BACKLOG,128)
  44. //7.参数设 置-设置活跃状态,child是设置workerGroup
  45. .childOption(ChannelOption.SO_KEEPALIVE, true)
  46. //8.创建一个通道初始化对象
  47. .childHandler(new ChannelInitializer<SocketChannel>() {
  48. @Override
  49. protected void initChannel(SocketChannel socketChannel) throws Exception {
  50. //添加支持http的编解码器
  51. socketChannel.pipeline().addLast(new HttpServerCodec());
  52. //9. 向pipeline中添加自定义业务处理handler
  53. socketChannel.pipeline().addLast(new NettyHttpServerHandler());
  54. }
  55. });
  56. //10.启动服务端并绑定端口,同时将异步改为同步
  57. ChannelFuture channelFuture = serverBootstrap.bind(port);
  58. channelFuture.addListener((ChannelFutureListener) future -> {
  59. if (future.isSuccess()) {
  60. System.out.println("端口绑定成功!");
  61. } else {
  62. System.out.println("端口绑定失败!");
  63. }
  64. });
  65. System.out.println("http服务端启动成功...");
  66. //11.关闭通道(并不是真正意义上的关闭,而是监听通道关闭状态)和关闭连接池
  67. channelFuture.channel().closeFuture().sync();
  68. }finally {
  69. assert bossGroup != null;
  70. bossGroup.shutdownGracefully();
  71. assert workerGroup != null;
  72. workerGroup.shutdownGracefully();
  73. }
  74. }
  75. public static void main(String[] args) throws InterruptedException {
  76. new NettyHttpServer(8080).run();
  77. }
  78. }
  79. package org.example.http;
  80. import io.netty.buffer.ByteBuf;
  81. import io.netty.buffer.Unpooled;
  82. import io.netty.channel.Channel;
  83. import io.netty.channel.ChannelHandlerContext;
  84. import io.netty.channel.SimpleChannelInboundHandler;
  85. import io.netty.handler.codec.http.*;
  86. import io.netty.util.CharsetUtil;
  87. import java.util.ArrayList;
  88. import java.util.List;
  89. /**
  90. * @author :xjlonly
  91. * @date :Created in 2021/3/24 14:43
  92. * @description:Http请求处理
  93. * @modified By:
  94. * @version: 1.0$
  95. */
  96. public class NettyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
  97. public static List<Channel> channelList = new ArrayList<Channel>();
  98. /**
  99. * 通道读取事件
  100. * */
  101. @Override
  102. protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
  103. //判断请求是否是Http请求
  104. if(msg instanceof HttpRequest){
  105. DefaultHttpRequest request = (DefaultHttpRequest) msg;
  106. System.out.println("请求路径:" + request.uri());
  107. if("/favicon.ico".equals(request.uri())){
  108. return;
  109. }
  110. }
  111. ByteBuf byteBuf = Unpooled.copiedBuffer("Hello, 我是Netty服务器!", CharsetUtil.UTF_8);
  112. //给浏览器响应信息
  113. DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
  114. //设置请求头
  115. response.headers().add(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=utf-8");
  116. response.headers().add(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
  117. ctx.writeAndFlush(response);
  118. }
  119. }

4.4 基于Netty的WebSocket开发网页版聊天室

WebSocket和HTTP的区别
http协议是用在应用层的协议,他是基于tcp协议的,http协议建立连接也必须要有三次握手才能发送信息。 http连接分为短连接,长连接,短连接是每次请求都要三次握手才能发送自己的信息。即每一个request对应一个response。长连接是在一定的期限内保持连接。保持TCP连接不断开。客户端与服务器通信,必须要有客户端先发起, 然后服务器返回结果。客户端是主动的,服务器是被动的。 客户端要想实时获取服务端消息就得不断发送长连接到服务端.

WebSocket实现了多路复用,他是全双工通信。在webSocket协议下服务端和客户端可以同时发送信息。 建立了WebSocket连接之后, 服务端可以主动发送信息到客户端。而且信息当中不必在带有head的部分信息了与http的长链接通信来说,这种方式,不仅能降低服务器的压力。而且信息当中也减少了部分多余的信息。

功能需求
1. Netty 服务器在 8080 端口监听
2. 浏览器发出请求 “http://localhost:8080/
3. 服务器可以回复消息给客户端 “Hello! 我是Netty服务器 “ ,并对特定请求资源进行过滤.

服务端实现:
Netty配置类

  1. package com.lagou.config;
  2. import lombok.Data;
  3. import org.springframework.boot.context.properties.ConfigurationProperties;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @author :xjlonly
  8. * @date :Created in 2021/3/24 16:01
  9. * @description:配置类
  10. * @modified By:
  11. * @version: 1.0$
  12. */
  13. @Component
  14. @ConfigurationProperties(prefix = "netty")
  15. @Data
  16. public class NettyConfig {
  17. private int port;
  18. private String path;
  19. }
  1. NettyHttpServer ```java package com.lagou.netty;

import com.lagou.config.NettyConfig; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;

/**

  • @author :xjlonly
  • @date :Created in 2021/3/24 16:04
  • @description:netty服务器
  • @modified By:
  • @version: 1.0$ */ @Component public class NettyWebSocketServer implements Runnable{ @Autowired private NettyConfig config;

    @Autowired private WebSocketChannelInit webSocketChannelInit;

    private EventLoopGroup bossGroup = new NioEventLoopGroup(1); private EventLoopGroup workerGroup = new NioEventLoopGroup(); @Override public void run() {

    1. try {
    2. //创建服务端启动助手
    3. ServerBootstrap serverBootstrap = new ServerBootstrap();
    4. serverBootstrap.group(bossGroup, workerGroup);
    5. serverBootstrap.channel(NioServerSocketChannel.class)
    6. .handler(new LoggingHandler(LogLevel.DEBUG))
    7. .childHandler(webSocketChannelInit);
    8. //启动服务端
    9. ChannelFuture channelFuture = serverBootstrap.bind(config.getPort()).sync();
    10. System.out.println("--Netty服务端启动成功---");
    11. channelFuture.channel().closeFuture().sync();
    12. }catch (Exception e){
    13. e.printStackTrace();
    14. bossGroup.shutdownGracefully();
    15. workerGroup.shutdownGracefully();
    16. }finally {
    17. bossGroup.shutdownGracefully();
    18. workerGroup.shutdownGracefully();
    19. }
  1. }
  2. /**
  3. * 资源关闭 -容器销毁 资源关闭
  4. * */
  5. @PreDestroy
  6. public void close(){
  7. bossGroup.shutdownGracefully();
  8. workerGroup.shutdownGracefully();
  9. }

}

  1. NettyHttpServerHandle
  2. ```java
  3. package com.lagou.netty;
  4. import io.netty.channel.Channel;
  5. import io.netty.channel.ChannelHandler;
  6. import io.netty.channel.ChannelHandlerContext;
  7. import io.netty.channel.SimpleChannelInboundHandler;
  8. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
  9. import org.springframework.stereotype.Component;
  10. import java.util.ArrayList;
  11. import java.util.List;
  12. /**
  13. * @author :xjlonly
  14. * @date :Created in 2021/3/24 16:16
  15. * @description:自定义处理类 TextWebSocketFrame: websocket数据是帧的形式处理
  16. * @modified By:
  17. * @version: 1.0$
  18. */
  19. @Component
  20. @ChannelHandler.Sharable //设置通道共享
  21. public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
  22. public static List<Channel> channelList = new ArrayList<Channel>();
  23. @Override
  24. protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
  25. String msg = textWebSocketFrame.text();
  26. //当前发送消息的客户端连接
  27. Channel channel = channelHandlerContext.channel();
  28. for (Channel channel1 : channelList){
  29. if(!channel.equals(channel1)){
  30. channel1.writeAndFlush(new TextWebSocketFrame(msg));
  31. }
  32. }
  33. }
  34. /**
  35. * 通道就绪事件
  36. * */
  37. @Override
  38. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  39. Channel channel = ctx.channel();
  40. channelList.add(channel);
  41. System.out.println("[Server]:" + channel.remoteAddress().toString().substring(1) + "上线了");
  42. }
  43. /**
  44. * 通道未就绪 channel下线
  45. * */
  46. @Override
  47. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  48. Channel channel = ctx.channel();
  49. //当有客户端下线时,移除通道
  50. channelList.remove(channel);
  51. }
  52. /**
  53. * 异常处理
  54. * */
  55. @Override
  56. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  57. Channel channel = ctx.channel();
  58. cause.printStackTrace();
  59. channelList.remove(channel);
  60. }
  61. }

通道初始化对象

  1. package com.lagou.netty;
  2. import com.lagou.config.NettyConfig;
  3. import io.netty.channel.Channel;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.ChannelPipeline;
  6. import io.netty.handler.codec.http.HttpObjectAggregator;
  7. import io.netty.handler.codec.http.HttpServerCodec;
  8. import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
  9. import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
  10. import io.netty.handler.stream.ChunkedWriteHandler;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.stereotype.Component;
  13. import java.io.PipedReader;
  14. /**
  15. * @author :xjlonly
  16. * @date :Created in 2021/3/24 16:11
  17. * @description:通道初始化对象
  18. * @modified By:
  19. * @version: $
  20. */
  21. @Component
  22. public class WebSocketChannelInit extends ChannelInitializer {
  23. @Autowired
  24. private NettyConfig nettyConfig;
  25. @Autowired
  26. private WebSocketHandler webSocketHandler;
  27. @Override
  28. protected void initChannel(Channel channel) throws Exception {
  29. ChannelPipeline pipeline = channel.pipeline();
  30. //对http协议的支持
  31. pipeline.addLast(new HttpServerCodec());
  32. //对大数据流的支持
  33. pipeline.addLast(new ChunkedWriteHandler());
  34. //post请求分三部分. request line / request header / message body
  35. // HttpObjectAggregator将多个信息转化成单一的request或者response对象
  36. pipeline.addLast(new HttpObjectAggregator(8000));
  37. // 将http协议升级为ws协议. websocket的支持
  38. pipeline.addLast(new WebSocketServerProtocolHandler(nettyConfig.getPath()));
  39. // 自定义处理handler
  40. pipeline.addLast(webSocketHandler);
  41. }
  42. }

启动类

  1. package com.lagou;
  2. import com.lagou.netty.NettyWebSocketServer;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.CommandLineRunner;
  5. import org.springframework.boot.SpringApplication;
  6. import org.springframework.boot.autoconfigure.SpringBootApplication;
  7. /**
  8. * @author xjlonly
  9. */
  10. @SpringBootApplication
  11. public class NettySpringbootApplication implements CommandLineRunner {
  12. @Autowired
  13. NettyWebSocketServer webSocketServer;
  14. public static void main(String[] args) {
  15. SpringApplication.run(NettySpringbootApplication.class, args);
  16. }
  17. @Override
  18. public void run(String... args) throws Exception {
  19. new Thread(webSocketServer).start();
  20. }
  21. }

前端js开发

  1. $(function () {
  2. //这里需要注意的是,prompt有两个参数,前面是提示的话,后面是当对话框出来后,在对话框里的默认值
  3. var username = "";
  4. while (true) {
  5. //弹出一个输入框,输入一段文字,可以提交
  6. username = prompt("请输入您的名字", ""); //将输入的内容赋给变量 name ,
  7. if (username.trim() === "")//如果返回的有内容
  8. {
  9. alert("名称不能输入空")
  10. } else {
  11. $("#username").text(username);
  12. break;
  13. }
  14. }
  15. var webSocket = new WebSocket("ws://localhost:8081/chat");
  16. webSocket.onopen = function (){
  17. console.log("连接服务器成功!");
  18. }
  19. webSocket.onmessage = function (evt){
  20. showMessage(evt.data);
  21. }
  22. webSocket.onclose = function (){
  23. console.log("连接关闭...");
  24. }
  25. webSocket.onerror = function (){
  26. console.log("连接异常...");
  27. }
  28. function showMessage(message) {
  29. var str = message.split(":");
  30. $("#msg_list").append(`<li class="active"}>
  31. <div class="main">
  32. <img class="avatar" width="30" height="30" src="/img/user.png">
  33. <div>
  34. <div class="user_name">${str[0]}</div>
  35. <div class="text">${str[1]}</div>
  36. </div>
  37. </div>
  38. </li>`);
  39. // 置底
  40. setBottom();
  41. }
  42. $('#my_test').bind({
  43. focus: function (event) {
  44. event.stopPropagation()
  45. $('#my_test').val('');
  46. $('.arrow_box').hide()
  47. },
  48. keydown: function (event) {
  49. event.stopPropagation()
  50. if (event.keyCode === 13) {
  51. if ($('#my_test').val().trim() === '') {
  52. this.blur()
  53. $('.arrow_box').show()
  54. setTimeout(() => {
  55. this.focus()
  56. }, 1000)
  57. } else {
  58. $('.arrow_box').hide()
  59. //发送消息
  60. sendMsg();
  61. this.blur()
  62. setTimeout(() => {
  63. this.focus()
  64. })
  65. }
  66. }
  67. }
  68. });
  69. $('#send').on('click', function (event) {
  70. event.stopPropagation()
  71. if ($('#my_test').val().trim() === '') {
  72. $('.arrow_box').show()
  73. } else {
  74. sendMsg();
  75. }
  76. })
  77. function sendMsg() {
  78. var message = $("#my_test").val();
  79. $("#msg_list").append(`<li class="active"}>
  80. <div class="main self">
  81. <div class="text">` + message + `</div>
  82. </div>
  83. </li>`);
  84. $("#my_test").val('');
  85. message = username + ":" + message;
  86. webSocket.send(message);
  87. // 置底
  88. setBottom();
  89. }
  90. // 置底
  91. function setBottom() {
  92. // 发送消息后滚动到底部
  93. const container = $('.m-message')
  94. const scroll = $('#msg_list')
  95. container.animate({
  96. scrollTop: scroll[0].scrollHeight - container[0].clientHeight + container.scrollTop() + 100
  97. });
  98. }
  99. });

5、Netty中粘包和拆包的解决方案

5.1 粘包和拆包简介

粘包和拆包是TCP网络编程中不可避免的,无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制。
TCP是个“流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。
如图所示,假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。
1. 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;
image.png
2. 服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包

image.png
3. 如果D2的数据包比较大, 服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包
**image.png
4. 如果D1, D2的数据包都很大, 服务端分多次才能将D1和D2包接收完全,期间发生多次拆包

image.png
TCP粘包和拆包产生的原因:
数据从发送方到接收方需要经过操作系统的缓冲区,而造成粘包和拆包的主要原因就在这个缓冲区上。粘包可以理解为缓冲区数据堆积,导致多个请求数据粘在一起,而拆包可以理解为发送的数据大于缓冲区,进行拆分处理。


5.2粘包和拆包代码演示

  1. 粘包
    客户端

image.png
服务端

image.png
image.png
服务端一次读取了客户端发送过来的消息,应该读取10次. 因此发生粘包.
2. 拆包
客户端

image.png
服务端

image.png
运行结果:

image.png
当客户端发送的数据包比较大的时候, 读取了18次, 应该读取10次, 则发送拆包事件.

5.3 粘包和拆包的解决方法

  1. 业内解决方案
    由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下。
  • 消息长度固定,累计读取到长度和为定长LEN的报文后,就认为读取到了一个完整的信息
  • 将换行符作为消息结束符
  • 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符
  • 通过在消息头中定义长度字段来标识消息的总长度
  1. Netty中的粘包和拆包解决方案
    Netty提供了4种解码器来解决,分别如下:
  • 固定长度的拆包器 FixedLengthFrameDecoder,每个应用层数据包的都拆分成都是固定长度的大小
  • 行拆包器 LineBasedFrameDecoder,每个应用层数据包,都以换行符作为分隔符,进行分割拆分
  • 分隔符拆包器 DelimiterBasedFrameDecoder,每个应用层数据包,都通过自定义的分隔符,进行分割拆分
  • 基于数据包长度的拆包器 LengthFieldBasedFrameDecoder,将应用层数据包的长度,作为接收端应用层数据包的拆分依据。按照应用层数据包的大小,拆包。这个拆包器,有一个要求,就是应用层协议中包含数据包的长度
  1. 代码实现

LineBasedFrameDecoder解码器

  1. ch.pipeline().addLast(new LineBasedFrameDecoder(2048));
  1. ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端"+i+"\n", CharsetUtil.UTF_8));

DelimiterBasedFrameDecoder解码器

  1. ByteBuf byteBuf = Unpooled.copiedBuffer("$".getBytes(StandardCharsets.UTF_8));
  2. ch.pipeline().addLast(new DelimiterBasedFrameDecoder(2048, byteBuf));
  1. ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端"+i+"$", CharsetUtil.UTF_8));