为什么数据库和数据库连接池不采用类似java nio的IO多路复用技术使用一个连接来维护和数据库的数据交换? - 知乎

三种reactor模式

关于划分boss worker的理解

服务器通常会建立很多连接,比如10000个,如果对应建立10000个线程,线程切换代价大。大多数连接建立连接后不发数据或发送数据量少(所以NIO应对大文件传输时不具备优势~~),白白的占用一个线程**(阻塞也占用?)。操作系统提供多路复用epoll,一个线程管理多个连接。

image.png
image.png
image.png
image.png

netty线程模型

image.png
Netty - 图6

参数调优

linux

  • /proc/sys/net/ipv4/tcp_keepalive_time
  • ulimit -n

netty

  • TCP_NODELAY:默认false,设置为true,不要nagle
  • ChannelOption.SO_BACKLOG:1024

image.png

EventLoop

EventLoop = selector + 线程
image.png

EventLoop执行任务

image.png
同一个channel,使用同一个EventLoop。

Netty中单独EventLoopGroup处理业务

image.png

EventLoop传递

image.png

Channel

ChannelInitializer

netty内部监听accept事件,收到accept事件后,调用initChannel()方法.
image.png

attr():保存一些信息

  • childAttr()

    ChannelFuture同步&异步

    ChannelFuture = channel.connect()
    image.png

    CloseFuture

    CloseFuture同样分为同步、异步操作。
    image.png

    ChannelHandlerContext.writeAndFlush() & NioSocketChannel.writeAndFlush()

    image.png

    EmbeddedChannel【测试使用】

    image.png

    ChannelGroup【管理一组Channel】

    image.png
    添加和移除Channel:
    image.png

    ByteBuf

  • 自动扩容

  • 支持两种内存:直接内存、堆内存
    • ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
    • ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10); 默认
  • 池化 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能

    • 4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
  • ByteBuf 是通过引用计数的方式管理

  • 每次调用 retain() 方法, 它的引用就加一
  • get/set 不会改变读写指针,而 read/write 会改变读写指针
  • 使用ctx.alloc().buffer()不要自定义buffer

    • ctx.alloc().buffer() 与统一配置有关。

      slice() & duplicate() & copy()

      image.png

      编解码器

      image.png

      ByteToMessageDecoder处理粘包【一次解码】

      image.png
  • decode是抽象方法

image.png
image.png

LengthFieldBasedFrameDecoder

image.png
如果收到半包,会等后面的数据。

通信协议

image.png

  1. @Slf4j
  2. @ChannelHandler.Sharable
  3. public class MessageCodec extends ByteToMessageCodec<Message> {
  4. @Override
  5. public void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
  6. // 1. 4 字节的魔数
  7. out.writeBytes(new byte[]{1, 2, 3, 4});
  8. // 2. 1 字节的版本,
  9. out.writeByte(1);
  10. // 3. 1 字节的序列化方式 jdk 0 , json 1
  11. out.writeByte(0);
  12. // 4. 1 字节的指令类型
  13. out.writeByte(msg.getMessageType());
  14. // 5. 4 个字节
  15. out.writeInt(msg.getSequenceId());
  16. // 无意义,对齐填充
  17. out.writeByte(0xff);
  18. // 6. 获取内容的字节数组
  19. ByteArrayOutputStream bos = new ByteArrayOutputStream();
  20. ObjectOutputStream oos = new ObjectOutputStream(bos);
  21. oos.writeObject(msg);
  22. byte[] bytes = bos.toByteArray();
  23. // 7. 长度
  24. out.writeInt(bytes.length);
  25. // 8. 写入内容
  26. out.writeBytes(bytes);
  27. }
  28. @Override
  29. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
  30. int magicNum = in.readInt();
  31. byte version = in.readByte();
  32. byte serializerType = in.readByte();
  33. byte messageType = in.readByte();
  34. int sequenceId = in.readInt();
  35. in.readByte();
  36. int length = in.readInt();
  37. byte[] bytes = new byte[length];
  38. in.readBytes(bytes, 0, length);
  39. ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
  40. Message message = (Message) ois.readObject();
  41. log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
  42. log.debug("{}", message);
  43. out.add(message);
  44. }
  45. }

image.png

MessageToMessageCodec二次编解码器【双向】(byte <-> object)

image.png
MessageToMessageDecoder会进行类型判断,不符合就传递给下一个handler。

StringDecoder

image.png
LineBasedFrameDecoder extends ByteToMessageDecoder 【解决粘包】
StringDecoder extends MessageToMessageDecoder

处理消息Handler

image.png

ChannelHandler的方法生命周期

image.png
image.png
客户端关闭,服务端Channel状态:
image.png

  • channelActive()
    • 连接数统计
  • exceptionCaught(ChannelHandlerContext,Throwable):通道发生异常事件
  • channelInactive:Netty 是先将 Channel 关闭后,再回调 channelInactive 的

    SimpleChannelInboundHandler

  • 关注指定对象

  • 释放消息。

image.png

channelRead0()

image.png

移除Handler

  • 登录handler只用登录一次
  • 不用移除,使用SimpleChannelInboundHandler处理指定请求就可以了。

image.png

合并handler

image.png

IdleStateHandler:检测假死,触发事件

  • 要放在第一个handler

触发事件
image.png

服务端设置读空闲检测,客户端设置写空闲检测

image.png
image.png

ChannelDuplexHandler

ChannelDuplexHandler则同时实现了ChannelInboundHandler和ChannelOutboundHandler接口。如果一个所需的ChannelHandler既要处理入站事件又要处理出站事件,推荐继承此类。

线程池执行handler

相当于tomcat中的worker?

pipeline.addLast(businessGroup, new OrderServerProcessHandler());

流量整形:GlobalChannelTrafficShapingHandler【一般不用】

image.png
image.png

接收响应

future

image.png
image.png

返回响应

ChannelPipeline执行顺序

ctx.write() & ctx.channel().write()区别

image.png
ctx.channel().write()可能会死循环

channelReadComplete【不直接flush】【不用】

image.png

  • 一个读事件可能有多次读。(最多16次)

image.png

flushConsolidationHandler【增强flush】

它是flush增强,所以必须调用flush.它才帮你优化(减少你的flush次数)
image.png

其他

信息统计

image.png
image.png

堆外内存回收

JVM内存结构

开启native

高水位channel.isWritable()

默认32k-64k
image.png

ip过滤

image.png
image.png

心跳

Netty学习(五)—IdleStateHandler心跳机制_zhenyutu的博客-CSDN博客_idlestatehandler
Netty - 图53

————————————

ChannelHandler

ChannelPipeline

image.png

ChannelHandlerContext

image.png

编解码器

编码器也是ChannelHandler
image.png
image.png
image.png
image.png


Netty注解

@Sharable

表示handler可以被重复add

Netty优化

减少flush,增加吞吐量

channelReadComplete

image.png
complete中flush

FlushConsolidationHandler

固定间隔flush,异步flush

流量整形

image.png

开启native

高水位

image.png

空闲检测

  1. 服务端10s收不到请求就断掉。
  2. 客户端write idle check + keepalive

服务端:serverIdleCheckHandler

image.png
image.png

客户端:

ClientIdleCheckHandler

ClientIdleCheckHandler会捕获write idle,触发事件

KeepaliveHandler捕获事件

image.png