1、源码编译
源码编译环境java环境+Maven环境+Jdk8
clone地址https://github.com/netty/netty
选择tag中4.1.X版本任意一个
修改编译版本1.7为1.8
/修改前(将下面的1.6修改为1.8):
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
//修改后:
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
编译过程中maven-checkstyle-plugin会报格式检查错误
编译命令中可以加入跳过检查
mvn clean install -DskipTests=true -Dcheckstyle.skip=true
或者直接在pom.xml加入
<skip>true</skip>
执行命令: mvn clean install -DskipTests=true
2、EventLoopGroup事件循环组(线程组)分析
EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般会有多个 EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。
线程组创建流程源码分析
(1) NioEventLoopGroup线程组的创建
_EventLoopGroup _bossGroup = _new _NioEventLoopGroup(1); 创建NioEventLoopGroup实例时会调用到父类MultithreadEventLoopGroup中静态代码块,此处获取netty设置的默认线程数
在构造函数中判断传递的自定义线程数和默认线程数的值
=>调用父类构造方法:io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object…)
=>进入newChild方法:
此处直接创建NioEventLoop实例,跟踪进入构造方法
在NioEventLoop构造方法中主要进行了任务队列和选择器的创建。
首先看任务队列创建: 以int的最大值为限直接创建了queue.
然后根据进入openSelector(),此处provider是WindowsSelectorProvider,所以openSelector最终直接返回一个WindowsSelectorImpl
然后将WindowsSelectorImpl包装成Selector进行返回。 至此创建单个EventLoop实例完成
后续根据线程数依次创建多个EventLoop。
创建完成后 可以看出WorkerGroup有8个NioEventLoop, 每个NioEventLoop中分别有Selector和TaskQueue
3、Netty启动源码分析
上图为Netty启动的主要流程 其中重点在于eventLoop.execute()方法,此方法的作用是往taskqueue中加入待执行的线程,(此处流程为服务端启动)
netty启动时会首先往taskqueue中加入两个线程:
- register(channel)方法调用eventLoop.execute(register线程) 向teskqueue加入注册线程。此线程是将服务端通道注册到selector用于监听事件,然后调用通道初始化方法
- 通道初始化方法被调用时。调用eventLoop.execute(ServerBootstrapAccetpor)向taskqueue加入任务,此任务执行时将handler加入pipiline中
源码跟踪分析:
示例程序代码:
package io.netty.example.demo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import java.nio.charset.StandardCharsets;
/**
* Netty服务端
*/
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//1.创建bossGroup线程组: 处理网络事件--连接事件 线程数默认为: 2 * 处理器线程数
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//2.创建workerGroup线程组: 处理网络事件--读写事件 2 * 处理器线程数
EventLoopGroup workerGroup = new NioEventLoopGroup();
//3.创建服务端启动助手
ServerBootstrap bootstrap = new ServerBootstrap();
//4.设置bossGroup线程组和workerGroup线程组
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)//5.设置服务端通道实现
.option(ChannelOption.SO_BACKLOG, 128)//6.参数设置-设置线程队列中等待连接个数
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)//6.参数设置-设置活跃状态,child是设置workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() {//7.创建一个通道初始化对象 此处是workerGroup的childHandler, 并不是上图中的服务端通道初始化对象
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyServerOutHandle());
//8.向pipeline中添加自定义业务处理handler
ch.pipeline().addLast(new NettyServerHandle());
}
});
//9.启动服务端并绑定端口,同时将异步改为同步
ChannelFuture future = bootstrap.bind(9999);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("端口绑定成功!");
} else {
System.out.println("端口绑定失败!");
}
}
});
System.out.println("服务器启动成功....");
//10.关闭通道(并不是真正意义上的关闭,而是监听通道关闭状态)和关闭连接池
future.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
跟踪bootstrap.bind(9999)进入源码,进入到方法doBind()中
private ChannelFuture doBind(final SocketAddress localAddress) {
//初始化和注册
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
doBind()中首先调用了initAndRegister创建了一个通道:由于我们设置了通道类型为NioServerSocketChannel。所有此处通过通道工厂生产NioServerSocketChannel类型通道对象
通道对象生成之后接着执行init方法:
@Override
void init(Channel channel) {
setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));
//得到通道pipeline
ChannelPipeline p = channel.pipeline();
//赋值workGroup与服务端handler
final EventLoopGroup currentChildGroup = childGroup; //workGroup
final ChannelHandler currentChildHandler = childHandler; //示例代码中我们自定义的handler
final Entry<ChannelOption<?>, Object>[] currentChildOptions =
childOptions.entrySet().toArray(newOptionArray(0));
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
//添加通道初始化handler
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//在initChannel方法中添加ServerBootstrapAcceptor的handler
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
通道初始化方法执行完成之后init=>注册通道io.netty.channel.AbstractChannel.AbstractUnsafe#register
跟踪进入注册方法发现它内部使用eventLoop.execute()方法,此方法参数为一个线程
继续跟踪进入eventLoop.execute()方法,在判断当前线程为非eventLoop线程时,启动新的线程执行后续操作
跟踪startThread看看内部执行了什么操作: 使用线程池执行一个线程任务,此线程内部执行SingleThreadEventExecutor.this.run()方法
跟踪进入SingleThreadEventExecutor.this.run()方法:它最终进入io.netty.channel.nio.NioEventLoop#run()
@Override
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
//处理SelectedKey
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
根据以上源码可以看出run方法内执行的就是下图的死循环
继续跟踪方法。发现会去执行taskqueue中任务
跟入runAllTasks方法,循环执行task任务
———————————————————————————————————————————
pollTask: 从taskqueue中取任务执行
当前taskqueue只有一个待执行的注册任务。
因此当前第一次循环方法开始执行如下方法
注册线程,此线程在之前被方法taskqueue中
进入doRegister()方法。此方法注册通道到selector后检测所有事件
此处javaChannel()获取通道对象,跟踪进入发现它就是ServerSocketChannel对象。
获取到ServerSocketChannel后将通道注册到Selector,并指定要监听的Accept事件。
通道注册完成之后,调用pipeline中的通道初始化对象方法。
invokeHandlerAddedIfNeeded—>initChannel方法 io.netty.channel.ChannelInitializer#handlerAdded
跟踪initChannel方法 发现它回到了通道初始化方法中,调用ChannelInitializer中的initChannel方法
再次进入eventLoop().execute()方法, 此时是从eventLoop中调用的
由于inEventLoop为true, 此时注册事件已经走完,
回到runAllTasks循环中,
注册任务执行完成之后,taskqueue中新加入了一个ServerBootstrap任务,下一次循环开始执行跟踪代码 进入如下方法,初始化器中的通道初始化方法
此处向pipeline中添加了一个ServerBootstrapAcceptor,它是一个消息入站的handler
消息入站handler 当有消息到达时,会执行channelRead方法,将读取逻辑发入workerGroup中执行 逻辑如下:
此处对应线程模型中如下逻辑:
———————————-至此runAllTasks方法执行完毕——————————
方法回到io.netty.channel.nio.NioEventLoop#run 此处是死循环,此时通道已经注册到了selecetor, 因此下一次循环进入SELECT分支
当服务端有客户端连接时会有Accept事件,如果没有就一直循环检测。
到此Netty启动流程结束。
4、BossGroup/WorkGroup/消息入站源码分析
BossGroup主要负责监听. workGroup负责消息处理. 主要看下BossGroup如何将通道交给workGroup的,和如何处理消息读取的.即入站
流程分析:
客户端调试示例代码:
/**
* Netty客户端
*/
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//1. 创建线程组
EventLoopGroup group = new NioEventLoopGroup();
//2. 创建客户端启动助手
Bootstrap bootstrap = new Bootstrap();
//3. 设置线程组
bootstrap.group(group)
.channel(NioSocketChannel.class)//4. 设置服务端通道实现为NIO
.handler(new ChannelInitializer<SocketChannel>() { //5. 创建一个通道初始化对象
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//6. 向pipeline中添加自定义业务处理handler
ch.pipeline().addLast(new NettyClientHandle());
}
});
//7. 启动客户端, 等待连接服务端, 同时将异步改为同步
ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
//8. 关闭通道和关闭连接池
future.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
package io.netty.example.demo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* 客户端处理类
*/
public class NettyClientHandle implements ChannelInboundHandler {
/**
* 通道就绪事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端", CharsetUtil.UTF_8));
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("数据发送成功.");
} else {
System.out.println("数据发送失败.");
}
}
});
}
//客户端读取事件
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("服务端发来消息:" + byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
}
服务端已经启动,在上小节run方法中打上断点 等待客户端连接,此时selectedKeys.size=0
启动客户端: 此时已经有值 连接事件
进入循环 开始处理key
跟踪进入io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel) 此处判断它是一个连接事件
=>io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
=>doread方法中 生成NIOSocketChannel
判断通道中是否有值,此处是有值然后发布读取事件,此处的消息即为客户端连接的相关信息
将消息进行入站操作=>io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)
=>跟踪方法发现进入
跟踪进入invokeChannelRead方法
=>io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
继续执行
此处register方法与上节中BossGroup中启动流程执行注册方法是一致,也一样往taskqueue中添加任务,后续执行逻辑。
当有消息到达时,run方法中死循环里检测到selectionKey有读取事件 此时可以看出是workGroup在处理
此时判断为读取事件
又进入read方法=> io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read 进入的是AbstractNioByteChannel类
此处可以看出 服务端注册的处理入站出站的handler已经在pipeline中了,接下来进行数据读取操作,读取到数据后发布读取事件
=>跟踪进入fireChannelRead方法
最终channelRead方法执行会进入invokeChannelRead()。在此可以看出next为我们指定要的nettyServerHandle
=>到此调取到了我们自定义handel中的方法
服务端打印客户端发来的消息
消息入站流程至此结束
5、消息出站源码分析
示例代码 服务端读取完消息之后向客户端发送消息 回走出站方法
当读取了客户端消息之后 通道读取事件执行完毕
继续执行 回到io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read中
发布通道读取完毕事件
=>最终会调取到自定义NettyServerHandle中channelReadComplete的方法
继续跟踪io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)
调用写入方法 最终调用到我们自定义的出站操作
在出站时调用super.write()方法,往前查找pipeline中上一个节点的write方法 又进入io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)方法查找出站handle 直到pipeline头部
在所有write方法执行完成之后 进行刷新flush0方法
又进入我们自定义的NettyServerOutHandle类中 此时调用的是flush方法
跟踪super.flush()方法
此处获取了客户端连接的通道
真正开始执行写入
此时客户端收到消息
至此 出站流程结束