常用组件:

EventLoopGroup:

EventLoop本质是一个单线程执行器(维护了一个Selector选择器),内部有run方法处理Channel上的IO事件
EventLoopGroup 是一组EventLoop的抽象,Channel会调用EventLoopGroup的register方法来绑定其中一个EventLoop,后续这个Channel上的IO事件都由此EventLoop来处理(保证IO事件处理线程时的线程安全)
在Netty服务器端编程中,一般都需要提供两个EventLoopGroup用来处理连接请求和客户端业务处理,还可以实现异步任务操作(具体查看【任务队列】章节)
通常一个服务端口即一个ServerSocketChannel对应一个Selector 和一个EventLoop线程。BossEventLoop 负责接收客户端的连接并将SocketChannel交给WorkerEventLoopGroup来进行I/O处理,如下图所示:
image.png

  1. BossEventLoopGroup通常是一个单线程的EventLoop, EventLoop维护着一个注册了ServerSocketChannel
  2. Selector实例,BossEventLoop不断轮询Selector将连接事件分离出来
  3. 通常是OP_ACCEPT事件,然后将接收到的SocketChannel交给WorkerEventLoopGroup
  4. WorkerEventLoopGroup 会由next选择其中一个EventLoopGroup来将这个SocketChannel注册到其维护的
  5. Selector并对其后续的I/O事件进行处理

常用方法:

  1. public Future<?> shutdownGracefully(); 断开连接,关闭线程(finally使用),通常情况下只能在服务端使用
  2. public NioEventLoopGroup() 构造方法,用于创建bossGroupworkGroup

BootStrap系列:

含义:

Bootstrap意思是引导,一个Netty应用通常由一个Bootstrap开始,主要作用是配置整个Netty程序串联各个组件,Netty中Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类
需要使用sync进行连接、端口绑定,sync意为该操作为异步阻塞执行【案例演示二

BootStrap常用方法:

  1. public B group(EventLoopGroup group) 设置一个EventLoopGroup
  2. public B channel(Class<? extends C> channelClass) 设置服务器端的通道实现类
  3. public<T> B option(ChannelOption<T> option, T value) ServerChannel添加配置
  4. public ChannelFuture connect(String inetHost, int inetPort) 连接客户端

ServerBootStrap常用方法:

  1. public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) 用来设置管理连接和实际执行任务的EventLoop
  2. public ServerBootstrap childHandler(ChannelHandler childHandler) WprkGroup设置业务处理类Handler
  3. public B handler(ChannelHandler handler) BossGroup设置Handler(通常不使用)
  4. public<T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) 给接收到的通道添加配置
  5. public ChannelFuture bind(int inetPort) 绑定端口

Future与ChannelFuture:

Netty 中所有的IO操作都是异步的,不能立刻得知消息是否被正确处理,但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,当操作执行成功或失败时监听会自动触发注册的监听事件

常用方法:

  1. ChannelFuture channel() 返回当前正在进行IO操作的通道
  2. ChannelFuture sync() 等待异步操作执行完毕

Channel:

是Netty网络通信的组件,能够用于执行网络I/O操作。通过Channel 可获得当前网络连接的通道的状态、
网络连接的配置参数( 例如接收缓冲区大小 ),Channel提供异步的网络I/O操作(如建立连接, 读写,绑定端口), 异步调用意味着任何I/0调用都将立即返回,并且不保证在调用结束时所请求的I/O操作已完成

主要方法:

  1. close() 可以用来关闭channel
  2. closeFuture() 用来处理channel的关闭
  3. sync 方法作用是同步等待channel关闭
  4. addListener 注册监听器,异步等待channel关闭并通知【具体参考异步模型章节】
  5. pipeline() 方法添加处理器
  6. write() 方法将数据写入
  7. writeAndFlush() 方法将数据写入并刷出

不同协议、不同阻塞类型的连接都有不同的Channel与之对应:

  1. NioServerSocketChannel 异步的服务器瑞TCP Socket连接(最常用)
  2. NioDatagramChannel 异步的UDP连接
  3. NioSctpChannel 异步的客户端Sctp连接
  4. NioSctpServerChannel 异步的Sctp服务器端连接,通道涵盖了UDPTCP网络I/O以及文件I/O

