序列化、反序列化和消息转换

概述

Apache Kafka 为记录值及其键的序列化和反序列化提供了高级 API。它通过 org.apache.kafka.common.serialization.Serializer<T>org.apache.kafka.common.serialization.Deserializer<T> 抽象提供了内置实现。同时,我们可以使用 ProducerConsumer 配置属性来指定序列化器和反序列化器类。以下示例展示了如何进行配置

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

对于更复杂或特殊的情况,KafkaConsumer(以及 KafkaProducer)提供了重载构造函数,分别用于接受 keysvaluesSerializerDeserializer 实例。

使用此 API 时,DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory 也通过属性(通过构造函数或 setter 方法)提供了将自定义 SerializerDeserializer 实例注入目标 ProducerConsumer 的能力。此外,您还可以通过构造函数传入 Supplier<Serializer>Supplier<Deserializer> 实例 - 这些 Supplier 会在创建每个 ProducerConsumer 时被调用。

String 序列化

从版本 2.5 开始,Spring for Apache Kafka 提供了使用实体字符串表示的 ToStringSerializerParseStringDeserializer 类。它们依赖于 toString 方法以及某些 Function<String>BiFunction<String, Headers> 函数来解析字符串并填充实例的属性。通常,这将调用类上的静态方法,例如 parse

ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);

默认情况下,ToStringSerializer 配置为在记录 Headers 中传递有关序列化实体类型的信息。您可以通过将 addTypeInfo 属性设置为 false 来禁用此功能。此信息可供接收端的 ParseStringDeserializer 使用。

  • ToStringSerializer.ADD_TYPE_INFO_HEADERS(默认 true):您可以将其设置为 false 以禁用 ToStringSerializer 上的此功能(设置 addTypeInfo 属性)。

ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
    byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
    String entityType = new String(header);

    if (entityType.contains("Thing")) {
        return Thing.parse(str);
    }
    else {
        // ...parsing logic
    }
});

您可以配置用于将 String 转换为 byte[] 或从 byte[] 转换回 StringCharset,默认为 UTF-8

您可以使用 ConsumerConfig 属性配置反序列化器使用的解析器方法名称

  • ParseStringDeserializer.KEY_PARSER

  • ParseStringDeserializer.VALUE_PARSER

属性必须包含类的完全限定名,后跟方法名,用句点 . 分隔。方法必须是静态的,并且签名必须是 (String, Headers)(String) 之一。

还提供了 ToFromStringSerde,用于 Kafka Streams。

JSON

Spring for Apache Kafka 还提供了基于 Jackson JSON object mapper 的 JsonSerializerJsonDeserializer 实现。JsonSerializer 允许将任何 Java 对象写为 JSON 格式的 byte[]JsonDeserializer 需要一个额外的 Class<?> targetType 参数,以便将消费到的 byte[] 反序列化为适当的目标对象。以下示例展示了如何创建一个 JsonDeserializer

JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);

您可以使用 ObjectMapper 自定义 JsonSerializerJsonDeserializer。您还可以扩展它们,在 configure(Map<String, ?> configs, boolean isKey) 方法中实现一些特定的配置逻辑。

从版本 2.3 开始,所有支持 JSON 的组件默认使用一个 JacksonUtils.enhancedObjectMapper() 实例进行配置,该实例禁用了 MapperFeature.DEFAULT_VIEW_INCLUSIONDeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES 特性。此外,该实例还提供了用于自定义数据类型的知名模块,例如 Java 时间和 Kotlin 支持。更多信息请参阅 JacksonUtils.enhancedObjectMapper() JavaDocs。此方法还注册了一个 org.springframework.kafka.support.JacksonMimeTypeModule,用于将 org.springframework.util.MimeType 对象序列化为普通字符串,以实现网络上的跨平台兼容性。JacksonMimeTypeModule 可以注册为应用上下文中的 bean,并将自动配置到Spring Boot ObjectMapper 实例中。

同样从版本 2.3 开始,JsonDeserializer 提供了基于 TypeReference 的构造函数,以便更好地处理目标泛型容器类型。

