首先,通过一个Netty服务器的案例代码看一下其中涉及的核心组件,以及它们所对应的功能,如下所示
public class NettyDiscardServer {// 服务器线程指定端口号private final int serverPort;// 启动器类,负责服务器或客户端组件的组装,及Netty的初始化ServerBootstrap b = new ServerBootstrap();public NettyDiscardServer(int port) {this.serverPort = port;}public void runServer() {//创建reactor线程组,包含BossGroup和workerGroup// 处理连接请求EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);// 处理读写请求EventLoopGroup workerLoopGroup = new NioEventLoopGroup();try {// 1 设置reactor 线程组b.group(bossLoopGroup, workerLoopGroup);// 2 设置nio类型的channel为NioServerSocketChannelb.channel(NioServerSocketChannel.class);// 3 设置监听端口b.localAddress(serverPort);// 4 设置通道的参数b.option(ChannelOption.SO_KEEPALIVE, true); // 长连接b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); // ByteBuf采用池化方式b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);// 5 装配子通道(处理读写请求)流水线// 创建ChannelInitializer对象,并重写initChannel方法b.childHandler(new ChannelInitializer<SocketChannel>() {// 有连接到达时会创建一个channelprotected void initChannel(SocketChannel ch) throws Exception {// pipeline管理子通道channel中的Handler// 向子channel流水线添加一个handler处理器// ChannelPipeline采用双向链表组织ch.pipeline().addLast(new NettyDiscardHandler()); // 这里添加的Handler需要自行编码实现}});// 6 开始绑定server// 通过调用sync同步方法阻塞直到绑定成功ChannelFuture channelFuture = b.bind().sync();Logger.info(" 服务器启动成功,监听端口: " +channelFuture.channel().localAddress());// 7 等待通道关闭的异步任务结束// 服务监听通道会一直等待通道关闭的异步任务结束ChannelFuture closeFuture = channelFuture.channel().closeFuture();closeFuture.sync();} catch (Exception e) {e.printStackTrace();} finally {// 8 关闭EventLoopGroup,// 释放掉所有资源包括创建的线程workerLoopGroup.shutdownGracefully();bossLoopGroup.shutdownGracefully();}}}
自定义的入站ChannelHandler需要继承ChannelInboundHandlerAdapter,并重写ChannelRead方法实现,如下所示:
public class NettyDiscardHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf in = (ByteBuf) msg;try {Logger.info("收到消息,丢弃如下:");while (in.isReadable()) {System.out.print((char) in.readByte());}System.out.println();} finally {ReferenceCountUtil.release(msg);}}}
1. Netty Reactor
**
Netty中的Reactor模型的运行流程如下所示:
每个Socket连接对应的IO时间来自于Channel,如果想要查询Channel对应的IO时间,就必须将其注册到Selector上。Selector起到IO多路复用的功能,监听注册到上面的Channel。Reactor负责一个线程来不断的轮询Selector上的IO事件,当查询到有IO时间发生时,它负责将其分发给具体的Handler进行处理,真正的IO操作和业务处理由Handler负责完成。
Netty中常用的对应于NioSocketChannel的Reactor实现为NioEventLoop,它绑定了Thread类成员和Java NIO Selector两个成员属性。每一个NioEventLoop拥有一个线程,负责一个Selector的IO事件轮询。
2. Channel
Netty对于Java NIO中的Channel进行了进一步的封装,对于每一种通信连接协议都实现了自己的Channel。而每一种协议的Channel又有NIO(异步IO)和OIO(阻塞IO)两种形式:
| NioSocketChannel | 异步非阻塞TCP Socket 通道 |
|---|---|
| OioSocketChannel | 同步阻塞式TCP Socket 通道 |
| NioServerSocketChannel | 异步非阻塞式TCP Socket服务器端监听通道 |
| OioServerSocketChannel | 同步阻塞式TCP Socket服务器端监听通道 |
| NioDatagramChannel | 异步非阻塞式UDP传输通道 |
| OioDatagramChannel | 同步阻塞式UDP传输通道 |
| NioSctpServerChannel | 异步非阻塞式Sctp传输通道 |
| OioSctpServerChannel | 同步阻塞式Sctp传输通道 |
NioSocketChannel的IO操作最终还是会落到Java NIO的SelectableChannel上。
NioSocketChannel的类继承关系图如下所示:
其中,顶层的父类AbstractChannel构造函数如下所示:
/*** Creates a new instance.** @param parent* the parent of this channel. {@code null} if there's no parent.*/protected AbstractChannel(Channel parent) {this.parent = parent; // 父通道id = newId();unsafe = newUnsafe(); // 底层的IO通道完成实际IO操作pipeline = newChannelPipeline(); // 每个通道对应的流水线}
常用的方法有:
connect:连接远程服务器,调用后立即返回ChannelFuture
@Overridepublic ChannelFuture connect(SocketAddress remoteAddress) {return pipeline.connect(remoteAddress);}
bind:绑定监听地址,监听客户端连接
@Overridepublic ChannelFuture bind(SocketAddress localAddress) {return pipeline.bind(localAddress);}
close:关闭通道连接,返回连接关闭的ChannelFuture异步任务
- read:读取通道数据,启动入站处理。从内部的Java NIO Channel通道读取数据,启动内部的Pipeline流水线,开启数据读取的入站处理
- write:启动出站流水处理,将处理后的最终数据写到底层Java NIO通道
- flush:将缓冲区中的数据立即写出到对端
3. Handler
NioEventLoop中的Selector会轮询监听注册在上面的通道,是否有connect、accept、read和write中某一种IO事件的发生。当监听到相应的IO事件后,需要将其分发给自己的Handler进行处理。Netty中整个IO处理操作的流程为:从通道读数据包、数据包解码、业务处理、目标数据编码、结果写入通道、发送给对端。
从上图可以看出,入站操作是自底向上的,由Netty内部到入站处理器,而出站操作则是相反的。入站处理器负责解码和业务处理操作,出站处理器负责数据编码和通道的写入操作,数据的读取和发送是由Netty底层实现的。
Netty中的Handler由Channelhandler接口定义,具体的实现形式有:
- 入站处理器:ChannelInboundHandler,默认实现是ChannelInboundHandlerAdapter
- 出站处理器:ChannelOutboundHandler,默认实现是ChannelOutboundHandlerAdapter
例如,ChannelInboundHandler接口的定义如下:
public interface ChannelInboundHandler extends ChannelHandler {void channelRegistered(ChannelHandlerContext ctx) throws Exception;void channelUnregistered(ChannelHandlerContext ctx) throws Exception;void channelActive(ChannelHandlerContext ctx) throws Exception;void channelInactive(ChannelHandlerContext ctx) throws Exception;void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;void channelReadComplete(ChannelHandlerContext ctx) throws Exception;void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;@Override@SuppressWarnings("deprecation")void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;}
接口中定义了一系列有关操作的方法,接口常用的实现类就是ChannelInboundHandlerAdapter。详细的方法介绍可以查阅Class ChannelInboundHandlerAdapter。
相应的出站处理器接口中定义定义如下:
public interface ChannelOutboundHandler extends ChannelHandler {// 绑定监听地址void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;// 连接服务端void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,SocketAddress localAddress, ChannelPromise promise) throws Exception;// 断连void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;// 主动关闭通道void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;// 从底层读数据void read(ChannelHandlerContext ctx) throws Exception;// 写数据到底层void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;// 将底层缓存区的数据腾空,立即写出到对端void flush(ChannelHandlerContext ctx) throws Exception;}
通过继承ChannelInboundHandlerAdapter和ChannelInboundHandlerAdapter并重写其中相关的方法,我们就可以实现ChannelHandler的自定义。但是,ChannelHandler定义完毕之后,还需要将其和Channel对应的Pipeline进行关联,而关联操作需要ChannelInitializer完成。ChannelInitializer中定义了一个抽象方法initChannel来负责ChannelHandler的装配。
@Sharablepublic abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(new ConcurrentHashMap<ChannelHandlerContext, Boolean>());protected abstract void initChannel(C ch) throws Exception;// ...}
当需要装配ChannelHandler时,只需要通道调用initChannel方法,将ChannelHandler转配到对应的Pipeline上即可。例如:
b.childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new NettyDiscardHandler()); // 这里添加的Handler需要自行编码实现}});
4. ChannelPipeline
上面介绍了Channel、ChannelHandler和Reactor这些Netty中的核心要素,而将这些要素关联起来就需要另外一个部分:ChannelPipeline。每一个通道对应的IO事件会有很多的ChannelHandler进行处理,Netty将这些相关的ChannelHandler组织成双向链表的形式,对应的结构就是ChannelPipeline。
ChannelPipeline会管理通道相关的所有ChannelHandler,包括入站处理器和出站处理器,它们组成的结构如下所示:
入站操作的方向和出站操作的方向是相反的,而且,入站操作只会且只能从Inbound入站处理器类型的Handler流过,出站操作只会且只能从Outbound出站处理器类型的Handler流过。
ChannelPipeline的双向链表的节点类型定义为ChannelHandlerContext,它代表了具体的ChannelHandler和所在的ChannelPipeline之间的关联。它们俩和Channel三者之间的关系可以描述为:Channel拥有一条ChannelPipeline,每一个ChannelPipeline节点为一个ChannelHandlerContext对象,每一个ChannelHandlerContext对象中包含一个ChannelHandler。
**
ChannelHandlerContext的定义如下:
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {Channel channel();EventExecutor executor();String name();ChannelHandler handler();boolean isRemoved();@OverrideChannelHandlerContext fireChannelRegistered();@OverrideChannelHandlerContext fireChannelUnregistered();@OverrideChannelHandlerContext fireChannelActive();@OverrideChannelHandlerContext fireChannelInactive();@OverrideChannelHandlerContext fireExceptionCaught(Throwable cause);@OverrideChannelHandlerContext fireUserEventTriggered(Object evt);@OverrideChannelHandlerContext fireChannelRead(Object msg);@OverrideChannelHandlerContext fireChannelReadComplete();@OverrideChannelHandlerContext fireChannelWritabilityChanged();@OverrideChannelHandlerContext read();@OverrideChannelHandlerContext flush();ChannelPipeline pipeline();ByteBufAllocator alloc();@Deprecated@Override<T> Attribute<T> attr(AttributeKey<T> key);@Deprecated@Override<T> boolean hasAttr(AttributeKey<T> key);}
接口中定义的方法分为两类:
- 获取上下文所关联的Netty组件实例
- 入站和出站处理方法
**
如果通过Channel或是ChannelPipeline实例调用入站和出站的处理方法,它们会在整条Pipeline中传播;如果通过ChannelHandlerContext调用入站和出站处理方法,它们只会从当前的节点开始执行并传播到相同类型的下一个节点。
ChannelPipeline对于入站操作来说,如果想要在执行的某个点截断,那么通常有两种方式:
- 不调用super.channelXxx
- 不调用ctx.fireChannelXxx
其中xxx表示相应的操作,如read、write等。例如:
/*** @Author dyliang* @Date 2020/11/10 10:46* @Version 1.0*/public class InPipeline {static class SimpleInHandlerA extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("入站处理器 A: 被回调 ");super.channelRead(ctx, msg);}}static class SimpleInHandlerB extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("入站处理器 B: 被回调 ");super.channelRead(ctx, msg);}}static class SimpleInHandlerC extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("入站处理器 C: 被回调 ");super.channelRead(ctx, msg);}}//测试流水线的截断@Testpublic void testPipelineCutting() {ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {protected void initChannel(EmbeddedChannel ch) {ch.pipeline().addLast(new SimpleInHandlerA());ch.pipeline().addLast(new SimpleInHandlerB2());ch.pipeline().addLast(new SimpleInHandlerC());}};EmbeddedChannel channel = new EmbeddedChannel(i);ByteBuf buf = Unpooled.buffer();buf.writeInt(1);//向通道写一个入站报文channel.writeInbound(buf);try {Thread.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();}}}
正常执行测试方法,控制台输出:
入站处理器 A: 被回调入站处理器 B: 被回调入站处理器 C: 被回调
如果将上面的SimpleInHandlerB的super.channelRead(ctx, msg)注释掉。执行单元测试,控制台输出:
入站处理器 A: 被回调入站处理器 B: 被回调
第二种方式为:
static class SimpleInHandlerB extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("入站处理器 B: 被回调 ");ctx.fireChannelRead(msg);}}
除了可以在截断Pipeline的操作外,Netty还支持对于ChannelHandler的热插拔,即可以动态的增加和删除ChannelPipeline上的ChannelHandler。
5. Bootstrap启动类
Boostrap启动类提供一种简便的方式将上述的核心组件都组装起来,完成Netty程序的初始化。客户端使用Boostrap,服务端使用ServerBoostrap。例如:
ServerBootstrap b = new ServerBootstrap();b.group(bossLoopGroup, workerLoopGroup);b.channel(NioServerSocketChannel.class);b.localAddress(serverPort);b.option(ChannelOption.SO_KEEPALIVE, true);b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);b.childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new NettyDiscardHandler());}});ChannelFuture channelFuture = b.bind().sync();ChannelFuture closeFuture = channelFuture.channel().closeFuture();closeFuture.sync();workerLoopGroup.shutdownGracefully();bossLoopGroup.shutdownGracefully();
Netty中将处理连接的监听和建立的NioServerSocketChannel和对应IO事件的NioSocketChannel称为父通道和子通道。
那么Boostrap类是如何启动的呢?即如何使用Boostrap来实现组件的组装和服务器及客户端的启动呢?以上面的代码为例进行说明。
首先,要想使用Boostrap必须先实例化一个Boostrap对象:
ServerBootstrap b = new ServerBootstrap();
创建Reactor线程组,推荐使用BoosLoopGroup和WorkerLoopGroup配套的形式,调用group方法给Boostrap配置线程组:
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);EventLoopGroup workerLoopGroup = new NioEventLoopGroup();b.group(bossLoopGroup, workerLoopGroup);
设置通道类型、监听端口:
b.channel(NioServerSocketChannel.class);b.localAddress(serverPort);
配置传输通道的配置选项,例如开启TCP底层的心跳机制:
b.option(ChannelOption.SO_KEEPALIVE, true);
ChannelOption中定义了许多的配置项,例如:
public static final ChannelOption<Boolean> AUTO_CLOSE = valueOf("AUTO_CLOSE");public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST");public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF");public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR");public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER");public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG");public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT");
更多可见ChannelOption源码。
组装好了Channel和Reactor后,还需要组装Pipeline。由于父通道(NioServerSocketChannel)业务固定,通常无需单独配置,这里只需要组装子通道的Pipeline即可。
b.childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new NettyDiscardHandler());}});
接着需要绑定服务器新连接的监听端口,由于Netty中任何操作都是异步的,这里返回的是异步回调的ChannelFuture。不过没有对ChannelFuture做其他处理,而是阻塞直到绑定成功。
ChannelFuture channelFuture = b.bind().sync();
绑定成功后,自我阻塞直到通道关闭。
ChannelFuture closeFuture = channelFuture.channel().closeFuture();closeFuture.sync();
最后关闭EnentLoopGroup,释放资源。
workerLoopGroup.shutdownGracefully();bossLoopGroup.shutdownGracefully();
