1、当检测到有 SelectionKey.OP_READ 事件

  1. if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
  2. // ★★★ 对于服务端:unsafe = NioMessageUnsafe
  3. // ★★★ 对于客户端:unsafe = NioByteUnsafe
  4. unsafe.read();
  5. }

2、调用 AbstractNioByteChannel 的 read 方法

  1. public final void read() {
  2. final ChannelConfig config = config();
  3. if (shouldBreakReadReady(config)) {
  4. clearReadPending();
  5. return;
  6. }
  7. final ChannelPipeline pipeline = pipeline();
  8. // 创建 ByteBuf 分配器
  9. final ByteBufAllocator allocator = config.getAllocator();
  10. final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
  11. allocHandle.reset(config);
  12. ByteBuf byteBuf = null;
  13. boolean close = false;
  14. try {
  15. do {
  16. // 分配一个 byteBuf
  17. byteBuf = allocHandle.allocate(allocator);
  18. // ★ 将数据读取分配到 byteBuf 中
  19. allocHandle.lastBytesRead(doReadBytes(byteBuf));
  20. if (allocHandle.lastBytesRead() <= 0) {
  21. // nothing was read. release the buffer.
  22. byteBuf.release();
  23. byteBuf = null;
  24. close = allocHandle.lastBytesRead() < 0;
  25. if (close) {
  26. // There is nothing left to read as we received an EOF.
  27. readPending = false;
  28. }
  29. break;
  30. }
  31. allocHandle.incMessagesRead(1);
  32. readPending = false;
  33. // ★★★ pipeline 的读事件向下传播
  34. // ★★★ 此时就开始从 head 节点依次走我们自定义的 handler 的 channelRead 方法
  35. pipeline.fireChannelRead(byteBuf);
  36. byteBuf = null;
  37. } while (allocHandle.continueReading());
  38. allocHandle.readComplete();
  39. pipeline.fireChannelReadComplete();
  40. if (close) {
  41. closeOnRead(pipeline);
  42. }
  43. } catch (Throwable t) {
  44. handleReadException(pipeline, byteBuf, t, close, allocHandle);
  45. } finally {
  46. // Check if there is a readPending which was not processed yet.
  47. // This could be for two reasons:
  48. // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
  49. // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
  50. //
  51. // See https://github.com/netty/netty/issues/2254
  52. if (!readPending && !config.isAutoRead()) {
  53. removeReadOp();
  54. }
  55. }
  56. }

3、pipeline 从 headContext 开始向下传播

  1. @Override
  2. public final ChannelPipeline fireChannelRead(Object msg) {
  3. AbstractChannelHandlerContext.invokeChannelRead(head, msg);
  4. return this;
  5. }
  6. static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
  7. final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
  8. EventExecutor executor = next.executor();
  9. if (executor.inEventLoop()) {
  10. // ★★★ 执行读取
  11. next.invokeChannelRead(m);
  12. } else {
  13. executor.execute(new Runnable() {
  14. @Override
  15. public void run() {
  16. next.invokeChannelRead(m);
  17. }
  18. });
  19. }
  20. }
  21. private void invokeChannelRead(Object msg) {
  22. if (invokeHandler()) {
  23. try {
  24. ((ChannelInboundHandler) handler()).channelRead(this, msg);
  25. } catch (Throwable t) {
  26. notifyHandlerException(t);
  27. }
  28. } else {
  29. fireChannelRead(msg);
  30. }
  31. }
  32. @Override
  33. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  34. ctx.fireChannelRead(msg);
  35. }

4、传播到下一个处理器

  1. @Override
  2. public ChannelHandlerContext fireChannelRead(final Object msg) {
  3. invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
  4. return this;
  5. }

