阅读之前

在此之前,我们已经了解了 Dubbo 的服务暴露和引用的核心流程,接下来我们看看 Dubbo 3.0 的 Triple 协议,看看相比于 Dubbo2,它做了哪些改进。

强烈建议读者在阅读此文之前,学会 wireshark 的基础使用,通过抓包工具辅助分析,能让过程事半功倍。

为什么会有 Triple 协议

阅读官方文档后,发现当初设计 Triple 协议的时候,考虑了这些关键因素:

特性 实现 备注
通用性

统一的二进制格式,跨语言、跨平台、多传输层协议支持

- 引入了 GRPC
- 跨语言:不同语言之间可以通过 IDL 语言描述接口和 POJO 定义,
GRPC 是 CNCF 顶级项目,已经成为云原生的事实标准
穿透性

能够被各种中端设备识别和转发,如网关、代理服务器等

- 引入 HTTP/2
GRPC 本身就是构建在 HTTP/2 之上的
易用性

实现 Dubbo 2.5~2.7 的平滑升级

- 兼容使用 Java 类方式定义接口和 POJO
- 多序列化协议支持,如 Protobuf、Hessian、JSON 等
高性能

相比 Dubbo2 协议 和 GRPC 协议至少不出现性能下降

- metadata 和 payload 分离的策略
- HTTP/2 可以实现多路复用,不会出现 HOL 问题
官方的 Beanchmark 数据还需要更完善的解读

几者之间的关系如下图:
image.png
假如 Triple 的数据序列化选择 Protobuf,那么 Protobuf + HTTP/2 是不是就有 GRPC 的味道了,所以 Triple 协议是天然兼容 GRPC 的。

HTTP/2

我们先简单了解下 HTTP/2 通讯协议解决了什么问题,再去解答 Dubbo 3.0 为什么选择了 HTTP/2.

为什么不是 HTTP/1.X

HTTP/1.X 问题 HTTP/2 解决方案
HOL 请求阻塞问题 TCP 多路复用技术
HTTP 请求头冗余和过大问题,平均在 500~800 bytes 之间 标头压缩技术(HPACK compression)
可降低 85% 以上的头部负载
无法双向通讯 提供了 ServerPush 技术
网络资源使用效率低下
- Request prioritization(Stream 优先级)
- Flow control(依据接受端处理能力酌情发送,支持数据流和连接级流控制)
- 一个来源只建立一个连接(降低网络操作本身的开销,例如 TCP 断连,TLS 握手)

在解决这些问题的过程中,HTTP/2 并没有在应用层改变 HTTP 的语法定义,而是通过更改数据格式和传输方式实现特性提升,所以 HTTP/1.X 的用户可以平滑迁移,但服务端和客户端必须使用的是同代协议。

传输结构

HTTP/2 有几个关键元素需要记住:

  • 数据流(Stream): 已建立的连接内的双向字节流,可以承载一条或多条消息
  • 消息(Message): 与逻辑请求或响应消息对应的完整的一系列帧。
  • 帧(Frame): HTTP/2 通信的最小单位,每个帧都包含帧头,至少也会标识出当前帧所属的数据流。

此外,HTTP/2 所有的通讯过程都在一个 TCP 连接上完成,在一个连接上可以同时传输归属于不同 Stream 的 Frame。
image.png :::danger 一个 Stream 里面,消息是有序的吗?或者一个 Stream 代表一个完整的的 HTTP 来回吗,用完即销毁? :::

Stream 优先级

同时我们可以对 Stream 设置优先级,为了做到这一点,HTTP/2 标准允许每个数据流都有一个关联的权重和依赖关系:

  • 可以向每个数据流分配一个介于 1 至 256 之间的整数。
  • 每个数据流与其他数据流之间可以存在显式依赖关系(优先级高于权重)

