1、源码编译

源码编译环境java环境+Maven环境+Jdk8

clone地址https://github.com/netty/netty
选择tag中4.1.X版本任意一个
Netty源码分析 - 图1

修改编译版本1.7为1.8

  1. /修改前(将下面的1.6修改为1.8):
  2. <maven.compiler.source>1.7</maven.compiler.source>
  3. <maven.compiler.target>1.7</maven.compiler.target>
  4. //修改后:
  5. <maven.compiler.source>1.8</maven.compiler.source>
  6. <maven.compiler.target>1.8</maven.compiler.target>

编译过程中maven-checkstyle-plugin会报格式检查错误

编译命令中可以加入跳过检查

  1. mvn clean install -DskipTests=true -Dcheckstyle.skip=true

或者直接在pom.xml加入

  1. <skip>true</skip>

image.png
执行命令: mvn clean install -DskipTests=true
image.png

2、EventLoopGroup事件循环组(线程组)分析

EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般会有多个 EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。
线程组创建流程源码分析
image.png

(1) NioEventLoopGroup线程组的创建
_EventLoopGroup _bossGroup = _new _NioEventLoopGroup(1); 创建NioEventLoopGroup实例时会调用到父类MultithreadEventLoopGroup中静态代码块,此处获取netty设置的默认线程数
image.png
在构造函数中判断传递的自定义线程数和默认线程数的值
image.png
=>调用父类构造方法:io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object…)
image.png
=>进入newChild方法:
image.png
此处直接创建NioEventLoop实例,跟踪进入构造方法
image.png
在NioEventLoop构造方法中主要进行了任务队列和选择器的创建。
首先看任务队列创建: 以int的最大值为限直接创建了queue.
image.png
然后根据进入openSelector(),此处provider是WindowsSelectorProvider,所以openSelector最终直接返回一个WindowsSelectorImpl
image.pngimage.png
然后将WindowsSelectorImpl包装成Selector进行返回。 至此创建单个EventLoop实例完成
后续根据线程数依次创建多个EventLoop。
创建完成后 可以看出WorkerGroup有8个NioEventLoop, 每个NioEventLoop中分别有Selector和TaskQueue
image.png

3、Netty启动源码分析

image.png
上图为Netty启动的主要流程 其中重点在于eventLoop.execute()方法,此方法的作用是往taskqueue中加入待执行的线程,(此处流程为服务端启动)
netty启动时会首先往taskqueue中加入两个线程:

  • register(channel)方法调用eventLoop.execute(register线程) 向teskqueue加入注册线程。此线程是将服务端通道注册到selector用于监听事件,然后调用通道初始化方法
  • 通道初始化方法被调用时。调用eventLoop.execute(ServerBootstrapAccetpor)向taskqueue加入任务,此任务执行时将handler加入pipiline中
  • image.png

