首先,通过一个Netty服务器的案例代码看一下其中涉及的核心组件,以及它们所对应的功能,如下所示

  1. public class NettyDiscardServer {
  2. // 服务器线程指定端口号
  3. private final int serverPort;
  4. // 启动器类,负责服务器或客户端组件的组装,及Netty的初始化
  5. ServerBootstrap b = new ServerBootstrap();
  6. public NettyDiscardServer(int port) {
  7. this.serverPort = port;
  8. }
  9. public void runServer() {
  10. //创建reactor线程组,包含BossGroup和workerGroup
  11. // 处理连接请求
  12. EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
  13. // 处理读写请求
  14. EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
  15. try {
  16. // 1 设置reactor 线程组
  17. b.group(bossLoopGroup, workerLoopGroup);
  18. // 2 设置nio类型的channel为NioServerSocketChannel
  19. b.channel(NioServerSocketChannel.class);
  20. // 3 设置监听端口
  21. b.localAddress(serverPort);
  22. // 4 设置通道的参数
  23. b.option(ChannelOption.SO_KEEPALIVE, true); // 长连接
  24. b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); // ByteBuf采用池化方式
  25. b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
  26. // 5 装配子通道(处理读写请求)流水线
  27. // 创建ChannelInitializer对象,并重写initChannel方法
  28. b.childHandler(new ChannelInitializer<SocketChannel>() {
  29. // 有连接到达时会创建一个channel
  30. protected void initChannel(SocketChannel ch) throws Exception {
  31. // pipeline管理子通道channel中的Handler
  32. // 向子channel流水线添加一个handler处理器
  33. // ChannelPipeline采用双向链表组织
  34. ch.pipeline().addLast(new NettyDiscardHandler()); // 这里添加的Handler需要自行编码实现
  35. }
  36. });
  37. // 6 开始绑定server
  38. // 通过调用sync同步方法阻塞直到绑定成功
  39. ChannelFuture channelFuture = b.bind().sync();
  40. Logger.info(" 服务器启动成功,监听端口: " +
  41. channelFuture.channel().localAddress());
  42. // 7 等待通道关闭的异步任务结束
  43. // 服务监听通道会一直等待通道关闭的异步任务结束
  44. ChannelFuture closeFuture = channelFuture.channel().closeFuture();
  45. closeFuture.sync();
  46. } catch (Exception e) {
  47. e.printStackTrace();
  48. } finally {
  49. // 8 关闭EventLoopGroup,
  50. // 释放掉所有资源包括创建的线程
  51. workerLoopGroup.shutdownGracefully();
  52. bossLoopGroup.shutdownGracefully();
  53. }
  54. }
  55. }

自定义的入站ChannelHandler需要继承ChannelInboundHandlerAdapter,并重写ChannelRead方法实现,如下所示:

  1. public class NettyDiscardHandler extends ChannelInboundHandlerAdapter {
  2. @Override
  3. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  4. ByteBuf in = (ByteBuf) msg;
  5. try {
  6. Logger.info("收到消息,丢弃如下:");
  7. while (in.isReadable()) {
  8. System.out.print((char) in.readByte());
  9. }
  10. System.out.println();
  11. } finally {
  12. ReferenceCountUtil.release(msg);
  13. }
  14. }
  15. }

1. Netty Reactor

**
Netty中的Reactor模型的运行流程如下所示:
image.png

每个Socket连接对应的IO时间来自于Channel,如果想要查询Channel对应的IO时间,就必须将其注册到Selector上。Selector起到IO多路复用的功能,监听注册到上面的Channel。Reactor负责一个线程来不断的轮询Selector上的IO事件,当查询到有IO时间发生时,它负责将其分发给具体的Handler进行处理,真正的IO操作和业务处理由Handler负责完成。

Netty中常用的对应于NioSocketChannel的Reactor实现为NioEventLoop,它绑定了Thread类成员和Java NIO Selector两个成员属性。每一个NioEventLoop拥有一个线程,负责一个Selector的IO事件轮询


2. Channel

Netty对于Java NIO中的Channel进行了进一步的封装,对于每一种通信连接协议都实现了自己的Channel。而每一种协议的Channel又有NIO(异步IO)和OIO(阻塞IO)两种形式:

NioSocketChannel 异步非阻塞TCP Socket 通道
OioSocketChannel 同步阻塞式TCP Socket 通道
NioServerSocketChannel 异步非阻塞式TCP Socket服务器端监听通道
OioServerSocketChannel 同步阻塞式TCP Socket服务器端监听通道
NioDatagramChannel 异步非阻塞式UDP传输通道
OioDatagramChannel 同步阻塞式UDP传输通道
NioSctpServerChannel 异步非阻塞式Sctp传输通道
OioSctpServerChannel 同步阻塞式Sctp传输通道

