首先,通过一个Netty服务器的案例代码看一下其中涉及的核心组件,以及它们所对应的功能,如下所示
public class NettyDiscardServer {
// 服务器线程指定端口号
private final int serverPort;
// 启动器类,负责服务器或客户端组件的组装,及Netty的初始化
ServerBootstrap b = new ServerBootstrap();
public NettyDiscardServer(int port) {
this.serverPort = port;
}
public void runServer() {
//创建reactor线程组,包含BossGroup和workerGroup
// 处理连接请求
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
// 处理读写请求
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try {
// 1 设置reactor 线程组
b.group(bossLoopGroup, workerLoopGroup);
// 2 设置nio类型的channel为NioServerSocketChannel
b.channel(NioServerSocketChannel.class);
// 3 设置监听端口
b.localAddress(serverPort);
// 4 设置通道的参数
b.option(ChannelOption.SO_KEEPALIVE, true); // 长连接
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); // ByteBuf采用池化方式
b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
// 5 装配子通道(处理读写请求)流水线
// 创建ChannelInitializer对象,并重写initChannel方法
b.childHandler(new ChannelInitializer<SocketChannel>() {
// 有连接到达时会创建一个channel
protected void initChannel(SocketChannel ch) throws Exception {
// pipeline管理子通道channel中的Handler
// 向子channel流水线添加一个handler处理器
// ChannelPipeline采用双向链表组织
ch.pipeline().addLast(new NettyDiscardHandler()); // 这里添加的Handler需要自行编码实现
}
});
// 6 开始绑定server
// 通过调用sync同步方法阻塞直到绑定成功
ChannelFuture channelFuture = b.bind().sync();
Logger.info(" 服务器启动成功,监听端口: " +
channelFuture.channel().localAddress());
// 7 等待通道关闭的异步任务结束
// 服务监听通道会一直等待通道关闭的异步任务结束
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
closeFuture.sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 8 关闭EventLoopGroup,
// 释放掉所有资源包括创建的线程
workerLoopGroup.shutdownGracefully();
bossLoopGroup.shutdownGracefully();
}
}
}
自定义的入站ChannelHandler需要继承ChannelInboundHandlerAdapter,并重写ChannelRead方法实现,如下所示:
public class NettyDiscardHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
try {
Logger.info("收到消息,丢弃如下:");
while (in.isReadable()) {
System.out.print((char) in.readByte());
}
System.out.println();
} finally {
ReferenceCountUtil.release(msg);
}
}
}
1. Netty Reactor
**
Netty中的Reactor模型的运行流程如下所示:
每个Socket连接对应的IO时间来自于Channel,如果想要查询Channel对应的IO时间,就必须将其注册到Selector上。Selector起到IO多路复用的功能,监听注册到上面的Channel。Reactor负责一个线程来不断的轮询Selector上的IO事件,当查询到有IO时间发生时,它负责将其分发给具体的Handler进行处理,真正的IO操作和业务处理由Handler负责完成。
Netty中常用的对应于NioSocketChannel的Reactor实现为NioEventLoop,它绑定了Thread类成员和Java NIO Selector两个成员属性。每一个NioEventLoop拥有一个线程,负责一个Selector的IO事件轮询。
2. Channel
Netty对于Java NIO中的Channel进行了进一步的封装,对于每一种通信连接协议都实现了自己的Channel。而每一种协议的Channel又有NIO(异步IO)和OIO(阻塞IO)两种形式:
NioSocketChannel | 异步非阻塞TCP Socket 通道 |
---|---|
OioSocketChannel | 同步阻塞式TCP Socket 通道 |
NioServerSocketChannel | 异步非阻塞式TCP Socket服务器端监听通道 |
OioServerSocketChannel | 同步阻塞式TCP Socket服务器端监听通道 |
NioDatagramChannel | 异步非阻塞式UDP传输通道 |
OioDatagramChannel | 同步阻塞式UDP传输通道 |
NioSctpServerChannel | 异步非阻塞式Sctp传输通道 |
OioSctpServerChannel | 同步阻塞式Sctp传输通道 |
NioSocketChannel的IO操作最终还是会落到Java NIO的SelectableChannel上。
NioSocketChannel的类继承关系图如下所示:
其中,顶层的父类AbstractChannel构造函数如下所示:
/**
* Creates a new instance.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent) {
this.parent = parent; // 父通道
id = newId();
unsafe = newUnsafe(); // 底层的IO通道完成实际IO操作
pipeline = newChannelPipeline(); // 每个通道对应的流水线
}
常用的方法有:
connect:连接远程服务器,调用后立即返回ChannelFuture
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return pipeline.connect(remoteAddress);
}
bind:绑定监听地址,监听客户端连接
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return pipeline.bind(localAddress);
}
close:关闭通道连接,返回连接关闭的ChannelFuture异步任务
- read:读取通道数据,启动入站处理。从内部的Java NIO Channel通道读取数据,启动内部的Pipeline流水线,开启数据读取的入站处理
- write:启动出站流水处理,将处理后的最终数据写到底层Java NIO通道
- flush:将缓冲区中的数据立即写出到对端
3. Handler
NioEventLoop中的Selector会轮询监听注册在上面的通道,是否有connect、accept、read和write中某一种IO事件的发生。当监听到相应的IO事件后,需要将其分发给自己的Handler进行处理。Netty中整个IO处理操作的流程为:从通道读数据包、数据包解码、业务处理、目标数据编码、结果写入通道、发送给对端。
从上图可以看出,入站操作是自底向上的,由Netty内部到入站处理器,而出站操作则是相反的。入站处理器负责解码和业务处理操作,出站处理器负责数据编码和通道的写入操作,数据的读取和发送是由Netty底层实现的。
Netty中的Handler由Channelhandler接口定义,具体的实现形式有:
- 入站处理器:ChannelInboundHandler,默认实现是ChannelInboundHandlerAdapter
- 出站处理器:ChannelOutboundHandler,默认实现是ChannelOutboundHandlerAdapter
例如,ChannelInboundHandler接口的定义如下:
public interface ChannelInboundHandler extends ChannelHandler {
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
void channelActive(ChannelHandlerContext ctx) throws Exception;
void channelInactive(ChannelHandlerContext ctx) throws Exception;
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
接口中定义了一系列有关操作的方法,接口常用的实现类就是ChannelInboundHandlerAdapter。详细的方法介绍可以查阅Class ChannelInboundHandlerAdapter。
相应的出站处理器接口中定义定义如下:
public interface ChannelOutboundHandler extends ChannelHandler {
// 绑定监听地址
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
// 连接服务端
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
// 断连
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
// 主动关闭通道
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
// 从底层读数据
void read(ChannelHandlerContext ctx) throws Exception;
// 写数据到底层
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
// 将底层缓存区的数据腾空,立即写出到对端
void flush(ChannelHandlerContext ctx) throws Exception;
}
通过继承ChannelInboundHandlerAdapter和ChannelInboundHandlerAdapter并重写其中相关的方法,我们就可以实现ChannelHandler的自定义。但是,ChannelHandler定义完毕之后,还需要将其和Channel对应的Pipeline进行关联,而关联操作需要ChannelInitializer完成。ChannelInitializer中定义了一个抽象方法initChannel来负责ChannelHandler的装配。
@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);
private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(
new ConcurrentHashMap<ChannelHandlerContext, Boolean>());
protected abstract void initChannel(C ch) throws Exception;
// ...
}
当需要装配ChannelHandler时,只需要通道调用initChannel方法,将ChannelHandler转配到对应的Pipeline上即可。例如:
b.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyDiscardHandler()); // 这里添加的Handler需要自行编码实现
}
});
4. ChannelPipeline
上面介绍了Channel、ChannelHandler和Reactor这些Netty中的核心要素,而将这些要素关联起来就需要另外一个部分:ChannelPipeline。每一个通道对应的IO事件会有很多的ChannelHandler进行处理,Netty将这些相关的ChannelHandler组织成双向链表的形式,对应的结构就是ChannelPipeline。
ChannelPipeline会管理通道相关的所有ChannelHandler,包括入站处理器和出站处理器,它们组成的结构如下所示:
入站操作的方向和出站操作的方向是相反的,而且,入站操作只会且只能从Inbound入站处理器类型的Handler流过,出站操作只会且只能从Outbound出站处理器类型的Handler流过。
ChannelPipeline的双向链表的节点类型定义为ChannelHandlerContext,它代表了具体的ChannelHandler和所在的ChannelPipeline之间的关联。它们俩和Channel三者之间的关系可以描述为:Channel拥有一条ChannelPipeline,每一个ChannelPipeline节点为一个ChannelHandlerContext对象,每一个ChannelHandlerContext对象中包含一个ChannelHandler。
**
ChannelHandlerContext的定义如下:
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
Channel channel();
EventExecutor executor();
String name();
ChannelHandler handler();
boolean isRemoved();
@Override
ChannelHandlerContext fireChannelRegistered();
@Override
ChannelHandlerContext fireChannelUnregistered();
@Override
ChannelHandlerContext fireChannelActive();
@Override
ChannelHandlerContext fireChannelInactive();
@Override
ChannelHandlerContext fireExceptionCaught(Throwable cause);
@Override
ChannelHandlerContext fireUserEventTriggered(Object evt);
@Override
ChannelHandlerContext fireChannelRead(Object msg);
@Override
ChannelHandlerContext fireChannelReadComplete();
@Override
ChannelHandlerContext fireChannelWritabilityChanged();
@Override
ChannelHandlerContext read();
@Override
ChannelHandlerContext flush();
ChannelPipeline pipeline();
ByteBufAllocator alloc();
@Deprecated
@Override
<T> Attribute<T> attr(AttributeKey<T> key);
@Deprecated
@Override
<T> boolean hasAttr(AttributeKey<T> key);
}
接口中定义的方法分为两类:
- 获取上下文所关联的Netty组件实例
- 入站和出站处理方法
**
如果通过Channel或是ChannelPipeline实例调用入站和出站的处理方法,它们会在整条Pipeline中传播;如果通过ChannelHandlerContext调用入站和出站处理方法,它们只会从当前的节点开始执行并传播到相同类型的下一个节点。
ChannelPipeline对于入站操作来说,如果想要在执行的某个点截断,那么通常有两种方式:
- 不调用super.channelXxx
- 不调用ctx.fireChannelXxx
其中xxx表示相应的操作,如read、write等。例如:
/**
* @Author dyliang
* @Date 2020/11/10 10:46
* @Version 1.0
*/
public class InPipeline {
static class SimpleInHandlerA extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("入站处理器 A: 被回调 ");
super.channelRead(ctx, msg);
}
}
static class SimpleInHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("入站处理器 B: 被回调 ");
super.channelRead(ctx, msg);
}
}
static class SimpleInHandlerC extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("入站处理器 C: 被回调 ");
super.channelRead(ctx, msg);
}
}
//测试流水线的截断
@Test
public void testPipelineCutting() {
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
protected void initChannel(EmbeddedChannel ch) {
ch.pipeline().addLast(new SimpleInHandlerA());
ch.pipeline().addLast(new SimpleInHandlerB2());
ch.pipeline().addLast(new SimpleInHandlerC());
}
};
EmbeddedChannel channel = new EmbeddedChannel(i);
ByteBuf buf = Unpooled.buffer();
buf.writeInt(1);
//向通道写一个入站报文
channel.writeInbound(buf);
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
正常执行测试方法,控制台输出:
入站处理器 A: 被回调
入站处理器 B: 被回调
入站处理器 C: 被回调
如果将上面的SimpleInHandlerB的super.channelRead(ctx, msg)注释掉。执行单元测试,控制台输出:
入站处理器 A: 被回调
入站处理器 B: 被回调
第二种方式为:
static class SimpleInHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("入站处理器 B: 被回调 ");
ctx.fireChannelRead(msg);
}
}
除了可以在截断Pipeline的操作外,Netty还支持对于ChannelHandler的热插拔,即可以动态的增加和删除ChannelPipeline上的ChannelHandler。
5. Bootstrap启动类
Boostrap启动类提供一种简便的方式将上述的核心组件都组装起来,完成Netty程序的初始化。客户端使用Boostrap,服务端使用ServerBoostrap。例如:
ServerBootstrap b = new ServerBootstrap();
b.group(bossLoopGroup, workerLoopGroup);
b.channel(NioServerSocketChannel.class);
b.localAddress(serverPort);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyDiscardHandler());
}
});
ChannelFuture channelFuture = b.bind().sync();
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
closeFuture.sync();
workerLoopGroup.shutdownGracefully();
bossLoopGroup.shutdownGracefully();
Netty中将处理连接的监听和建立的NioServerSocketChannel和对应IO事件的NioSocketChannel称为父通道和子通道。
那么Boostrap类是如何启动的呢?即如何使用Boostrap来实现组件的组装和服务器及客户端的启动呢?以上面的代码为例进行说明。
首先,要想使用Boostrap必须先实例化一个Boostrap对象:
ServerBootstrap b = new ServerBootstrap();
创建Reactor线程组,推荐使用BoosLoopGroup和WorkerLoopGroup配套的形式,调用group方法给Boostrap配置线程组:
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
b.group(bossLoopGroup, workerLoopGroup);
设置通道类型、监听端口:
b.channel(NioServerSocketChannel.class);
b.localAddress(serverPort);
配置传输通道的配置选项,例如开启TCP底层的心跳机制:
b.option(ChannelOption.SO_KEEPALIVE, true);
ChannelOption中定义了许多的配置项,例如:
public static final ChannelOption<Boolean> AUTO_CLOSE = valueOf("AUTO_CLOSE");
public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST");
public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");
public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF");
public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");
public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR");
public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER");
public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG");
public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT");
更多可见ChannelOption源码。
组装好了Channel和Reactor后,还需要组装Pipeline。由于父通道(NioServerSocketChannel)业务固定,通常无需单独配置,这里只需要组装子通道的Pipeline即可。
b.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyDiscardHandler());
}
});
接着需要绑定服务器新连接的监听端口,由于Netty中任何操作都是异步的,这里返回的是异步回调的ChannelFuture。不过没有对ChannelFuture做其他处理,而是阻塞直到绑定成功。
ChannelFuture channelFuture = b.bind().sync();
绑定成功后,自我阻塞直到通道关闭。
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
closeFuture.sync();
最后关闭EnentLoopGroup,释放资源。
workerLoopGroup.shutdownGracefully();
bossLoopGroup.shutdownGracefully();