源码跟踪分析:
示例程序代码:

  1. package io.netty.example.demo;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.buffer.ByteBuf;
  4. import io.netty.buffer.Unpooled;
  5. import io.netty.channel.*;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioServerSocketChannel;
  9. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  10. import java.nio.charset.StandardCharsets;
  11. /**
  12. * Netty服务端
  13. */
  14. public class NettyServer {
  15. public static void main(String[] args) throws InterruptedException {
  16. //1.创建bossGroup线程组: 处理网络事件--连接事件 线程数默认为: 2 * 处理器线程数
  17. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  18. //2.创建workerGroup线程组: 处理网络事件--读写事件 2 * 处理器线程数
  19. EventLoopGroup workerGroup = new NioEventLoopGroup();
  20. //3.创建服务端启动助手
  21. ServerBootstrap bootstrap = new ServerBootstrap();
  22. //4.设置bossGroup线程组和workerGroup线程组
  23. bootstrap.group(bossGroup, workerGroup)
  24. .channel(NioServerSocketChannel.class)//5.设置服务端通道实现
  25. .option(ChannelOption.SO_BACKLOG, 128)//6.参数设置-设置线程队列中等待连接个数
  26. .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)//6.参数设置-设置活跃状态,child是设置workerGroup
  27. .childHandler(new ChannelInitializer<SocketChannel>() {//7.创建一个通道初始化对象 此处是workerGroup的childHandler, 并不是上图中的服务端通道初始化对象
  28. @Override
  29. protected void initChannel(SocketChannel ch) throws Exception {
  30. ch.pipeline().addLast(new NettyServerOutHandle());
  31. //8.向pipeline中添加自定义业务处理handler
  32. ch.pipeline().addLast(new NettyServerHandle());
  33. }
  34. });
  35. //9.启动服务端并绑定端口,同时将异步改为同步
  36. ChannelFuture future = bootstrap.bind(9999);
  37. future.addListener(new ChannelFutureListener() {
  38. @Override
  39. public void operationComplete(ChannelFuture future) throws Exception {
  40. if (future.isSuccess()) {
  41. System.out.println("端口绑定成功!");
  42. } else {
  43. System.out.println("端口绑定失败!");
  44. }
  45. }
  46. });
  47. System.out.println("服务器启动成功....");
  48. //10.关闭通道(并不是真正意义上的关闭,而是监听通道关闭状态)和关闭连接池
  49. future.channel().closeFuture().sync();
  50. bossGroup.shutdownGracefully();
  51. workerGroup.shutdownGracefully();
  52. }
  53. }

跟踪bootstrap.bind(9999)进入源码,进入到方法doBind()中

  1. private ChannelFuture doBind(final SocketAddress localAddress) {
  2. //初始化和注册
  3. final ChannelFuture regFuture = initAndRegister();
  4. final Channel channel = regFuture.channel();
  5. if (regFuture.cause() != null) {
  6. return regFuture;
  7. }
  8. if (regFuture.isDone()) {
  9. // At this point we know that the registration was complete and successful.
  10. ChannelPromise promise = channel.newPromise();
  11. doBind0(regFuture, channel, localAddress, promise);
  12. return promise;
  13. } else {
  14. // Registration future is almost always fulfilled already, but just in case it's not.
  15. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
  16. regFuture.addListener(new ChannelFutureListener() {
  17. @Override
  18. public void operationComplete(ChannelFuture future) throws Exception {
  19. Throwable cause = future.cause();
  20. if (cause != null) {
  21. // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
  22. // IllegalStateException once we try to access the EventLoop of the Channel.
  23. promise.setFailure(cause);
  24. } else {
  25. // Registration was successful, so set the correct executor to use.
  26. // See https://github.com/netty/netty/issues/2586
  27. promise.registered();
  28. doBind0(regFuture, channel, localAddress, promise);
  29. }
  30. }
  31. });
  32. return promise;
  33. }
  34. }