NioSocketChannel的IO操作最终还是会落到Java NIO的SelectableChannel上。

NioSocketChannel的类继承关系图如下所示:
image.png

其中,顶层的父类AbstractChannel构造函数如下所示:

  1. /**
  2. * Creates a new instance.
  3. *
  4. * @param parent
  5. * the parent of this channel. {@code null} if there's no parent.
  6. */
  7. protected AbstractChannel(Channel parent) {
  8. this.parent = parent; // 父通道
  9. id = newId();
  10. unsafe = newUnsafe(); // 底层的IO通道完成实际IO操作
  11. pipeline = newChannelPipeline(); // 每个通道对应的流水线
  12. }

常用的方法有:

  • connect:连接远程服务器,调用后立即返回ChannelFuture

    1. @Override
    2. public ChannelFuture connect(SocketAddress remoteAddress) {
    3. return pipeline.connect(remoteAddress);
    4. }
  • bind:绑定监听地址,监听客户端连接

    1. @Override
    2. public ChannelFuture bind(SocketAddress localAddress) {
    3. return pipeline.bind(localAddress);
    4. }
  • close:关闭通道连接,返回连接关闭的ChannelFuture异步任务

  • read:读取通道数据,启动入站处理。从内部的Java NIO Channel通道读取数据,启动内部的Pipeline流水线,开启数据读取的入站处理
  • write:启动出站流水处理,将处理后的最终数据写到底层Java NIO通道
  • flush:将缓冲区中的数据立即写出到对端

3. Handler

NioEventLoop中的Selector会轮询监听注册在上面的通道,是否有connect、accept、read和write中某一种IO事件的发生。当监听到相应的IO事件后,需要将其分发给自己的Handler进行处理。Netty中整个IO处理操作的流程为:从通道读数据包、数据包解码、业务处理、目标数据编码、结果写入通道、发送给对端。
image.png

从上图可以看出,入站操作是自底向上的,由Netty内部到入站处理器,而出站操作则是相反的。入站处理器负责解码和业务处理操作,出站处理器负责数据编码和通道的写入操作,数据的读取和发送是由Netty底层实现的。

Netty中的Handler由Channelhandler接口定义,具体的实现形式有:

  • 入站处理器:ChannelInboundHandler,默认实现是ChannelInboundHandlerAdapter
  • 出站处理器:ChannelOutboundHandler,默认实现是ChannelOutboundHandlerAdapter

例如,ChannelInboundHandler接口的定义如下:

  1. public interface ChannelInboundHandler extends ChannelHandler {
  2. void channelRegistered(ChannelHandlerContext ctx) throws Exception;
  3. void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
  4. void channelActive(ChannelHandlerContext ctx) throws Exception;
  5. void channelInactive(ChannelHandlerContext ctx) throws Exception;
  6. void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
  7. void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
  8. void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
  9. void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
  10. @Override
  11. @SuppressWarnings("deprecation")
  12. void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
  13. }

Netty中ChannelHandler的生命周期

接口中定义了一系列有关操作的方法,接口常用的实现类就是ChannelInboundHandlerAdapter。详细的方法介绍可以查阅Class ChannelInboundHandlerAdapter

相应的出站处理器接口中定义定义如下:

  1. public interface ChannelOutboundHandler extends ChannelHandler {
  2. // 绑定监听地址
  3. void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
  4. // 连接服务端
  5. void connect(
  6. ChannelHandlerContext ctx, SocketAddress remoteAddress,
  7. SocketAddress localAddress, ChannelPromise promise) throws Exception;
  8. // 断连
  9. void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
  10. // 主动关闭通道
  11. void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
  12. void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
  13. // 从底层读数据
  14. void read(ChannelHandlerContext ctx) throws Exception;
  15. // 写数据到底层
  16. void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
  17. // 将底层缓存区的数据腾空,立即写出到对端
  18. void flush(ChannelHandlerContext ctx) throws Exception;
  19. }

通过继承ChannelInboundHandlerAdapter和ChannelInboundHandlerAdapter并重写其中相关的方法,我们就可以实现ChannelHandler的自定义。但是,ChannelHandler定义完毕之后,还需要将其和Channel对应的Pipeline进行关联,而关联操作需要ChannelInitializer完成。ChannelInitializer中定义了一个抽象方法initChannel来负责ChannelHandler的装配。

  1. @Sharable
  2. public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
  3. private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);
  4. private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(
  5. new ConcurrentHashMap<ChannelHandlerContext, Boolean>());
  6. protected abstract void initChannel(C ch) throws Exception;
  7. // ...
  8. }

