记录序列化和反序列化
Kafka Streams binder 允许您通过两种方式序列化和反序列化记录。一种是 Kafka 提供的原生序列化和反序列化功能,另一种是 Spring Cloud Stream 框架的消息转换能力。让我们看看一些细节。
入站反序列化
键始终使用原生 Serdes 进行反序列化。
对于值,默认情况下,入站反序列化由 Kafka 原生执行。请注意,这是与早期版本的 Kafka Streams binder 的默认行为相比的重大变化,在早期版本中,反序列化是由框架完成的。
Kafka Streams binder 将尝试通过查看 java.util.function.Function|Consumer
的类型签名来推断匹配的 Serde
类型。这是它匹配 Serdes 的顺序。
-
如果应用提供了一个
Serde
类型的 bean,并且返回类型使用传入的键或值类型的实际类型进行了参数化,那么它将使用该Serde
进行入站反序列化。例如,如果您的应用中有以下内容,binder 会检测到KStream
的传入值类型与Serde
bean 参数化的类型匹配。它将使用该 Serde 进行入站反序列化。
@Bean
public Serde<Foo> customSerde() {
...
}
@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
-
接下来,它会查看类型,看它们是否是 Kafka Streams 公开的类型之一。如果是,则使用它们。以下是 binder 将尝试从 Kafka Streams 匹配的 Serde 类型。
Integer, Long, Short, Double, Float, byte[], UUID and String.
-
如果 Kafka Streams 提供的 Serdes 都不匹配这些类型,那么它将使用 Spring Kafka 提供的 JsonSerde。在这种情况下,binder 假定这些类型是 JSON 友好的。如果您有多个值对象作为输入,这非常有用,因为 binder 会在内部将它们推断为正确的 Java 类型。但在回退到
JsonSerde
之前,binder 会检查 Kafka Streams 配置中设置的默认Serde
,看它是否是与传入 KStream 类型匹配的Serde
。
如果上述策略都不奏效,则应用必须通过配置提供 Serde
。这可以通过两种方式配置 - 绑定级别或默认级别。
首先,binder 会查看绑定级别是否提供了 Serde
。例如,如果您有以下处理器,
@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
然后,您可以使用以下方式提供绑定级别 Serde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果您如上所述为每个输入绑定提供了 Serde ,那么这将具有更高的优先级,并且 binder 将避免进行任何 Serde 推断。 |
如果您希望将默认的键/值 Serdes 用于入站反序列化,您可以在 binder 级别进行配置。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您不想要 Kafka 提供的原生解码,可以依赖 Spring Cloud Stream 提供的消息转换功能。由于原生解码是默认设置,为了让 Spring Cloud Stream 反序列化入站值对象,您需要显式禁用原生解码。
例如,如果您使用与上述相同的 BiFunction 处理器,则设置 spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false
您需要为所有输入单独禁用原生解码。否则,对于未禁用的输入,仍将应用原生解码。
默认情况下,Spring Cloud Stream 将使用 application/json
作为内容类型,并使用相应的 json 消息转换器。您可以通过使用以下属性和相应的 MessageConverter
bean 来使用自定义消息转换器。
spring.cloud.stream.bindings.process-in-0.contentType
出站序列化
出站序列化基本上遵循与上述入站反序列化相同的规则。与入站反序列化一样,Spring Cloud Stream 早期版本的一个主要变化是出站序列化由 Kafka 原生处理。在 binder 的 3.0 版本之前,这是由框架本身完成的。
出站的键总是由 Kafka 使用 binder 推断出的匹配 Serde
进行序列化。如果无法推断键的类型,则需要使用配置进行指定。
值 serdes 使用与入站反序列化相同的规则进行推断。首先,它会匹配以查看出站类型是否来自应用中提供的 bean。如果不是,它会检查是否与 Kafka 公开的 Serde
匹配,例如 - Integer
, Long
, Short
, Double
, Float
, byte[]
, UUID
和 String
。如果这不起作用,则会回退到 Spring Kafka 项目提供的 JsonSerde
,但首先会查看默认的 Serde
配置以查看是否存在匹配项。请记住,所有这些对应用来说是透明的。如果这些都不奏效,则用户必须通过配置提供要使用的 Serde
。
假设您使用与上述相同的 BiFunction
处理器。那么您可以按如下方式配置出站键/值 Serdes。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果 Serde 推断失败,并且未提供绑定级别 Serdes,则 binder 会回退到 JsonSerde
,但会查看默认 Serdes 以查找匹配项。
默认 serdes 的配置方式与上面反序列化部分描述的相同。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您的应用使用了分支功能并且有多个输出绑定,那么这些必须按绑定进行配置。再次强调,如果 binder 能够推断 Serde
类型,您则无需进行此配置。
如果您不想要 Kafka 提供的原生编码,但希望使用框架提供的消息转换,则需要显式禁用原生编码,因为原生编码是默认设置。例如,如果您使用与上述相同的 BiFunction 处理器,则设置 spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false
在分支情况下,您需要为所有输出单独禁用原生编码。否则,对于未禁用的输出,仍将应用原生编码。
当转换由 Spring Cloud Stream 完成时,默认情况下,它将使用 application/json
作为内容类型,并使用相应的 json 消息转换器。您可以通过使用以下属性和相应的 MessageConverter
bean 来使用自定义消息转换器。
spring.cloud.stream.bindings.process-out-0.contentType
禁用原生编码/解码后,binder 不会像原生 Serdes 那样进行任何推断。应用需要显式提供所有配置选项。因此,通常建议在使用 Spring Cloud Stream Kafka Streams 应用时,坚持使用默认的序列化/反序列化选项,并采用 Kafka Streams 提供的原生序列化/反序列化。唯一必须使用框架提供的消息转换能力的情况是,您的上游生产者正在使用特定的序列化策略。在这种情况下,您希望使用匹配的反序列化策略,因为原生机制可能会失败。当依赖默认的 Serde
机制时,应用必须确保 binder 能够正确地将入站和出站与适当的 Serde
进行映射,否则可能会失败。
值得一提的是,上述数据序列化/反序列化方法仅适用于您的处理器的边缘,即 - 入站和出站。您的业务逻辑可能仍然需要调用显式需要 Serde
对象的 Kafka Streams API。这些仍然是应用的责任,必须由开发人员相应地处理。