数据缓冲区和编解码器

Java NIO 提供了 ByteBuffer,但许多库在其之上构建了自己的字节缓冲区 API,尤其是在网络操作中,重用缓冲区和/或使用直接缓冲区有利于提高性能。例如,Netty 具有 ByteBuf 层次结构,Undertow 使用 XNIO,Jetty 使用带回调的池化字节缓冲区以进行释放,等等。spring-core 模块提供了一组抽象来处理各种字节缓冲区 API,如下所示

DataBufferFactory

DataBufferFactory 用于通过以下两种方式之一创建数据缓冲区

  1. 分配一个新的数据缓冲区,可以选择性地预先指定容量(如果已知),即使 DataBuffer 的实现可以根据需要增长和缩小,这也会更有效率。

  2. 包装现有的 byte[]java.nio.ByteBuffer,它使用 DataBuffer 实现装饰给定的数据,并且不涉及分配。

请注意,WebFlux 应用程序不会直接创建 DataBufferFactory,而是通过客户端上的 ServerHttpResponseClientHttpRequest 访问它。工厂的类型取决于底层客户端或服务器,例如,Reactor Netty 使用 NettyDataBufferFactory,其他情况使用 DefaultDataBufferFactory

DataBuffer

DataBuffer 接口提供了与 java.nio.ByteBuffer 类似的操作,但也带来了一些额外的优势,其中一些受到 Netty ByteBuf 的启发。以下是部分优势列表

  • 使用独立的位置进行读写,即不需要调用 flip() 来在读写之间切换。

  • 容量根据需要扩展,就像 java.lang.StringBuilder 一样。

  • 通过 PooledDataBuffer 进行池化缓冲区和引用计数。

  • 将缓冲区视为 java.nio.ByteBufferInputStreamOutputStream

  • 确定给定字节的索引或最后一个索引。

PooledDataBuffer

ByteBuffer 的 Javadoc 中所述,字节缓冲区可以是直接的或非直接的。直接缓冲区可能驻留在 Java 堆之外,从而无需为本机 I/O 操作复制数据。这使得直接缓冲区在通过套接字接收和发送数据时特别有用,但它们创建和释放的成本也更高,这导致了池化缓冲区的概念。

PooledDataBufferDataBuffer 的扩展,有助于引用计数,这对于字节缓冲区池化至关重要。它是如何工作的?当分配 PooledDataBuffer 时,引用计数为 1。对 retain() 的调用会增加计数,而对 release() 的调用会减少计数。只要计数大于 0,就可以保证缓冲区不会被释放。当计数减少到 0 时,可以释放池化缓冲区,在实践中,这可能意味着为缓冲区保留的内存将返回到内存池。

请注意,在大多数情况下,最好使用 DataBufferUtils 中的便捷方法来操作 PooledDataBuffer,而不是直接操作 PooledDataBuffer,这些方法仅在 DataBufferPooledDataBuffer 的实例时才对它应用释放或保留。

DataBufferUtils

DataBufferUtils 提供了许多用于操作数据缓冲区的实用方法

  • 将数据缓冲区的流连接到单个缓冲区,可能无需复制,例如,通过复合缓冲区(如果底层字节缓冲区 API 支持)。

  • InputStream 或 NIO Channel 转换为 Flux<DataBuffer>,反之亦然,将 Publisher<DataBuffer> 转换为 OutputStream 或 NIO Channel

  • 如果缓冲区是 PooledDataBuffer 的实例,则释放或保留 DataBuffer 的方法。

  • 跳过或获取字节流,直到达到特定的字节数。

编解码器

org.springframework.core.codec 包提供了以下策略接口

  • EncoderPublisher<T> 编码为数据缓冲区的流。

  • DecoderPublisher<DataBuffer> 解码为更高级别对象的流。

spring-core 模块提供了 byte[]ByteBufferDataBufferResourceString 编解码器实现。spring-web 模块添加了 Jackson JSON、Jackson Smile、JAXB2、Protocol Buffers 等编解码器。请参阅 WebFlux 部分中的 编解码器

使用 DataBuffer

在使用数据缓冲区时,必须特别注意确保释放缓冲区,因为它们可能是 池化的。我们将使用编解码器来说明它是如何工作的,但这些概念更普遍适用。让我们看看编解码器在内部必须执行什么操作来管理数据缓冲区。

Decoder 是最后一个读取输入数据缓冲区的,在创建更高级别的对象之前,因此它必须按如下方式释放它们

  1. 如果 Decoder 只是读取每个输入缓冲区并准备立即释放它,它可以通过 DataBufferUtils.release(dataBuffer) 来做到这一点。

  2. 如果 Decoder 使用 FluxMono 运算符(如 flatMapreduce 等)在内部预取和缓存数据项,或者使用运算符(如 filterskip 等)省略项,则必须将 doOnDiscard(DataBuffer.class, DataBufferUtils::release) 添加到组合链中,以确保在丢弃此类缓冲区之前释放它们,也可能是在错误或取消信号的结果。

  3. 如果 Decoder 以任何其他方式保留一个或多个数据缓冲区,它必须确保在完全读取时释放它们,或者在错误或取消信号发生在缓存的数据缓冲区被读取和释放之前的情况下释放它们。

请注意,DataBufferUtils#join 提供了一种安全有效的方法,可以将数据缓冲区流聚合到单个数据缓冲区中。同样,skipUntilByteCounttakeUntilByteCount 是解码器可以使用的一些其他安全方法。

一个 Encoder 会分配数据缓冲区,供其他组件读取(并释放)。因此,Encoder 本身的工作量并不多。但是,Encoder 必须注意,如果在使用数据填充缓冲区时发生序列化错误,则必须释放数据缓冲区。例如:

  • Java

  • Kotlin

DataBuffer buffer = factory.allocateBuffer();
boolean release = true;
try {
	// serialize and populate buffer..
	release = false;
}
finally {
	if (release) {
		DataBufferUtils.release(buffer);
	}
}
return buffer;
val buffer = factory.allocateBuffer()
var release = true
try {
	// serialize and populate buffer..
	release = false
} finally {
	if (release) {
		DataBufferUtils.release(buffer)
	}
}
return buffer

Encoder 的使用者负责释放它接收到的数据缓冲区。在 WebFlux 应用程序中,Encoder 的输出用于写入 HTTP 服务器响应或客户端 HTTP 请求,在这种情况下,释放数据缓冲区的责任在于写入服务器响应或客户端请求的代码。

请注意,在 Netty 上运行时,有一些用于排查缓冲区泄漏的调试选项。