Java NIO 提供了 ByteBuffer,但许多库在上面建立了自己的字节缓冲区 API,特别是对于重复使用缓冲区和/或使用直接缓冲区对性能有利的网络操作。例如,Netty 有 ByteBuf 层次结构,Undertow 使用 XNIO,Jetty 使用带有回调的池化字节缓冲区来释放,等等。spring-core 模块提供了一组抽象,以便与各种字节缓冲区 API 协同工作,如下所示:
- DataBufferFactory: 抽象了数据缓冲区的创建。
- DataBuffer: 代表一个字节的缓冲区,它可以被 汇集(pooled) 起来。
- DataBufferUtils:提供了数据缓冲区的实用方法。
- Codecs:将数据缓冲流解码或编码为更高层次的对象。
:::tips 数据缓冲区的抽象就稍微有点底层的东西了,一般不写协议的解析之类的,很少会用到 :::
DataBufferFactory
DataBufferFactory 被用来以两种方式之一创建数据缓冲区:
- 分配一个新的数据缓冲区,如果知道的话,可以选择预先指定容量,这更有效率,即使 DataBuffer 的实现可以按需增长和收缩。
- 包裹一个现有的
byte[]
或java.nio.ByteBuffer
,它用 DataBuffer 的实现来装饰给定的数据,并且不涉及分配。
请注意,WebFlux 应用程序不会直接创建 DataBufferFactory,而是通过客户端的 ServerHttpResponse 或 ClientHttpRequest 访问它。工厂的类型取决于底层的客户端或服务器,例如,Reactor Netty 的 NettyDataBufferFactory,其他的 DefaultDataBufferFactory。
DataBuffer
DataBuffer 接口提供了与 java.nio.ByteBuffer 类似的操作,但也带来了一些额外的好处,其中一些是受到 Netty ByteBuf 的启发。下面是部分优点的列表:
- 具有独立位置的读和写,即不需要调用
flip()
来交替进行读和写。 - 像
java.lang.StringBuilder
那样按需扩展容量。 - 通过
PooledDataBuffer
实现池化缓冲区和引用计数。 - 以
java.nio.ByteBuffer
、InputStream
或OutputStream
查看缓冲区。 - 确定一个给定字节的索引,或者最后的索引。
PooledDataBuffer / 池化数据缓冲区
正如 ByteBuffer 的 Javadoc 中所解释的,字节缓冲区可以是直接的或非直接的。直接缓冲区可以驻留在 Java 堆之外,这样就不需要为本地 I/O 操作进行复制了。这使得直接缓冲区在通过套接字接收和发送数据时特别有用,但它们在创建和释放时也比较昂贵,这导致了缓冲区池化的想法。
PooledDataBuffer 是 DataBuffer 的一个扩展,它有助于参考计数,这对字节缓冲区池化至关重要。它是如何工作的?当 PooledDataBuffer 被分配时,引用计数为 1。对 retain()
的调用会增加计数,而对 release()
的调用会减少计数。只要计数在 0 以上,缓冲区就保证不会被释放。当计数减少到 0 时,池中的缓冲区可以被释放,这在实践中可能意味着为缓冲区保留的内存被返回到内存池中。
注意,与其直接对 PooledDataBuffer 进行操作,在大多数情况下,最好使用 DataBufferUtils 中的便利方法,只有当 DataBuffer 是 PooledDataBuffer 的实例时,才对它应用释放或保留。
DataBufferUtils
DataBufferUtils 提供了许多实用的方法来对数据缓冲区进行操作:
- 如果底层字节缓冲区 API 支持的话,可以通过复合缓冲区等方式,将数据缓冲区流加入一个零拷贝的缓冲区。
- 将 InputStream 或 NIO Channel 变成
Flux<DataBuffer>
,反之,将Publisher<DataBuffer>
变成OutputStream
或NIO Channel
。 - 如果缓冲区是 PooledDataBuffer 的实例,有方法释放或保留一个 DataBuffer。
- 从一个字节流中跳过或取出,直到一个特定的字节数。
Codecs
org.springframework.core.codec
包提供了以下策略接口:
- Encoder:将
Publisher<T>
编码为数据缓冲区 - Decoder:
Publisher<DataBuffer>
解码为更高层次的对象流
spring-core 模块提供了 byte[]
、ByteBuffer、DataBuffer、Resource 和 String 编码和解码器的实现。spring-web 模块增加了 Jackson JSON、Jackson Smile、JAXB2、Protocol Buffers 和其他编码和解码器。参见 WebFlux 部分的编解码器。
使用 DataBuffer
在处理数据缓冲区时,必须特别注意确保缓冲区被释放,因为它们可能被池化了。我们将使用编解码器来说明它是如何工作的,但这些概念更普遍地适用。让我们看看编解码器在内部必须做什么来管理数据缓冲区。
解码器(Decoder)是最后一个读取输入数据缓冲区的,在创建更高层次的对象之前,因此它必须按以下方式释放它们:
- 如果一个 Decoder 只是简单地读取每个输入缓冲区并准备立即释放它,它可以通过
DataBufferUtils.release(dataBuffer)
这样做。 - 如果一个 Decoder 正在使用 Flux 或 Mono 操作符,如 flatMap、reduce 和其他内部预取和缓存数据项的操作符,或者正在使用过滤、跳过和其他遗漏项的操作符,那么
doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
必须被添加到组成链中,以确保这些缓冲区在被丢弃前被释放,也可能是由于错误或取消信号的结果。 - 如果一个 Decoder 以任何其他方式保留一个或多个数据缓冲区,它必须确保它们在完全读取时被释放,或者在错误或取消信号发生时,在缓存的数据缓冲区被读取和释放之前被释放。
请注意,DataBufferUtils#join
提供了一个安全有效的方法,将数据缓冲区流聚集到一个单一的数据缓冲区。同样,skipUntilByteCount 和 takeUntilByteCount 也是供解码器使用的额外安全方法。
Encoder 分配数据缓冲区,其他人必须读取(和释放)。所以编码器没有什么可做的。然而,如果在用数据填充缓冲区时发生序列化错误,编码器必须注意释放数据缓冲区。比如说:
DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
// serialize and populate buffer..
release = false;
}
finally {
if (release) {
DataBufferUtils.release(buffer);
}
}
return buffer;
编码器的消费者负责释放它收到的数据缓冲区。在 WebFlux 应用程序中,编码器的输出被用来写入 HTTP 服务器响应或客户端的 HTTP 请求,在这种情况下,释放数据缓冲区是写入服务器响应或客户端请求的代码的责任。
请注意,在 Netty 上运行时,有调试选项用于排除缓冲区泄漏的故障。