public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.localAddress(9000)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new SomeSocketClientHandler());
}
});
//Clinet客户端启动流程,准备跟踪connect方法
ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
future.channel().closeFuture().sync();
} finally {
if(eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully();
}
}
}
Bootstrap.java
public ChannelFuture connect(InetAddress inetHost, int inetPort) {
return connect(new InetSocketAddress(inetHost, inetPort));
}
public ChannelFuture connect(SocketAddress remoteAddress) {
//数据校验
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
validate();
return doResolveAndConnect(remoteAddress, config.localAddress());
}
public Bootstrap validate() {
super.validate();
//核心处理器都没有,工作根本展开不了,抛出异常直接中断启动
if (config.handler() == null) {
throw new IllegalStateException("handler not set");
}
return this;
}
/** 所需数据校验完成,开始处理连接业务
* @see #connect()
*/
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// channel的创建、初始化与注册
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
// 解析server端地址并连接
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Directly obtain the cause and do a null check so we only need one volatile read in case of a
// failure.
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
AbstractBootstrap.java
//初始化并注册
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 创建parentChannel
// 其实这里面是使用反射newInstance 创建出一个Channel无参对象
channel = channelFactory.newChannel();
// 对象创建完后,开始初始化该channel
init(channel);
} catch (Throwable t) {
if (channel != null) { // 若条件为true,说明channel创建成功,但初始化时出现问题
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
// 将channel强制关闭
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 代码能走到这里,说明创建channel过程中出现了问题
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 注册parentChannel(该过程中从group中选择出了eventLoop与channel进行了绑定,并创建启动了这个线程)
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
跟踪init()方法其是一个抽象类在Clinet端和Server端中均被实现用来启动时处理不同端的业务启动流程,注意我们现在是客户端启动,所以跟踪的是Bootstrap.java中的init()。
Bootstrap.java
@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
// 获取到pipeline
ChannelPipeline p = channel.pipeline();
// 将Bootstrap中创建的ChannelInitializer处理器添加到pipeline
p.addLast(config.handler());
// 使用Bootstrap中的options初始化channel
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
//设置option到当前channel中
setChannelOptions(channel, options, logger);
}
// 使用Bootstrap中的attrs初始化channel
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}
AbstractBootstrap.java
static void setChannelOptions(Channel channel, Map<ChannelOption<?>, Object> options, InternalLogger logger) {
// 遍历options
for (Map.Entry<ChannelOption<?>, Object> e: options.entrySet()) {
// 将当前遍历的option初始化到channel
setChannelOption(channel, e.getKey(), e.getValue(), logger);
}
}
@SuppressWarnings("unchecked")
private static void setChannelOption(Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {
try {
// 将option写入到channel的config中
if (!channel.config().setOption((ChannelOption<Object>) option, value)) {
logger.warn("Unknown channel option '{}' for channel '{}'", option, channel);
}
} catch (Throwable t) {
logger.warn(
"Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t);
}
}
客户端初始化和注册就阅读完了,现在回到Bootstap.java中继续阅读doResolveAndConnect()方法。
/** 所需数据校验完成,开始处理连接业务
* @see #connect()
*/
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// channel的创建、初始化与注册
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
// 解析server端地址并连接
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Directly obtain the cause and do a null check so we only need one volatile read in case of a
// failure.
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
//解析server端地址并连接
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
try {
// 获取到channel所绑定的eventLoop
final EventLoop eventLoop = channel.eventLoop();
// 创建一个地址解析器
final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
// 若当前地址解析器不支持该地址格式,或该地址已经解析过了,怎么办?硬连
// 若连接成功,则成功,失败就失败。无论成功还是失败,结果都写入到了promise中
if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
// Resolver has no idea about what to do with the specified remote address or it's resolved already.
doConnect(remoteAddress, localAddress, promise);
return promise;
}
// 处理地址没有解析且解析器也支持该地址格式的情况
// 以异步的方式解析地址
final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
// 处理解析完成的情况
if (resolveFuture.isDone()) {
final Throwable resolveFailureCause = resolveFuture.cause();
// 若解析过程中出现异常,则关闭channel,否则连接解析的地址
if (resolveFailureCause != null) {
// Failed to resolve immediately
channel.close();
promise.setFailure(resolveFailureCause);
} else {
// Succeeded to resolve immediately; cached? (or did a blocking lookup)
// getNow() 从异步结果中获取解析结果
doConnect(resolveFuture.getNow(), localAddress, promise);
}
return promise;
}
// Wait until the name resolution is finished.
// 若解析还没有完成,则为其添加监听器
resolveFuture.addListener(new FutureListener<SocketAddress>() {
// 回调
// 若解析过程中出现异常,则关闭channel,否则连接解析的地址
@Override
public void operationComplete(Future<SocketAddress> future) throws Exception {
if (future.cause() != null) {
channel.close();
promise.setFailure(future.cause());
} else {
doConnect(future.getNow(), localAddress, promise);
}
}
});
} catch (Throwable cause) {
promise.tryFailure(cause);
}
return promise;
}
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
//获取到当前Channel的eventLoop后的exeuct以异步的方式去连接服务端地址
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
//如果没有配置本机地址,则进行远程连接
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
//准备跟踪
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
继续跟踪阅读连接代码,AbstractChannel.java中的connect()
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, localAddress, promise);
}
@Override
public final ChannelFuture connect(
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
//使用pipeline中的尾结点进行连接处理
return tail.connect(remoteAddress, localAddress, promise);
}
AbstractChannelHandlerContext.java
@Override
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
//准备跟踪
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise;
}
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
//准备跟踪
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}
DefultChannelPipeline.java
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) {
unsafe.connect(remoteAddress, localAddress, promise);
}
AbstractNioChannel.java
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
// Already a connect in process.
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
//准备跟踪
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
NioSocketChannel.java
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
// 绑定bootstrap中指定的localAddress
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
// 直接连接指定的server地址,其可能一次连接成功,也可能没有成功
// 若没有成功,则该channel的连接就绪事件发生,为下一次selector的选择打下基础
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
// 若没有连接成功,则指定其所关注的事件为连接就绪事件
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
private void doBind0(SocketAddress localAddress) throws Exception {
//如果JDK版本大于等于7使用该方式进行连接
if (PlatformDependent.javaVersion() >= 7) {
SocketUtils.bind(javaChannel(), localAddress);
} else {
SocketUtils.bind(javaChannel().socket(), localAddress);
}
}
//end!!!