Pipeline的创建
NioServerSocketChannel.java
public static void main(String[] args) throws InterruptedException {
// parentGroup中的eventLoop将来是用于与处理客户端连接请求的parentChannel进行绑定的
EventLoopGroup parentGroup = new NioEventLoopGroup();
// childGroup中的eventLoop将来是用于与处理客户端读写请求的childChannel进行绑定的
EventLoopGroup childGroup = new NioEventLoopGroup();
final EventLoopGroup someGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(parentGroup, childGroup)
//从这里开始跟踪,跟踪NioServerSocketChannel.class
.channel(NioServerSocketChannel.class)
//这两个方法都是用于设置Channel属性的
//attr的方法一般是用于parentGroup中,一般会在当客户端与服务端连接建立的时候触发调用
.attr(AttributeKey.valueOf("depart"), "市场部")
//chaild的方法一般用于childGroup中,一般用于客户端往服务端发送消息的时候触发调用
.childAttr(AttributeKey.valueOf("addr"), "北京海淀")
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new SomeSocketServerHandler());
Object depart = ch.attr(AttributeKey.valueOf("depart")).get();
Object addr = ch.attr(AttributeKey.valueOf("addr")).get();
System.out.println("depart = " + depart);
System.out.println("addr = " + addr);
}
});
ChannelFuture future = bootstrap.bind(8888).sync();
System.out.println("服务器已启动。。。");
future.channel().closeFuture().sync();
} finally {
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
NioServerSocketChannel.java
public NioServerSocketChannel(ServerSocketChannel channel) {
// 第一个参数:父channel
// 第二个参数:Nio原生channel
// 第三个参数:指定其关注的事件为 接收客户端连接
// 跟踪super
super(null, channel, SelectionKey.OP_ACCEPT);
// config用于对当前channel进行配置的
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
AbstractNioMessageChannel.java
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
// 跟踪super
super(parent, ch, readInterestOp);
}
AbstractNioChannel.java
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
//跟踪super
super(parent);
// NIO原生channel
this.ch = ch;
// 初始化其关注事件
this.readInterestOp = readInterestOp;
try {
// 指定这里使用的是非阻塞IO,即NIO
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
AbstractChannel.java
//其Pipeline对象是在这里创建的
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 生成channel的id
id = newId();
// 底层操作对象
unsafe = newUnsafe();
// 创建了channelPipeline
// 准备跟踪
pipeline = newChannelPipeline();
}
DefaultChannelPipeline.java
// 生成head节点的名称,简单类名后添加#0
private static final String HEAD_NAME = generateName0(HeadContext.class);
// 生成tail节点的名称,简单类名后添加#0
private static final String TAIL_NAME = generateName0(TailContext.class);
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
// pipeline本质上是一个双向链表
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
// 尾节点
// 准备跟踪
tail = new TailContext(this);
// 头节点
head = new HeadContext(this);
// 头节的下一个指针指向尾节点
head.next = tail;
// 尾节点的上一个指针指向头结点
tail.prev = head;
}
// A special catch-all handler that handles both bytes and messages.
// tail节点是一个inbound处理器,用于释放资源
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
// 第二个参数为当前节点所绑定的eventLoop,当前为null
// 第三个参数为当前节点的名称
// 准备跟踪
super(pipeline, null, TAIL_NAME, TailContext.class);
// 修改节点状态为ADD_COMPLETE
setAddComplete();
}
}
AbstractChannelHandContext.java
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
// 当前节点所绑定的eventLoop
this.executor = executor;
// 初始化执行标记,通过该变量可以判断出当前处理器是inbound还是outbound处理器,
// 可以判断出哪些方法应该跳过,哪些方法应该执行
// 准备跟踪
this.executionMask = mask(handlerClass);
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
ChannelHeadnlerMask.java
static int mask(Class<? extends ChannelHandler> clazz) {
// Try to obtain the mask from the cache first. If this fails calculate it and put it in the cache for fast
// lookup in the future.
Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
Integer mask = cache.get(clazz);
if (mask == null) {
// 计算mask的值
mask = mask0(clazz);
cache.put(clazz, mask);
}
return mask;
}
/**
* Calculate the {@code executionMask}.
*/
private static int mask0(Class<? extends ChannelHandler> handlerType) {
// 初始化mask
int mask = MASK_EXCEPTION_CAUGHT;
try {
// 处理handlerType为inbound处理器的情况
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;
// 判断channelRegistered()方法是否应该跳过执行,即不执行
// 若channelRegistered()上出现了@Skip注解,则应该跳过
// 像我们自定义的Handler处理器里面继承的ChannelInboundHandlerAdapter
// 类里面的channelRead(),exceptionCaught等等它就不会跳过,
// 因为我们实现的方法上面没有@Skip注解
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_REGISTERED;
}
if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_UNREGISTERED;
}
if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_ACTIVE;
}
if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_INACTIVE;
}
if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_CHANNEL_READ;
}
if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_READ_COMPLETE;
}
if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
}
if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_USER_EVENT_TRIGGERED;
}
}
// 处理handlerType为outbound处理器的情况
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_OUTBOUND;
if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_BIND;
}
if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_CONNECT;
}
if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DISCONNECT;
}
if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_CLOSE;
}
if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DEREGISTER;
}
if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
mask &= ~MASK_READ;
}
if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
Object.class, ChannelPromise.class)) {
mask &= ~MASK_WRITE;
}
if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
mask &= ~MASK_FLUSH;
}
}
if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
mask &= ~MASK_EXCEPTION_CAUGHT;
}
} catch (Exception e) {
// Should never reach here.
PlatformDependent.throwException(e);
}
return mask;
}
// 判断方法上是否有@Skip注解
@SuppressWarnings("rawtypes")
private static boolean isSkippable(
final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws Exception {
return handlerType.getMethod(methodName, paramTypes).isAnnotationPresent(Skip.class);
}
});
}
回过头来跟踪头节点
DefaultChannelPipeline.java
// head节点既是一个inbound处理器,又是一个outbound处理器
// 作为inbound处理器,其用于将消息向下一个节点传递
// 作为outbound处理器,其用于完成直接的底层操作
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
// 获取底层操作对象unsafe
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
}
像Pipeline中添加处理器
DefaultChannelPipeline.java
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
// 第一个参数是一个EventLoopGroup
// 第二个参数是一个数组
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
// 逐个添加处理器
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
// 添加一个处理器到pipeline
// 第二个参数为name,
addLast(executor, null, h);
}
return this;
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 检测当前handler是否被重复添加
// 1:准备跟踪
checkMultiplicity(handler);
// 将处理器封装为一个节点
// filterName() 获取节点名称
// 2:准备跟踪
newCtx = newContext(group, filterName(name, handler), handler);
// 将新的节点添加到pipeline
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
// 处理channel没有注册的情况
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
// 获取当前处理器节点的eventLoop
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
// 触发当前处理器的handlerAdded()方法
// 3:准备跟踪
callHandlerAdded0(newCtx);
return this;
}
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
// 若当前处理器是非共享处理器,且已经被添加过了,则抛出异常
// 因为非共享处理器只能被添加一次
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
// 修改添加状态变量
h.added = true;
}
}
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
// childExecutor() 获取当前节点所绑定的eventLoop
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
// 调用当前节点处理器的handlerAdded()
ctx.callHandlerAdded();
} catch (Throwable t) {
boolean removed = false;
try {
remove0(ctx);
ctx.callHandlerRemoved();
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}
if (removed) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; removed.", t));
} else {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; also failed to remove.", t));
}
}
}
处理器在Pipeline中被删除
ChannelInitializer.java
// 其为一个可共享的处理器,即该处理器实例可以被添加到多个pipeline,可以被多次添加到pipeline
@Sharable
// 其为一个inbound处理器
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);
// We use a Set as a ChannelInitializer is usually shared between all Channels in a Bootstrap /
// ServerBootstrap. This way we can reduce the memory usage compared to use Attributes.
// 该集合中存放的是当前处理器实例所封装的处理器节点
private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(
new ConcurrentHashMap<ChannelHandlerContext, Boolean>());
// 当, 当前处理器被添加到pipeline后,该方法就会被调用
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) { // 若当前channel已经注册完毕了,则...
if (initChannel(ctx)) { // 调用本地的initChannel()
removeState(ctx); // 将当前处理器所封装的节点从initMap中删除
}
}
}
/**
* @param ctx 当前处理器所封装为的节点
* @return
* @throws Exception
*/
@SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
// 若ctx节点添加到initMap成功,则...
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
// 调用重写的initChannel(),该方法执行完毕,则其历史使命完成,就可以做收尾工作了
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
// 收尾工作:将当前节点从当前pipeline上删除
ChannelPipeline pipeline = ctx.pipeline();
// 从pipeline中首先对当前处理器进行查询,若存在,则将该节点从pipeline中删除
if (pipeline.context(this) != null) {
// 准备跟踪
pipeline.remove(this);
}
}
return true;
}
return false;
}
}
DefaultChannelPipeline.java
@Override
public final ChannelPipeline remove(ChannelHandler handler) {
// 跟踪getContextOrDie()方法
remove(getContextOrDie(handler));
return this;
}
// 不成功便成仁
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
// 查询当前处理器所封装的节点
// 准备跟踪context()方法
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
// 若没有找到,则直接抛出异常,否则返回找到的节点
if (ctx == null) {
throw new NoSuchElementException(handler.getClass().getName());
} else {
return ctx;
}
}
@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
// 从头节点的下一个节点开始
AbstractChannelHandlerContext ctx = head.next;
// 遍历整个pipeline,要么找到了返回,要么返回null
for (;;) {
if (ctx == null) {
return null;
}
if (ctx.handler() == handler) {
return ctx;
}
// 获取下一个节点
ctx = ctx.next;
}
}
然后回过头来再跟踪一下ChannelInitializer.java中的removeState()
private void removeState(final ChannelHandlerContext ctx) {
// The removal may happen in an async fashion if the EventExecutor we use does something funky.
// 若当前节点的状态为 删除完毕,则将当前节点从initMap中删除
if (ctx.isRemoved()) {
initMap.remove(ctx);
} else {
// The context is not removed yet which is most likely the case because a custom EventExecutor is used.
// Let's schedule it on the EventExecutor to give it some more time to be completed in case it is offloaded.
ctx.executor().execute(new Runnable() {
@Override
public void run() {
initMap.remove(ctx);
}
});
}
}