public static void main(String[] args) throws InterruptedException {NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).localAddress(9000).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));pipeline.addLast(new SomeSocketClientHandler());}});//Clinet客户端启动流程,准备跟踪connect方法ChannelFuture future = bootstrap.connect("localhost", 8888).sync();future.channel().closeFuture().sync();} finally {if(eventLoopGroup != null) {eventLoopGroup.shutdownGracefully();}}}
Bootstrap.java
public ChannelFuture connect(InetAddress inetHost, int inetPort) {return connect(new InetSocketAddress(inetHost, inetPort));}public ChannelFuture connect(SocketAddress remoteAddress) {//数据校验if (remoteAddress == null) {throw new NullPointerException("remoteAddress");}validate();return doResolveAndConnect(remoteAddress, config.localAddress());}public Bootstrap validate() {super.validate();//核心处理器都没有,工作根本展开不了,抛出异常直接中断启动if (config.handler() == null) {throw new IllegalStateException("handler not set");}return this;}/** 所需数据校验完成,开始处理连接业务* @see #connect()*/private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {// channel的创建、初始化与注册final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.isDone()) {if (!regFuture.isSuccess()) {return regFuture;}// 解析server端地址并连接return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {// Directly obtain the cause and do a null check so we only need one volatile read in case of a// failure.Throwable cause = future.cause();if (cause != null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel.promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.registered();doResolveAndConnect0(channel, remoteAddress, localAddress, promise);}}});return promise;}}
AbstractBootstrap.java
//初始化并注册final ChannelFuture initAndRegister() {Channel channel = null;try {// 创建parentChannel// 其实这里面是使用反射newInstance 创建出一个Channel无参对象channel = channelFactory.newChannel();// 对象创建完后,开始初始化该channelinit(channel);} catch (Throwable t) {if (channel != null) { // 若条件为true,说明channel创建成功,但初始化时出现问题// channel can be null if newChannel crashed (eg SocketException("too many open files"))// 将channel强制关闭channel.unsafe().closeForcibly();// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}// 代码能走到这里,说明创建channel过程中出现了问题// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}// 注册parentChannel(该过程中从group中选择出了eventLoop与channel进行了绑定,并创建启动了这个线程)ChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// If we are here and the promise is not failed, it's one of the following cases:// 1) If we attempted registration from the event loop, the registration has been completed at this point.// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.// 2) If we attempted registration from the other thread, the registration request has been successfully// added to the event loop's task queue for later execution.// i.e. It's safe to attempt bind() or connect() now:// because bind() or connect() will be executed *after* the scheduled registration task is executed// because register(), bind(), and connect() are all bound to the same thread.return regFuture;}
跟踪init()方法其是一个抽象类在Clinet端和Server端中均被实现用来启动时处理不同端的业务启动流程,注意我们现在是客户端启动,所以跟踪的是Bootstrap.java中的init()。
Bootstrap.java
@Override@SuppressWarnings("unchecked")void init(Channel channel) throws Exception {// 获取到pipelineChannelPipeline p = channel.pipeline();// 将Bootstrap中创建的ChannelInitializer处理器添加到pipelinep.addLast(config.handler());// 使用Bootstrap中的options初始化channelfinal Map<ChannelOption<?>, Object> options = options0();synchronized (options) {//设置option到当前channel中setChannelOptions(channel, options, logger);}// 使用Bootstrap中的attrs初始化channelfinal Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}}}
AbstractBootstrap.java
static void setChannelOptions(Channel channel, Map<ChannelOption<?>, Object> options, InternalLogger logger) {// 遍历optionsfor (Map.Entry<ChannelOption<?>, Object> e: options.entrySet()) {// 将当前遍历的option初始化到channelsetChannelOption(channel, e.getKey(), e.getValue(), logger);}}@SuppressWarnings("unchecked")private static void setChannelOption(Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {try {// 将option写入到channel的config中if (!channel.config().setOption((ChannelOption<Object>) option, value)) {logger.warn("Unknown channel option '{}' for channel '{}'", option, channel);}} catch (Throwable t) {logger.warn("Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t);}}
客户端初始化和注册就阅读完了,现在回到Bootstap.java中继续阅读doResolveAndConnect()方法。
/** 所需数据校验完成,开始处理连接业务* @see #connect()*/private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {// channel的创建、初始化与注册final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.isDone()) {if (!regFuture.isSuccess()) {return regFuture;}// 解析server端地址并连接return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {// Directly obtain the cause and do a null check so we only need one volatile read in case of a// failure.Throwable cause = future.cause();if (cause != null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel.promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.registered();doResolveAndConnect0(channel, remoteAddress, localAddress, promise);}}});return promise;}}//解析server端地址并连接private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,final SocketAddress localAddress, final ChannelPromise promise) {try {// 获取到channel所绑定的eventLoopfinal EventLoop eventLoop = channel.eventLoop();// 创建一个地址解析器final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);// 若当前地址解析器不支持该地址格式,或该地址已经解析过了,怎么办?硬连// 若连接成功,则成功,失败就失败。无论成功还是失败,结果都写入到了promise中if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {// Resolver has no idea about what to do with the specified remote address or it's resolved already.doConnect(remoteAddress, localAddress, promise);return promise;}// 处理地址没有解析且解析器也支持该地址格式的情况// 以异步的方式解析地址final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);// 处理解析完成的情况if (resolveFuture.isDone()) {final Throwable resolveFailureCause = resolveFuture.cause();// 若解析过程中出现异常,则关闭channel,否则连接解析的地址if (resolveFailureCause != null) {// Failed to resolve immediatelychannel.close();promise.setFailure(resolveFailureCause);} else {// Succeeded to resolve immediately; cached? (or did a blocking lookup)// getNow() 从异步结果中获取解析结果doConnect(resolveFuture.getNow(), localAddress, promise);}return promise;}// Wait until the name resolution is finished.// 若解析还没有完成,则为其添加监听器resolveFuture.addListener(new FutureListener<SocketAddress>() {// 回调// 若解析过程中出现异常,则关闭channel,否则连接解析的地址@Overridepublic void operationComplete(Future<SocketAddress> future) throws Exception {if (future.cause() != null) {channel.close();promise.setFailure(future.cause());} else {doConnect(future.getNow(), localAddress, promise);}}});} catch (Throwable cause) {promise.tryFailure(cause);}return promise;}private static void doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {//获取到当前Channel的eventLoop后的exeuct以异步的方式去连接服务端地址final Channel channel = connectPromise.channel();channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {//如果没有配置本机地址,则进行远程连接if (localAddress == null) {channel.connect(remoteAddress, connectPromise);} else {//准备跟踪channel.connect(remoteAddress, localAddress, connectPromise);}connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);}});}
继续跟踪阅读连接代码,AbstractChannel.java中的connect()
@Overridepublic ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {return pipeline.connect(remoteAddress, localAddress, promise);}@Overridepublic final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {//使用pipeline中的尾结点进行连接处理return tail.connect(remoteAddress, localAddress, promise);}
AbstractChannelHandlerContext.java
@Overridepublic ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {if (remoteAddress == null) {throw new NullPointerException("remoteAddress");}if (isNotValidPromise(promise, false)) {// cancelledreturn promise;}final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);EventExecutor executor = next.executor();if (executor.inEventLoop()) {//准备跟踪next.invokeConnect(remoteAddress, localAddress, promise);} else {safeExecute(executor, new Runnable() {@Overridepublic void run() {next.invokeConnect(remoteAddress, localAddress, promise);}}, promise, null);}return promise;}private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {if (invokeHandler()) {try {//准备跟踪((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);}} else {connect(remoteAddress, localAddress, promise);}}

DefultChannelPipeline.java
@Overridepublic void connect(ChannelHandlerContext ctx,SocketAddress remoteAddress, SocketAddress localAddress,ChannelPromise promise) {unsafe.connect(remoteAddress, localAddress, promise);}
AbstractNioChannel.java
@Overridepublic final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}try {if (connectPromise != null) {// Already a connect in process.throw new ConnectionPendingException();}boolean wasActive = isActive();//准备跟踪if (doConnect(remoteAddress, localAddress)) {fulfillConnectPromise(promise, wasActive);} else {connectPromise = promise;requestedRemoteAddress = remoteAddress;// Schedule connect timeout.int connectTimeoutMillis = config().getConnectTimeoutMillis();if (connectTimeoutMillis > 0) {connectTimeoutFuture = eventLoop().schedule(new Runnable() {@Overridepublic void run() {ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;ConnectTimeoutException cause =new ConnectTimeoutException("connection timed out: " + remoteAddress);if (connectPromise != null && connectPromise.tryFailure(cause)) {close(voidPromise());}}}, connectTimeoutMillis, TimeUnit.MILLISECONDS);}promise.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isCancelled()) {if (connectTimeoutFuture != null) {connectTimeoutFuture.cancel(false);}connectPromise = null;close(voidPromise());}}});}} catch (Throwable t) {promise.tryFailure(annotateConnectException(t, remoteAddress));closeIfClosed();}}
NioSocketChannel.java
@Overrideprotected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {// 绑定bootstrap中指定的localAddressif (localAddress != null) {doBind0(localAddress);}boolean success = false;try {// 直接连接指定的server地址,其可能一次连接成功,也可能没有成功// 若没有成功,则该channel的连接就绪事件发生,为下一次selector的选择打下基础boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);if (!connected) {// 若没有连接成功,则指定其所关注的事件为连接就绪事件selectionKey().interestOps(SelectionKey.OP_CONNECT);}success = true;return connected;} finally {if (!success) {doClose();}}}private void doBind0(SocketAddress localAddress) throws Exception {//如果JDK版本大于等于7使用该方式进行连接if (PlatformDependent.javaVersion() >= 7) {SocketUtils.bind(javaChannel(), localAddress);} else {SocketUtils.bind(javaChannel().socket(), localAddress);}}//end!!!