ChannelOption:

Netty 在创建Channel实例后,一般都需要设置ChannelOption参数,具体如下:

  1. ChannelOption.SO BACKLOG
  2. 对应TCP/IP协议listen函数中的backlog参数,用来初始化服务器可连接队列大小。服务端处理客户端连接请
  3. 求是顺序处理的,所以同一时间只能处理-一个客户端连接。多个客户端来的时候,服务端将不能处理的客户端连
  4. 接请求放在队列中等待处理,backlog 参数指定了队列的大小。
  5. ChannelOption.SO KEEPALIVE
  6. 一直保持连接活动状态
  1. s![image.png](https://cdn.nlark.com/yuque/0/2022/png/21405095/1651732954285-f5484bfb-3877-4af4-95f6-2bfe24f03405.png#clientId=uda203d0d-c2ef-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=477&id=ua79cb49e&margin=%5Bobject%20Object%5D&name=image.png&originHeight=879&originWidth=1090&originalType=binary&ratio=1&rotation=0&showTitle=true&size=100002&status=done&style=stroke&taskId=u3f338e24-3975-40c8-a1f0-37ae56f3eed&title=%E6%9C%8D%E5%8A%A1%E7%AB%AFoption%E5%8F%82%E6%95%B0%E6%B7%BB%E5%8A%A0%E4%BE%8B%E5%AD%90&width=591.6666870117188 "服务端option参数添加例子")

ChannelPipeLine:

ChannelPipeLine是保存ChannelHandler的List,用于处理或拦截Channel的入站事件和出站操作。通过添加编码器、解码器、自定义事件Handler来实现不同情况下的业务处理,其具体内部结构如下:
image.png
Channel和ChannelPipeLine的组成关系

一个Channel包含了一个ChannelPipeline,而ChannelPipeline中又维护了一个由ChannelHandlerContext组成的双向链表,并且每个ChannelHandlerContext中又关联着一个ChannelHandler
入站事件和出站事件在一个双向链表中,入站事件会从链表head往后传递到最后一个入站的handler,出站事件会从链表tail往前传递到最前一个出站的handler,两种类型的handler互不干扰

注意点:

1、在添加编码器与解码器时,其存放的顺序需要放在业务处理Handler之前,具体可参考编码器和解码器
2、添加自定义Handler需要继承 ChannelInboundHandlerAdapter ,具体查看ChannelHandler组件讲解
3、添加多个自定义Handler时,如果需要将上一个自定义handler的数据传递到下一个handler,需要使用super.channelRead(ctx, msg) 或者 ctx.fireChannelRead(msg) 方法,具体参考【案例演示四

常用方法:

  1. ChannelPipeline addFirst(ChannelHandler... handlers) 把一个业务处理类(handler)添加到链中的第一个位置
  2. ChannelPipeline addLast(ChannelHandler... handlers) 把一个业务处理类(handler)添加到链中的最后一个位置

ChannelHandler:

用于处理I/O事件或拦截I/O操作,并将其转发到对应的ChannelPipeline(业务处理链)中的下一个处理程序,分为入站、 出站两种。|所有ChannelHandler被连成一串,就是Pipeline
入站处理器通常是ChannellnboundHandlerAdapter的子类,主要用来读取客户端数据,写回结果
出站处理器通常是ChannelOutboundHandlerAdapter的子类,主要对写回结果进行加工
通过实现子类 ChannelInboundHandlerAdapter SimpleChannelInboundHandler<参数> 来创建自定义 Handler 实现在不同管道事件下的具体业务操作,SimpleChannelInboundHandler可以通过指定参数来指定接收到的消息类型
添加编码器&解码器具体可以参考【编码器与解码器】章节,使用SimpleChannelInboundHandler指定消息类型参考【编码器与解码器】章节的Protobuf案例

常用方法:

  1. void channelRegistered() 通道注册时触发
  2. void channelUnregistered() 通道注销时触发
  3. void channelActive() 通道就绪(有活动)时触发
  4. void channelInactive() 通道非活动时触发
  5. void channelRead() 当有读取事件时该方法将被触发
  6. void channelReadComplete() 数据读取完毕后触发
  7. void exceptionCaught() 处理异常
  8. void handlerAdded() handler加入到管道时触发
  9. void handlerRemoved() handler从管道移除时触发(可以看做客户端断开连接)

ChannelHandlerContext:

保存Channel相关的所有上下文信息,同时关联一个ChannelHandler 对象(即ChannelHandlerContext中包 含一个具体的事件处理器ChannelHandler,同时ChannelHandlerContext中也绑定了对应的pipeline和Channel的信息,方便对ChannelHandler进行调用)

常用方法:

  1. ChannelFuture close() 关闭通道
  2. ChannelOutboundInvoker flush() 刷新
  3. ChannelFuture writeAndFlush(Object msg) 将数据写到ChannelPipeline中当前ChannelHandler的下一个ChannelHandler开始处理(出站)

案例演示一:

结合常用组件创建一个HelloWord案例讲解Netty执行流程,源码下载

服务端:

  1. package hello;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioServerSocketChannel;
  7. import io.netty.handler.codec.string.StringDecoder;
  8. /* Netty欢迎案例-服务端 */
  9. public class HelloService {
  10. public static void main(String[] args) {
  11. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  12. EventLoopGroup workGroup = new NioEventLoopGroup();
  13. //启动器,负责组装Netty组件进行启动服务器
  14. ServerBootstrap bootstrap = new ServerBootstrap();
  15. try {
  16. //指定处理事件的组,对应【线程模型章节】中的 workGroup
  17. bootstrap.group(bossGroup,workGroup)
  18. //设置服务器端的通道实现类,不同的操作系统还有其特殊通道实现类,但通常均为 NioServerSocketChannel
  19. .channel(NioServerSocketChannel.class)
  20. //决定 workGroup 能做哪些操作,ChannelInitializer代表和客户端数据读写的通道的初始化器,用于添加具体业务操作的handler和编码&解码器
  21. .childHandler(new ChannelInitializer<SocketChannel>() {
  22. //连接建立后开始执行
  23. @Override
  24. protected void initChannel(SocketChannel ch) throws Exception {
  25. ChannelPipeline pipeline = ch.pipeline();
  26. pipeline.addLast(new StringDecoder()); //将StrBuf解码为String
  27. //自定义Handler,通常实现子类ChannelInboundHandlerAdapter或SimpleChannelInboundHandler<参数>进行实现
  28. pipeline.addLast(new ChannelInboundHandlerAdapter(){
  29. //当有读事件时执行
  30. @Override
  31. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  32. System.out.println("读取到的数据为: "+msg.toString());
  33. }
  34. });
  35. }
  36. });
  37. System.out.println("服务器 is ready .....");
  38. //启动服务器绑定一个端口并且同步,生成一个 ChannelFuture 对象
  39. ChannelFuture future = bootstrap.bind(6666).sync();
  40. //对关闭通道进行监听(当有关闭通道的消息时才进行监听)
  41. future.channel().closeFuture().sync();
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. }finally {
  45. bossGroup.shutdownGracefully();
  46. workGroup.shutdownGracefully();
  47. }
  48. }
  49. }