从版本 2.1 开始,您可以在记录 Headers 中传递类型信息,从而允许处理多种类型。此外,您可以使用以下 Kafka 属性来配置序列化器和反序列化器。如果您已经分别向 KafkaConsumerKafkaProducer 提供了 SerializerDeserializer 实例,则这些属性无效。

配置属性

  • JsonSerializer.ADD_TYPE_INFO_HEADERS(默认 true):您可以将其设置为 false 以禁用 JsonSerializer 上的此功能(设置 addTypeInfo 属性)。

  • JsonSerializer.TYPE_MAPPINGS(默认 empty):参阅 类型映射

  • JsonDeserializer.USE_TYPE_INFO_HEADERS(默认 true):您可以将其设置为 false 以忽略序列化器设置的头部。

  • JsonDeserializer.REMOVE_TYPE_INFO_HEADERS(默认 true):您可以将其设置为 false 以保留序列化器设置的头部。

  • JsonDeserializer.KEY_DEFAULT_TYPE:如果不存在头部信息,则为键的反序列化提供的回退类型。

  • JsonDeserializer.VALUE_DEFAULT_TYPE:如果不存在头部信息,则为值的反序列化提供的回退类型。

  • JsonDeserializer.TRUSTED_PACKAGES(默认 java.util, java.lang):允许反序列化的包模式的逗号分隔列表。* 表示反序列化所有。

  • JsonDeserializer.TYPE_MAPPINGS(默认 empty):参阅 类型映射

  • JsonDeserializer.KEY_TYPE_METHOD(默认 empty):参阅 使用方法确定类型

  • JsonDeserializer.VALUE_TYPE_METHOD(默认 empty):参阅 使用方法确定类型

从版本 2.2 开始,类型信息头部(如果由序列化器添加)将被反序列化器移除。您可以通过将 removeTypeHeaders 属性设置为 false 来恢复到之前的行为,无论是直接在反序列化器上设置,还是使用前面描述的配置属性设置。

从版本 2.8 开始,如果您按照 编程构建 中所示以编程方式构造序列化器或反序列化器,只要您没有明确设置任何属性(使用 set*() 方法或流式 API),工厂就会应用上述属性。以前,以编程方式创建时,配置属性从未被应用;如果您直接在对象上明确设置属性,情况仍然如此。

类型映射

从版本 2.2 开始,当使用 JSON 时,您现在可以使用前面列表中的属性提供类型映射。以前,您必须在序列化器和反序列化器中自定义类型映射器。映射由逗号分隔的 token:className 对列表组成。出站时,负载的类名被映射到相应的 token。入站时,类型头部中的 token 被映射到相应的类名。

以下示例创建了一组映射

senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
相应的对象必须兼容。

如果您使用 Spring Boot,您可以在 application.properties(或 yaml)文件中提供这些属性。以下示例展示了如何进行配置

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat

只能通过属性进行简单配置。对于更高级的配置(例如在序列化器和反序列化器中使用自定义 ObjectMapper),您应该使用接受预构建序列化器和反序列化器的生产者和消费者工厂构造函数。以下 Spring Boot 示例覆盖了默认工厂

@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(JsonDeserializer customValueDeserializer) {
    Map<String, Object> properties = new HashMap<>();
    // properties.put(..., ...)
    // ...
    return new DefaultKafkaConsumerFactory<>(properties,
        new StringDeserializer(), customValueDeserializer);
}

@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(JsonSerializer customValueSerializer) {
    return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(),
        new StringSerializer(), customValueSerializer);
}

也提供了 Setter 方法,作为使用这些构造函数的替代方案。

当使用 Spring Boot 并覆盖 ConsumerFactoryProducerFactory 时(如上所示),需要将通配符泛型类型与 bean 方法返回类型一起使用。如果提供具体的泛型类型,则 Spring Boot 将忽略这些 bean,并仍然使用默认的。

从版本 2.2 开始,您可以通过使用带布尔参数 useHeadersIfPresent(默认为 true)的重载构造函数之一,明确配置反序列化器使用提供的目标类型并忽略头部中的类型信息。以下示例展示了如何进行配置

DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
        new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

使用方法确定类型

从版本 2.5 开始,您现在可以通过属性配置反序列化器调用一个方法来确定目标类型。如果此方法存在,它将覆盖上面讨论的任何其他技术。如果数据是由不使用 Spring 序列化器的应用程序发布的,并且您需要根据数据或其他头部反序列化到不同的类型,这将很有用。将这些属性设置为方法名称 - 即完全限定的类名后跟方法名,用句点 . 分隔。该方法必须声明为 public static,具有以下三种签名之一:(String topic, byte[] data, Headers headers)(byte[] data, Headers headers)(byte[] data),并返回一个 Jackson JavaType

  • JsonDeserializer.KEY_TYPE_METHOD : spring.json.key.type.method

  • JsonDeserializer.VALUE_TYPE_METHOD : spring.json.value.type.method

您可以使用任意头部或检查数据来确定类型。

JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);

JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);

public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
    // {"thisIsAFieldInThing1":"value", ...
    if (data[21] == '1') {
        return thing1Type;
    }
    else {
        return thing2Type;
    }
}

对于更复杂的数据检查,可以考虑使用 JsonPath 或类似工具,但是用于确定类型的测试越简单,过程就越高效。

以下是以编程方式创建反序列化器(在构造函数中向消费者工厂提供反序列化器)的示例

JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);

...

public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
    ...
}

编程构建

在为生产者/消费者工厂以编程方式构造序列化器/反序列化器时,自版本 2.3 以来,您可以使用流式 API,这简化了配置。

@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
        new JsonSerializer<MyKeyType>()
            .forKeys()
            .noTypeInfo(),
        new JsonSerializer<MyValueType>()
            .noTypeInfo());
    return pf;
}

@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
        new JsonDeserializer<>(MyKeyType.class)
            .forKeys()
            .ignoreTypeHeaders(),
        new JsonDeserializer<>(MyValueType.class)
            .ignoreTypeHeaders());
    return cf;
}

要以编程方式提供类型映射,类似于 使用方法确定类型,请使用 typeFunction 属性。

JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeFunction(MyUtils::thingOneOrThingTwo);

或者,只要您不使用流式 API 配置属性,或使用 set*() 方法设置它们,工厂将使用配置属性配置序列化器/反序列化器;参阅 配置属性

委派序列化器和反序列化器

使用头部

版本 2.3 引入了 DelegatingSerializerDelegatingDeserializer,它们允许生产和消费具有不同键和/或值类型的记录。生产者必须将头部 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR 设置为用于选择值序列化器的选择器值,并将 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR 设置为键的选择器值;如果未找到匹配项,则抛出 IllegalStateException

对于入站记录,反序列化器使用相同的头部来选择要使用的反序列化器;如果未找到匹配项或头部不存在,则返回原始的 byte[]

您可以通过构造函数配置选择器到 Serializer / Deserializer 的映射,或者通过 Kafka 生产者/消费者属性使用键 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIGDelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG 进行配置。对于序列化器,生产者属性可以是 Map<String, Object>,其中键是选择器,值是 Serializer 实例、序列化器 Class 或类名。该属性也可以是逗号分隔的映射条目字符串,如下所示。

对于反序列化器,消费者属性可以是 Map<String, Object>,其中键是选择器,值是 Deserializer 实例、反序列化器 Class 或类名。该属性也可以是逗号分隔的映射条目字符串,如下所示。

要使用属性进行配置,使用以下语法

producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")

consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")

生产者随后会将 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR 头部设置为 thing1thing2

此技术支持将不同类型发送到同一主题(或不同主题)。

从版本 2.5.1 开始,如果类型(键或值)是 Serdes 支持的标准类型之一(例如 LongInteger 等),则不必设置选择器头部。相反,序列化器将把头部设置为该类型的类名。对于这些类型,不必配置序列化器或反序列化器,它们将动态地(创建一次)创建。

对于另一种将不同类型发送到不同主题的技术,参阅 使用 RoutingKafkaTemplate

