基本说明

  1. Netty的组件设计: Netty的主要组件有Channel, EventLoop, ChannelFuture, ChannelHandler, ChannelPipeline等
  2. ChannelHandler充当了处理入站和出站数据的应用程序逻辑的容器, 例如: 实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter), 你就可以接收入站事件和数据, 这些数据会被业务逻辑处理, 当要给客户端发送响应时, 也可以从ChannelInboundhandler冲刷数据, 业务逻辑通常写在一个或多个ChannelInboundHandler中,ChannelOutboundHandler原理一样, 只不过她是用来处理出站数据的
  3. ChannelPipeline提供了ChannelHandler链的容器, 以客户端应用程序为例, 例如事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站, 即客户端发送给服务端的数据会通过pipeline中的一系列ChannelOutboundHandler, 并被这些Handler处理, 反之则称为入站

image.png

编码解码器

  1. 当Netty发送或者接收一个消息的时候, 就将会发生一次数据转换, 入站消息会被解码, 从字节转换为另一种格式(比如Java对象) ;如果是出站消息, 他就会被编码成字节
  2. Netty提供了一系列实用的编解码器, 他们都实现了ChannelInboundHandler或者ChannelOutboundHandler接口,在这些类中ChannelRead方法已经被重写, 以入站为例,对于每个从入站Channel读取的消息, 这个方法会被调用, 随后, 他将调用由解码器所提供的decoder()方法进行解码, 并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler

    解码器 - ByteToMessageDecoder

  3. 关系继承图

image.png

  1. 由于不可能知道远程节点是否会一次性发送一个完整的消息, TCP有可能出现粘包拆包的问题, 这个类会对入站数据进行缓冲, 直到它准备好被处理
  2. 一个关于ByteToMessageDecoder实例分析

image.png
image.png

Netty的handler链调用机制

需求

  1. 使用自定义的编码器和解码器来说明Netty的handler调用机制
    1. 客户端发送long到服务器
    2. 服务器发送Long到客户端
  2. 案例演示

NettyServer

  1. package com.dance.netty.netty.inandout;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.ChannelOption;
  6. import io.netty.channel.ChannelPipeline;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.SocketChannel;
  9. import io.netty.channel.socket.nio.NioServerSocketChannel;
  10. import io.netty.handler.logging.LogLevel;
  11. import io.netty.handler.logging.LoggingHandler;
  12. public class NettyServer {
  13. public static void main(String[] args) throws InterruptedException {
  14. NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
  15. NioEventLoopGroup workerGroup = new NioEventLoopGroup();
  16. try {
  17. ServerBootstrap serverBootstrap = new ServerBootstrap();
  18. serverBootstrap.group(bossGroup,workerGroup)
  19. .channel(NioServerSocketChannel.class)
  20. .option(ChannelOption.SO_BACKLOG, 128)
  21. .childOption(ChannelOption.SO_KEEPALIVE, true)
  22. .handler(new LoggingHandler(LogLevel.INFO))
  23. .childHandler(new ChannelInitializer<SocketChannel>() {
  24. @Override
  25. protected void initChannel(SocketChannel ch) throws Exception {
  26. ChannelPipeline pipeline = ch.pipeline();
  27. // 添加自己的处理器
  28. // 入站的Handler进行解码 MyByteToLongDecoder
  29. pipeline.addLast("longDecoder", new MyByteToLongDecoder());
  30. // 对long类型进行编码的编码器
  31. pipeline.addLast("longEncoder", new MyLongToByteEncoder());
  32. // 处理入站数据
  33. pipeline.addLast(new NettyServerHandler());
  34. }
  35. });
  36. ChannelFuture sync = serverBootstrap.bind("127.0.0.1", 7000).sync();
  37. System.out.println("server is ready ......");
  38. sync.channel().closeFuture().sync();
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. } finally {
  42. bossGroup.shutdownGracefully();
  43. workerGroup.shutdownGracefully();
  44. }
  45. }
  46. }

NettyServerHandler

  1. package com.dance.netty.netty.inandout;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.SimpleChannelInboundHandler;
  4. public class NettyServerHandler extends SimpleChannelInboundHandler<Long> {
  5. @Override
  6. protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
  7. System.out.println("服务器收到来自客户端的消息 : " + msg.toString());
  8. // 回显客户端
  9. ctx.writeAndFlush(msg);
  10. }
  11. @Override
  12. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  13. ctx.close();
  14. cause.printStackTrace();
  15. }
  16. }