5、如:找到 FixedLengthFrameDecoder,调用父类的 channelRead 方法

  1. @Override
  2. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  3. if (msg instanceof ByteBuf) {
  4. CodecOutputList out = CodecOutputList.newInstance();
  5. try {
  6. ByteBuf data = (ByteBuf) msg;
  7. // cumulation == null 则 first = true
  8. first = cumulation == null;
  9. if (first) {
  10. // 第一次直接赋值
  11. cumulation = data;
  12. } else {
  13. // 累加
  14. cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
  15. }
  16. // ★★★ 调用 handler 的 decode 方法
  17. // out 再符合解码器要求的情况下,out != null,否则 out == null
  18. callDecode(ctx, cumulation, out);
  19. } catch (DecoderException e) {
  20. throw e;
  21. } catch (Exception e) {
  22. throw new DecoderException(e);
  23. } finally {
  24. if (cumulation != null && !cumulation.isReadable()) {
  25. numReads = 0;
  26. cumulation.release();
  27. cumulation = null;
  28. } else if (++numReads >= discardAfterReads) {
  29. // We did enough reads already try to discard some bytes so we not risk to see a OOME.
  30. // See https://github.com/netty/netty/issues/4275
  31. numReads = 0;
  32. discardSomeReadBytes();
  33. }
  34. int size = out.size();
  35. firedChannelRead |= out.insertSinceRecycled();
  36. // ★★★ 根据 out.size(); 决定是否向下传播
  37. fireChannelRead(ctx, out, size);
  38. out.recycle();
  39. }
  40. } else {
  41. ctx.fireChannelRead(msg);
  42. }
  43. }

6、执行 callDecode 方法

  1. protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
  2. try {
  3. // 判断是否还有数据可读
  4. while (in.isReadable()) {
  5. int outSize = out.size();
  6. if (outSize > 0) {
  7. fireChannelRead(ctx, out, outSize);
  8. out.clear();
  9. // Check if this handler was removed before continuing with decoding.
  10. // If it was removed, it is not safe to continue to operate on the buffer.
  11. //
  12. // See:
  13. // - https://github.com/netty/netty/issues/4635
  14. if (ctx.isRemoved()) {
  15. break;
  16. }
  17. outSize = 0;
  18. }
  19. // 获取可读的字节数
  20. int oldInputLength = in.readableBytes();
  21. // ★★★ 根据 handler 类型调用 decode 方法
  22. decodeRemovalReentryProtection(ctx, in, out);
  23. // Check if this handler was removed before continuing the loop.
  24. // If it was removed, it is not safe to continue to operate on the buffer.
  25. //
  26. // See https://github.com/netty/netty/issues/1664
  27. if (ctx.isRemoved()) {
  28. break;
  29. }
  30. // outSize = 0
  31. // out.size() 在调用完 decodeRemovalReentryProtection 的 decode 方法之后,可能不为 0
  32. if (outSize == out.size()) {
  33. if (oldInputLength == in.readableBytes()) {
  34. break;
  35. } else {
  36. continue;
  37. }
  38. }
  39. if (oldInputLength == in.readableBytes()) {
  40. throw new DecoderException(
  41. StringUtil.simpleClassName(getClass()) +
  42. ".decode() did not read anything but decoded a message.");
  43. }
  44. if (isSingleDecode()) {
  45. break;
  46. }
  47. }
  48. } catch (DecoderException e) {
  49. throw e;
  50. } catch (Exception cause) {
  51. throw new DecoderException(cause);
  52. }
  53. }
  54. final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
  55. throws Exception {
  56. decodeState = STATE_CALLING_CHILD_DECODE;
  57. try {
  58. // 调用子类的解码方法
  59. decode(ctx, in, out);
  60. } finally {
  61. boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
  62. decodeState = STATE_INIT;
  63. if (removePending) {
  64. fireChannelRead(ctx, out, out.size());
  65. out.clear();
  66. handlerRemoved(ctx);
  67. }
  68. }
  69. }

7、调用子类的 decode 方法,如:FixedLengthFrameDecoder 的 decode

  1. @Override
  2. protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
  3. // 如果符合固定长度要求,则 decoded 不为空,并写入 out
  4. Object decoded = decode(ctx, in);
  5. if (decoded != null) {
  6. out.add(decoded);
  7. }
  8. }
  9. protected Object decode(
  10. @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
  11. // 如果可读的字节数 < 指定的字节数
  12. if (in.readableBytes() < frameLength) {
  13. return null;
  14. }
  15. // 如果可读的字节数 >= 指定的字节数
  16. else {
  17. // ★★★ in.readRetainedSlice(frameLength); 返回一个新的 ByteBuf,大小为 frameLength 个长度
  18. // 会移动读指针
  19. return in.readRetainedSlice(frameLength);
  20. }
  21. }