按类型

版本 2.8 引入了 DelegatingByTypeSerializer

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            null, new DelegatingByTypeSerializer(Map.of(
                    byte[].class, new ByteArraySerializer(),
                    Bytes.class, new BytesSerializer(),
                    String.class, new StringSerializer())));
}

从版本 2.8.3 开始,您可以配置序列化器检查映射键是否可赋值自目标对象,这在委托序列化器可以序列化子类时很有用。在这种情况下,如果存在歧义匹配,则应提供有序的 Map,例如 LinkedHashMap

按主题

从版本 2.8 开始,DelegatingByTopicSerializerDelegatingByTopicDeserializer 允许基于主题名称选择序列化器/反序列化器。使用正则表达式 Pattern 来查找要使用的实例。可以通过构造函数或通过属性(逗号分隔的 pattern:serializer 列表)配置映射。

producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArraySerializer.class.getName()
        + ", topic[5-9]:" + StringSerializer.class.getName());
...
consumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArrayDeserializer.class.getName()
        + ", topic[5-9]:" + StringDeserializer.class.getName());

当用于键时,使用 KEY_SERIALIZATION_TOPIC_CONFIG 属性。

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            new IntegerSerializer(),
            new DelegatingByTopicSerializer(Map.of(
                    Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
                    Pattern.compile("topic[5-9]"), new StringSerializer())),
                    new JsonSerializer<Object>());  // default
}

当没有模式匹配时,您可以使用 DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULTDelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT 指定要使用的默认序列化器/反序列化器。

一个附加属性 DelegatingByTopicSerialization.CASE_SENSITIVE(默认为 true),当设置为 false 时,主题查找将不区分大小写。

重试反序列化器

RetryingDeserializer 使用一个委托的 DeserializerRetryTemplate,用于在委托反序列化器可能在反序列化期间出现瞬时错误(例如网络问题)时重试反序列化。

ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
    new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
    new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));

从版本 3.1.2 开始,可以在 RetryingDeserializer 上可选地设置 RecoveryCallback

关于如何使用重试策略、退避策略等配置 RetryTemplate,请参阅 spring-retry 项目。

Spring Messaging 消息转换

虽然从底层 Kafka ConsumerProducer 的角度来看,SerializerDeserializer API 相当简单灵活,但在 Spring Messaging 级别使用 @KafkaListenerSpring Integration 的 Apache Kafka 支持 时,您可能需要更多的灵活性。为了让您轻松地在 org.springframework.messaging.Message 之间进行转换,Spring for Apache Kafka 提供了一个 MessageConverter 抽象,其实现为 MessagingMessageConverter 及其定制的 JsonMessageConverter(和子类)。您可以将 MessageConverter 直接注入到 KafkaTemplate 实例中,也可以通过使用 @KafkaListener.containerFactory() 属性的 AbstractKafkaListenerContainerFactory bean 定义来注入。以下示例展示了如何进行配置

@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordMessageConverter(new JsonMessageConverter());
    return factory;
}
...
@KafkaListener(topics = "jsonData",
                containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}

使用 Spring Boot 时,只需将转换器定义为一个 @Bean,Spring Boot 自动配置就会将其注入自动配置的模板和容器工厂中。

当您使用 @KafkaListener 时,参数类型会被提供给消息转换器,以协助进行转换。

这种类型推断只有当 @KafkaListener 注解声明在方法级别时才能实现。对于类级别的 @KafkaListener,负载类型用于选择要调用的 @KafkaHandler 方法,因此在选择方法之前它必须已经完成转换。

在消费者端,您可以配置一个 JsonMessageConverter;它可以处理类型为 byte[]BytesStringConsumerRecord 值,因此应该与 ByteArrayDeserializerBytesDeserializerStringDeserializer 结合使用。(byte[]Bytes 更高效,因为它们避免了不必要的 byte[]String 转换)。如果您愿意,您还可以配置与反序列化器对应的特定 JsonMessageConverter 子类。

