5.Netty核心源码剖析

5.1 Netty源码构建

  1. 下载源码
    https://github.com/netty/netty
    image.png
    2. 导入项目工程
    image.png
    3. 将入门案例demo代码example模块下
    image.png

    5.2 EventLoopGroup事件循环组(线程组)源码

    EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般会有多个 EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。

    5.2.1 线程组源码流程分析

    image.png

    5.2.2 线程组源码主要源码跟踪

  2. NioEventLoopGroup线程组的创建 ```java static { //默认线程数量为处理器数*2 DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
    1. "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)
    ); if (logger.isDebugEnabled()) {
    1. logger.debug("-Dio.netty.eventLoopThreads: {}",
    2. DEFAULT_EVENT_LOOP_THREADS);
    } } /**
  • @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object…) */ protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object… args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } ```
  1. NioEventLoop的创建
    1. protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
    2. if (nThreads <= 0) {
    3. throw new IllegalArgumentException(String.format("nThreads: %d(expected: > 0)", nThreads));
    4. }
    5. if (executor == null) {
    6. executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    7. }
    8. //根据线程数量创建
    9. children = new EventExecutor[nThreads];
    10. for (int i = 0; i < nThreads; i ++) {
    11. boolean success = false;
    12. try {
    13. //循环创建线程NioEventLoop
    14. children[i] = newChild(executor, args);
    15. success = true;
    16. } catch (Exception e) {
    17. // TODO: Think about if this is a good exception type
    18. throw new IllegalStateException("failed to create a child event loop", e);
    19. } finally {
    20. ...
    21. }
    22. }
    23. ......
    24. }
    newChild方法
    1. @Override
    2. protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    3. EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
    4. //创建NioEventLoop
    5. return new NioEventLoop(
    6. this, executor,
    7. (SelectorProvider) args[0],
    8. ((SelectStrategyFactory) args[1]).newSelectStrategy(),
    9. (RejectedExecutionHandler) args[2],
    10. queueFactory
    11. );
    12. }
    NioEventLoop
    1. NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
    2. SelectStrategy strategy,
    3. RejectedExecutionHandler rejectedExecutionHandler,
    4. EventLoopTaskQueueFactory queueFactory) {
    5. super(parent, executor, false, newTaskQueue(queueFactory),
    6. newTaskQueue(queueFactory),
    7. rejectedExecutionHandler
    8. );
    9. if (selectorProvider == null) {
    10. throw new NullPointerException("selectorProvider");
    11. }
    12. if (strategy == null) {
    13. throw new NullPointerException("selectStrategy");
    14. }
    15. provider = selectorProvider;
    16. //创建选择器
    17. final SelectorTuple selectorTuple = openSelector();
    18. selector = selectorTuple.selector;
    19. unwrappedSelector = selectorTuple.unwrappedSelector;
    20. selectStrategy = strategy;
    21. }

    5.3 Netty启动源码

    5.3.1 启动流程分析

    image.png

    5.3.2 主要源码跟踪

  2. initAndRegister方法
    1. final ChannelFuture initAndRegister() {
    2. Channel channel = null;
    3. try {
    4. //创建通道
    5. channel = channelFactory.newChannel();
    6. //通道初始化
    7. init(channel);
    8. } catch (Throwable t) {
    9. if (channel != null) {
    10. // channel can be null if newChannel crashed (eg SocketException("too many open files"))
    11. channel.unsafe().closeForcibly();
    12. // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
    13. return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    14. }
    15. // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
    16. return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    17. }
    18. //注册通道
    19. ChannelFuture regFuture = config().group().register(channel);
    20. if (regFuture.cause() != null) {
    21. if (channel.isRegistered()) {
    22. channel.close();
    23. } else {
    24. channel.unsafe().closeForcibly();
    25. }
    26. }
    27. return regFuture;
    28. }
  3. init方法
    1. void init(Channel channel) {
    2. setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
    3. setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));
    4. //得到通道pipeline
    5. ChannelPipeline p = channel.pipeline();
    6. //赋值workGroup与服务端handler
    7. final EventLoopGroup currentChildGroup = childGroup;
    8. final ChannelHandler currentChildHandler = childHandler;
    9. final Entry<ChannelOption<?>, Object>[] currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
    10. final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    11. //添加通道初始化handler
    12. p.addLast(new ChannelInitializer<Channel>() {
    13. @Override
    14. public void initChannel(final Channel ch) {
    15. final ChannelPipeline pipeline = ch.pipeline();
    16. ChannelHandler handler = config.handler();
    17. if (handler != null) {
    18. pipeline.addLast(handler);
    19. }
    20. ch.eventLoop().execute(new Runnable() {
    21. @Override
    22. public void run() {
    23. //在initChannel方法中添加ServerBootstrapAcceptor的handler
    24. pipeline.addLast(
    25. new ServerBootstrapAcceptor(
    26. ch, currentChildGroup, currentChildHandler,
    27. currentChildOptions, currentChildAttrs
    28. )
    29. );
    30. }
    31. });
    32. }
    33. });
    34. }
  4. register方法
    1. public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    2. if (eventLoop == null) {
    3. throw new NullPointerException("eventLoop");
    4. }
    5. if (isRegistered()) {
    6. promise.setFailure(new IllegalStateException("registered to an event loop already"));
    7. return;
    8. }
    9. if (!isCompatible(eventLoop)) {
    10. promise.setFailure(
    11. new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
    12. return;
    13. }
    14. AbstractChannel.this.eventLoop = eventLoop;
    15. if (eventLoop.inEventLoop()) {
    16. register0(promise);
    17. } else {
    18. try {
    19. //执行NioEventLoop
    20. eventLoop.execute(new Runnable() {
    21. @Override
    22. public void run() {
    23. //注册通道
    24. register0(promise);
    25. }
    26. });
    27. } catch (Throwable t) {
    28. logger.warn(
    29. "Force-closing a channel whose registration task was not accepted by an event loop: {}",
    30. AbstractChannel.this, t
    31. );
    32. closeForcibly();
    33. closeFuture.setClosed();
    34. safeSetFailure(promise, t);
    35. }
    36. }
    37. }
  5. execute方法
    1. public void execute(Runnable task) {
    2. if (task == null) {
    3. throw new NullPointerException("task");
    4. }
    5. boolean inEventLoop = inEventLoop();
    6. //添加到任务队列
    7. addTask(task);
    8. if (!inEventLoop) {
    9. //启动线程
    10. startThread();
    11. if (isShutdown()) {
    12. boolean reject = false;
    13. try {
    14. if (removeTask(task)) {
    15. reject = true;
    16. }
    17. } catch (UnsupportedOperationException e) {
    18. // The task queue does not support removal so the best thing we can do is to just move on and
    19. // hope we will be able to pick-up the task before its completely terminated.
    20. // In worst case we will log on termination.
    21. }
    22. if (reject) {
    23. reject();
    24. }
    25. }
    26. }
    27. if (!addTaskWakesUp && wakesUpForTask(task)) {
    28. wakeup(inEventLoop);
    29. }
    30. }
  6. startThread—>run方法
    1. protected void run() {
    2. for (;;) {
    3. try {
    4. try {
    5. switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
    6. case SelectStrategy.CONTINUE:
    7. continue;
    8. case SelectStrategy.BUSY_WAIT:
    9. // fall-through to SELECT since the busy-wait is not supported with NIO
    10. case SelectStrategy.SELECT:
    11. select(wakenUp.getAndSet(false));
    12. if (wakenUp.get()) {
    13. selector.wakeup();
    14. }
    15. // fall through
    16. default:
    17. }
    18. } catch (IOException e) {
    19. // If we receive an IOException here its because the Selector is messed up. Let's rebuild
    20. // the selector and retry. https://github.com/netty/netty/issues/8566
    21. rebuildSelector0();
    22. handleLoopException(e);
    23. continue;
    24. }
    25. cancelledKeys = 0;
    26. needsToSelectAgain = false;
    27. final int ioRatio = this.ioRatio;
    28. if (ioRatio == 100) {
    29. try {
    30. processSelectedKeys();
    31. } finally {
    32. // Ensure we always run tasks.
    33. runAllTasks();
    34. }
    35. } else {
    36. final long ioStartTime = System.nanoTime();
    37. try {
    38. //处理SelectedKey
    39. processSelectedKeys();
    40. } finally {
    41. // Ensure we always run tasks.
    42. final long ioTime = System.nanoTime() - ioStartTime;
    43. runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    44. }
    45. }
    46. } catch (Throwable t) {
    47. handleLoopException(t);
    48. }
    49. // Always handle shutdown even if the loop processing threw an exception.
    50. try {
    51. if (isShuttingDown()) {
    52. closeAll();
    53. if (confirmShutdown()) {
    54. return;
    55. }
    56. }
    57. } catch (Throwable t) {
    58. handleLoopException(t);
    59. }
    60. }
    61. }
  7. runAllTasks方法
    1. protected boolean runAllTasks(long timeoutNanos) {
    2. fetchFromScheduledTaskQueue();
    3. //轮询任务
    4. Runnable task = pollTask();
    5. if (task == null) {
    6. afterRunningAllTasks();
    7. return false;
    8. }
    9. final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    10. long runTasks = 0;
    11. long lastExecutionTime;
    12. for (;;) {
    13. //执行任务
    14. safeExecute(task);
    15. runTasks ++;
    16. // Check timeout every 64 tasks because nanoTime() is relatively expensive.
    17. // XXX: Hard-coded value - will make it configurable if it is really a problem.
    18. if ((runTasks & 0x3F) == 0) {
    19. lastExecutionTime = ScheduledFutureTask.nanoTime();
    20. if (lastExecutionTime >= deadline) {
    21. break;
    22. }
    23. }
    24. task = pollTask();
    25. if (task == null) {
    26. lastExecutionTime = ScheduledFutureTask.nanoTime();
    27. break;
    28. }
    29. }
    30. afterRunningAllTasks();
    31. this.lastExecutionTime = lastExecutionTime;
    32. return true;
    33. }
  8. register0方法
    1. private void register0(ChannelPromise promise) {
    2. try {
    3. // check if the channel is still open as it could be closed in the mean time when the register
    4. // call was outside of the eventLoop
    5. if (!promise.setUncancellable() || !ensureOpen(promise)) {
    6. return;
    7. }
    8. boolean firstRegistration = neverRegistered;
    9. //注册通道
    10. doRegister();
    11. neverRegistered = false;
    12. registered = true;
    13. // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
    14. // 向pipeline中添加handler主要是针对initChannel方法中
    15. pipeline.invokeHandlerAddedIfNeeded();
    16. safeSetSuccess(promise);
    17. pipeline.fireChannelRegistered();
    18. // Only fire a channelActive if the channel has never been registered. This prevents firing
    19. // multiple channel actives if the channel is deregistered and reregistered.
    20. if (isActive()) {
    21. if (firstRegistration) {
    22. pipeline.fireChannelActive();
    23. } else if (config().isAutoRead()) {
    24. // This channel was registered before and autoRead() is set. This means we need to begin read
    25. // again so that we process inbound data.
    26. //
    27. // See https://github.com/netty/netty/issues/4805
    28. beginRead();
    29. }
    30. }
    31. } catch (Throwable t) {
    32. // Close the channel directly to avoid FD leak.
    33. closeForcibly();
    34. closeFuture.setClosed();
    35. safeSetFailure(promise, t);
    36. }
    37. }
    doRegister()
    1. protected void doRegister() throws Exception {
    2. boolean selected = false;
    3. for (;;) {
    4. try {
    5. //将channel注册到Selector上
    6. selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
    7. return;
    8. } catch (CancelledKeyException e) {
    9. if (!selected) {
    10. // Force the Selector to select now as the "canceled" SelectionKey may still be
    11. // cached and not removed because no Select.select(..)
    12. operation was called yet.
    13. eventLoop().selectNow();
    14. selected = true;
    15. } else {
    16. // We forced a select operation on the selector before but the SelectionKey is still cached
    17. // for whatever reason. JDK bug ?
    18. throw e;
    19. }
    20. }
    21. }
    22. }
    invokeHandlerAddedIfNeeded—>initChannel方法
    1. p.addLast(new ChannelInitializer<Channel>() {
    2. @Override
    3. public void initChannel(final Channel ch) {
    4. final ChannelPipeline pipeline = ch.pipeline();
    5. ChannelHandler handler = config.handler();
    6. if (handler != null) {
    7. pipeline.addLast(handler);
    8. }
    9. ch.eventLoop().execute(new Runnable() {
    10. @Override
    11. public void run() {
    12. //在initChannel方法中添加ServerBootstrapAcceptor的handler
    13. pipeline.addLast(
    14. new ServerBootstrapAcceptor(
    15. ch, currentChildGroup, currentChildHandler,
    16. currentChildOptions, currentChildAttrs)
    17. );
    18. }
    19. });
    20. }
    21. });

    5.4 BossGroup/WorkGroup/消息入站源码

    BossGroup主要负责监听. workGroup负责消息处理. 主要看下BossGroup如何将通道交给workGroup的,和如何处理消息读取的.即入站

    5.4.1 BossGroup/WorkGroup源码分析

    image.png

    5.4.2 主要流程源码

  9. processSelectedKeysOptimized
    1. private void processSelectedKeysOptimized() {
    2. for (int i = 0; i < selectedKeys.size; ++i) {
    3. final SelectionKey k = selectedKeys.keys[i];
    4. // null out entry in the array to allow to have it GC'ed once the Channel close
    5. // See https://github.com/netty/netty/issues/2363
    6. selectedKeys.keys[i] = null;
    7. final Object a = k.attachment();
    8. if (a instanceof AbstractNioChannel) {
    9. //处理SelectedKey
    10. processSelectedKey(k, (AbstractNioChannel) a);
    11. } else {
    12. @SuppressWarnings("unchecked")
    13. NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
    14. processSelectedKey(k, task);
    15. }
    16. if (needsToSelectAgain) {
    17. // null out entries in the array to allow to have it GC'ed once the Channel close
    18. // See https://github.com/netty/netty/issues/2363
    19. selectedKeys.reset(i + 1);
    20. selectAgain();
    21. i = -1;
    22. }
    23. }
    24. }
  10. processSelectedKey
    1. private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    2. final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    3. if (!k.isValid()) {
    4. final EventLoop eventLoop;
    5. try {
    6. eventLoop = ch.eventLoop();
    7. } catch (Throwable ignored) {
    8. // If the channel implementation throws an exception because there is no event loop, we ignore this
    9. // because we are only trying to determine if ch is registered to this event loop and thus has authority
    10. // to close ch.
    11. return;
    12. }
    13. // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
    14. // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
    15. // still healthy and should not be closed.
    16. // See https://github.com/netty/netty/issues/5125
    17. if (eventLoop != this || eventLoop == null) {
    18. return;
    19. }
    20. // close the channel if the key is not valid anymore
    21. unsafe.close(unsafe.voidPromise());
    22. return;
    23. }
    24. try {
    25. int readyOps = k.readyOps();
    26. // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
    27. // the NIO JDK channel implementation may throw a NotYetConnectedException.
    28. if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    29. // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
    30. // See https://github.com/netty/netty/issues/924
    31. int ops = k.interestOps();
    32. ops &= ~SelectionKey.OP_CONNECT;
    33. k.interestOps(ops);
    34. unsafe.finishConnect();
    35. }
    36. // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
    37. if ((readyOps & SelectionKey.OP_WRITE) != 0) {
    38. // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
    39. ch.unsafe().forceFlush();
    40. }
    41. // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
    42. // to a spin loop
    43. if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    44. //读取数据
    45. unsafe.read();
    46. }
    47. } catch (CancelledKeyException ignored) {
    48. unsafe.close(unsafe.voidPromise());
    49. }
    50. }
  11. unsafe.read()

    1. private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
    2. @Override
    3. @SuppressWarnings("unchecked")
    4. public void channelRead(ChannelHandlerContext ctx, Object msg) {
    5. //获得通道信息
    6. final Channel child = (Channel) msg;
    7. //将服务端处理器添加到pipeline中
    8. child.pipeline().addLast(childHandler);
    9. setChannelOptions(child, childOptions, logger);
    10. setAttributes(child, childAttrs);
    11. try {
    12. //将通道注册到workGroup线程组上
    13. childGroup.register(child).addListener(new ChannelFutureListener() {
    14. @Override
    15. public void operationComplete(ChannelFuture future) throws
    16. Exception {
    17. if (!future.isSuccess()) {
    18. forceClose(child, future.cause());
    19. }
    20. }
    21. });
    22. } catch (Throwable t) {
    23. forceClose(child, t);
    24. }
    25. }
    26. }

    5.5 消息出站源码

    1. private void write(Object msg, boolean flush, ChannelPromise promise) {
    2. ObjectUtil.checkNotNull(msg, "msg");
    3. try {
    4. if (isNotValidPromise(promise, true)) {
    5. ReferenceCountUtil.release(msg);
    6. // cancelled
    7. return;
    8. }
    9. } catch (RuntimeException e) {
    10. ReferenceCountUtil.release(msg);
    11. throw e;
    12. }
    13. //得到上下文出站handler-从后往前查找
    14. final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    15. final Object m = pipeline.touch(msg, next);
    16. EventExecutor executor = next.executor();
    17. if (executor.inEventLoop()) {
    18. if (flush) {
    19. //调用写入和刷新方法
    20. next.invokeWriteAndFlush(m, promise);
    21. } else {
    22. next.invokeWrite(m, promise);
    23. }
    24. } else {
    25. final AbstractWriteTask task;
    26. if (flush) {
    27. task = WriteAndFlushTask.newInstance(next, m, promise);
    28. } else {
    29. task = WriteTask.newInstance(next, m, promise);
    30. }
    31. if (!safeExecute(executor, task, promise, m)) {
    32. // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
    33. // and put it back in the Recycler for re-use later.
    34. //
    35. // See https://github.com/netty/netty/issues/8343.
    36. task.cancel();
    37. }
    38. }
    39. }