记录序列化和反序列化
Kafka Streams 绑定器允许您通过两种方式序列化和反序列化记录。一种是 Kafka 提供的原生序列化和反序列化功能,另一种是 Spring Cloud Stream 框架的消息转换功能。让我们看看一些细节。
入站反序列化
键始终使用原生 Serdes 反序列化。
对于值,默认情况下,入站的反序列化由 Kafka 原生执行。请注意,这是 Kafka Streams 绑定器先前版本默认行为的一个重大变化,在先前版本中,反序列化是由框架完成的。
Kafka Streams 绑定器将尝试通过查看 `java.util.function.Function|Consumer` 的类型签名来推断匹配的 `Serde` 类型。以下是它匹配 Serdes 的顺序。
-
如果应用程序提供类型为 `Serde` 的 Bean,并且如果返回类型使用传入键或值类型的实际类型进行参数化,则它将使用该 `Serde` 进行入站反序列化。例如,如果您的应用程序中有以下内容,则绑定器检测到 `KStream` 的传入值类型与 `Serde` Bean 上参数化的类型匹配。它将将其用于入站反序列化。
@Bean
public Serde<Foo> customSerde() {
...
}
@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
-
接下来,它查看类型并查看它们是否为 Kafka Streams 公开的类型之一。如果是,则使用它们。以下是绑定器将尝试从 Kafka Streams 中匹配的 Serde 类型。
Integer, Long, Short, Double, Float, byte[], UUID and String.
-
如果 Kafka Streams 提供的任何 Serdes 都不匹配类型,则它将使用 Spring Kafka 提供的 JsonSerde。在这种情况下,绑定器假设类型对 JSON 友好。如果您有多个值对象作为输入,这将非常有用,因为绑定器将在内部将它们推断为正确的 Java 类型。但在回退到 `JsonSerde` 之前,绑定器会检查 Kafka Streams 配置中设置的默认 `Serde`,以查看它是否是可以与传入 KStream 类型匹配的 `Serde`。
如果上述策略均无效,则应用程序必须通过配置提供 `Serde`。这可以通过两种方式配置 - 绑定或默认。
首先,绑定器将查看是否在绑定级别提供了 `Serde`。例如,如果您有以下处理器,
@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
那么,您可以使用以下方式提供绑定级别的 `Serde`
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果如上所述为每个输入绑定提供 `Serde`,则它将具有更高的优先级,并且绑定器将避免任何 `Serde` 推断。 |
如果希望将默认键/值 Serdes 用于入站反序列化,则可以在绑定器级别执行此操作。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您不想使用 Kafka 提供的原生解码,则可以依赖 Spring Cloud Stream 提供的消息转换功能。由于原生解码是默认设置,因此为了让 Spring Cloud Stream 反序列化传入的值对象,您需要显式禁用原生解码。
例如,如果您有与上面相同的 BiFunction 处理器,则需要使用 spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false
。您需要为所有输入分别禁用原生解码。否则,对于您未禁用的输入,仍将应用原生解码。
默认情况下,Spring Cloud Stream 将使用 application/json
作为内容类型并使用相应的 JSON 消息转换器。您可以使用以下属性和相应的 MessageConverter
bean 来使用自定义消息转换器。
spring.cloud.stream.bindings.process-in-0.contentType
输出序列化
输出序列化与上述输入反序列化的规则基本相同。与输入反序列化一样,Spring Cloud Stream 的先前版本的一个主要变化是,输出上的序列化由 Kafka 本地处理。在 3.0 版本之前的绑定器中,这是由框架本身完成的。
输出上的键始终由 Kafka 使用绑定器推断出的匹配 Serde
进行序列化。如果它无法推断键的类型,则需要使用配置来指定。
值 Serde 使用与输入反序列化相同的规则推断。首先,它匹配以查看输出类型是否来自应用程序中提供的 Bean。如果不是,则它检查是否与 Kafka 公开的 Serde
匹配,例如 - Integer
、Long
、Short
、Double
、Float
、byte[]
、UUID
和 String
。如果这不起作用,则它将回退到 Spring Kafka 项目提供的 JsonSerde
,但首先查看默认的 Serde
配置以查看是否存在匹配项。请记住,所有这些都对应用程序透明地进行。如果这些都不起作用,则用户必须通过配置提供要使用的 Serde
。
假设您正在使用与上面相同的 BiFunction
处理器。然后,您可以按如下方式配置输出键/值 Serde。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果 Serde 推断失败,并且未提供绑定级别的 Serde,则绑定器将回退到 JsonSerde
,但会查看默认 Serde 以查找匹配项。
默认 Serde 的配置方式与上面反序列化中描述的方式相同。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您的应用程序使用分支功能并具有多个输出绑定,则必须为每个绑定配置这些绑定。再次,如果绑定器能够推断 Serde
类型,则您无需进行此配置。
如果您不想使用 Kafka 提供的原生编码,但想使用框架提供的消息转换,则需要显式禁用原生编码,因为原生编码是默认设置。例如,如果您有与上面相同的 BiFunction 处理器,则需要使用 spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false
。在分支的情况下,您需要为所有输出分别禁用原生编码。否则,对于您未禁用的输出,仍将应用原生编码。
当 Spring Cloud Stream 完成转换时,默认情况下,它将使用 application/json
作为内容类型并使用相应的 JSON 消息转换器。您可以使用以下属性和相应的 MessageConverter
bean 来使用自定义消息转换器。
spring.cloud.stream.bindings.process-out-0.contentType
当禁用原生编码/解码时,绑定器将不会像原生 Serde 那样进行任何推断。应用程序需要显式提供所有配置选项。因此,通常建议坚持使用反序列化的默认选项,并在编写 Spring Cloud Stream Kafka Streams 应用程序时坚持使用 Kafka Streams 提供的原生反序列化。您必须使用框架提供的消息转换功能的唯一场景是,当您的上游生产者使用特定的序列化策略时。在这种情况下,您希望使用匹配的反序列化策略,因为原生机制可能会失败。当依赖默认的 Serde
机制时,应用程序必须确保绑定器有办法正确映射输入和输出并使用适当的 Serde
,否则事情可能会失败。
值得一提的是,上面概述的数据反序列化方法仅适用于处理器的边缘,即输入和输出。您的业务逻辑可能仍然需要调用 Kafka Streams API,这些 API 明确需要 Serde
对象。这些仍然是应用程序的责任,必须由开发人员相应地处理。