客户端:

  1. package hello;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioSocketChannel;
  7. import io.netty.handler.codec.string.StringEncoder;
  8. /* Netty欢迎案例-客户端 */
  9. public class HelloClient {
  10. public static void main(String[] args) throws InterruptedException {
  11. NioEventLoopGroup eventGroup = new NioEventLoopGroup();
  12. //Netty启动类
  13. Bootstrap bootstrap = new Bootstrap();
  14. try {
  15. //添加EventLoop
  16. bootstrap.group(eventGroup)
  17. //设置客户端的通道实现类,不同的操作系统还有其特殊通道实现类,但通常均为 NioSocketChannel
  18. .channel(NioSocketChannel.class)
  19. //添加处理器
  20. .handler(new ChannelInitializer<SocketChannel>() {
  21. //连接建立后开始执行
  22. @Override
  23. protected void initChannel(SocketChannel ch) throws Exception {
  24. ChannelPipeline pipeline = ch.pipeline();
  25. pipeline.addLast(new StringEncoder()); //将Stirng编码为StrBuf
  26. pipeline.addLast(new ChannelInboundHandlerAdapter(){
  27. //当通道就绪的时候创建
  28. @Override
  29. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  30. ctx.writeAndFlush("Hello,Netty !");
  31. }
  32. });
  33. }
  34. });
  35. System.out.println("客户端 ok...");
  36. //启动客户端并连接到服务端
  37. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
  38. //给关闭通道进行监听(关闭通道事件发生后触发)
  39. channelFuture.channel().closeFuture().sync();
  40. } finally {
  41. eventGroup.shutdownGracefully();
  42. }
  43. }
  44. }

