序列化、反序列化和消息转换
概述
Apache Kafka 提供了用于序列化和反序列化记录值及其键的高级 API。它通过 org.apache.kafka.common.serialization.Serializer<T> 和 org.apache.kafka.common.serialization.Deserializer<T> 抽象以及一些内置实现来提供。同时,我们可以通过使用 Producer 或 Consumer 配置属性来指定序列化器和反序列化器类。以下示例展示了如何实现:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
对于更复杂或特殊的情况,KafkaConsumer(因此也包括 KafkaProducer)提供了重载的构造函数,分别接受用于 keys 和 values 的 Serializer 和 Deserializer 实例。
当您使用此 API 时,DefaultKafkaProducerFactory 和 DefaultKafkaConsumerFactory 也提供了属性(通过构造函数或 setter 方法)来将自定义 Serializer 和 Deserializer 实例注入到目标 Producer 或 Consumer 中。此外,您可以通过构造函数传入 Supplier<Serializer> 或 Supplier<Deserializer> 实例 - 这些 Supplier 在创建每个 Producer 或 Consumer 时都会被调用。
字符串序列化
自 2.5 版本以来,Spring for Apache Kafka 提供了 ToStringSerializer 和 ParseStringDeserializer 类,它们使用实体的字符串表示形式。它们依赖于 toString 方法以及一些 Function<String> 或 BiFunction<String, Headers> 来解析字符串并填充实例的属性。通常,这会调用类上的某个静态方法,例如 parse。
ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);
默认情况下,ToStringSerializer 配置为在记录 Headers 中传递有关序列化实体的类型信息。您可以通过将 addTypeInfo 属性设置为 false 来禁用此功能。此信息可由接收端的 ParseStringDeserializer 使用。
-
ToStringSerializer.ADD_TYPE_INFO_HEADERS(默认true):您可以将其设置为false以在ToStringSerializer上禁用此功能(设置addTypeInfo属性)。
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
String entityType = new String(header);
if (entityType.contains("Thing")) {
return Thing.parse(str);
}
else {
// ...parsing logic
}
});
您可以配置用于将 String 转换为 byte[] 或从 byte[] 转换的 Charset,默认值为 UTF-8。
您可以使用 ConsumerConfig 属性配置反序列化器,并指定解析器方法的名称。
-
ParseStringDeserializer.KEY_PARSER -
ParseStringDeserializer.VALUE_PARSER
这些属性必须包含类的完全限定名,后跟方法名,并用句点 . 分隔。该方法必须是静态的,并且签名必须是 (String, Headers) 或 (String)。
还提供了 ToFromStringSerde,用于 Kafka Streams。
JSON
Spring for Apache Kafka 还提供了基于 Jackson JSON 对象映射器的 JacksonJsonSerializer 和 JacksonJsonDeserializer 实现。JacksonJsonSerializer 允许将任何 Java 对象写入为 JSON byte[]。JacksonJsonDeserializer 需要一个额外的 Class<?> targetType 参数,以允许将消费的 byte[] 反序列化为正确的目标对象。以下示例展示了如何创建 JacksonJsonDeserializer:
JacksonJsonDeserializer<Thing> thingDeserializer = new JacksonJsonDeserializer<>(Thing.class);
您可以使用 ObjectMapper 自定义 JacksonJsonSerializer 和 JacksonJsonDeserializer。您还可以扩展它们,在 configure(Map<String, ?> configs, boolean isKey) 方法中实现一些特定的配置逻辑。
从 2.3 版本开始,所有支持 JSON 的组件默认使用 JacksonUtils.enhancedObjectMapper() 实例进行配置,该实例禁用了 MapperFeature.DEFAULT_VIEW_INCLUSION 和 DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES 功能。此外,该实例还提供了用于自定义数据类型(如 Java time 和 Kotlin 支持)的知名模块。有关更多信息,请参阅 JacksonUtils.enhancedObjectMapper() 的 JavaDocs。此方法还注册了一个 org.springframework.kafka.support.JacksonMimeTypeModule,用于将 org.springframework.util.MimeType 对象序列化为纯字符串,以便在网络上实现跨平台兼容性。JacksonMimeTypeModule 可以作为 bean 注册到应用程序上下文中,它将被自动配置到 Spring Boot ObjectMapper 实例中。
同样从 2.3 版本开始,JsonDeserializer 提供了基于 TypeReference 的构造函数,以更好地处理目标泛型容器类型。
从 2.1 版本开始,您可以在记录 Headers 中传递类型信息,从而支持处理多种类型。此外,您可以使用以下 Kafka 属性配置序列化器和反序列化器。如果您已为 KafkaConsumer 和 KafkaProducer 分别提供了 Serializer 和 Deserializer 实例,则它们不起作用。
配置属性
-
JacksonJsonSerializer.ADD_TYPE_INFO_HEADERS(默认true):您可以将其设置为false以在JacksonJsonSerializer上禁用此功能(设置addTypeInfo属性)。 -
JacksonJsonSerializer.TYPE_MAPPINGS(默认empty):参见映射类型。 -
JacksonJsonDeserializer.USE_TYPE_INFO_HEADERS(默认true):您可以将其设置为false以忽略序列化器设置的头。 -
JacksonJsonDeserializer.REMOVE_TYPE_INFO_HEADERS(默认true):您可以将其设置为false以保留序列化器设置的头。 -
JacksonJsonDeserializer.KEY_DEFAULT_TYPE:如果不存在头信息,则键反序列化的回退类型。 -
JacksonJsonDeserializer.VALUE_DEFAULT_TYPE:如果不存在头信息,则值反序列化的回退类型。 -
JacksonJsonDeserializer.TRUSTED_PACKAGES(默认java.util,java.lang):允许反序列化的包模式的逗号分隔列表。*表示反序列化所有。 -
JacksonJsonDeserializer.TYPE_MAPPINGS(默认empty):参见映射类型。 -
JacksonJsonDeserializer.KEY_TYPE_METHOD(默认empty):参见使用方法确定类型。 -
JacksonJsonDeserializer.VALUE_TYPE_METHOD(默认empty):参见使用方法确定类型。
从 2.2 版本开始,类型信息头(如果由序列化器添加)会被反序列化器移除。您可以通过将 removeTypeHeaders 属性设置为 false,直接在反序列化器上或通过前面描述的配置属性,恢复到以前的行为。
从 2.8 版本开始,如果您按照 编程构造 中所示以编程方式构造序列化器或反序列化器,工厂将应用上述属性,只要您没有显式设置任何属性(使用 set*() 方法或使用流畅 API)。以前,以编程方式创建时,配置属性从不应用;如果您直接在对象上显式设置属性,则仍然如此。 |
映射类型
从 2.2 版本开始,当使用 JSON 时,您现在可以使用前面列表中的属性提供类型映射。以前,您必须在序列化器和反序列化器中自定义类型映射器。映射由逗号分隔的 token:className 对列表组成。出站时,有效负载的类名映射到相应的令牌。入站时,类型头中的令牌映射到相应的类名。
以下示例创建了一组映射
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
| 相应的对象必须兼容。 |
如果您使用 Spring Boot,您可以在 application.properties(或 yaml)文件中提供这些属性。以下示例展示了如何实现:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
|
您只能通过属性执行简单的配置。对于更高级的配置(例如在序列化器和反序列化器中使用自定义
还提供了 setter 方法,作为使用这些构造函数的替代方案。 |
当使用 Spring Boot 并如上所示覆盖 ConsumerFactory 和 ProducerFactory 时,bean 方法的返回类型需要使用通配符泛型类型。如果提供了具体的泛型类型,那么 Spring Boot 将忽略这些 bean,并仍然使用默认的 bean。 |
从 2.2 版本开始,您可以通过使用带有布尔型 useHeadersIfPresent 参数(默认为 true)的重载构造函数之一,显式配置反序列化器以使用提供的目标类型并忽略头中的类型信息。以下示例展示了如何实现:
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
使用方法确定类型
从 2.5 版本开始,您现在可以通过属性配置反序列化器,以调用一个方法来确定目标类型。如果存在,这将覆盖前面讨论的任何其他技术。如果数据由不使用 Spring 序列化器的应用程序发布,并且您需要根据数据或其他头反序列化为不同的类型,这可能很有用。将这些属性设置为方法名 - 完全限定的类名后跟方法名,用句点 . 分隔。该方法必须声明为 public static,具有以下三种签名之一:(String topic, byte[] data, Headers headers)、(byte[] data, Headers headers) 或 (byte[] data),并返回 Jackson JavaType。
-
JsonDeserializer.KEY_TYPE_METHOD:spring.json.key.type.method -
JsonDeserializer.VALUE_TYPE_METHOD:spring.json.value.type.method
您可以使用任意头或检查数据来确定类型。
JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);
JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);
public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
// {"thisIsAFieldInThing1":"value", ...
if (data[21] == '1') {
return thing1Type;
}
else {
return thing2Type;
}
}
对于更复杂的数据检查,请考虑使用 JsonPath 或类似工具,但确定类型的测试越简单,过程效率就越高。
以下是编程创建反序列化器的示例(在构造函数中为消费者工厂提供反序列化器时):
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);
...
public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
...
}
编程构造
自 2.3 版本以来,当以编程方式构造序列化器/反序列化器以在生产者/消费者工厂中使用时,您可以使用流畅 API,这简化了配置。
@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
new JsonSerializer<MyKeyType>()
.forKeys()
.noTypeInfo(),
new JsonSerializer<MyValueType>()
.noTypeInfo());
return pf;
}
@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(MyKeyType.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonDeserializer<>(MyValueType.class)
.ignoreTypeHeaders());
return cf;
}
为了以编程方式提供类型映射,类似于 使用方法确定类型,请使用 typeFunction 属性。
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeFunction(MyUtils::thingOneOrThingTwo);
或者,只要您不使用流畅 API 配置属性,或使用 set*() 方法设置它们,工厂将使用配置属性配置序列化器/反序列化器;请参阅 配置属性。
委托序列化器和反序列化器
使用 Headers
版本 2.3 引入了 DelegatingSerializer 和 DelegatingDeserializer,它们允许生产和消费具有不同键和/或值类型的记录。生产者必须将头 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR 设置为选择器值,该值用于选择用于值的序列化器,并将 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR 设置为键的选择器;如果未找到匹配项,则会抛出 IllegalStateException。
对于传入记录,反序列化器使用相同的头来选择要使用的反序列化器;如果未找到匹配项或头不存在,则返回原始 byte[]。
您可以通过构造函数配置选择器到 Serializer / Deserializer 的映射,或者通过 Kafka 生产者/消费者属性配置,键为 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG 和 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG。对于序列化器,生产者属性可以是 Map<String, Object>,其中键是选择器,值是 Serializer 实例、序列化器 Class 或类名。该属性也可以是逗号分隔的映射条目字符串,如下所示。
对于反序列化器,消费者属性可以是 Map<String, Object>,其中键是选择器,值是 Deserializer 实例、反序列化器 Class 或类名。该属性也可以是逗号分隔的映射条目字符串,如下所示。
要使用属性进行配置,请使用以下语法:
producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")
consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
生产者随后会将 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR 头设置为 thing1 或 thing2。
此技术支持将不同类型发送到相同主题(或不同主题)。
从 2.5.1 版本开始,如果类型(键或值)是 Serdes 支持的标准类型之一(Long、Integer 等),则不需要设置选择器头。相反,序列化器会将头设置为该类型的类名。不需要为这些类型配置序列化器或反序列化器,它们将动态创建(一次)。 |
有关将不同类型发送到不同主题的另一种技术,请参阅 使用 RoutingKafkaTemplate。
按类型
版本 2.8 引入了 DelegatingByTypeSerializer。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null, new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
Bytes.class, new BytesSerializer(),
String.class, new StringSerializer())));
}
从 2.8.3 版本开始,您可以配置序列化器以检查映射键是否可从目标对象赋值,这在委托序列化器可以序列化子类时非常有用。在这种情况下,如果存在歧义匹配,则应提供一个有序的 Map,例如 LinkedHashMap。
按主题
从 2.8 版本开始,DelegatingByTopicSerializer 和 DelegatingByTopicDeserializer 允许根据主题名称选择序列化器/反序列化器。使用正则表达式 Pattern 来查找要使用的实例。映射可以使用构造函数或通过属性(逗号分隔的 pattern:serializer 列表)进行配置。
producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArraySerializer.class.getName()
+ ", topic[5-9]:" + StringSerializer.class.getName());
...
consumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArrayDeserializer.class.getName()
+ ", topic[5-9]:" + StringDeserializer.class.getName());
当用于键时,请使用 KEY_SERIALIZATION_TOPIC_CONFIG。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
new IntegerSerializer(),
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
}
您可以使用 DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT 和 DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT 指定在没有模式匹配时使用的默认序列化器/反序列化器。
额外的属性 DelegatingByTopicSerialization.CASE_SENSITIVE(默认 true),当设置为 false 时,主题查找将不区分大小写。
重试反序列化器
RetryingDeserializer 使用委托 Deserializer 和 RetryTemplate 在委托反序列化期间可能出现瞬时错误(例如网络问题)时重试反序列化。
ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
可以在 RetryingDeserializer 上设置恢复回调,以便在所有重试都用尽时返回一个备用对象。
有关使用重试策略、退避等配置 RetryTemplate,请参阅 Spring Framework 项目。
Spring Messaging 消息转换
尽管 Serializer 和 Deserializer API 从低级 Kafka Consumer 和 Producer 的角度来看非常简单和灵活,但在使用 @KafkaListener 或 Spring Integration 的 Apache Kafka 支持时,您可能需要在 Spring Messaging 级别上获得更大的灵活性。为了让您轻松地在 org.springframework.messaging.Message 之间进行转换,Spring for Apache Kafka 提供了 MessageConverter 抽象,并提供了 MessagingMessageConverter 实现及其 JacksonJsonMessageConverter(及其子类)定制。您可以将 MessageConverter 直接注入到 KafkaTemplate 实例中,并通过使用 AbstractKafkaListenerContainerFactory bean 定义来设置 @KafkaListener.containerFactory() 属性。以下示例展示了如何实现:
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordMessageConverter(new JsonMessageConverter());
return factory;
}
...
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}
当使用 Spring Boot 时,只需将转换器定义为 @Bean,Spring Boot 自动配置就会将其连接到自动配置的模板和容器工厂中。
当您使用 @KafkaListener 时,参数类型会提供给消息转换器以协助转换。
|
此类型推断只能在方法级别声明 |
|
在消费者端,您可以配置一个 在生产者端,当您使用 Spring Integration 或
同样,使用 为方便起见,从 2.3 版本开始,框架还提供了 |
从 2.7.1 版本开始,消息有效负载转换可以委托给 spring-messaging SmartMessageConverter;这使得转换,例如,可以基于 MessageHeaders.CONTENT_TYPE 头。
KafkaMessageConverter.fromMessage() 方法用于将出站消息转换为 ProducerRecord,其中消息有效负载位于 ProducerRecord.value() 属性中。KafkaMessageConverter.toMessage() 方法用于将入站消息从 ConsumerRecord 转换为,其中有效负载是 ConsumerRecord.value() 属性。SmartMessageConverter.toMessage() 方法用于从传递给 fromMessage() 的 Message 创建新的出站 Message<?>(通常通过 KafkaTemplate.send(Message<?> msg))。类似地,在 KafkaMessageConverter.toMessage() 方法中,在转换器从 ConsumerRecord 创建新的 Message<?> 后,将调用 SmartMessageConverter.fromMessage() 方法,然后使用新转换的有效负载创建最终的入站消息。在这两种情况下,如果 SmartMessageConverter 返回 null,则使用原始消息。 |
当 KafkaTemplate 和侦听器容器工厂中使用默认转换器时,您可以通过在模板上调用 setMessagingConverter() 并通过 @KafkaListener 方法上的 contentTypeConverter 属性来配置 SmartMessageConverter。
示例
template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
...
}
使用 Spring Data Projection 接口
从 2.1.1 版本开始,您可以将 JSON 转换为 Spring Data Projection 接口而不是具体类型。这允许对数据进行非常选择性且低耦合的绑定,包括从 JSON 文档中的多个位置查找值。例如,以下接口可以定义为消息有效负载类型:
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
默认情况下,访问器方法将用于查找接收到的 JSON 文档中的属性名称作为字段。@JsonPath 表达式允许自定义值查找,甚至可以定义多个 JSON Path 表达式,以从多个位置查找值,直到某个表达式返回实际值。
要启用此功能,请使用配置了适当委托转换器(用于出站转换和非投影接口的转换)的 JacksonProjectingMessageConverter。您还必须将 spring-data:spring-data-commons 和 com.jayway.jsonpath:json-path 添加到类路径。
当用作 @KafkaListener 方法的参数时,接口类型会自动像往常一样传递给转换器。
使用 ErrorHandlingDeserializer
当反序列化器无法反序列化消息时,Spring 无法处理该问题,因为它发生在 poll() 返回之前。为了解决这个问题,引入了 ErrorHandlingDeserializer。此反序列化器委托给一个真正的反序列化器(键或值)。如果委托反序列化器无法反序列化记录内容,ErrorHandlingDeserializer 将返回一个 null 值,并在一个头中返回一个 DeserializationException,其中包含原因和原始字节。当您使用记录级 MessageListener 时,如果 ConsumerRecord 包含键或值的 DeserializationException 头,则容器的 ErrorHandler 将使用失败的 ConsumerRecord 被调用。该记录不会传递给侦听器。
或者,您可以通过提供 failedDeserializationFunction(一个 Function<FailedDeserializationInfo, T>)来配置 ErrorHandlingDeserializer 以创建自定义值。此函数被调用以创建 T 实例,该实例以通常的方式传递给侦听器。一个类型为 FailedDeserializationInfo 的对象,其中包含所有上下文信息,将提供给该函数。您可以在头中找到 DeserializationException(作为序列化的 Java 对象)。有关 ErrorHandlingDeserializer 的更多信息,请参阅 Javadoc。
您可以使用接受键和值 Deserializer 对象的 DefaultKafkaConsumerFactory 构造函数,并连接您已配置了适当委托的 ErrorHandlingDeserializer 实例。或者,您可以使用消费者配置属性(由 ErrorHandlingDeserializer 使用)来实例化委托。属性名称是 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS 和 ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS。属性值可以是类或类名。以下示例展示了如何设置这些属性:
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
以下示例使用 failedDeserializationFunction。
public class BadThing extends Thing {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {
@Override
public Thing apply(FailedDeserializationInfo info) {
return new BadThing(info);
}
}
前面的示例使用以下配置:
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
如果消费者配置了 ErrorHandlingDeserializer,则重要的是要配置 KafkaTemplate 及其生产者,使其具有可以处理普通对象以及原始 byte[] 值(由反序列化异常引起)的序列化器。模板的泛型值类型应为 Object。一种技术是使用 DelegatingByTypeSerializer;示例如下: |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
当将 ErrorHandlingDeserializer 与批处理侦听器一起使用时,您必须检查消息头中的反序列化异常。与 DefaultBatchErrorHandler 一起使用时,您可以使用该头来确定异常失败的记录,并通过 BatchListenerFailedException 将其通知错误处理器。
@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
for (int i = 0; i < in.size(); i++) {
Thing thing = in.get(i);
if (thing == null
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
try {
DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
}
}
catch (Exception ex) {
logger.error(ex, "Record at index " + i + " could not be deserialized");
}
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
process(thing);
}
}
SerializationUtils.byteArrayToDeserializationException() 可用于将头转换为 DeserializationException。
当消费 List<ConsumerRecord<?, ?> 时,改用 SerializationUtils.getExceptionFromHeader()。
@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
for (int i = 0; i < in.size(); i++) {
ConsumerRecord<String, Thing> rec = in.get(i);
if (rec.value() == null) {
DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
if (deserEx != null) {
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
}
process(rec.value());
}
}
如果您还在使用 DeadLetterPublishingRecoverer,为 DeserializationException 发布的记录将具有 byte[] 类型的 record.value();这不应被序列化。考虑使用配置为对 byte[] 使用 ByteArraySerializer 而对所有其他类型使用普通序列化器(Json、Avro 等)的 DelegatingByTypeSerializer。 |
从 3.1 版本开始,您可以向 ErrorHandlingDeserializer 添加一个 Validator。如果委托 Deserializer 成功反序列化对象,但该对象未能通过验证,则会抛出类似于反序列化异常的异常。这允许将原始原始数据传递给错误处理器。当您自己创建反序列化器时,只需调用 setValidator;如果您使用属性配置序列化器,请将消费者配置属性 ErrorHandlingDeserializer.VALIDATOR_CLASS 设置为 Validator 的类或完全限定类名。当使用 Spring Boot 时,此属性名为 spring.kafka.consumer.properties.spring.deserializer.validator.class。
批处理侦听器与有效负载转换
当您使用批处理侦听器容器工厂时,还可以在 BatchMessagingMessageConverter 中使用 JacksonJsonMessageConverter 来转换批处理消息。有关更多信息,请参阅 序列化、反序列化和消息转换 和 Spring Messaging 消息转换。
默认情况下,转换的类型从侦听器参数推断。如果您使用 DefaultJackson2TypeMapper 配置 JacksonJsonMessageConverter,并将其 TypePrecedence 设置为 TYPE_ID(而不是默认的 INFERRED),则转换器将使用头中的类型信息(如果存在)。这允许,例如,侦听器方法使用接口而不是具体类声明。此外,类型转换器支持映射,因此反序列化可以转换为与源不同的类型(只要数据兼容)。当您使用 类级别 @KafkaListener 实例 时,这也很实用,因为有效负载必须已经转换才能确定要调用的方法。以下示例创建了使用此方法的 bean:
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}
@Bean
public JsonMessageConverter converter() {
return new JsonMessageConverter();
}
请注意,为了使其工作,转换目标的 方法签名 必须是一个带有单个泛型参数类型的容器对象,例如以下内容:
@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
请注意,您仍然可以访问批处理头。
如果批处理转换器具有支持它的记录转换器,您还可以接收消息列表,其中有效负载根据泛型类型进行转换。以下示例展示了如何实现:
@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
...
}
如果批处理中的记录无法转换,其有效负载将在目标 payloads 列表中设置为 null。此记录的转换异常将作为警告记录,并作为 List<ConversionException> 的一项存储在 KafkaHeaders.CONVERSION_FAILURES 头中。目标 @KafkaListener 方法可以执行 Java Stream API 以从有效负载列表中过滤掉这些 null 值,或对转换异常头进行处理。
@KafkaListener(id = "foo", topics = "foo", autoStartup = "false")
public void listen(List<Foo> list,
@Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> conversionFailures) {
for (int i = 0; i < list.size(); i++) {
if (conversionFailures.get(i) != null) {
throw new BatchListenerFailedException("Conversion Failed", conversionFailures.get(i), i);
}
}
}
ConversionService 定制
从 2.1.1 版本开始,默认的 org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory 用于解析侦听器方法调用的参数的 org.springframework.core.convert.ConversionService 将提供实现以下任何接口的所有 bean:
-
org.springframework.core.convert.converter.Converter -
org.springframework.core.convert.converter.GenericConverter -
org.springframework.format.Formatter
这允许您进一步自定义侦听器反序列化,而无需更改 ConsumerFactory 和 KafkaListenerContainerFactory 的默认配置。
通过 KafkaListenerConfigurer bean 在 KafkaListenerEndpointRegistrar 上设置自定义 MessageHandlerMethodFactory 将禁用此功能。 |
向 @KafkaListener 添加自定义 HandlerMethodArgumentResolver
从 2.4.2 版本开始,您可以添加自己的 HandlerMethodArgumentResolver 并解析自定义方法参数。您只需实现 KafkaListenerConfigurer 并使用 KafkaListenerEndpointRegistrar 类的 setCustomMethodArgumentResolvers() 方法。
@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setCustomMethodArgumentResolvers(
new HandlerMethodArgumentResolver() {
@Override
public boolean supportsParameter(MethodParameter parameter) {
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return new CustomMethodArgument(
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
);
}
}
);
}
}
您还可以通过向 KafkaListenerEndpointRegistrar bean 添加自定义 MessageHandlerMethodFactory 来完全替换框架的参数解析。如果您这样做,并且您的应用程序需要处理具有 null value() 的墓碑记录(例如,来自压缩主题),您应该向工厂添加一个 KafkaNullAwarePayloadArgumentResolver;它必须是最后一个解析器,因为它支持所有类型并且可以匹配没有 @Payload 注解的参数。如果您正在使用 DefaultMessageHandlerMethodFactory,请将此解析器设置为最后一个自定义解析器;工厂将确保此解析器将在标准 PayloadMethodArgumentResolver 之前使用,后者不了解 KafkaNull 有效负载。
另请参阅 空有效负载和墓碑记录的日志压缩。