接收端可以通过告知发送端 Stream 优先级的树形结构,帮助发送端优化(非强制) Stream 处理顺序。
image.png
我们来看一下上图中的其他几个操作示例。 从左到右依次为:

  • 数据流 A 和数据流 B 都没有指定父依赖项,依赖于隐式“根数据流”;A 的权重为 12,B 的权重为 4。因此,根据比例权重: 数据流 B 获得的资源是 A 所获资源的三分之一。
  • 数据流 D 依赖于根数据流;C 依赖于 D。 因此,D 应先于 C 获得完整资源分配。 权重不重要,因为 C 的依赖关系拥有更高的优先级。
  • 数据流 D 应先于 C 获得完整资源分配;C 应先于 A 和 B 获得完整资源分配;数据流 B 获得的资源是 A 所获资源的三分之一。
  • 数据流 D 应先于 E 和 C 获得完整资源分配;E 和 C 应先于 A 和 B 获得相同的资源分配;A 和 B 应基于其权重获得比例分配。

标头压缩

针对 HTTP/1.X 头部负载过大的问题,HTTP/2 使用的标头压缩涉及了如下两个简单而又强大的技术:

  • 针对未知字段:这种格式支持通过静态霍夫曼代码对传输的标头字段进行编码,从而减小了各个传输的大小。
  • 针对已知字段:这种格式要求客户端和服务器同时维护和更新一个包含之前见过的标头字段的索引列表(换句话说,它可以建立一个共享的压缩上下文),此列表随后会用作参考,对之前传输的值进行有效编码。针对常见的 HTTP 头,可以提前放置于静态索引表内以提高性能。

image.png

Frame 二进制格式

  1. All frames begin with a fixed 9-octet header followed by a variable-
  2. length payload.
  3. +-----------------------------------------------+
  4. | Length (24) |
  5. +---------------+---------------+---------------+
  6. | Type (8) | Flags (8) |
  7. +-+-------------+---------------+-------------------------------+
  8. |R| Stream Identifier (31) |
  9. +=+=============================================================+
  10. | Frame Payload (0...) ...
  11. +---------------------------------------------------------------+
  12. Frame Type Registry
  13. +---------------+------+--------------+
  14. | Frame Type | Code | Section |
  15. +---------------+------+--------------+
  16. | DATA | 0x0 | Section 6.1 |
  17. | HEADERS | 0x1 | Section 6.2 |
  18. | PRIORITY | 0x2 | Section 6.3 |
  19. | RST_STREAM | 0x3 | Section 6.4 |
  20. | SETTINGS | 0x4 | Section 6.5 |
  21. | PUSH_PROMISE | 0x5 | Section 6.6 |
  22. | PING | 0x6 | Section 6.7 |
  23. | GOAWAY | 0x7 | Section 6.8 |
  24. | WINDOW_UPDATE | 0x8 | Section 6.9 |
  25. | CONTINUATION | 0x9 | Section 6.10 |
  26. +---------------+------+--------------+

所以 Triple 协议到底是什么?

二进制层面

分析和设计协议,必然绕不开
针对示例 Demo 我们通过 wireshark 抓包,并通过 tcp.port == 50051 设置过滤出 Triple 相关请求,但 Protocol 部分并没有给出我们更加可读的信息。
image.png
尝试 右键 -> Follow -> TCP Stream -> Show data as Hex Dump
通过 HTTP/2_CONNECTION_PREFACE 我们可以得知这是个基于 TCP 的 HTTP/2 协议,当然如果你对协议的二进制帧异常熟悉,也大概可以看出各个 FRAME 代表什么,但这分析效率还是过于低下。
image.png
针对这种非标端口的 HTTP/2 协议连接,我们其实是可以通过指定解码方式来告诉 wireshark:请使用 HTTP/2 来解析这段二进制。在包列表页面(第一张图), 选择 右键 -> Decode as,配置如下:
image.png
点击 OK 之后,Triple 协议已无密码可言,wireshark 已经很明确的告诉我们:
在二进制层面,Triple 就是 DataFrame 部分序列化方式可选的 HTTP/2 协议
所以你了解了 HTTP/2 几乎就等于了解了 Triple 协议,相比于 Dubbo2 的私有化二进制协议,个人还是很欣赏 Dubbo 3.0 的这个变更的,简单通用可理解,是架构之道。
image.png

