Pipeline的创建
NioServerSocketChannel.java
public static void main(String[] args) throws InterruptedException {// parentGroup中的eventLoop将来是用于与处理客户端连接请求的parentChannel进行绑定的EventLoopGroup parentGroup = new NioEventLoopGroup();// childGroup中的eventLoop将来是用于与处理客户端读写请求的childChannel进行绑定的EventLoopGroup childGroup = new NioEventLoopGroup();final EventLoopGroup someGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(parentGroup, childGroup)//从这里开始跟踪,跟踪NioServerSocketChannel.class.channel(NioServerSocketChannel.class)//这两个方法都是用于设置Channel属性的//attr的方法一般是用于parentGroup中,一般会在当客户端与服务端连接建立的时候触发调用.attr(AttributeKey.valueOf("depart"), "市场部")//chaild的方法一般用于childGroup中,一般用于客户端往服务端发送消息的时候触发调用.childAttr(AttributeKey.valueOf("addr"), "北京海淀").childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new SomeSocketServerHandler());Object depart = ch.attr(AttributeKey.valueOf("depart")).get();Object addr = ch.attr(AttributeKey.valueOf("addr")).get();System.out.println("depart = " + depart);System.out.println("addr = " + addr);}});ChannelFuture future = bootstrap.bind(8888).sync();System.out.println("服务器已启动。。。");future.channel().closeFuture().sync();} finally {parentGroup.shutdownGracefully();childGroup.shutdownGracefully();}}
NioServerSocketChannel.java
public NioServerSocketChannel(ServerSocketChannel channel) {// 第一个参数:父channel// 第二个参数:Nio原生channel// 第三个参数:指定其关注的事件为 接收客户端连接// 跟踪supersuper(null, channel, SelectionKey.OP_ACCEPT);// config用于对当前channel进行配置的config = new NioServerSocketChannelConfig(this, javaChannel().socket());}
AbstractNioMessageChannel.java
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {// 跟踪supersuper(parent, ch, readInterestOp);}
AbstractNioChannel.java
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {//跟踪supersuper(parent);// NIO原生channelthis.ch = ch;// 初始化其关注事件this.readInterestOp = readInterestOp;try {// 指定这里使用的是非阻塞IO,即NIOch.configureBlocking(false);} catch (IOException e) {try {ch.close();} catch (IOException e2) {if (logger.isWarnEnabled()) {logger.warn("Failed to close a partially initialized socket.", e2);}}throw new ChannelException("Failed to enter non-blocking mode.", e);}}
AbstractChannel.java
//其Pipeline对象是在这里创建的protected AbstractChannel(Channel parent) {this.parent = parent;// 生成channel的idid = newId();// 底层操作对象unsafe = newUnsafe();// 创建了channelPipeline// 准备跟踪pipeline = newChannelPipeline();}
DefaultChannelPipeline.java
// 生成head节点的名称,简单类名后添加#0private static final String HEAD_NAME = generateName0(HeadContext.class);// 生成tail节点的名称,简单类名后添加#0private static final String TAIL_NAME = generateName0(TailContext.class);protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);}// pipeline本质上是一个双向链表protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise = new VoidChannelPromise(channel, true);// 尾节点// 准备跟踪tail = new TailContext(this);// 头节点head = new HeadContext(this);// 头节的下一个指针指向尾节点head.next = tail;// 尾节点的上一个指针指向头结点tail.prev = head;}// A special catch-all handler that handles both bytes and messages.// tail节点是一个inbound处理器,用于释放资源final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {TailContext(DefaultChannelPipeline pipeline) {// 第二个参数为当前节点所绑定的eventLoop,当前为null// 第三个参数为当前节点的名称// 准备跟踪super(pipeline, null, TAIL_NAME, TailContext.class);// 修改节点状态为ADD_COMPLETEsetAddComplete();}}
AbstractChannelHandContext.java
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,String name, Class<? extends ChannelHandler> handlerClass) {this.name = ObjectUtil.checkNotNull(name, "name");this.pipeline = pipeline;// 当前节点所绑定的eventLoopthis.executor = executor;// 初始化执行标记,通过该变量可以判断出当前处理器是inbound还是outbound处理器,// 可以判断出哪些方法应该跳过,哪些方法应该执行// 准备跟踪this.executionMask = mask(handlerClass);// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.ordered = executor == null || executor instanceof OrderedEventExecutor;}
ChannelHeadnlerMask.java
static int mask(Class<? extends ChannelHandler> clazz) {// Try to obtain the mask from the cache first. If this fails calculate it and put it in the cache for fast// lookup in the future.Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();Integer mask = cache.get(clazz);if (mask == null) {// 计算mask的值mask = mask0(clazz);cache.put(clazz, mask);}return mask;}/*** Calculate the {@code executionMask}.*/private static int mask0(Class<? extends ChannelHandler> handlerType) {// 初始化maskint mask = MASK_EXCEPTION_CAUGHT;try {// 处理handlerType为inbound处理器的情况if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {mask |= MASK_ALL_INBOUND;// 判断channelRegistered()方法是否应该跳过执行,即不执行// 若channelRegistered()上出现了@Skip注解,则应该跳过// 像我们自定义的Handler处理器里面继承的ChannelInboundHandlerAdapter// 类里面的channelRead(),exceptionCaught等等它就不会跳过,// 因为我们实现的方法上面没有@Skip注解if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_REGISTERED;}if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_UNREGISTERED;}if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_ACTIVE;}if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_INACTIVE;}if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {mask &= ~MASK_CHANNEL_READ;}if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_READ_COMPLETE;}if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;}if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {mask &= ~MASK_USER_EVENT_TRIGGERED;}}// 处理handlerType为outbound处理器的情况if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {mask |= MASK_ALL_OUTBOUND;if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,SocketAddress.class, ChannelPromise.class)) {mask &= ~MASK_BIND;}if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,SocketAddress.class, ChannelPromise.class)) {mask &= ~MASK_CONNECT;}if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {mask &= ~MASK_DISCONNECT;}if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {mask &= ~MASK_CLOSE;}if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {mask &= ~MASK_DEREGISTER;}if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {mask &= ~MASK_READ;}if (isSkippable(handlerType, "write", ChannelHandlerContext.class,Object.class, ChannelPromise.class)) {mask &= ~MASK_WRITE;}if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {mask &= ~MASK_FLUSH;}}if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {mask &= ~MASK_EXCEPTION_CAUGHT;}} catch (Exception e) {// Should never reach here.PlatformDependent.throwException(e);}return mask;}// 判断方法上是否有@Skip注解@SuppressWarnings("rawtypes")private static boolean isSkippable(final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {@Overridepublic Boolean run() throws Exception {return handlerType.getMethod(methodName, paramTypes).isAnnotationPresent(Skip.class);}});}
回过头来跟踪头节点
DefaultChannelPipeline.java
// head节点既是一个inbound处理器,又是一个outbound处理器// 作为inbound处理器,其用于将消息向下一个节点传递// 作为outbound处理器,其用于完成直接的底层操作final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {private final Unsafe unsafe;HeadContext(DefaultChannelPipeline pipeline) {super(pipeline, null, HEAD_NAME, HeadContext.class);// 获取底层操作对象unsafeunsafe = pipeline.channel().unsafe();setAddComplete();}}
像Pipeline中添加处理器
DefaultChannelPipeline.java
@Overridepublic final ChannelPipeline addLast(ChannelHandler... handlers) {// 第一个参数是一个EventLoopGroup// 第二个参数是一个数组return addLast(null, handlers);}@Overridepublic final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {if (handlers == null) {throw new NullPointerException("handlers");}// 逐个添加处理器for (ChannelHandler h: handlers) {if (h == null) {break;}// 添加一个处理器到pipeline// 第二个参数为name,addLast(executor, null, h);}return this;}@Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {// 检测当前handler是否被重复添加// 1:准备跟踪checkMultiplicity(handler);// 将处理器封装为一个节点// filterName() 获取节点名称// 2:准备跟踪newCtx = newContext(group, filterName(name, handler), handler);// 将新的节点添加到pipelineaddLast0(newCtx);// If the registered is false it means that the channel was not registered on an eventLoop yet.// In this case we add the context to the pipeline and add a task that will call// ChannelHandler.handlerAdded(...) once the channel is registered.// 处理channel没有注册的情况if (!registered) {newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;}// 获取当前处理器节点的eventLoopEventExecutor executor = newCtx.executor();if (!executor.inEventLoop()) {callHandlerAddedInEventLoop(newCtx, executor);return this;}}// 触发当前处理器的handlerAdded()方法// 3:准备跟踪callHandlerAdded0(newCtx);return this;}private static void checkMultiplicity(ChannelHandler handler) {if (handler instanceof ChannelHandlerAdapter) {ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;// 若当前处理器是非共享处理器,且已经被添加过了,则抛出异常// 因为非共享处理器只能被添加一次if (!h.isSharable() && h.added) {throw new ChannelPipelineException(h.getClass().getName() +" is not a @Sharable handler, so can't be added or removed multiple times.");}// 修改添加状态变量h.added = true;}}private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {// childExecutor() 获取当前节点所绑定的eventLoopreturn new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);}private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {try {// 调用当前节点处理器的handlerAdded()ctx.callHandlerAdded();} catch (Throwable t) {boolean removed = false;try {remove0(ctx);ctx.callHandlerRemoved();removed = true;} catch (Throwable t2) {if (logger.isWarnEnabled()) {logger.warn("Failed to remove a handler: " + ctx.name(), t2);}}if (removed) {fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() +".handlerAdded() has thrown an exception; removed.", t));} else {fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() +".handlerAdded() has thrown an exception; also failed to remove.", t));}}}
处理器在Pipeline中被删除
ChannelInitializer.java
// 其为一个可共享的处理器,即该处理器实例可以被添加到多个pipeline,可以被多次添加到pipeline@Sharable// 其为一个inbound处理器public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);// We use a Set as a ChannelInitializer is usually shared between all Channels in a Bootstrap /// ServerBootstrap. This way we can reduce the memory usage compared to use Attributes.// 该集合中存放的是当前处理器实例所封装的处理器节点private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(new ConcurrentHashMap<ChannelHandlerContext, Boolean>());// 当, 当前处理器被添加到pipeline后,该方法就会被调用@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isRegistered()) { // 若当前channel已经注册完毕了,则...if (initChannel(ctx)) { // 调用本地的initChannel()removeState(ctx); // 将当前处理器所封装的节点从initMap中删除}}}/*** @param ctx 当前处理器所封装为的节点* @return* @throws Exception*/@SuppressWarnings("unchecked")private boolean initChannel(ChannelHandlerContext ctx) throws Exception {// 若ctx节点添加到initMap成功,则...if (initMap.add(ctx)) { // Guard against re-entrance.try {// 调用重写的initChannel(),该方法执行完毕,则其历史使命完成,就可以做收尾工作了initChannel((C) ctx.channel());} catch (Throwable cause) {// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).// We do so to prevent multiple calls to initChannel(...).exceptionCaught(ctx, cause);} finally {// 收尾工作:将当前节点从当前pipeline上删除ChannelPipeline pipeline = ctx.pipeline();// 从pipeline中首先对当前处理器进行查询,若存在,则将该节点从pipeline中删除if (pipeline.context(this) != null) {// 准备跟踪pipeline.remove(this);}}return true;}return false;}}
DefaultChannelPipeline.java
@Overridepublic final ChannelPipeline remove(ChannelHandler handler) {// 跟踪getContextOrDie()方法remove(getContextOrDie(handler));return this;}// 不成功便成仁private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {// 查询当前处理器所封装的节点// 准备跟踪context()方法AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);// 若没有找到,则直接抛出异常,否则返回找到的节点if (ctx == null) {throw new NoSuchElementException(handler.getClass().getName());} else {return ctx;}}@Overridepublic final ChannelHandlerContext context(ChannelHandler handler) {if (handler == null) {throw new NullPointerException("handler");}// 从头节点的下一个节点开始AbstractChannelHandlerContext ctx = head.next;// 遍历整个pipeline,要么找到了返回,要么返回nullfor (;;) {if (ctx == null) {return null;}if (ctx.handler() == handler) {return ctx;}// 获取下一个节点ctx = ctx.next;}}
然后回过头来再跟踪一下ChannelInitializer.java中的removeState()
private void removeState(final ChannelHandlerContext ctx) {// The removal may happen in an async fashion if the EventExecutor we use does something funky.// 若当前节点的状态为 删除完毕,则将当前节点从initMap中删除if (ctx.isRemoved()) {initMap.remove(ctx);} else {// The context is not removed yet which is most likely the case because a custom EventExecutor is used.// Let's schedule it on the EventExecutor to give it some more time to be completed in case it is offloaded.ctx.executor().execute(new Runnable() {@Overridepublic void run() {initMap.remove(ctx);}});}}
