序列化、反序列化和消息转换
概述
Apache Kafka 提供了一个高级 API 用于序列化和反序列化记录值以及它们的键。它与 org.apache.kafka.common.serialization.Serializer<T>
和 org.apache.kafka.common.serialization.Deserializer<T>
抽象一起提供,并具有一些内置的实现。同时,我们可以通过使用 Producer
或 Consumer
配置属性来指定序列化器和反序列化器类。以下示例展示了如何执行此操作
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
对于更复杂或特定的情况,KafkaConsumer
(以及 KafkaProducer
)提供了重载的构造函数来分别接受 keys
和 values
的 Serializer
和 Deserializer
实例。
当您使用此 API 时,DefaultKafkaProducerFactory
和 DefaultKafkaConsumerFactory
也提供了属性(通过构造函数或 setter 方法)将自定义的 Serializer
和 Deserializer
实例注入到目标 Producer
或 Consumer
中。此外,您可以通过构造函数传入 Supplier<Serializer>
或 Supplier<Deserializer>
实例 - 这些 Supplier
在创建每个 Producer
或 Consumer
时被调用。
字符串序列化
从 2.5 版本开始,Spring for Apache Kafka 提供了 ToStringSerializer
和 ParseStringDeserializer
类,它们使用实体的字符串表示形式。它们依赖于方法 toString
和一些 Function<String>
或 BiFunction<String, Headers>
来解析字符串并填充实例的属性。通常,这会调用类上的一些静态方法,例如 parse
ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);
默认情况下,ToStringSerializer
配置为在记录 Headers
中传达有关已序列化实体的类型信息。您可以通过将 addTypeInfo
属性设置为 false
来禁用此功能。接收方可以使用 ParseStringDeserializer
使用此信息。
-
ToStringSerializer.ADD_TYPE_INFO_HEADERS
(默认值为true
):您可以将其设置为false
以禁用ToStringSerializer
上的此功能(设置addTypeInfo
属性)。
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
String entityType = new String(header);
if (entityType.contains("Thing")) {
return Thing.parse(str);
}
else {
// ...parsing logic
}
});
您可以使用默认值为 UTF-8
的配置用于将 String
转换为/从 byte[]
的 Charset
。
您可以使用 ConsumerConfig
属性配置反序列化器以及解析器方法的名称
-
ParseStringDeserializer.KEY_PARSER
-
ParseStringDeserializer.VALUE_PARSER
这些属性必须包含类的完全限定名称,后跟方法名称,并用句点 .
分隔。该方法必须是静态的,并且签名为 (String, Headers)
或 (String)
。
还提供了 ToFromStringSerde
,用于与 Kafka Streams 一起使用。
JSON
Spring for Apache Kafka 还提供了基于 Jackson JSON 对象映射器的 JsonSerializer
和 JsonDeserializer
实现。JsonSerializer
允许将任何 Java 对象作为 JSON byte[]
写入。JsonDeserializer
需要一个额外的 Class<?> targetType
参数,以允许将消耗的 byte[]
反序列化为正确的目标对象。以下示例展示了如何创建一个 JsonDeserializer
JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);
您可以使用ObjectMapper
自定义JsonSerializer
和JsonDeserializer
。您还可以扩展它们,并在configure(Map<String, ?> configs, boolean isKey)
方法中实现一些特定的配置逻辑。
从2.3版本开始,所有JSON感知组件默认都使用JacksonUtils.enhancedObjectMapper()
实例进行配置,该实例禁用了MapperFeature.DEFAULT_VIEW_INCLUSION
和DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES
特性。此外,此实例还提供了一些用于自定义数据类型的知名模块,例如Java时间和Kotlin支持。有关更多信息,请参阅JacksonUtils.enhancedObjectMapper()
JavaDoc。此方法还注册了一个org.springframework.kafka.support.JacksonMimeTypeModule
,用于将org.springframework.util.MimeType
对象序列化为普通字符串,以便跨平台网络兼容性。JacksonMimeTypeModule
可以在应用程序上下文中注册为一个bean,它将自动配置到Spring Boot ObjectMapper
实例中。
同样从2.3版本开始,JsonDeserializer
提供了基于TypeReference
的构造函数,以便更好地处理目标泛型容器类型。
从2.1版本开始,您可以在记录Headers
中传达类型信息,从而允许处理多种类型。此外,您可以使用以下Kafka属性配置序列化器和反序列化器。如果您分别为KafkaConsumer
和KafkaProducer
提供了Serializer
和Deserializer
实例,则这些属性无效。
配置属性
-
JsonSerializer.ADD_TYPE_INFO_HEADERS
(默认值为true
):您可以将其设置为false
以在JsonSerializer
上禁用此功能(设置addTypeInfo
属性)。 -
JsonSerializer.TYPE_MAPPINGS
(默认值为empty
):请参阅映射类型。 -
JsonDeserializer.USE_TYPE_INFO_HEADERS
(默认值为true
):您可以将其设置为false
以忽略序列化器设置的标头。 -
JsonDeserializer.REMOVE_TYPE_INFO_HEADERS
(默认值为true
):您可以将其设置为false
以保留序列化器设置的标头。 -
JsonDeserializer.KEY_DEFAULT_TYPE
:如果不存在标头信息,则用于反序列化键的回退类型。 -
JsonDeserializer.VALUE_DEFAULT_TYPE
:如果不存在标头信息,则用于反序列化值的回退类型。 -
JsonDeserializer.TRUSTED_PACKAGES
(默认值为java.util
、java.lang
):允许反序列化的包模式的逗号分隔列表。*
表示反序列化所有。 -
JsonDeserializer.TYPE_MAPPINGS
(默认值为empty
):请参阅映射类型。 -
JsonDeserializer.KEY_TYPE_METHOD
(默认值为empty
):请参阅使用方法确定类型。 -
JsonDeserializer.VALUE_TYPE_METHOD
(默认值为empty
):请参阅使用方法确定类型。
从2.2版本开始,反序列化器会移除类型信息标头(如果由序列化器添加)。您可以通过将removeTypeHeaders
属性设置为false
来恢复到以前的行为,这可以通过直接在反序列化器上设置或使用前面描述的配置属性来实现。
从2.8版本开始,如果您按程序方式构造序列化器或反序列化器(如程序化构造中所示),工厂将应用上述属性,只要您没有显式设置任何属性(使用set*() 方法或使用流畅的API)。以前,在以编程方式创建时,配置属性永远不会应用;如果您直接在对象上显式设置属性,情况仍然如此。 |
映射类型
从2.2版本开始,在使用JSON时,您现在可以通过使用前面列表中的属性来提供类型映射。以前,您必须在序列化器和反序列化器中自定义类型映射器。映射由逗号分隔的token:className
对列表组成。在出站时,有效负载的类名将映射到相应的token。在入站时,类型标头中的token将映射到相应的类名。
以下示例创建一组映射
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
相应的对象必须兼容。 |
如果您使用Spring Boot,则可以在application.properties
(或yaml)文件中提供这些属性。以下示例演示了如何执行此操作
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
您只能使用属性执行简单的配置。对于更高级的配置(例如在序列化器和反序列化器中使用自定义
还提供了setter,作为使用这些构造函数的替代方法。 |
在使用Spring Boot并覆盖ConsumerFactory 和ProducerFactory (如上所示)时,需要将通配符泛型类型与bean方法返回类型一起使用。如果改为提供具体的泛型类型,则Spring Boot将忽略这些bean,并继续使用默认bean。 |
从2.2版本开始,您可以通过使用具有布尔值useHeadersIfPresent
参数(默认为true
)的重载构造函数之一,显式配置反序列化器以使用提供的目标类型并忽略标头中的类型信息。以下示例演示了如何执行此操作
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
使用方法确定类型
从2.5版本开始,您现在可以通过属性配置反序列化器,以调用一个方法来确定目标类型。如果存在,这将覆盖上面讨论的任何其他技术。如果数据由不使用Spring序列化器的应用程序发布,并且您需要根据数据或其他标头反序列化到不同的类型,这将非常有用。将这些属性设置为方法名称 - 由点.
分隔的完全限定类名和方法名。该方法必须声明为public static
,具有以下三个签名之一(String topic, byte[] data, Headers headers)
、(byte[] data, Headers headers)
或(byte[] data)
,并返回Jackson JavaType
。
-
JsonDeserializer.KEY_TYPE_METHOD
:spring.json.key.type.method
-
JsonDeserializer.VALUE_TYPE_METHOD
:spring.json.value.type.method
您可以使用任意标头或检查数据以确定类型。
JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);
JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);
public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
// {"thisIsAFieldInThing1":"value", ...
if (data[21] == '1') {
return thing1Type;
}
else {
return thing2Type;
}
}
对于更复杂的数据检查,请考虑使用JsonPath
或类似方法,但确定类型的测试越简单,过程效率越高。
以下是在以编程方式创建反序列化器时的示例(在使用构造函数向消费者工厂提供反序列化器时)
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);
...
public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
...
}
程序化构造
在以编程方式构造用于生产者/消费者工厂的序列化器/反序列化器时,从2.3版本开始,您可以使用流畅的API,这简化了配置。
@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
new JsonSerializer<MyKeyType>()
.forKeys()
.noTypeInfo(),
new JsonSerializer<MyValueType>()
.noTypeInfo());
return pf;
}
@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(MyKeyType.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonDeserializer<>(MyValueType.class)
.ignoreTypeHeaders());
return cf;
}
要以编程方式提供类型映射,类似于使用方法确定类型,请使用typeFunction
属性。
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeFunction(MyUtils::thingOneOrThingTwo);
或者,只要您不使用流畅的API配置属性或使用set*()
方法设置属性,工厂就会使用配置属性配置序列化器/反序列化器;请参阅配置属性。
委托序列化器和反序列化器
使用标头
2.3版本引入了DelegatingSerializer
和DelegatingDeserializer
,它们允许使用不同的键和/或值类型生成和使用记录。生产者必须将标头DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
设置为选择器值,该值用于选择用于值的序列化器,以及用于键的DelegatingSerializer.KEY_SERIALIZATION_SELECTOR
;如果找不到匹配项,则会抛出IllegalStateException
。
对于传入的记录,反序列化器使用相同的标头来选择要使用的反序列化器;如果找不到匹配项或标头不存在,则返回原始byte[]
。
您可以通过构造函数配置选择器到Serializer
/Deserializer
的映射,也可以通过Kafka生产者/消费者属性使用键DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG
和DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG
进行配置。对于序列化器,生产者属性可以是Map<String, Object>
,其中键是选择器,值是Serializer
实例、序列化器Class
或类名。该属性也可以是逗号分隔的映射条目的字符串,如下所示。
对于反序列化器,消费者属性可以是Map<String, Object>
,其中键是选择器,值是Deserializer
实例、反序列化器Class
或类名。该属性也可以是逗号分隔的映射条目的字符串,如下所示。
要使用属性进行配置,请使用以下语法
producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")
consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
然后,生产者会将DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
标头设置为thing1
或thing2
。
此技术支持将不同类型发送到同一主题(或不同主题)。
从2.5.1版本开始,如果类型(键或值)是Serdes 支持的标准类型之一(Long 、Integer 等),则无需设置选择器标头。相反,序列化器会将标头设置为类型的类名。无需为这些类型配置序列化器或反序列化器,它们将(一次)动态创建。 |
有关将不同类型发送到不同主题的另一种技术,请参阅使用RoutingKafkaTemplate
。
按类型
2.8版本引入了DelegatingByTypeSerializer
。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null, new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
Bytes.class, new BytesSerializer(),
String.class, new StringSerializer())));
}
从2.8.3版本开始,您可以配置序列化器以检查映射键是否可以从目标对象分配,这在委托序列化器可以序列化子类时很有用。在这种情况下,如果存在歧义匹配,则应提供有序的Map
,例如LinkedHashMap
。
按主题
从2.8版本开始,DelegatingByTopicSerializer
和DelegatingByTopicDeserializer
允许根据主题名称选择序列化器/反序列化器。正则表达式Pattern
用于查找要使用的实例。可以使用构造函数或通过属性(逗号分隔的pattern:serializer
列表)配置映射。
producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArraySerializer.class.getName()
+ ", topic[5-9]:" + StringSerializer.class.getName());
...
ConsumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArrayDeserializer.class.getName()
+ ", topic[5-9]:" + StringDeserializer.class.getName());
在将此用于键时,请使用KEY_SERIALIZATION_TOPIC_CONFIG
。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
new IntegerSerializer(),
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
}
当没有模式匹配时,您可以指定要使用的默认序列化器/反序列化器,使用DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT
和DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT
。
当设置为false
时,附加属性DelegatingByTopicSerialization.CASE_SENSITIVE
(默认值为true
)使主题查找不区分大小写。
重试反序列化器
RetryingDeserializer
使用委托Deserializer
和RetryTemplate
在委托可能存在瞬态错误(例如反序列化期间的网络问题)时重试反序列化。
ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
从3.1.2
版本开始,可以可选地在RetryingDeserializer
上设置RecoveryCallback
。
有关使用重试策略、回退策略等配置RetryTemplate
,请参阅spring-retry项目。
Spring消息传递消息转换
尽管Serializer
和Deserializer
API从低级Kafka Consumer
和Producer
的角度来看非常简单且灵活,但在使用@KafkaListener
或Spring Integration的Apache Kafka支持时,您可能需要在Spring消息传递级别获得更多灵活性。为了让您轻松地转换到和从org.springframework.messaging.Message
转换,Spring for Apache Kafka提供了一个MessageConverter
抽象,以及MessagingMessageConverter
实现及其JsonMessageConverter
(和子类)自定义。您可以将MessageConverter
直接注入KafkaTemplate
实例,并使用AbstractKafkaListenerContainerFactory
bean定义作为@KafkaListener.containerFactory()
属性。以下示例演示了如何执行此操作
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordMessageConverter(new JsonMessageConverter());
return factory;
}
...
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}
使用 Spring Boot 时,只需将转换器定义为一个@Bean
,Spring Boot 自动配置将将其连接到自动配置的模板和容器工厂。
当使用@KafkaListener
时,参数类型将提供给消息转换器以协助转换。
只有在方法级别声明 |
在消费者端,您可以配置一个 在生产者端,当您使用 Spring Integration 或
同样,使用 为了方便起见,从版本 2.3 开始,框架还提供了一个 |
从版本 2.7.1 开始,消息有效负载转换可以委托给一个spring-messaging
SmartMessageConverter
;这使得转换可以基于MessageHeaders.CONTENT_TYPE
标头,例如。
KafkaMessageConverter.fromMessage() 方法用于传出转换为ProducerRecord ,其中消息有效负载位于ProducerRecord.value() 属性中。KafkaMessageConverter.toMessage() 方法用于从ConsumerRecord 传入转换,其中有效负载为ConsumerRecord.value() 属性。SmartMessageConverter.toMessage() 方法用于从传递给fromMessage() 的消息(通常由KafkaTemplate.send(Message<?> msg) 传递)创建一个新的传出Message<?> 。类似地,在KafkaMessageConverter.toMessage() 方法中,在转换器从ConsumerRecord 创建了一个新的Message<?> 后,将调用SmartMessageConverter.fromMessage() 方法,然后使用新转换的有效负载创建最终的传入消息。在这两种情况下,如果SmartMessageConverter 返回null ,则使用原始消息。 |
当在KafkaTemplate
和侦听器容器工厂中使用默认转换器时,您可以通过在模板上调用setMessagingConverter()
以及通过@KafkaListener
方法上的contentTypeConverter
属性来配置SmartMessageConverter
。
示例
template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
...
}
使用 Spring Data 投影接口
从版本 2.1.1 开始,您可以将 JSON 转换为 Spring Data 投影接口而不是具体类型。这允许对数据进行非常选择性和松耦合的绑定,包括查找 JSON 文档内部多个位置的值。例如,可以将以下接口定义为消息有效负载类型
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
访问器方法将默认用于查找属性名称作为接收到的 JSON 文档中的字段。@JsonPath
表达式允许自定义值查找,甚至定义多个 JSON Path 表达式,以从多个位置查找值,直到表达式返回实际值。
要启用此功能,请使用一个ProjectingMessageConverter
,并使用相应的委托转换器进行配置(用于传出转换和转换非投影接口)。您还必须将spring-data:spring-data-commons
和com.jayway.jsonpath:json-path
添加到类路径中。
当用作@KafkaListener
方法的参数时,接口类型将像往常一样自动传递给转换器。
使用ErrorHandlingDeserializer
当反序列化器无法反序列化消息时,Spring 无法处理此问题,因为它发生在poll()
返回之前。为了解决此问题,引入了ErrorHandlingDeserializer
。此反序列化器委托给一个真正的反序列化器(键或值)。如果委托无法反序列化记录内容,则ErrorHandlingDeserializer
返回一个null
值,并在包含原因和原始字节的标头中返回一个DeserializationException
。当使用记录级MessageListener
时,如果ConsumerRecord
包含键或值的DeserializationException
标头,则容器的ErrorHandler
将使用失败的ConsumerRecord
被调用。记录不会传递给侦听器。
或者,您可以配置ErrorHandlingDeserializer
以通过提供一个failedDeserializationFunction
来创建自定义值,该函数是一个Function<FailedDeserializationInfo, T>
。调用此函数以创建T
的实例,该实例将以通常的方式传递给侦听器。将提供一个FailedDeserializationInfo
类型的对象,其中包含所有上下文信息。您可以在标头中找到DeserializationException
(作为序列化的 Java 对象)。有关ErrorHandlingDeserializer
的更多信息,请参阅Javadoc。
您可以使用采用键和值Deserializer
对象的DefaultKafkaConsumerFactory
构造函数,并连接您已使用适当的委托配置的ErrorHandlingDeserializer
实例。或者,您可以使用消费者配置属性(ErrorHandlingDeserializer
使用这些属性)来实例化委托。属性名称为ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS
和ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS
。属性值可以是类或类名。以下示例显示了如何设置这些属性
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
以下示例使用failedDeserializationFunction
。
public class BadThing extends Thing {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {
@Override
public Thing apply(FailedDeserializationInfo info) {
return new BadThing(info);
}
}
前面的示例使用以下配置
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
如果消费者配置了ErrorHandlingDeserializer ,则务必配置KafkaTemplate 及其生产者,使其使用可以处理普通对象以及反序列化异常产生的原始byte[] 值的序列化器。模板的通用值类型应为Object 。一种技术是使用DelegatingByTypeSerializer ;以下是一个示例 |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
当将ErrorHandlingDeserializer
与批处理侦听器一起使用时,必须检查消息标头中的反序列化异常。当与DefaultBatchErrorHandler
一起使用时,您可以使用该标头确定异常在哪个记录上失败,并通过BatchListenerFailedException
与错误处理程序通信。
@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
for (int i = 0; i < in.size(); i++) {
Thing thing = in.get(i);
if (thing == null
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
try {
DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
}
}
catch (Exception ex) {
logger.error(ex, "Record at index " + i + " could not be deserialized");
}
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
process(thing);
}
}
SerializationUtils.byteArrayToDeserializationException()
可用于将标头转换为DeserializationException
。
当使用List<ConsumerRecord<?, ?>
时,使用SerializationUtils.getExceptionFromHeader()
代替
@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
for (int i = 0; i < in.size(); i++) {
ConsumerRecord<String, Thing> rec = in.get(i);
if (rec.value() == null) {
DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
if (deserEx != null) {
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
}
process(rec.value());
}
}
如果您还使用DeadLetterPublishingRecoverer ,则为DeserializationException 发布的记录将具有类型为byte[] 的record.value() ;这不应该被序列化。考虑使用DelegatingByTypeSerializer ,将其配置为对byte[] 使用ByteArraySerializer ,对所有其他类型使用普通序列化器(Json、Avro 等)。 |
从版本 3.1 开始,您可以向ErrorHandlingDeserializer
添加一个Validator
。如果委托Deserializer
成功反序列化了对象,但该对象验证失败,则会抛出一个类似于反序列化异常发生的异常。这允许将原始原始数据传递给错误处理程序。当您自己创建反序列化器时,只需调用setValidator
;如果您使用属性配置序列化器,请将消费者配置属性ErrorHandlingDeserializer.VALIDATOR_CLASS
设置为您的Validator
的类或完全限定类名。使用 Spring Boot 时,此属性名称为spring.kafka.consumer.properties.spring.deserializer.validator.class
。
批处理侦听器中的有效负载转换
当您使用批处理侦听器容器工厂时,您也可以在BatchMessagingMessageConverter
中使用JsonMessageConverter
来转换批处理消息。有关更多信息,请参阅序列化、反序列化和消息转换和Spring 消息传递消息转换。
默认情况下,转换的类型是从侦听器参数推断的。如果您使用DefaultJackson2TypeMapper
配置JsonMessageConverter
,并且其TypePrecedence
设置为TYPE_ID
(而不是默认的INFERRED
),则转换器将使用标头(如果存在)中的类型信息。例如,这允许将侦听器方法声明为接口而不是具体类。此外,类型转换器支持映射,因此反序列化可以是与源不同的类型(只要数据兼容)。当您使用类级别@KafkaListener
实例时,这也很有用,在这些实例中,必须已转换有效负载才能确定要调用哪个方法。以下示例创建使用此方法的 bean
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}
@Bean
public JsonMessageConverter converter() {
return new JsonMessageConverter();
}
请注意,为了使这能够正常工作,转换目标的方法签名必须是一个具有单个泛型参数类型的容器对象,例如以下内容
@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
请注意,您仍然可以访问批处理标头。
如果批处理转换器具有支持它的记录转换器,您还可以接收消息列表,其中有效负载根据泛型类型进行转换。以下示例显示了如何执行此操作
@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
...
}
ConversionService
自定义
从 2.1.1 版本开始,默认的 org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory
用于解析监听器方法调用参数的 org.springframework.core.convert.ConversionService
会提供所有实现了以下任何接口的 Bean。
-
org.springframework.core.convert.converter.Converter
-
org.springframework.core.convert.converter.GenericConverter
-
org.springframework.format.Formatter
这允许您进一步自定义监听器反序列化,而无需更改 ConsumerFactory
和 KafkaListenerContainerFactory
的默认配置。
通过 KafkaListenerConfigurer Bean 在 KafkaListenerEndpointRegistrar 上设置自定义 MessageHandlerMethodFactory 将禁用此功能。 |
向 @KafkaListener
添加自定义 HandlerMethodArgumentResolver
从 2.4.2 版本开始,您可以添加自己的 HandlerMethodArgumentResolver
并解析自定义方法参数。您只需要实现 KafkaListenerConfigurer
并使用 KafkaListenerEndpointRegistrar
类中的 setCustomMethodArgumentResolvers()
方法。
@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setCustomMethodArgumentResolvers(
new HandlerMethodArgumentResolver() {
@Override
public boolean supportsParameter(MethodParameter parameter) {
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return new CustomMethodArgument(
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
);
}
}
);
}
}
您还可以通过向 KafkaListenerEndpointRegistrar
Bean 添加自定义 MessageHandlerMethodFactory
来完全替换框架的参数解析。如果您这样做,并且您的应用程序需要处理带有 null
value()
的 tombstone 记录(例如,来自已压缩主题),则应向工厂添加 KafkaNullAwarePayloadArgumentResolver
;它必须是最后一个解析器,因为它支持所有类型并且可以匹配没有 @Payload
注解的参数。如果您使用的是 DefaultMessageHandlerMethodFactory
,请将此解析器设置为最后一个自定义解析器;工厂将确保在标准 PayloadMethodArgumentResolver
(它不知道 KafkaNull
负载)之前使用此解析器。
另请参阅 空负载和 Tombstone
记录的对数压缩。