应用层面

基于上面的分析,我们再来看看应用层面是如何映射的。

在发起请求的时候(Request)

其中 Header Frame 部分,用于定位远端的服务接口,包含服务接口、应用名、超时时间等信息
image.png
其中 Data Frame 部分,用于传递函数参数,函数参数支持多种序列化方式,如 JSON、Hessian、Protobuf 等。
⚠️ 注意:如果要兼容 GRPC 那么函数入参只能有一个,为一个 POJO 对象,具体原因可查看 .proto 语法约定。
image.png

请求结果返回(Response)

以请求头约定的 content-type 格式序列化返回对象后,通过 HTTP/2 Data Frame 返回给请求端,逻辑非常清晰明了。
image.png

源码验证

数据流程:
image.png

关键断点参考:

// => org.apache.dubbo.remoting.api.PortUnificationServer#doOpen
// Netty 监听端口绑定
protected void doOpen() {
    bootstrap = new ServerBootstrap();

    bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
    workerGroup = NettyEventLoopFactory.eventLoopGroup(
        getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
        "NettyServerWorker");

    bootstrap.group(bossGroup, workerGroup)
        .channel(NettyEventLoopFactory.serverSocketChannelClass())
        .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
        .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
        .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                // FIXME: should we use getTimeout()?
                int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                final ChannelPipeline p = ch.pipeline();
                //                        p.addLast(new LoggingHandler(LogLevel.DEBUG));

                final boolean enableSsl = getUrl().getParameter(SSL_ENABLED_KEY, false);
                if (enableSsl) {
                    p.addLast("negotiation-ssl", new SslServerTlsHandler(getUrl()));
                }

                final PortUnificationServerHandler puHandler = new PortUnificationServerHandler(protocols);
                p.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS));
                p.addLast("negotiation-protocol", puHandler);
                channelGroup = puHandler.getChannels();
            }
        });
    // bind
    String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
    int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
    if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
        bindIp = ANYHOST_VALUE;
    }
    InetSocketAddress bindAddress = new InetSocketAddress(bindIp, bindPort);
    ChannelFuture channelFuture = bootstrap.bind(bindAddress);
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();
}


// => org.apache.dubbo.remoting.api.PortUnificationServerHandler#decode
// 二进制协议选择,例如是选择 GRPC 还是 Triple
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    // Will use the first five bytes to detect a protocol.
    if (in.readableBytes() < 5) {
        return;
    }

    for (final WireProtocol protocol : protocols) {
        in.markReaderIndex();
        final ProtocolDetector.Result result = protocol.detector().detect(ctx, in);
        in.resetReaderIndex();
        switch (result) {
            case UNRECOGNIZED:
                continue;
            case RECOGNIZED:
                protocol.configServerPipeline(ctx.pipeline(), sslCtx);
                ctx.pipeline().remove(this);
            case NEED_MORE_DATA:
                return;
            default:
                return;
        }
    }
    // Unknown protocol; discard everything and close the connection.
    in.clear();
    ctx.close();
}


// => org.apache.dubbo.rpc.protocol.tri.TripleHttp2FrameServerHandler#channelRead
// 处理 HTTP/2 HEADER FRAME 和 DATA FRAME
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof Http2HeadersFrame) {
        onHeadersRead(ctx, (Http2HeadersFrame) msg);
    } else if (msg instanceof Http2DataFrame) {
        onDataRead(ctx, (Http2DataFrame) msg);
    } else if (msg instanceof Http2Frame) {
        // ignored
        ReferenceCountUtil.release(msg);
    } else {
        super.channelRead(ctx, msg);
    }
}

参考资料