doBind()中首先调用了initAndRegister创建了一个通道:由于我们设置了通道类型为NioServerSocketChannel。所有此处通过通道工厂生产NioServerSocketChannel类型通道对象
image.png
通道对象生成之后接着执行init方法:

  1. @Override
  2. void init(Channel channel) {
  3. setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
  4. setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));
  5. //得到通道pipeline
  6. ChannelPipeline p = channel.pipeline();
  7. //赋值workGroup与服务端handler
  8. final EventLoopGroup currentChildGroup = childGroup; //workGroup
  9. final ChannelHandler currentChildHandler = childHandler; //示例代码中我们自定义的handler
  10. final Entry<ChannelOption<?>, Object>[] currentChildOptions =
  11. childOptions.entrySet().toArray(newOptionArray(0));
  12. final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
  13. //添加通道初始化handler
  14. p.addLast(new ChannelInitializer<Channel>() {
  15. @Override
  16. public void initChannel(final Channel ch) {
  17. final ChannelPipeline pipeline = ch.pipeline();
  18. ChannelHandler handler = config.handler();
  19. if (handler != null) {
  20. pipeline.addLast(handler);
  21. }
  22. ch.eventLoop().execute(new Runnable() {
  23. @Override
  24. public void run() {
  25. //在initChannel方法中添加ServerBootstrapAcceptor的handler
  26. pipeline.addLast(new ServerBootstrapAcceptor(
  27. ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
  28. }
  29. });
  30. }
  31. });
  32. }

image.png
通道初始化方法执行完成之后init=>注册通道io.netty.channel.AbstractChannel.AbstractUnsafe#register
image.png
跟踪进入注册方法发现它内部使用eventLoop.execute()方法,此方法参数为一个线程
image.png
继续跟踪进入eventLoop.execute()方法,在判断当前线程为非eventLoop线程时,启动新的线程执行后续操作
image.png
跟踪startThread看看内部执行了什么操作: 使用线程池执行一个线程任务,此线程内部执行SingleThreadEventExecutor.this.run()方法
image.png
跟踪进入SingleThreadEventExecutor.this.run()方法:它最终进入io.netty.channel.nio.NioEventLoop#run()

  1. @Override
  2. protected void run() {
  3. for (;;) {
  4. try {
  5. try {
  6. switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
  7. case SelectStrategy.CONTINUE:
  8. continue;
  9. case SelectStrategy.BUSY_WAIT:
  10. // fall-through to SELECT since the busy-wait is not supported with NIO
  11. case SelectStrategy.SELECT:
  12. select(wakenUp.getAndSet(false));
  13. // 'wakenUp.compareAndSet(false, true)' is always evaluated
  14. // before calling 'selector.wakeup()' to reduce the wake-up
  15. // overhead. (Selector.wakeup() is an expensive operation.)
  16. //
  17. // However, there is a race condition in this approach.
  18. // The race condition is triggered when 'wakenUp' is set to
  19. // true too early.
  20. //
  21. // 'wakenUp' is set to true too early if:
  22. // 1) Selector is waken up between 'wakenUp.set(false)' and
  23. // 'selector.select(...)'. (BAD)
  24. // 2) Selector is waken up between 'selector.select(...)' and
  25. // 'if (wakenUp.get()) { ... }'. (OK)
  26. //
  27. // In the first case, 'wakenUp' is set to true and the
  28. // following 'selector.select(...)' will wake up immediately.
  29. // Until 'wakenUp' is set to false again in the next round,
  30. // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
  31. // any attempt to wake up the Selector will fail, too, causing
  32. // the following 'selector.select(...)' call to block
  33. // unnecessarily.
  34. //
  35. // To fix this problem, we wake up the selector again if wakenUp
  36. // is true immediately after selector.select(...).
  37. // It is inefficient in that it wakes up the selector for both
  38. // the first case (BAD - wake-up required) and the second case
  39. // (OK - no wake-up required).
  40. if (wakenUp.get()) {
  41. selector.wakeup();
  42. }
  43. // fall through
  44. default:
  45. }
  46. } catch (IOException e) {
  47. // If we receive an IOException here its because the Selector is messed up. Let's rebuild
  48. // the selector and retry. https://github.com/netty/netty/issues/8566
  49. rebuildSelector0();
  50. handleLoopException(e);
  51. continue;
  52. }
  53. cancelledKeys = 0;
  54. needsToSelectAgain = false;
  55. final int ioRatio = this.ioRatio;
  56. if (ioRatio == 100) {
  57. try {
  58. processSelectedKeys();
  59. } finally {
  60. // Ensure we always run tasks.
  61. runAllTasks();
  62. }
  63. } else {
  64. final long ioStartTime = System.nanoTime();
  65. try {
  66. //处理SelectedKey
  67. processSelectedKeys();
  68. } finally {
  69. // Ensure we always run tasks.
  70. final long ioTime = System.nanoTime() - ioStartTime;
  71. runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
  72. }
  73. }
  74. } catch (Throwable t) {
  75. handleLoopException(t);
  76. }
  77. // Always handle shutdown even if the loop processing threw an exception.
  78. try {
  79. if (isShuttingDown()) {
  80. closeAll();
  81. if (confirmShutdown()) {
  82. return;
  83. }
  84. }
  85. } catch (Throwable t) {
  86. handleLoopException(t);
  87. }
  88. }
  89. }

