1. public static void main(String[] args) throws InterruptedException {
    2. NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    3. try {
    4. Bootstrap bootstrap = new Bootstrap();
    5. bootstrap.group(eventLoopGroup)
    6. .localAddress(9000)
    7. .channel(NioSocketChannel.class)
    8. .handler(new ChannelInitializer<SocketChannel>() {
    9. @Override
    10. protected void initChannel(SocketChannel ch) throws Exception {
    11. ChannelPipeline pipeline = ch.pipeline();
    12. pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
    13. pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
    14. pipeline.addLast(new SomeSocketClientHandler());
    15. }
    16. });
    17. //Clinet客户端启动流程,准备跟踪connect方法
    18. ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
    19. future.channel().closeFuture().sync();
    20. } finally {
    21. if(eventLoopGroup != null) {
    22. eventLoopGroup.shutdownGracefully();
    23. }
    24. }
    25. }

    Bootstrap.java

    1. public ChannelFuture connect(InetAddress inetHost, int inetPort) {
    2. return connect(new InetSocketAddress(inetHost, inetPort));
    3. }
    4. public ChannelFuture connect(SocketAddress remoteAddress) {
    5. //数据校验
    6. if (remoteAddress == null) {
    7. throw new NullPointerException("remoteAddress");
    8. }
    9. validate();
    10. return doResolveAndConnect(remoteAddress, config.localAddress());
    11. }
    12. public Bootstrap validate() {
    13. super.validate();
    14. //核心处理器都没有,工作根本展开不了,抛出异常直接中断启动
    15. if (config.handler() == null) {
    16. throw new IllegalStateException("handler not set");
    17. }
    18. return this;
    19. }
    20. /** 所需数据校验完成,开始处理连接业务
    21. * @see #connect()
    22. */
    23. private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    24. // channel的创建、初始化与注册
    25. final ChannelFuture regFuture = initAndRegister();
    26. final Channel channel = regFuture.channel();
    27. if (regFuture.isDone()) {
    28. if (!regFuture.isSuccess()) {
    29. return regFuture;
    30. }
    31. // 解析server端地址并连接
    32. return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
    33. } else {
    34. // Registration future is almost always fulfilled already, but just in case it's not.
    35. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    36. regFuture.addListener(new ChannelFutureListener() {
    37. @Override
    38. public void operationComplete(ChannelFuture future) throws Exception {
    39. // Directly obtain the cause and do a null check so we only need one volatile read in case of a
    40. // failure.
    41. Throwable cause = future.cause();
    42. if (cause != null) {
    43. // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
    44. // IllegalStateException once we try to access the EventLoop of the Channel.
    45. promise.setFailure(cause);
    46. } else {
    47. // Registration was successful, so set the correct executor to use.
    48. // See https://github.com/netty/netty/issues/2586
    49. promise.registered();
    50. doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
    51. }
    52. }
    53. });
    54. return promise;
    55. }
    56. }

    AbstractBootstrap.java

    1. //初始化并注册
    2. final ChannelFuture initAndRegister() {
    3. Channel channel = null;
    4. try {
    5. // 创建parentChannel
    6. // 其实这里面是使用反射newInstance 创建出一个Channel无参对象
    7. channel = channelFactory.newChannel();
    8. // 对象创建完后,开始初始化该channel
    9. init(channel);
    10. } catch (Throwable t) {
    11. if (channel != null) { // 若条件为true,说明channel创建成功,但初始化时出现问题
    12. // channel can be null if newChannel crashed (eg SocketException("too many open files"))
    13. // 将channel强制关闭
    14. channel.unsafe().closeForcibly();
    15. // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
    16. return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    17. }
    18. // 代码能走到这里,说明创建channel过程中出现了问题
    19. // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
    20. return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    21. }
    22. // 注册parentChannel(该过程中从group中选择出了eventLoop与channel进行了绑定,并创建启动了这个线程)
    23. ChannelFuture regFuture = config().group().register(channel);
    24. if (regFuture.cause() != null) {
    25. if (channel.isRegistered()) {
    26. channel.close();
    27. } else {
    28. channel.unsafe().closeForcibly();
    29. }
    30. }
    31. // If we are here and the promise is not failed, it's one of the following cases:
    32. // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    33. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    34. // 2) If we attempted registration from the other thread, the registration request has been successfully
    35. // added to the event loop's task queue for later execution.
    36. // i.e. It's safe to attempt bind() or connect() now:
    37. // because bind() or connect() will be executed *after* the scheduled registration task is executed
    38. // because register(), bind(), and connect() are all bound to the same thread.
    39. return regFuture;
    40. }

    跟踪init()方法其是一个抽象类在Clinet端和Server端中均被实现用来启动时处理不同端的业务启动流程,注意我们现在是客户端启动,所以跟踪的是Bootstrap.java中的init()

    Bootstrap.java

    1. @Override
    2. @SuppressWarnings("unchecked")
    3. void init(Channel channel) throws Exception {
    4. // 获取到pipeline
    5. ChannelPipeline p = channel.pipeline();
    6. // 将Bootstrap中创建的ChannelInitializer处理器添加到pipeline
    7. p.addLast(config.handler());
    8. // 使用Bootstrap中的options初始化channel
    9. final Map<ChannelOption<?>, Object> options = options0();
    10. synchronized (options) {
    11. //设置option到当前channel中
    12. setChannelOptions(channel, options, logger);
    13. }
    14. // 使用Bootstrap中的attrs初始化channel
    15. final Map<AttributeKey<?>, Object> attrs = attrs0();
    16. synchronized (attrs) {
    17. for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
    18. channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    19. }
    20. }
    21. }

    AbstractBootstrap.java

    1. static void setChannelOptions(Channel channel, Map<ChannelOption<?>, Object> options, InternalLogger logger) {
    2. // 遍历options
    3. for (Map.Entry<ChannelOption<?>, Object> e: options.entrySet()) {
    4. // 将当前遍历的option初始化到channel
    5. setChannelOption(channel, e.getKey(), e.getValue(), logger);
    6. }
    7. }
    8. @SuppressWarnings("unchecked")
    9. private static void setChannelOption(Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {
    10. try {
    11. // 将option写入到channel的config中
    12. if (!channel.config().setOption((ChannelOption<Object>) option, value)) {
    13. logger.warn("Unknown channel option '{}' for channel '{}'", option, channel);
    14. }
    15. } catch (Throwable t) {
    16. logger.warn(
    17. "Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t);
    18. }
    19. }

    客户端初始化和注册就阅读完了,现在回到Bootstap.java中继续阅读doResolveAndConnect()方法。

    1. /** 所需数据校验完成,开始处理连接业务
    2. * @see #connect()
    3. */
    4. private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    5. // channel的创建、初始化与注册
    6. final ChannelFuture regFuture = initAndRegister();
    7. final Channel channel = regFuture.channel();
    8. if (regFuture.isDone()) {
    9. if (!regFuture.isSuccess()) {
    10. return regFuture;
    11. }
    12. // 解析server端地址并连接
    13. return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
    14. } else {
    15. // Registration future is almost always fulfilled already, but just in case it's not.
    16. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    17. regFuture.addListener(new ChannelFutureListener() {
    18. @Override
    19. public void operationComplete(ChannelFuture future) throws Exception {
    20. // Directly obtain the cause and do a null check so we only need one volatile read in case of a
    21. // failure.
    22. Throwable cause = future.cause();
    23. if (cause != null) {
    24. // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
    25. // IllegalStateException once we try to access the EventLoop of the Channel.
    26. promise.setFailure(cause);
    27. } else {
    28. // Registration was successful, so set the correct executor to use.
    29. // See https://github.com/netty/netty/issues/2586
    30. promise.registered();
    31. doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
    32. }
    33. }
    34. });
    35. return promise;
    36. }
    37. }
    38. //解析server端地址并连接
    39. private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
    40. final SocketAddress localAddress, final ChannelPromise promise) {
    41. try {
    42. // 获取到channel所绑定的eventLoop
    43. final EventLoop eventLoop = channel.eventLoop();
    44. // 创建一个地址解析器
    45. final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
    46. // 若当前地址解析器不支持该地址格式,或该地址已经解析过了,怎么办?硬连
    47. // 若连接成功,则成功,失败就失败。无论成功还是失败,结果都写入到了promise中
    48. if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
    49. // Resolver has no idea about what to do with the specified remote address or it's resolved already.
    50. doConnect(remoteAddress, localAddress, promise);
    51. return promise;
    52. }
    53. // 处理地址没有解析且解析器也支持该地址格式的情况
    54. // 以异步的方式解析地址
    55. final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
    56. // 处理解析完成的情况
    57. if (resolveFuture.isDone()) {
    58. final Throwable resolveFailureCause = resolveFuture.cause();
    59. // 若解析过程中出现异常,则关闭channel,否则连接解析的地址
    60. if (resolveFailureCause != null) {
    61. // Failed to resolve immediately
    62. channel.close();
    63. promise.setFailure(resolveFailureCause);
    64. } else {
    65. // Succeeded to resolve immediately; cached? (or did a blocking lookup)
    66. // getNow() 从异步结果中获取解析结果
    67. doConnect(resolveFuture.getNow(), localAddress, promise);
    68. }
    69. return promise;
    70. }
    71. // Wait until the name resolution is finished.
    72. // 若解析还没有完成,则为其添加监听器
    73. resolveFuture.addListener(new FutureListener<SocketAddress>() {
    74. // 回调
    75. // 若解析过程中出现异常,则关闭channel,否则连接解析的地址
    76. @Override
    77. public void operationComplete(Future<SocketAddress> future) throws Exception {
    78. if (future.cause() != null) {
    79. channel.close();
    80. promise.setFailure(future.cause());
    81. } else {
    82. doConnect(future.getNow(), localAddress, promise);
    83. }
    84. }
    85. });
    86. } catch (Throwable cause) {
    87. promise.tryFailure(cause);
    88. }
    89. return promise;
    90. }
    91. private static void doConnect(
    92. final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
    93. //获取到当前Channel的eventLoop后的exeuct以异步的方式去连接服务端地址
    94. final Channel channel = connectPromise.channel();
    95. channel.eventLoop().execute(new Runnable() {
    96. @Override
    97. public void run() {
    98. //如果没有配置本机地址,则进行远程连接
    99. if (localAddress == null) {
    100. channel.connect(remoteAddress, connectPromise);
    101. } else {
    102. //准备跟踪
    103. channel.connect(remoteAddress, localAddress, connectPromise);
    104. }
    105. connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
    106. }
    107. });
    108. }

    继续跟踪阅读连接代码,AbstractChannel.java中的connect()

    1. @Override
    2. public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    3. return pipeline.connect(remoteAddress, localAddress, promise);
    4. }
    5. @Override
    6. public final ChannelFuture connect(
    7. SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    8. //使用pipeline中的尾结点进行连接处理
    9. return tail.connect(remoteAddress, localAddress, promise);
    10. }

    AbstractChannelHandlerContext.java

    1. @Override
    2. public ChannelFuture connect(
    3. final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    4. if (remoteAddress == null) {
    5. throw new NullPointerException("remoteAddress");
    6. }
    7. if (isNotValidPromise(promise, false)) {
    8. // cancelled
    9. return promise;
    10. }
    11. final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
    12. EventExecutor executor = next.executor();
    13. if (executor.inEventLoop()) {
    14. //准备跟踪
    15. next.invokeConnect(remoteAddress, localAddress, promise);
    16. } else {
    17. safeExecute(executor, new Runnable() {
    18. @Override
    19. public void run() {
    20. next.invokeConnect(remoteAddress, localAddress, promise);
    21. }
    22. }, promise, null);
    23. }
    24. return promise;
    25. }
    26. private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    27. if (invokeHandler()) {
    28. try {
    29. //准备跟踪
    30. ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
    31. } catch (Throwable t) {
    32. notifyOutboundHandlerException(t, promise);
    33. }
    34. } else {
    35. connect(remoteAddress, localAddress, promise);
    36. }
    37. }

    Netty客户端启动源码阅读 - 图1

    DefultChannelPipeline.java

    1. @Override
    2. public void connect(
    3. ChannelHandlerContext ctx,
    4. SocketAddress remoteAddress, SocketAddress localAddress,
    5. ChannelPromise promise) {
    6. unsafe.connect(remoteAddress, localAddress, promise);
    7. }

    AbstractNioChannel.java

    1. @Override
    2. public final void connect(
    3. final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    4. if (!promise.setUncancellable() || !ensureOpen(promise)) {
    5. return;
    6. }
    7. try {
    8. if (connectPromise != null) {
    9. // Already a connect in process.
    10. throw new ConnectionPendingException();
    11. }
    12. boolean wasActive = isActive();
    13. //准备跟踪
    14. if (doConnect(remoteAddress, localAddress)) {
    15. fulfillConnectPromise(promise, wasActive);
    16. } else {
    17. connectPromise = promise;
    18. requestedRemoteAddress = remoteAddress;
    19. // Schedule connect timeout.
    20. int connectTimeoutMillis = config().getConnectTimeoutMillis();
    21. if (connectTimeoutMillis > 0) {
    22. connectTimeoutFuture = eventLoop().schedule(new Runnable() {
    23. @Override
    24. public void run() {
    25. ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
    26. ConnectTimeoutException cause =
    27. new ConnectTimeoutException("connection timed out: " + remoteAddress);
    28. if (connectPromise != null && connectPromise.tryFailure(cause)) {
    29. close(voidPromise());
    30. }
    31. }
    32. }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
    33. }
    34. promise.addListener(new ChannelFutureListener() {
    35. @Override
    36. public void operationComplete(ChannelFuture future) throws Exception {
    37. if (future.isCancelled()) {
    38. if (connectTimeoutFuture != null) {
    39. connectTimeoutFuture.cancel(false);
    40. }
    41. connectPromise = null;
    42. close(voidPromise());
    43. }
    44. }
    45. });
    46. }
    47. } catch (Throwable t) {
    48. promise.tryFailure(annotateConnectException(t, remoteAddress));
    49. closeIfClosed();
    50. }
    51. }

    NioSocketChannel.java

    1. @Override
    2. protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    3. // 绑定bootstrap中指定的localAddress
    4. if (localAddress != null) {
    5. doBind0(localAddress);
    6. }
    7. boolean success = false;
    8. try {
    9. // 直接连接指定的server地址,其可能一次连接成功,也可能没有成功
    10. // 若没有成功,则该channel的连接就绪事件发生,为下一次selector的选择打下基础
    11. boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
    12. if (!connected) {
    13. // 若没有连接成功,则指定其所关注的事件为连接就绪事件
    14. selectionKey().interestOps(SelectionKey.OP_CONNECT);
    15. }
    16. success = true;
    17. return connected;
    18. } finally {
    19. if (!success) {
    20. doClose();
    21. }
    22. }
    23. }
    24. private void doBind0(SocketAddress localAddress) throws Exception {
    25. //如果JDK版本大于等于7使用该方式进行连接
    26. if (PlatformDependent.javaVersion() >= 7) {
    27. SocketUtils.bind(javaChannel(), localAddress);
    28. } else {
    29. SocketUtils.bind(javaChannel().socket(), localAddress);
    30. }
    31. }
    32. //end!!!