从绑定端口bind()方法开始

    Netty服务端启动属性绑定及Channel线程创建源码阅读 - 图1

    AbstractBootstrap.java 从构造方法一路跟踪

    1. /**
    2. * Create a new {@link Channel} and bind it.
    3. */
    4. public ChannelFuture bind(int inetPort) {
    5. return bind(new InetSocketAddress(inetPort));
    6. }
    7. /**
    8. * Create a new {@link Channel} and bind it.
    9. */
    10. public ChannelFuture bind(String inetHost, int inetPort) {
    11. return bind(SocketUtils.socketAddress(inetHost, inetPort));
    12. }
    13. /**
    14. * Create a new {@link Channel} and bind it.
    15. */
    16. public ChannelFuture bind(InetAddress inetHost, int inetPort) {
    17. return bind(new InetSocketAddress(inetHost, inetPort));
    18. }
    19. /**
    20. * Create a new {@link Channel} and bind it.
    21. */
    22. public ChannelFuture bind(SocketAddress localAddress) {
    23. // 验证group与channelFactory属性是否为null
    24. validate();
    25. if (localAddress == null) {
    26. throw new NullPointerException("localAddress");
    27. }
    28. return doBind(localAddress);
    29. }
    30. private ChannelFuture doBind(final SocketAddress localAddress) {
    31. // 以异步的方式创建、初始化一个channel,并将其注册到selector
    32. // 准备跟踪initAndRegister() 方法
    33. final ChannelFuture regFuture = initAndRegister();
    34. // 从future中获取channel
    35. final Channel channel = regFuture.channel();
    36. // 若在执行异步操作过程中出现了异常,则直接返回这个future
    37. if (regFuture.cause() != null) {
    38. return regFuture;
    39. }
    40. // 处理当前异步操作完成(任务正常结束,或执行过程中发生异常,或任务被取消)的情况
    41. if (regFuture.isDone()) {
    42. // At this point we know that the registration was complete and successful.
    43. // 创建一个channelPromise实例
    44. ChannelPromise promise = channel.newPromise();
    45. // 继续绑定
    46. doBind0(regFuture, channel, localAddress, promise);
    47. return promise;
    48. } else { // 处理当前异步操作目前尚未完成的情况
    49. // Registration future is almost always fulfilled already, but just in case it's not.
    50. // Pending,悬而未决的
    51. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    52. // 为future添加监听,当异步操作完成时,会触发该回调的执行
    53. regFuture.addListener(new ChannelFutureListener() {
    54. @Override
    55. public void operationComplete(ChannelFuture future) throws Exception {
    56. Throwable cause = future.cause();
    57. if (cause != null) {
    58. // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
    59. // IllegalStateException once we try to access the EventLoop of the Channel.
    60. // 修改promise的值
    61. promise.setFailure(cause);
    62. } else {
    63. // Registration was successful, so set the correct executor to use.
    64. // See https://github.com/netty/netty/issues/2586
    65. promise.registered();
    66. doBind0(regFuture, channel, localAddress, promise);
    67. }
    68. }
    69. });
    70. return promise;
    71. }
    72. }

    初始化并注册ChannelFuture

    1. final ChannelFuture initAndRegister() {
    2. Channel channel = null;
    3. try {
    4. // 创建parentChannel
    5. // 其实这里面是使用反射newInstance 创建出一个Channel无参对象
    6. channel = channelFactory.newChannel();
    7. // 对象创建完后,开始初始化该channel
    8. init(channel);
    9. } catch (Throwable t) {
    10. if (channel != null) { // 若条件为true,说明channel创建成功,但初始化时出现问题
    11. // channel can be null if newChannel crashed (eg SocketException("too many open files"))
    12. // 将channel强制关闭
    13. channel.unsafe().closeForcibly();
    14. // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
    15. return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    16. }
    17. // 代码能走到这里,说明创建channel过程中出现了问题
    18. // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
    19. return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    20. }
    21. // 注册parentChannel(该过程中从group中选择出了eventLoop与channel进行了绑定,并创建启动了这个线程)
    22. ChannelFuture regFuture = config().group().register(channel);
    23. if (regFuture.cause() != null) {
    24. if (channel.isRegistered()) {
    25. channel.close();
    26. } else {
    27. channel.unsafe().closeForcibly();
    28. }
    29. }
    30. // If we are here and the promise is not failed, it's one of the following cases:
    31. // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    32. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    33. // 2) If we attempted registration from the other thread, the registration request has been successfully
    34. // added to the event loop's task queue for later execution.
    35. // i.e. It's safe to attempt bind() or connect() now:
    36. // because bind() or connect() will be executed *after* the scheduled registration task is executed
    37. // because register(), bind(), and connect() are all bound to the same thread.
    38. return regFuture;
    39. }

    我们现在查看的源代码是服务端的,所以选择ServerBootstrap.java进行跟踪init() 初始化方法

    1. @Override
    2. void init(Channel channel) throws Exception {
    3. // 获取ServerBootstrap中的options属性
    4. final Map<ChannelOption<?>, Object> options = options0();
    5. synchronized (options) {
    6. //这个地方就是将options属性初始化到channel中,而这个channel就是我们
    7. //.childHandler(new ChannelInitializer<SocketChannel>() {} 中设置channel
    8. //准备跟踪
    9. setChannelOptions(channel, options, logger);
    10. }
    11. // 获取ServerBootstrap中的attrs属性
    12. final Map<AttributeKey<?>, Object> attrs = attrs0();
    13. synchronized (attrs) {
    14. // 逐个将attr属性写入到channel
    15. for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
    16. @SuppressWarnings("unchecked")
    17. AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
    18. channel.attr(key).set(e.getValue());
    19. }
    20. }
    21. // 获取当前channel的pipeline
    22. ChannelPipeline p = channel.pipeline();
    23. // 将ServerBootstrap中所有以child开头的属性赋值给局部变量
    24. final EventLoopGroup currentChildGroup = childGroup;
    25. final ChannelHandler currentChildHandler = childHandler;
    26. final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    27. final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    28. synchronized (childOptions) {
    29. currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
    30. }
    31. synchronized (childAttrs) {
    32. currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    33. }
    34. // 添加一个ChannelInitializer处理器到pipeline
    35. p.addLast(new ChannelInitializer<Channel>() {
    36. @Override
    37. public void initChannel(final Channel ch) throws Exception {
    38. final ChannelPipeline pipeline = ch.pipeline();
    39. // 获取serverBootstrap的handler()属性值,并添加到pipeline
    40. ChannelHandler handler = config.handler();
    41. if (handler != null) {
    42. pipeline.addLast(handler);
    43. }
    44. ch.eventLoop().execute(new Runnable() {
    45. @Override
    46. public void run() {
    47. // ServerBootstrapAcceptor称为连接处理器
    48. pipeline.addLast(new ServerBootstrapAcceptor(
    49. ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
    50. }
    51. });
    52. }
    53. });
    54. }

    设置options的属性

    1. static void setChannelOptions(Channel channel, Map<ChannelOption<?>, Object> options, InternalLogger logger) {
    2. // 遍历options
    3. for (Map.Entry<ChannelOption<?>, Object> e: options.entrySet()) {
    4. // 将当前遍历的option初始化到channel
    5. setChannelOption(channel, e.getKey(), e.getValue(), logger);
    6. }
    7. }
    8. @SuppressWarnings("unchecked")
    9. private static void setChannelOption(Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {
    10. try {
    11. // 将option写入到channel的config中
    12. if (!channel.config().setOption((ChannelOption<Object>) option, value)) {
    13. logger.warn("Unknown channel option '{}' for channel '{}'", option, channel);
    14. }
    15. } catch (Throwable t) {
    16. logger.warn(
    17. "Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t);
    18. }
    19. }
    20. static void setChannelOptions(Channel channel, Map.Entry<ChannelOption<?>, Object>[] options, InternalLogger logger) {
    21. // 多个时,使用循环的方式增加
    22. for (Map.Entry<ChannelOption<?>, Object> e: options) {
    23. setChannelOption(channel, e.getKey(), e.getValue(), logger);
    24. }
    25. }

    属性设置这一块算是跟踪完毕了,继续回到initAndRegister() 方法中跟踪

    ChannelFuture regFuture = config().group().register(channel); 跟踪register 的注册细节。

    MultithreadEventLoopGroup.java

    1. @Override
    2. public ChannelFuture register(Channel channel) {
    3. // next()是从group中通过轮询方式选择出一个eventLoop
    4. return next().register(channel);
    5. }

    SingleThreadEventLoop.java

    1. @Override
    2. public ChannelFuture register(final ChannelPromise promise) {
    3. ObjectUtil.checkNotNull(promise, "promise");
    4. promise.channel().unsafe().register(this, promise);
    5. return promise;
    6. }

    AbstractChannel.java

    1. @Override
    2. public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    3. // 若eventLoop为null,则抛出异常。因为这里要将当前channel与eventLoop进行绑定
    4. if (eventLoop == null) {
    5. throw new NullPointerException("eventLoop");
    6. }
    7. // 若当前channel已经注册过了,则直接结束
    8. if (isRegistered()) {
    9. promise.setFailure(new IllegalStateException("registered to an event loop already"));
    10. return;
    11. }
    12. // 若当前eventLoop与当前channel不兼容,则直接结束
    13. if (!isCompatible(eventLoop)) {
    14. promise.setFailure(
    15. new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
    16. return;
    17. }
    18. // 当前channel与eventLoop的绑定就发生在这里
    19. AbstractChannel.this.eventLoop = eventLoop;
    20. if (eventLoop.inEventLoop()) { // 判断当前正在执行的线程与eventLoop所绑定的线程是否是同一个
    21. register0(promise);
    22. } else {
    23. try {
    24. // eventLoop本质上是一个executor,这里调用的是它的execute()
    25. eventLoop.execute(new Runnable() {
    26. @Override
    27. public void run() {
    28. // 注册
    29. register0(promise);
    30. }
    31. });
    32. } catch (Throwable t) {
    33. logger.warn(
    34. "Force-closing a channel whose registration task was not accepted by an event loop: {}",
    35. AbstractChannel.this, t);
    36. closeForcibly();
    37. closeFuture.setClosed();
    38. safeSetFailure(promise, t);
    39. }
    40. }
    41. }
    42. private void register0(ChannelPromise promise) {
    43. try {
    44. // check if the channel is still open as it could be closed in the mean time when the register
    45. // call was outside of the eventLoop
    46. if (!promise.setUncancellable() || !ensureOpen(promise)) {
    47. return;
    48. }
    49. boolean firstRegistration = neverRegistered;
    50. // 注册
    51. doRegister();
    52. neverRegistered = false;
    53. registered = true;
    54. // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
    55. // user may already fire events through the pipeline in the ChannelFutureListener.
    56. pipeline.invokeHandlerAddedIfNeeded();
    57. safeSetSuccess(promise);
    58. pipeline.fireChannelRegistered();
    59. // Only fire a channelActive if the channel has never been registered. This prevents firing
    60. // multiple channel actives if the channel is deregistered and re-registered.
    61. if (isActive()) {
    62. if (firstRegistration) {
    63. pipeline.fireChannelActive();
    64. } else if (config().isAutoRead()) {
    65. // This channel was registered before and autoRead() is set. This means we need to begin read
    66. // again so that we process inbound data.
    67. //
    68. // See https://github.com/netty/netty/issues/4805
    69. beginRead();
    70. }
    71. }
    72. } catch (Throwable t) {
    73. // Close the channel directly to avoid FD leak.
    74. closeForcibly();
    75. closeFuture.setClosed();
    76. safeSetFailure(promise, t);
    77. }
    78. }

    AbstractNioChannel.java

    1. @Override
    2. protected void doRegister() throws Exception {
    3. boolean selected = false;
    4. for (;;) {
    5. try {
    6. // 完成NIO原生channel向NIO原生Selector的注册
    7. // 第二个参数为0,表示当前channel没有关注的事件,为什么指定为0?
    8. // 两个原因:
    9. // 1)这是一个一般性方法,所有channel的注册均会调用该方法。所有channel包含三类:
    10. // 1.1 服务端的parentChannel,其关注的事件应该为OP_ACCEPT,接收连接就绪事件
    11. // 1.2 服务端的childChannel,其关注的事件应该为OP_READ或OP_WRITE,即读/写就绪事件
    12. // 1.3 客户端的channel,其关注的事件应该为OP_CONNECT,即连接就绪事件
    13. // 2)真正指定其所关注的事件,是在Netty封装的channel创建时指定的
    14. selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
    15. return;
    16. } catch (CancelledKeyException e) {
    17. if (!selected) {
    18. // Force the Selector to select now as the "canceled" SelectionKey may still be
    19. // cached and not removed because no Select.select(..) operation was called yet.
    20. eventLoop().selectNow();
    21. selected = true;
    22. } else {
    23. // We forced a select operation on the selector before but the SelectionKey is still cached
    24. // for whatever reason. JDK bug ?
    25. throw e;
    26. }
    27. }
    28. } // end-for
    29. }

    回到AbstractChannel.java中,跟踪线程创建 eventLoop.execute(new Runnable()

    SingleThreadEventExecutor.java

    1. @Override
    2. public void execute(Runnable task) {
    3. if (task == null) {
    4. throw new NullPointerException("task");
    5. }
    6. // 若当前线程与当前eventLoop所绑定线程是同一个线程,则返回true,否则返回false
    7. boolean inEventLoop = inEventLoop();
    8. // 将task任务添加到任务队列
    9. addTask(task);
    10. if (!inEventLoop) {
    11. // 创建并启动一个线程
    12. startThread();
    13. if (isShutdown()) {
    14. boolean reject = false;
    15. try {
    16. if (removeTask(task)) {
    17. reject = true;
    18. }
    19. } catch (UnsupportedOperationException e) {
    20. // The task queue does not support removal so the best thing we can do is to just move on and
    21. // hope we will be able to pick-up the task before its completely terminated.
    22. // In worst case we will log on termination.
    23. }
    24. if (reject) {
    25. reject();
    26. }
    27. }
    28. }
    29. if (!addTaskWakesUp && wakesUpForTask(task)) {
    30. wakeup(inEventLoop);
    31. }
    32. }
    33. private void startThread() {
    34. if (state == ST_NOT_STARTED) {
    35. if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
    36. try {
    37. doStartThread();
    38. } catch (Throwable cause) {
    39. STATE_UPDATER.set(this, ST_NOT_STARTED);
    40. PlatformDependent.throwException(cause);
    41. }
    42. }
    43. }
    44. }
    45. private void doStartThread() {
    46. assert thread == null;
    47. // 调用子executor(即eventLoop所包含的executor)的execute()
    48. // 该execute()完成了两项工作:
    49. // 1)创建了一个线程
    50. // 2)启动了这个线程
    51. executor.execute(new Runnable() {
    52. @Override
    53. public void run() {
    54. thread = Thread.currentThread();
    55. if (interrupted) {
    56. thread.interrupt();
    57. }
    58. boolean success = false;
    59. updateLastExecutionTime();
    60. try {
    61. // 其会调用一个无限循环的for
    62. SingleThreadEventExecutor.this.run();
    63. success = true;
    64. } catch (Throwable t) {
    65. logger.warn("Unexpected exception from an event executor: ", t);
    66. } finally {
    67. for (;;) {
    68. int oldState = state;
    69. if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
    70. SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
    71. break;
    72. }
    73. }
    74. // Check if confirmShutdown() was called at the end of the loop.
    75. if (success && gracefulShutdownStartTime == 0) {
    76. if (logger.isErrorEnabled()) {
    77. logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
    78. SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
    79. "be called before run() implementation terminates.");
    80. }
    81. }
    82. try {
    83. // Run all remaining tasks and shutdown hooks.
    84. for (;;) {
    85. if (confirmShutdown()) {
    86. break;
    87. }
    88. }
    89. } finally {
    90. try {
    91. cleanup();
    92. } finally {
    93. // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
    94. // the future. The user may block on the future and once it unblocks the JVM may terminate
    95. // and start unloading classes.
    96. // See https://github.com/netty/netty/issues/6596.
    97. FastThreadLocal.removeAll();
    98. STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
    99. threadLock.release();
    100. if (!taskQueue.isEmpty()) {
    101. if (logger.isWarnEnabled()) {
    102. logger.warn("An event executor terminated with " +
    103. "non-empty task queue (" + taskQueue.size() + ')');
    104. }
    105. }
    106. terminationFuture.setSuccess(null);
    107. }
    108. }
    109. }
    110. }
    111. });
    112. }

    跟踪doStartThread() 方法中的 线程创建方法 execute

    ThreadExecutorMap.java

    1. public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
    2. ObjectUtil.checkNotNull(executor, "executor");
    3. ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
    4. // 通过匿名内部类创建的一个executor
    5. return new Executor() {
    6. @Override
    7. public void execute(final Runnable command) {
    8. // 调用总executor的execute()
    9. executor.execute(apply(command, eventExecutor));
    10. }
    11. };
    12. }

    继续跟execute()中的execute方法

    ThreadPerTaskExecutor.java

    1. @Override
    2. public void execute(Runnable command) {
    3. // newThread() 创建一个新的线程
    4. // start() 启动该新线程,即调用该command的run()方法
    5. threadFactory.newThread(command).start();
    6. }

    创建这个任务线程并返回这个线程。到这里该线程就创建完毕了

    DefaultThreadFactory.java

    1. @Override
    2. public Thread newThread(Runnable r) {
    3. // 创建了一个线程
    4. Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
    5. // 初始化线程
    6. try {
    7. if (t.isDaemon() != daemon) {
    8. t.setDaemon(daemon);
    9. }
    10. if (t.getPriority() != priority) {
    11. t.setPriority(priority);
    12. }
    13. } catch (Exception ignored) {
    14. // Doesn't matter even if failed to set.
    15. }
    16. return t;
    17. }
    18. protected Thread newThread(Runnable r, String name) {
    19. return new FastThreadLocalThread(threadGroup, r, name);
    20. }