在生产者端,当您使用 Spring Integration 或 KafkaTemplate.send(Message<?> message) 方法时(参阅 使用 KafkaTemplate),您必须配置一个与已配置的 Kafka Serializer 兼容的消息转换器。

  • StringJsonMessageConverterStringSerializer

  • BytesJsonMessageConverterBytesSerializer

  • ByteArrayJsonMessageConverterByteArraySerializer

同样,使用 byte[]Bytes 更高效,因为它们避免了 Stringbyte[] 的转换。

为方便起见,从版本 2.3 开始,该框架还提供了 StringOrBytesSerializer,它可以序列化所有这三种值类型,因此可以与任何消息转换器一起使用。

从版本 2.7.1 开始,消息负载转换可以委托给 spring-messagingSmartMessageConverter;例如,这使得转换可以基于 MessageHeaders.CONTENT_TYPE 头部进行。

KafkaMessageConverter.fromMessage() 方法在进行出站转换到 ProducerRecord 时被调用,消息负载位于 ProducerRecord.value() 属性中。KafkaMessageConverter.toMessage() 方法在进行入站转换从 ConsumerRecord 时被调用,负载即为 ConsumerRecord.value() 属性。SmartMessageConverter.toMessage() 方法用于从传递给 fromMessage()Message(通常通过 KafkaTemplate.send(Message<?> msg))创建新的出站 Message<?>。类似地,在 KafkaMessageConverter.toMessage() 方法中,在转换器从 ConsumerRecord 创建新的 Message<?> 之后,会调用 SmartMessageConverter.fromMessage() 方法,然后使用新转换的负载创建最终的入站消息。在任何一种情况下,如果 SmartMessageConverter 返回 null,则使用原始消息。

当在 KafkaTemplate 和监听器容器工厂中使用默认转换器时,您可以通过在模板上调用 setMessagingConverter() 方法以及通过 @KafkaListener 方法上的 contentTypeConverter 属性来配置 SmartMessageConverter

示例

template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
    contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
    ...
}

使用 Spring Data Projection 接口

从版本 2.1.1 开始,您可以将 JSON 转换为 Spring Data Projection 接口,而不是具体的类型。这允许非常选择性且低耦合地绑定到数据,包括从 JSON 文档内多个位置查找值。例如,以下接口可以定义为消息负载类型

interface SomeSample {

  @JsonPath({ "$.username", "$.user.name" })
  String getUsername();

}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
    String username = in.getUsername();
    ...
}

默认情况下,访问器方法将用于在接收到的 JSON 文档中将属性名作为字段查找。@JsonPath 表达式允许自定义值查找,甚至可以定义多个 JSON Path 表达式,从多个位置查找值,直到某个表达式返回实际值。

要启用此功能,请使用配置了适当委托转换器(用于出站转换和转换非 Projection 接口)的 ProjectingMessageConverter。您还必须将 spring-data:spring-data-commonscom.jayway.jsonpath:json-path 添加到 classpath 中。

当用作 @KafkaListener 方法的参数时,接口类型会正常自动传递给转换器。

使用 ErrorHandlingDeserializer

当反序列化器未能反序列化消息时,Spring 无法处理此问题,因为它发生在 poll() 返回之前。为了解决这个问题,引入了 ErrorHandlingDeserializer。此反序列化器委托给一个实际的反序列化器(用于键或值)。如果委托方未能反序列化记录内容,ErrorHandlingDeserializer 将返回一个 null 值,并在头部中包含一个 DeserializationException,该头部包含原因和原始字节。当您使用记录级别的 MessageListener 时,如果 ConsumerRecord 的键或值包含 DeserializationException 头部,容器的 ErrorHandler 将被调用,并传入失败的 ConsumerRecord。该记录不会传递给监听器。

或者,您可以通过提供 failedDeserializationFunction 来配置 ErrorHandlingDeserializer 以创建自定义值,该函数是一个 Function<FailedDeserializationInfo, T>。此函数被调用以创建 T 的实例,然后像往常一样将该实例传递给监听器。类型为 FailedDeserializationInfo 的对象(包含所有上下文信息)会提供给该函数。您可以在头部中找到 DeserializationException(作为序列化的 Java 对象)。更多信息请参见 ErrorHandlingDeserializerJavadoc