NettyClient

package com.dance.netty.netty.inandout;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventExecutors)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 解析Long类型的解析器
                            pipeline.addLast("longDecoder", new MyByteToLongDecoder());
                            // 对long类型进行编码的编码器
                            pipeline.addLast("longEncoder", new MyLongToByteEncoder());
                            // 自己的数据处理器
                            pipeline.addLast(new NettyClientHandler());
                        }
                    });
            ChannelFuture sync = bootstrap.connect("127.0.0.1", 7000).sync();
            sync.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            eventExecutors.shutdownGracefully();
        }
    }
}

NettyClientHandler

package com.dance.netty.netty.inandout;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.nio.charset.StandardCharsets;

public class NettyClientHandler extends SimpleChannelInboundHandler<Long> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 发送消息到服务器
        // 正常数据
        ctx.writeAndFlush(1234567890L);

        /*
         * 分析:
         *  1. "qwerqwerqwerqwer" 是16个字节
         *  2. 该处理器的前一个Handler是MyLongToByteEncoder
         *  3. MyLongToByteEncoder 的父类是 MessageToByteEncoder
         *  4. MessageToByteEncoder 类中 有一个模板方法, 里面调用了我们自己实现的encode方法, 因为是模板方法所以里面有逻辑(重要),分析一下
         *  @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                ByteBuf buf = null;
                try {
                * // 重点: 判断 消息是否和当前的编码器匹配 如果匹配才往下走
                    if (acceptOutboundMessage(msg)) {
                        @SuppressWarnings("unchecked")
                        * // 转换消息类型为定义类型
                        I cast = (I) msg;
                        * // 构造成 ByteBuf
                        buf = allocateBuffer(ctx, cast, preferDirect);
                        try {
                        * // 重点: 调用我们自己实现的encode方法 传入case, 也就是我们发送的数据
                            encode(ctx, cast, buf);
                        } finally {
                            ReferenceCountUtil.release(cast);
                        }

                        if (buf.isReadable()) {
                            ctx.write(buf, promise);
                        } else {
                            buf.release();
                            ctx.write(Unpooled.EMPTY_BUFFER, promise);
                        }
                        buf = null;
                    } else {
                    * // 重点: 不匹配直接发送, 不走encode方法
                        ctx.write(msg, promise);
                    }
                } catch (EncoderException e) {
                    throw e;
                } catch (Throwable e) {
                    throw new EncoderException(e);
                } finally {
                    if (buf != null) {
                        buf.release();
                    }
                }
            }
            * 5. 因此我们在编写Encoder的时候, 要注意 传入的数据类型要和Encode的处理类型一致
         */
        // 其他类型数据
//        ctx.writeAndFlush(Unpooled.copiedBuffer("qwerqwerqwerqwer", StandardCharsets.UTF_8));
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
        System.out.println("客户端接收来自服务器的消息: " + msg.toString());
    }
}

MyByteToLongDecoder

package com.dance.netty.netty.inandout;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * 自定义解码器
 */
public class MyByteToLongDecoder extends ByteToMessageDecoder {
    /**
     * 解码方法 会根据接收到的数据被调用多次 直到确定没有新的元素被添加到 List中
     * 或者ByteBuf没有更多的可读字节为止
     * 如果List out 不为空 就会将list的内容传递给下一个ChannelInboundHandler进行处理 该方法也会被调用多次
     * @param ctx 上下文对象
     * @param in 入站的ByteBuf
     * @param out 输出给下一个调用链的数据容器
     * @throws Exception
     */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 应为这个是一个Long的解码器, 所以只处理Long类型的数据
        // 默认Long是8个字节
        if(in.readableBytes() >= 8){
            out.add(in.readLong());
        }else{
            // TODO 不够一个Long的数据, 不处理
        }
    }
}

MyLongToByteEncoder

package com.dance.netty.netty.inandout;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;

import java.util.List;

/**
 * 自定义编码器
 */
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
    /**
     * 编码方法
     * @param ctx 上下文
     * @param msg 消息
     * @param out 输出给下一个编码器的ByteBuf
     * @throws Exception 异常
     */
    @Override
    protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
        if(msg != null){
            out.writeLong(msg);
        }
    }
}
  1. 测试 执行结果