根据以上源码可以看出run方法内执行的就是下图的死循环
image.png
继续跟踪方法。发现会去执行taskqueue中任务
image.png
跟入runAllTasks方法,循环执行task任务
———————————————————————————————————————————
image.png
pollTask: 从taskqueue中取任务执行
image.png
当前taskqueue只有一个待执行的注册任务。

因此当前第一次循环方法开始执行如下方法
注册线程,此线程在之前被方法taskqueue中
image.png
image.png
进入doRegister()方法。此方法注册通道到selector后检测所有事件
image.png
此处javaChannel()获取通道对象,跟踪进入发现它就是ServerSocketChannel对象。
image.png
获取到ServerSocketChannel后将通道注册到Selector,并指定要监听的Accept事件。
通道注册完成之后,调用pipeline中的通道初始化对象方法。
image.png

invokeHandlerAddedIfNeeded—>initChannel方法 io.netty.channel.ChannelInitializer#handlerAdded
image.png
跟踪initChannel方法 发现它回到了通道初始化方法中,调用ChannelInitializer中的initChannel方法
image.png
image.png
再次进入eventLoop().execute()方法, 此时是从eventLoop中调用的
image.png
image.png
由于inEventLoop为true, 此时注册事件已经走完,

回到runAllTasks循环中,

image.png
注册任务执行完成之后,taskqueue中新加入了一个ServerBootstrap任务,下一次循环开始执行跟踪代码 进入如下方法,初始化器中的通道初始化方法
image.png
此处向pipeline中添加了一个ServerBootstrapAcceptor,它是一个消息入站的handler
image.png
消息入站handler 当有消息到达时,会执行channelRead方法,将读取逻辑发入workerGroup中执行 逻辑如下:
image.png
此处对应线程模型中如下逻辑:
image.png
———————————-至此runAllTasks方法执行完毕——————————

方法回到io.netty.channel.nio.NioEventLoop#run 此处是死循环,此时通道已经注册到了selecetor, 因此下一次循环进入SELECT分支
image.png
image.png

当服务端有客户端连接时会有Accept事件,如果没有就一直循环检测。

到此Netty启动流程结束。

4、BossGroup/WorkGroup/消息入站源码分析

BossGroup主要负责监听. workGroup负责消息处理. 主要看下BossGroup如何将通道交给workGroup的,和如何处理消息读取的.即入站
流程分析:
image.png
客户端调试示例代码:

  1. /**
  2. * Netty客户端
  3. */
  4. public class NettyClient {
  5. public static void main(String[] args) throws InterruptedException {
  6. //1. 创建线程组
  7. EventLoopGroup group = new NioEventLoopGroup();
  8. //2. 创建客户端启动助手
  9. Bootstrap bootstrap = new Bootstrap();
  10. //3. 设置线程组
  11. bootstrap.group(group)
  12. .channel(NioSocketChannel.class)//4. 设置服务端通道实现为NIO
  13. .handler(new ChannelInitializer<SocketChannel>() { //5. 创建一个通道初始化对象
  14. @Override
  15. protected void initChannel(SocketChannel ch) throws Exception {
  16. //6. 向pipeline中添加自定义业务处理handler
  17. ch.pipeline().addLast(new NettyClientHandle());
  18. }
  19. });
  20. //7. 启动客户端, 等待连接服务端, 同时将异步改为同步
  21. ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
  22. //8. 关闭通道和关闭连接池
  23. future.channel().closeFuture().sync();
  24. group.shutdownGracefully();
  25. }
  26. }
  27. package io.netty.example.demo;
  28. import io.netty.buffer.ByteBuf;
  29. import io.netty.buffer.Unpooled;
  30. import io.netty.channel.ChannelFuture;
  31. import io.netty.channel.ChannelFutureListener;
  32. import io.netty.channel.ChannelHandlerContext;
  33. import io.netty.channel.ChannelInboundHandler;
  34. import io.netty.util.CharsetUtil;
  35. /**
  36. * 客户端处理类
  37. */
  38. public class NettyClientHandle implements ChannelInboundHandler {
  39. /**
  40. * 通道就绪事件
  41. *
  42. * @param ctx
  43. * @throws Exception
  44. */
  45. @Override
  46. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  47. ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端", CharsetUtil.UTF_8));
  48. channelFuture.addListener(new ChannelFutureListener() {
  49. @Override
  50. public void operationComplete(ChannelFuture future) throws Exception {
  51. if (future.isSuccess()) {
  52. System.out.println("数据发送成功.");
  53. } else {
  54. System.out.println("数据发送失败.");
  55. }
  56. }
  57. });
  58. }
  59. //客户端读取事件
  60. @Override
  61. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  62. ByteBuf byteBuf = (ByteBuf) msg;
  63. System.out.println("服务端发来消息:" + byteBuf.toString(CharsetUtil.UTF_8));
  64. }
  65. @Override
  66. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  67. cause.printStackTrace();
  68. }
  69. @Override
  70. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  71. }
  72. @Override
  73. public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
  74. }
  75. @Override
  76. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  77. }
  78. @Override
  79. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  80. }
  81. @Override
  82. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  83. }
  84. @Override
  85. public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
  86. }
  87. @Override
  88. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  89. }
  90. @Override
  91. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  92. }
  93. }