测试:

轮流启动服务端与客户端后服务端收到数据
image.png
服务端收到客户端数据
image.png
服务端与客户端代码执行流程

解释:

  • 把 Channel 理解为数据的通道
  • 把 Msg 理解为流动的数据,最开始输入是ByteBuf,但经过 pipeline 的加工会变成其它类型对象,最后输出又变成ByteBuf
  • 把 Handler 理解为数据的处理工序
    • 工序有多道,合在一起就是pipeline,,pipeline负责发布事件(读、读取完成…)传播给每个 handler,handler 对自己感兴趣的事件进行处理(重写相应事件处理方法)
    • handler分Inbound和Outbound两类
  • 把 EventLoop 理解为处理数据的工人
    • 工人可以管理多个channel的IO操作,并且一旦工人负责了某个channel就要负责到底(绑定)
    • 工人既可以执行IO操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个channel的待处理任务,任务分为普通任务、定时任务
    • 工人按照pipeline顺序,依次按照handler的规划(代码) 处理数据,可以为每道工序指定不同的人

案例演示二:

在绑定端口与连接时需要使用sync进行异步阻塞操作,在获取与服务端连接前或端口绑定操作完成前代码都会被阻塞在sync处
对案例演示一的客户端进行修改,在去掉sync操作后服务端将收不到任何数据(原因是与服务端建立连接的channel线程为异步的连接线程,和main函数中获取的channel所处线程不为同一个,数据自然发送失败)

  1. package channelFuture;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioSocketChannel;
  7. import io.netty.handler.codec.string.StringEncoder;
  8. public class EventLoopClient {
  9. public static void main(String[] args) throws InterruptedException {
  10. NioEventLoopGroup eventGroup = new NioEventLoopGroup();
  11. //Netty启动类
  12. Bootstrap bootstrap = new Bootstrap();
  13. try {
  14. //添加EventLoop
  15. bootstrap.group(eventGroup)
  16. //设置客户端的通道实现类,不同的操作系统还有其特殊通道实现类,但通常均为 NioSocketChannel
  17. .channel(NioSocketChannel.class)
  18. //添加处理器
  19. .handler(new ChannelInitializer<SocketChannel>() {
  20. //连接建立后开始执行
  21. @Override
  22. protected void initChannel(SocketChannel ch) throws Exception {
  23. ChannelPipeline pipeline = ch.pipeline();
  24. pipeline.addLast(new StringEncoder()); //将Stirng编码为StrBuf
  25. }
  26. });
  27. System.out.println("客户端 ok...");
  28. //启动客户端并连接到服务端
  29. ChannelFuture channelFuture1 = bootstrap.connect("127.0.0.1", 8652); //没有sync将会继续向下运行,不会阻塞,导致发送数据的依旧是psvm主线程而不是与服务端建立连接的线程
  30. //没有sync操作下获取的channel不是与服务端建立连接的channel,数据发送服务端自然不会显示
  31. Channel channel = channelFuture1.channel();
  32. channel.writeAndFlush("Hello,Netty !");
  33. } finally {
  34. eventGroup.shutdownGracefully();
  35. }
  36. }
  37. }