Server

一月 18, 2022 12:05:45 上午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0x1a8323fa] REGISTERED
一月 18, 2022 12:05:45 上午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0x1a8323fa] BIND: /127.0.0.1:7000
一月 18, 2022 12:05:45 上午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0x1a8323fa, L:/127.0.0.1:7000] ACTIVE
server is ready ......
一月 18, 2022 12:05:50 上午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x1a8323fa, L:/127.0.0.1:7000] READ: [id: 0xac268da1, L:/127.0.0.1:7000 - R:/127.0.0.1:55730]
一月 18, 2022 12:05:50 上午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x1a8323fa, L:/127.0.0.1:7000] READ COMPLETE
服务器收到来自客户端的消息 : 1234567890

Client

客户端接收来自服务器的消息: 1234567890
  1. 调用图

image.png
image.png

  1. 结论

    1. 不论解码器Handler还是编码器Handler即接收的消息类型必须与待处理的消息类型一致, 否则该Handle不会被执行
    2. 在解码器 进行数据解码时, 需要判断 缓冲区(ByteBuf)的数据是否足够, 否则接收到的结果会与期望结果可能不一致

      解码器 - ReplayingDecoder

  2. public abstract class ReplayingDecoder extends ByteToMessgaeDecoder

  3. ReplayingDecoder 扩展了 ByteToMessageDecoder类, 使用这个类 我们不必调用readableBytes()方法 参数 S 指定了用户状态管理的类型, 其中Void代表不需要状态管理
  4. 应用实例: 使用ReplayingDecoder编写解码器, 对前面的案例进行简化[案例演示] ```java package com.dance.netty.netty.inandout;

import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.ReplayingDecoder;

import java.util.List;

/**

  • 自定义解码器 */ public class MyByteToLongDecoder2 extends ReplayingDecoder {

    @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {

     // 应为这个是一个Long的解码器, 所以只处理Long类型的数据
     // 默认Long是8个字节
    

    // if(in.readableBytes() >= 8){

     // 不需要判断数据是否够读取 ,内部会进行处理判断
     out.add(in.readLong());
    

    // }else{ // // TODO 不够一个Long的数据, 不处理 // } } } ```

    1. ReplayingDecoder使用方便, 但是它有一些局限性:

      1. 并不是所有的ByteBuf操作都被支持, 如果调用了一个不被支持的方法, 将会抛出一个UnsupportedOperationExeception
      2. ReplayingDecoder 在某些情况下可能稍慢于ByteToMessageDecoder, 例如:网络缓慢并且消息格式复杂时, 消息会被拆成多个碎片,速度变慢

        其他解码器

        其他解码器

    2. LineBasedFrameDecoder : 这个类在Netty内部也有使用, 它使用行尾控制字符(\n 或者 \r\n) 作为分隔符来解析数据

    3. DelimiterBasedFrameDecoder: 使用自定义的特殊字符作为消息的分隔符
    4. HttpObjectDecoder : 一个Http数据的解码器
    5. LengthFieldBasedFrameDecoder : 通过指定长度来标识整包消息, 这样就可以自动处理黏包和半包消息

      其他编码器

      image.png

      Log4j整合到Netty

      添加POM依赖

      <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
       <version>1.2.17</version>
      </dependency>
      <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
       <version>1.7.25</version>
      </dependency>
      <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
       <version>1.7.25</version>
      </dependency>
      <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-simple</artifactId>
       <version>1.7.25</version>
       <scope>test</scope>
      </dependency>
      

      在Resource下新建log4j.properties

      log4j.rootLogger=DEBUG, stdout
      log4j.appender.stdout = org.apache.log4j.ConsoleAppender
      log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
      log4j.appender.stdout.layout.ConversionPattern = [%p]%C{1}-%m%n
      

      测试

      启动上一个案例的服务端测试
      [DEBUG]Slf4JLogger-Using SLF4J as the default logging framework
      [DEBUG]Slf4JLogger--Dio.netty.eventLoopThreads: 16
      [DEBUG]Slf4JLogger-Platform: Windows
      [DEBUG]Slf4JLogger--Dio.netty.noUnsafe: false
      [DEBUG]Slf4JLogger-Java version: 8
      [DEBUG]Slf4JLogger-sun.misc.Unsafe.theUnsafe: available
      [DEBUG]Slf4JLogger-sun.misc.Unsafe.copyMemory: available
      [DEBUG]Slf4JLogger-java.nio.Buffer.address: available
      [DEBUG]Slf4JLogger-direct buffer constructor: available
      [DEBUG]Slf4JLogger-java.nio.Bits.unaligned: available, true
      [DEBUG]Slf4JLogger-jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9
      [DEBUG]Slf4JLogger-java.nio.DirectByteBuffer.<init>(long, int): available
      [DEBUG]Slf4JLogger-sun.misc.Unsafe: available
      [DEBUG]Slf4JLogger--Dio.netty.tmpdir: C:\Users\EXT~1.CAI\AppData\Local\Temp (java.io.tmpdir)
      [DEBUG]Slf4JLogger--Dio.netty.bitMode: 64 (sun.arch.data.model)
      [DEBUG]Slf4JLogger--Dio.netty.noPreferDirect: false
      [DEBUG]Slf4JLogger--Dio.netty.maxDirectMemory: 3760193536 bytes
      [DEBUG]Slf4JLogger--Dio.netty.uninitializedArrayAllocationThreshold: -1
      [DEBUG]Slf4JLogger-java.nio.ByteBuffer.cleaner(): available
      [DEBUG]Slf4JLogger--Dio.netty.noKeySetOptimization: false
      [DEBUG]Slf4JLogger--Dio.netty.selectorAutoRebuildThreshold: 512
      [DEBUG]Slf4JLogger-org.jctools-core.MpscChunkedArrayQueue: available
      [DEBUG]Slf4JLogger--Dio.netty.processId: 14236 (auto-detected)
      [DEBUG]Slf4JLogger--Djava.net.preferIPv4Stack: false
      [DEBUG]Slf4JLogger--Djava.net.preferIPv6Addresses: false
      [DEBUG]Slf4JLogger-Loopback interface: lo (Software Loopback Interface 1, 127.0.0.1)
      [DEBUG]Slf4JLogger-Failed to get SOMAXCONN from sysctl and file \proc\sys\net\core\somaxconn. Default: 200
      [DEBUG]Slf4JLogger--Dio.netty.machineId: 00:50:56:ff:fe:c0:00:08 (auto-detected)
      [DEBUG]Slf4JLogger--Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
      [DEBUG]Slf4JLogger--Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
      [DEBUG]Slf4JLogger--Dio.netty.leakDetection.level: simple
      [DEBUG]Slf4JLogger--Dio.netty.leakDetection.targetRecords: 4
      [DEBUG]Slf4JLogger--Dio.netty.allocator.numHeapArenas: 16
      [DEBUG]Slf4JLogger--Dio.netty.allocator.numDirectArenas: 16
      [DEBUG]Slf4JLogger--Dio.netty.allocator.pageSize: 8192
      [DEBUG]Slf4JLogger--Dio.netty.allocator.maxOrder: 11
      [DEBUG]Slf4JLogger--Dio.netty.allocator.chunkSize: 16777216
      [DEBUG]Slf4JLogger--Dio.netty.allocator.tinyCacheSize: 512
      [DEBUG]Slf4JLogger--Dio.netty.allocator.smallCacheSize: 256
      [DEBUG]Slf4JLogger--Dio.netty.allocator.normalCacheSize: 64
      [DEBUG]Slf4JLogger--Dio.netty.allocator.maxCachedBufferCapacity: 32768
      [DEBUG]Slf4JLogger--Dio.netty.allocator.cacheTrimInterval: 8192
      [DEBUG]Slf4JLogger--Dio.netty.allocator.useCacheForAllThreads: true
      [DEBUG]Slf4JLogger--Dio.netty.allocator.type: pooled
      [DEBUG]Slf4JLogger--Dio.netty.threadLocalDirectBufferSize: 65536
      [DEBUG]Slf4JLogger--Dio.netty.maxThreadLocalCharBufferSize: 16384
      [INFO]Slf4JLogger-[id: 0x7449a0c5] REGISTERED
      [INFO]Slf4JLogger-[id: 0x7449a0c5] BIND: /127.0.0.1:7000
      server is ready ......
      [INFO]Slf4JLogger-[id: 0x7449a0c5, L:/127.0.0.1:7000] ACTIVE
      
      ok~