1、当检测到有 SelectionKey.OP_READ 事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // ★★★ 对于服务端:unsafe = NioMessageUnsafe // ★★★ 对于客户端:unsafe = NioByteUnsafe unsafe.read();}
2、调用 AbstractNioByteChannel 的 read 方法
public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); // 创建 ByteBuf 分配器 final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { // 分配一个 byteBuf byteBuf = allocHandle.allocate(allocator); // ★ 将数据读取分配到 byteBuf 中 allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { // There is nothing left to read as we received an EOF. readPending = false; } break; } allocHandle.incMessagesRead(1); readPending = false; // ★★★ pipeline 的读事件向下传播 // ★★★ 此时就开始从 head 节点依次走我们自定义的 handler 的 channelRead 方法 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } }}
3、pipeline 从 headContext 开始向下传播
@Overridepublic final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this;}static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { // ★★★ 执行读取 next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); }}private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); }}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.fireChannelRead(msg);}
4、传播到下一个处理器
@Overridepublic ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg); return this;}
5、如:找到 FixedLengthFrameDecoder,调用父类的 channelRead 方法
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; // cumulation == null 则 first = true first = cumulation == null; if (first) { // 第一次直接赋值 cumulation = data; } else { // 累加 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } // ★★★ 调用 handler 的 decode 方法 // out 再符合解码器要求的情况下,out != null,否则 out == null callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com/netty/netty/issues/4275 numReads = 0; discardSomeReadBytes(); } int size = out.size(); firedChannelRead |= out.insertSinceRecycled(); // ★★★ 根据 out.size(); 决定是否向下传播 fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); }}
6、执行 callDecode 方法
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { // 判断是否还有数据可读 while (in.isReadable()) { int outSize = out.size(); if (outSize > 0) { fireChannelRead(ctx, out, outSize); out.clear(); // Check if this handler was removed before continuing with decoding. // If it was removed, it is not safe to continue to operate on the buffer. // // See: // - https://github.com/netty/netty/issues/4635 if (ctx.isRemoved()) { break; } outSize = 0; } // 获取可读的字节数 int oldInputLength = in.readableBytes(); // ★★★ 根据 handler 类型调用 decode 方法 decodeRemovalReentryProtection(ctx, in, out); // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) { break; } // outSize = 0 // out.size() 在调用完 decodeRemovalReentryProtection 的 decode 方法之后,可能不为 0 if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { break; } else { continue; } } if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Exception cause) { throw new DecoderException(cause); }}final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { decodeState = STATE_CALLING_CHILD_DECODE; try { // 调用子类的解码方法 decode(ctx, in, out); } finally { boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING; decodeState = STATE_INIT; if (removePending) { fireChannelRead(ctx, out, out.size()); out.clear(); handlerRemoved(ctx); } }}
7、调用子类的 decode 方法,如:FixedLengthFrameDecoder 的 decode
@Overrideprotected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // 如果符合固定长度要求,则 decoded 不为空,并写入 out Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); }} protected Object decode( @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception { // 如果可读的字节数 < 指定的字节数 if (in.readableBytes() < frameLength) { return null; } // 如果可读的字节数 >= 指定的字节数 else { // ★★★ in.readRetainedSlice(frameLength); 返回一个新的 ByteBuf,大小为 frameLength 个长度 // 会移动读指针 return in.readRetainedSlice(frameLength); } }