服务端已经启动,在上小节run方法中打上断点 等待客户端连接,此时selectedKeys.size=0
image.png
启动客户端: 此时已经有值 连接事件
image.png
进入循环 开始处理key
image.png
跟踪进入io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel) 此处判断它是一个连接事件
image.png
=>io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
=>doread方法中 生成NIOSocketChannel

image.png
判断通道中是否有值,此处是有值然后发布读取事件,此处的消息即为客户端连接的相关信息
image.png
将消息进行入站操作=>io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)
image.png
=>跟踪方法发现进入
image.png
image.png
跟踪进入invokeChannelRead方法
image.png
=>io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
image.png
继续执行

image.png

此处register方法与上节中BossGroup中启动流程执行注册方法是一致,也一样往taskqueue中添加任务,后续执行逻辑。

当有消息到达时,run方法中死循环里检测到selectionKey有读取事件 此时可以看出是workGroup在处理
image.png
此时判断为读取事件

image.png
又进入read方法=> io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read 进入的是AbstractNioByteChannel类
image.png
此处可以看出 服务端注册的处理入站出站的handler已经在pipeline中了,接下来进行数据读取操作,读取到数据后发布读取事件
image.png

=>跟踪进入fireChannelRead方法
image.png
image.png
最终channelRead方法执行会进入invokeChannelRead()。在此可以看出next为我们指定要的nettyServerHandle
image.png
image.png
=>到此调取到了我们自定义handel中的方法
image.png
服务端打印客户端发来的消息
image.png

消息入站流程至此结束

5、消息出站源码分析

示例代码 服务端读取完消息之后向客户端发送消息 回走出站方法
image.png

当读取了客户端消息之后 通道读取事件执行完毕
image.png
继续执行 回到io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read中
image.png
发布通道读取完毕事件
image.png
=>最终会调取到自定义NettyServerHandle中channelReadComplete的方法
image.png
继续跟踪io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)
image.png
调用写入方法 最终调用到我们自定义的出站操作
image.png
在出站时调用super.write()方法,往前查找pipeline中上一个节点的write方法 又进入io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)方法查找出站handle 直到pipeline头部
image.png
在所有write方法执行完成之后 进行刷新flush0方法
image.png
又进入我们自定义的NettyServerOutHandle类中 此时调用的是flush方法
image.png
跟踪super.flush()方法
此处获取了客户端连接的通道
image.png
真正开始执行写入
image.png
image.png
此时客户端收到消息
image.png
至此 出站流程结束