您可以使用接受键和值 Deserializer 对象的 DefaultKafkaConsumerFactory 构造函数,并注入您配置了适当委托的 ErrorHandlingDeserializer 实例。或者,您可以使用消费者配置属性(ErrorHandlingDeserializer 使用这些属性)来实例化委托。属性名称是 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASSErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS。属性值可以是类或类名。以下示例显示了如何设置这些属性

... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);

以下示例使用 failedDeserializationFunction

public class BadThing extends Thing {

  private final FailedDeserializationInfo failedDeserializationInfo;

  public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
    this.failedDeserializationInfo = failedDeserializationInfo;
  }

  public FailedDeserializationInfo getFailedDeserializationInfo() {
    return this.failedDeserializationInfo;
  }

}

public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {

  @Override
  public Thing apply(FailedDeserializationInfo info) {
    return new BadThing(info);
  }

}

前面的示例使用以下配置

...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
如果消费者配置了 ErrorHandlingDeserializer,那么配置 KafkaTemplate 及其生产者使用一个既可以处理普通对象也可以处理因反序列化异常产生的原始 byte[] 值的序列化器就很重要。模板的泛型值类型应该是 Object。一种技术是使用 DelegatingByTypeSerializer;示例如下
@Bean
public ProducerFactory<String, Object> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
    new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
          MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}

当将 ErrorHandlingDeserializer 与批处理监听器一起使用时,您必须检查消息头部中的反序列化异常。与 DefaultBatchErrorHandler 一起使用时,您可以使用该头部确定异常发生在哪个记录上,并通过 BatchListenerFailedException 与错误处理器通信。

@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
    for (int i = 0; i < in.size(); i++) {
        Thing thing = in.get(i);
        if (thing == null
                && headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
            try {
                DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
                        headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
                if (deserEx != null) {
                    logger.error(deserEx, "Record at index " + i + " could not be deserialized");
                }
            }
            catch (Exception ex) {
                logger.error(ex, "Record at index " + i + " could not be deserialized");
            }
            throw new BatchListenerFailedException("Deserialization", deserEx, i);
        }
        process(thing);
    }
}

SerializationUtils.byteArrayToDeserializationException() 可用于将头部转换为 DeserializationException

当消费 List<ConsumerRecord<?, ?> 时,应改为使用 SerializationUtils.getExceptionFromHeader()

@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
    for (int i = 0; i < in.size(); i++) {
        ConsumerRecord<String, Thing> rec = in.get(i);
        if (rec.value() == null) {
            DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
                    SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
            if (deserEx != null) {
                logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
                throw new BatchListenerFailedException("Deserialization", deserEx, i);
            }
        }
        process(rec.value());
    }
}
如果您还使用了 DeadLetterPublishingRecoverer,为 DeserializationException 发布的记录将具有 record.value() 类型为 byte[];这不应被序列化。考虑使用配置为对 byte[] 使用 ByteArraySerializer,而对所有其他类型使用正常序列化器(Json、Avro 等)的 DelegatingByTypeSerializer

从版本 3.1 开始,您可以向 ErrorHandlingDeserializer 添加一个 Validator。如果委托 Deserializer 成功反序列化了对象,但该对象验证失败,则会抛出类似反序列化异常的异常。这允许将原始原始数据传递给错误处理器。自行创建反序列化器时,只需调用 setValidator;如果您使用属性配置序列化器,请将消费者配置属性 ErrorHandlingDeserializer.VALIDATOR_CLASS 设置为您的 Validator 的类或完全限定类名。使用 Spring Boot 时,此属性名称是 spring.kafka.consumer.properties.spring.deserializer.validator.class

批处理监听器的 Payload 转换

当您使用批处理监听器容器工厂时,您还可以在 BatchMessagingMessageConverter 中使用 JsonMessageConverter 来转换批处理消息。更多信息请参见 序列化、反序列化和消息转换 以及 Spring Messaging 消息转换