案例演示三:

结合常用组件进行案例演示,源码下载

服务端:

调用代码:

  1. package simple;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioServerSocketChannel;
  7. /* 服务端代码 */
  8. public class NettyServer {
  9. public static void main(String[] args) throws InterruptedException {
  10. /* 创建 BossGroup 和 WorkerGroup 线程组
  11. * BossGroup 只处理连接请求,真正的和客户端业务处理会交给 WorkGrouop 完成
  12. * bossGroup 和 workerGroup 含有的子线程的个数默认为 CPU最大线程数 X 2,可以手动指定
  13. */
  14. EventLoopGroup bossGroup = new NioEventLoopGroup(1); //创建子线程的个数为1的 bossGroup
  15. EventLoopGroup workerGroup = new NioEventLoopGroup(8); //创建子线程的个数为8的 workerGroup
  16. try {
  17. //创建服务器端的启动对象,配置参数
  18. ServerBootstrap bootstrap = new ServerBootstrap();
  19. //使用链式参数配置启动参数
  20. bootstrap.group(bossGroup,workerGroup) //设置两个线程组
  21. .channel(NioServerSocketChannel.class) //使用 NioSocketChannel 作为服务器通道实现
  22. .option(ChannelOption.SO_BACKLOG,120) //设置线程队列等待连接的个数
  23. .childOption(ChannelOption.SO_KEEPALIVE,true) //保持活动连接状态
  24. .childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道初始化对象
  25. //给PipeLine设置处理器
  26. @Override
  27. protected void initChannel(SocketChannel ch) throws Exception {
  28. ch.pipeline().addLast(new NettyServerHandler()); //将自定义处理器加入到PipeLine
  29. }
  30. }); //给 WorkerGroup 的 EventLoop 对应的管道设置处理器
  31. System.out.println("服务器 is ready .....");
  32. //启动服务器绑定一个端口并且同步,生成一个 ChannelFuture 对象
  33. ChannelFuture cf = bootstrap.bind(6666).sync();
  34. //对关闭通道进行监听(当有关闭通道的消息时才进行监听)
  35. cf.channel().closeFuture().sync();
  36. } finally {
  37. bossGroup.shutdownGracefully(); //关闭资源
  38. workerGroup.shutdownGracefully(); //关闭
  39. }
  40. }
  41. }

自定义处理器:

  1. package simple;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.ChannelInboundHandlerAdapter;
  6. import java.nio.charset.StandardCharsets;
  7. /** 自定义管道 Handler
  8. * 1、自定义一个 Handler,需要继承 Netty 规定好的某个 HandlerAddapter
  9. * 2、这时我们自定义一个 Handler,才能称为一个 Handler
  10. */
  11. public class NettyServerHandler extends ChannelInboundHandlerAdapter {
  12. /*当有读取事件时该方法将被触发
  13. * 参数一: ChannelHandlerContext 上下文对象,含有管道 PipeLine、通道
  14. * 参数二: Object 客户端发送的数据,默认是Object需要转换
  15. */
  16. @Override
  17. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  18. System.out.println("Server ctx = "+ ctx);
  19. //将 Msg 转换为 ByteBuf,ByteBuf是Netty对于NIO ButeBuffer的再包装
  20. ByteBuf buf = (ByteBuf) msg;
  21. System.out.println("客户端发送消息: "+buf.toString(StandardCharsets.UTF_8));
  22. System.out.println("客户端地址: "+ ctx.channel().remoteAddress());
  23. }
  24. /* 数据读取完毕后触发
  25. * 参数一: ChannelHandlerContext 上下文对象,含有管道 PipeLine、通道
  26. */
  27. @Override
  28. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  29. //对发送数据进行编码后写入到缓存并刷新
  30. ctx.writeAndFlush(Unpooled.copiedBuffer("服务端连接成功,欢迎!"+LocalDateTime.now(),StandardCharsets.UTF_8));
  31. }
  32. /* 处理异常,一般为关闭通道 */
  33. @Override
  34. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  35. ctx.close(); //和 ctx.channel().close() 是一个意思
  36. }
  37. }