当需要装配ChannelHandler时,只需要通道调用initChannel方法,将ChannelHandler转配到对应的Pipeline上即可。例如:

  1. b.childHandler(new ChannelInitializer<SocketChannel>() {
  2. protected void initChannel(SocketChannel ch) throws Exception {
  3. ch.pipeline().addLast(new NettyDiscardHandler()); // 这里添加的Handler需要自行编码实现
  4. }
  5. });

4. ChannelPipeline

上面介绍了Channel、ChannelHandler和Reactor这些Netty中的核心要素,而将这些要素关联起来就需要另外一个部分:ChannelPipeline。每一个通道对应的IO事件会有很多的ChannelHandler进行处理,Netty将这些相关的ChannelHandler组织成双向链表的形式,对应的结构就是ChannelPipeline。

ChannelPipeline会管理通道相关的所有ChannelHandler,包括入站处理器和出站处理器,它们组成的结构如下所示:
image.png

入站操作的方向和出站操作的方向是相反的,而且,入站操作只会且只能从Inbound入站处理器类型的Handler流过,出站操作只会且只能从Outbound出站处理器类型的Handler流过。

ChannelPipeline的双向链表的节点类型定义为ChannelHandlerContext,它代表了具体的ChannelHandler和所在的ChannelPipeline之间的关联。它们俩和Channel三者之间的关系可以描述为:Channel拥有一条ChannelPipeline,每一个ChannelPipeline节点为一个ChannelHandlerContext对象,每一个ChannelHandlerContext对象中包含一个ChannelHandler。
**
ChannelHandlerContext的定义如下:

  1. public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
  2. Channel channel();
  3. EventExecutor executor();
  4. String name();
  5. ChannelHandler handler();
  6. boolean isRemoved();
  7. @Override
  8. ChannelHandlerContext fireChannelRegistered();
  9. @Override
  10. ChannelHandlerContext fireChannelUnregistered();
  11. @Override
  12. ChannelHandlerContext fireChannelActive();
  13. @Override
  14. ChannelHandlerContext fireChannelInactive();
  15. @Override
  16. ChannelHandlerContext fireExceptionCaught(Throwable cause);
  17. @Override
  18. ChannelHandlerContext fireUserEventTriggered(Object evt);
  19. @Override
  20. ChannelHandlerContext fireChannelRead(Object msg);
  21. @Override
  22. ChannelHandlerContext fireChannelReadComplete();
  23. @Override
  24. ChannelHandlerContext fireChannelWritabilityChanged();
  25. @Override
  26. ChannelHandlerContext read();
  27. @Override
  28. ChannelHandlerContext flush();
  29. ChannelPipeline pipeline();
  30. ByteBufAllocator alloc();
  31. @Deprecated
  32. @Override
  33. <T> Attribute<T> attr(AttributeKey<T> key);
  34. @Deprecated
  35. @Override
  36. <T> boolean hasAttr(AttributeKey<T> key);
  37. }

接口中定义的方法分为两类:

  • 获取上下文所关联的Netty组件实例
  • 入站和出站处理方法

**

如果通过Channel或是ChannelPipeline实例调用入站和出站的处理方法,它们会在整条Pipeline中传播;如果通过ChannelHandlerContext调用入站和出站处理方法,它们只会从当前的节点开始执行并传播到相同类型的下一个节点。

ChannelPipeline对于入站操作来说,如果想要在执行的某个点截断,那么通常有两种方式:

  • 不调用super.channelXxx
  • 不调用ctx.fireChannelXxx

其中xxx表示相应的操作,如read、write等。例如:

  1. /**
  2. * @Author dyliang
  3. * @Date 2020/11/10 10:46
  4. * @Version 1.0
  5. */
  6. public class InPipeline {
  7. static class SimpleInHandlerA extends ChannelInboundHandlerAdapter {
  8. @Override
  9. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  10. System.out.println("入站处理器 A: 被回调 ");
  11. super.channelRead(ctx, msg);
  12. }
  13. }
  14. static class SimpleInHandlerB extends ChannelInboundHandlerAdapter {
  15. @Override
  16. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  17. System.out.println("入站处理器 B: 被回调 ");
  18. super.channelRead(ctx, msg);
  19. }
  20. }
  21. static class SimpleInHandlerC extends ChannelInboundHandlerAdapter {
  22. @Override
  23. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  24. System.out.println("入站处理器 C: 被回调 ");
  25. super.channelRead(ctx, msg);
  26. }
  27. }
  28. //测试流水线的截断
  29. @Test
  30. public void testPipelineCutting() {
  31. ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
  32. protected void initChannel(EmbeddedChannel ch) {
  33. ch.pipeline().addLast(new SimpleInHandlerA());
  34. ch.pipeline().addLast(new SimpleInHandlerB2());
  35. ch.pipeline().addLast(new SimpleInHandlerC());
  36. }
  37. };
  38. EmbeddedChannel channel = new EmbeddedChannel(i);
  39. ByteBuf buf = Unpooled.buffer();
  40. buf.writeInt(1);
  41. //向通道写一个入站报文
  42. channel.writeInbound(buf);
  43. try {
  44. Thread.sleep(Integer.MAX_VALUE);
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. }