默认情况下,转换的类型从监听器参数中推断。如果您使用 DefaultJackson2TypeMapper 配置 JsonMessageConverter,并将其 TypePrecedence 设置为 TYPE_ID(而不是默认的 INFERRED),转换器将使用头部中的类型信息(如果存在)。这允许,例如,监听器方法声明使用接口而不是具体类。此外,类型转换器支持映射,因此反序列化可以转换为与源不同的类型(只要数据兼容)。当您使用 类级别的 @KafkaListener 实例 时,其中 payload 必须已经转换才能确定调用哪个方法,这也很有用。以下示例创建使用此方法的 bean

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
    return factory;
}

@Bean
public JsonMessageConverter converter() {
    return new JsonMessageConverter();
}

请注意,要使其工作,转换目标的签名必须是一个容器对象,只有一个泛型参数类型,如下所示

@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

请注意,您仍然可以访问批处理头部。

如果批处理转换器具有支持此功能的记录转换器,您还可以接收消息列表,其中 payload 会根据泛型类型进行转换。以下示例显示了如何实现

@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
    ...
}

如果批处理中的记录无法转换,其 payload 将被设置为 null 到目标 payloads 列表中。该记录的转换异常将作为警告记录,并存储到 KafkaHeaders.CONVERSION_FAILURES 头部中,作为 List<ConversionException> 的一个项目。目标 @KafkaListener 方法可以使用 Java Stream API 从 payload 列表中过滤掉这些 null 值,或者处理转换异常头部

@KafkaListener(id = "foo", topics = "foo", autoStartup = "false")
public void listen(List<Foo> list,
         @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> conversionFailures) {

    for (int i = 0; i < list.size(); i++) {
        if (conversionFailures.get(i) != null) {
            throw new BatchListenerFailedException("Conversion Failed", conversionFailures.get(i), i);
        }
    }
}

ConversionService 自定义

从版本 2.1.1 开始,默认的 org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory 用于解析监听器方法调用的参数时使用的 org.springframework.core.convert.ConversionService 会提供给所有实现以下任何接口的 bean

  • org.springframework.core.convert.converter.Converter

  • org.springframework.core.convert.converter.GenericConverter

  • org.springframework.format.Formatter

这允许您进一步自定义监听器反序列化,而无需更改 ConsumerFactoryKafkaListenerContainerFactory 的默认配置。

通过 KafkaListenerConfigurer bean 在 KafkaListenerEndpointRegistrar 上设置自定义的 MessageHandlerMethodFactory 将禁用此功能。

@KafkaListener 添加自定义的 HandlerMethodArgumentResolver

从版本 2.4.2 开始,您可以添加自己的 HandlerMethodArgumentResolver 并解析自定义方法参数。您只需要实现 KafkaListenerConfigurer 并使用 KafkaListenerEndpointRegistrar 类中的 setCustomMethodArgumentResolvers() 方法。

@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        registrar.setCustomMethodArgumentResolvers(
            new HandlerMethodArgumentResolver() {

                @Override
                public boolean supportsParameter(MethodParameter parameter) {
                    return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
                }

                @Override
                public Object resolveArgument(MethodParameter parameter, Message<?> message) {
                    return new CustomMethodArgument(
                        message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
                    );
                }
            }
        );
    }

}

您也可以通过向 KafkaListenerEndpointRegistrar bean 添加自定义的 MessageHandlerMethodFactory 来完全替换框架的参数解析。如果您这样做,并且您的应用程序需要处理带有 null value() 的墓碑记录(例如来自压缩主题),您应该向工厂添加一个 KafkaNullAwarePayloadArgumentResolver;它必须是最后一个解析器,因为它支持所有类型,并且可以匹配没有 @Payload 注解的参数。如果您使用的是 DefaultMessageHandlerMethodFactory,请将此解析器设置为最后一个自定义解析器;工厂将确保在标准的 PayloadMethodArgumentResolver 之前使用此解析器,而标准的解析器对 KafkaNull payload 不了解。