客户端:

调用代码:

  1. package simple;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.EventLoopGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. /* 客户端代码 */
  10. public class NettyClient {
  11. public static void main(String[] args) throws Exception {
  12. //客户端需要一个事件循环组
  13. EventLoopGroup group = new NioEventLoopGroup();
  14. try {
  15. //客户端启动对象,客户端使用的是 Bootstrap 而不是 ServerBootstrap
  16. Bootstrap bootstrap = new Bootstrap();
  17. //设置相关参数
  18. bootstrap.group(group) //设置线程组
  19. .channel(NioSocketChannel.class) //设置客户端通道的实现类
  20. .handler(new ChannelInitializer<SocketChannel>() {
  21. @Override
  22. protected void initChannel(SocketChannel ch) throws Exception {
  23. ch.pipeline().addLast(new NettyClientHandler()); //自定义处理器
  24. }
  25. });
  26. System.out.println("客户端 ok...");
  27. //启动客户端并连接到服务端
  28. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
  29. //给关闭通道进行监听(关闭通道事件发生后触发)
  30. channelFuture.channel().closeFuture().sync();
  31. } finally {
  32. group.shutdownGracefully();
  33. }
  34. }
  35. }

自定义处理器:

  1. package simple;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.ChannelInboundHandlerAdapter;
  6. import io.netty.util.CharsetUtil;
  7. import java.nio.charset.StandardCharsets;
  8. /* 自定义管道 Handler */
  9. public class NettyClientHandler extends ChannelInboundHandlerAdapter {
  10. /* 当通道就绪就会触发该方法 */
  11. @Override
  12. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  13. System.out.println("client "+ ctx);
  14. ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,这是客户端发送的消息!", StandardCharsets.UTF_8));
  15. }
  16. /*当通道有读取数据事件时触发
  17. * 参数一: ChannelHandlerContext 上下文对象,含有管道 PipeLine、通道
  18. * 参数二: Object 客户端发送的数据,默认是Object需要转换
  19. */
  20. @Override
  21. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  22. ByteBuf buf = (ByteBuf) msg;
  23. System.out.println("服务器回复的消息: "+buf.toString(CharsetUtil.UTF_8));
  24. System.out.println("服务器的地址: "+ctx.channel().remoteAddress());
  25. }
  26. /* 异常发生时触发 */
  27. @Override
  28. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  29. cause.printStackTrace();
  30. ctx.close();
  31. }
  32. }

测试:

对客户端启用多实例后运行,服务端成功收到客户端信息
image.png
服务端与客户端信息详情


案例演示四:

当存在多个入站handler时,如果需要将上一个handler处理后的数据转发到下一个handler进行处理,需要使用 super.channelRead 或者 ctx.fireChannelRead 方法,否则链将会断开
例如案例中创建5个自定义handler,执行顺序为 head(默认创建) -> h1 -> h2 -> h3 -> h4 -> h5 -> tail(默认创建),当h2注释掉方法传递时,handler链路将会断开,导致h3和之后的自定义handler不再工作

handler链表断开演示:

使用案例演示一的客户端进行配合该服务端代码进行演示

  1. package channelhandler;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioServerSocketChannel;
  7. /* 服务端代码 */
  8. public class NettyServer {
  9. public static void main(String[] args) throws InterruptedException {
  10. EventLoopGroup bossGroup = new NioEventLoopGroup(1); //创建子线程的个数为1的 bossGroup
  11. EventLoopGroup workerGroup = new NioEventLoopGroup(8); //创建子线程的个数为8的 workerGroup
  12. try {
  13. //创建服务器端的启动对象,配置参数
  14. ServerBootstrap bootstrap = new ServerBootstrap();
  15. //使用链式参数配置启动参数
  16. bootstrap.group(bossGroup,workerGroup) //设置两个线程组
  17. .channel(NioServerSocketChannel.class) //使用 NioSocketChannel 作为服务器通道实现
  18. .childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道初始化对象
  19. //给PipeLine设置处理器
  20. @Override
  21. protected void initChannel(SocketChannel ch) throws Exception {
  22. ChannelPipeline pipeline = ch.pipeline();
  23. /*
  24. * 添加处理器顺序: head(默认创建) -> h1 -> h2 -> h3 -> h4 -> h5 -> tail(默认创建)
  25. * 当存在多个入站handler时,如果需要将上一个handler处理后的数据转发到下一个handler进行处理,需要使用 super.channelRead 或者 ctx.fireChannelRead 方法,否则链将会断开
  26. */
  27. pipeline.addLast("h1",new ChannelInboundHandlerAdapter(){
  28. @Override
  29. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  30. System.out.println("进入第一个自定义入站handler");
  31. super.channelRead(ctx, msg);
  32. }
  33. });
  34. pipeline.addLast("h2",new ChannelInboundHandlerAdapter(){
  35. @Override
  36. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  37. System.out.println("进入第二个自定义入站handler");
  38. //ctx.fireChannelRead(msg);
  39. }
  40. });
  41. pipeline.addLast("h3",new ChannelInboundHandlerAdapter(){
  42. @Override
  43. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  44. System.out.println("进入第三个自定义入站handler");
  45. ctx.writeAndFlush("第三个Handler返回结果");
  46. }
  47. });
  48. pipeline.addLast("h4",new ChannelOutboundHandlerAdapter(){
  49. @Override
  50. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
  51. System.out.println("进入第一个自定义出站handler");
  52. super.write(ctx, msg, promise);
  53. }
  54. });
  55. pipeline.addLast("h5",new ChannelOutboundHandlerAdapter(){
  56. @Override
  57. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
  58. System.out.println("进入第二个自定义出站handler");
  59. super.write(ctx, msg, promise);
  60. }
  61. });
  62. }
  63. });
  64. //启动服务器绑定一个端口并且同步,生成一个 ChannelFuture 对象
  65. ChannelFuture cf = bootstrap.bind(6666).sync();
  66. //对关闭通道进行监听(当有关闭通道的消息时才进行监听)
  67. cf.channel().closeFuture().sync();
  68. } finally {
  69. bossGroup.shutdownGracefully(); //关闭资源
  70. workerGroup.shutdownGracefully(); //关闭
  71. }
  72. }
  73. }

image.png
注释之后,后续handler功能无法正常执行

writeAndFlush的区别:

往通道写入数据可以使用ChannelHandlerContext.writeAndFlusChannel.writeAndFlus方法,二者的区别在于 ChannelHandlerContext.writeAndFlus 寻找的是从所属 handler 开始一直到 head 的出站handler,Channel.writeAndFlus 则是从 tail 开始寻找出站 handler

演示:
对之前的 h3 Handler进行修改,使用 ctx.writeAndFlush 后,handler 出站寻找顺序为 h3 -> h2 -> h1 -> head,由于没有出站handler,所以 h4、h5 handler 都不会被触发

  1. pipeline.addLast("h3",new ChannelInboundHandlerAdapter(){
  2. @Override
  3. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  4. System.out.println("进入第三个自定义入站handler");
  5. ctx.writeAndFlush("第三个Handler返回结果");
  6. //ch.writeAndFlush("第三个Handler返回结果"); 和 ctx.channel().writeAndFlush() 是一个意思
  7. }
  8. });

image.png
测试结果

对之前的 h3 Handler 再次进行修改,使用 ch.writeAndFlush 后,寻找出站 handler 顺序为 tail ->h5 -> h4 -> h3 -> h2 -> h1 -> head,所以 h4、h5 handler 都会被触发

  1. pipeline.addLast("h3",new ChannelInboundHandlerAdapter(){
  2. @Override
  3. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  4. System.out.println("进入第三个自定义入站handler");
  5. ch.writeAndFlush("第三个Handler返回结果"); // 与 ctx.channel().writeAndFlush() 是一个意思
  6. }
  7. });

image.png
测试结果