正常执行测试方法,控制台输出:

  1. 入站处理器 A: 被回调
  2. 入站处理器 B: 被回调
  3. 入站处理器 C: 被回调

如果将上面的SimpleInHandlerB的super.channelRead(ctx, msg)注释掉。执行单元测试,控制台输出:

  1. 入站处理器 A: 被回调
  2. 入站处理器 B: 被回调

第二种方式为:

  1. static class SimpleInHandlerB extends ChannelInboundHandlerAdapter {
  2. @Override
  3. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  4. System.out.println("入站处理器 B: 被回调 ");
  5. ctx.fireChannelRead(msg);
  6. }
  7. }

除了可以在截断Pipeline的操作外,Netty还支持对于ChannelHandler的热插拔,即可以动态的增加和删除ChannelPipeline上的ChannelHandler。


5. Bootstrap启动类

Boostrap启动类提供一种简便的方式将上述的核心组件都组装起来,完成Netty程序的初始化。客户端使用Boostrap,服务端使用ServerBoostrap。例如:

  1. ServerBootstrap b = new ServerBootstrap();
  2. b.group(bossLoopGroup, workerLoopGroup);
  3. b.channel(NioServerSocketChannel.class);
  4. b.localAddress(serverPort);
  5. b.option(ChannelOption.SO_KEEPALIVE, true);
  6. b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
  7. b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
  8. b.childHandler(new ChannelInitializer<SocketChannel>() {
  9. protected void initChannel(SocketChannel ch) throws Exception {
  10. ch.pipeline().addLast(new NettyDiscardHandler());
  11. }
  12. });
  13. ChannelFuture channelFuture = b.bind().sync();
  14. ChannelFuture closeFuture = channelFuture.channel().closeFuture();
  15. closeFuture.sync();
  16. workerLoopGroup.shutdownGracefully();
  17. bossLoopGroup.shutdownGracefully();

Netty中将处理连接的监听和建立的NioServerSocketChannel和对应IO事件的NioSocketChannel称为父通道和子通道。

那么Boostrap类是如何启动的呢?即如何使用Boostrap来实现组件的组装和服务器及客户端的启动呢?以上面的代码为例进行说明。

首先,要想使用Boostrap必须先实例化一个Boostrap对象:

  1. ServerBootstrap b = new ServerBootstrap();

创建Reactor线程组,推荐使用BoosLoopGroup和WorkerLoopGroup配套的形式,调用group方法给Boostrap配置线程组:

  1. EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
  2. EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
  3. b.group(bossLoopGroup, workerLoopGroup);

设置通道类型、监听端口:

  1. b.channel(NioServerSocketChannel.class);
  2. b.localAddress(serverPort);

配置传输通道的配置选项,例如开启TCP底层的心跳机制:

  1. b.option(ChannelOption.SO_KEEPALIVE, true);

ChannelOption中定义了许多的配置项,例如:

  1. public static final ChannelOption<Boolean> AUTO_CLOSE = valueOf("AUTO_CLOSE");
  2. public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST");
  3. public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");
  4. public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF");
  5. public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");
  6. public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR");
  7. public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER");
  8. public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG");
  9. public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT");

更多可见ChannelOption源码。

组装好了Channel和Reactor后,还需要组装Pipeline。由于父通道(NioServerSocketChannel)业务固定,通常无需单独配置,这里只需要组装子通道的Pipeline即可。

  1. b.childHandler(new ChannelInitializer<SocketChannel>() {
  2. protected void initChannel(SocketChannel ch) throws Exception {
  3. ch.pipeline().addLast(new NettyDiscardHandler());
  4. }
  5. });

接着需要绑定服务器新连接的监听端口,由于Netty中任何操作都是异步的,这里返回的是异步回调的ChannelFuture。不过没有对ChannelFuture做其他处理,而是阻塞直到绑定成功。

  1. ChannelFuture channelFuture = b.bind().sync();

绑定成功后,自我阻塞直到通道关闭。

  1. ChannelFuture closeFuture = channelFuture.channel().closeFuture();
  2. closeFuture.sync();

最后关闭EnentLoopGroup,释放资源。

  1. workerLoopGroup.shutdownGracefully();
  2. bossLoopGroup.shutdownGracefully();