新增功能?

3.2 相比 3.1 的新增功能

本节介绍从 3.1 版本到 3.2 版本所做的更改。有关早期版本的更改,请参阅 更改历史记录

Kafka 客户端版本

此版本需要 3.7.0 kafka-clients。Kafka 客户端的 3.7.0 版本引入了新的消费者组协议。有关更多详细信息及其限制,请参阅 KIP-848。新的消费者组协议是早期访问版本,不适合在生产环境中使用。仅建议在此版本中用于测试目的。因此,适用于 Apache Kafka 的 Spring 仅在 kafka-client 本身提供的此类测试级别支持的范围内支持此新的消费者组协议。默认情况下,适用于 Apache Kafka 的 Spring 使用经典的消费者组协议,在测试新的消费者组协议时,需要通过消费者的 group.protocol 属性选择加入。

测试支持更改

kraft 模式在 EmbeddedKafka 中默认情况下处于禁用状态,并且希望使用 kraft 模式的用户必须启用它。这是由于在 kraft 模式下使用 EmbeddedKafka 时观察到某些不稳定性,尤其是在测试新的消费者组协议时。新的消费者组协议仅在 kraft 模式下受支持,因此,在测试新协议时,需要针对真实的 Kafka 集群进行测试,而不是基于 KafkaClusterTestKit 的集群,而 EmbeddedKafka 基于 KafkaClusterTestKit。此外,在 kraft 模式下使用 EmbeddedKafka 运行多个 KafkaListener 方法时,观察到一些其他竞争条件。在解决这些问题之前,EmbeddedKafka 上的 kraft 默认值将保持为 false

Kafka Streams 交互式查询支持

一个新的 API KafkaStreamsInteractiveQuerySupport 用于访问 Kafka Streams 交互式查询中使用的可查询存储。有关更多详细信息,请参阅 Kafka Streams 交互式支持

TransactionIdSuffixStrategy

引入了一个新的 TransactionIdSuffixStrategy 接口来管理 transactional.id 后缀。当设置 maxCache 大于零时,默认实现是 DefaultTransactionIdSuffixStrategy 可以重用特定范围内的 transactional.id,否则将通过递增计数器动态生成后缀。有关更多信息,请参阅 固定 TransactionIdSuffix

异步 @KafkaListener 返回

@KafkaListener(和 @KafkaHandler)方法现在可以返回异步返回类型,包括 CompletableFuture<?>Mono<?> 和 Kotlin suspend 函数。有关更多信息,请参阅 异步返回

基于抛出的异常将消息路由到自定义 DLT

现在可以根据在消息处理期间抛出的异常类型将消息重定向到自定义 DLT。重定向规则通过 RetryableTopic.exceptionBasedDltRoutingRetryTopicConfigurationBuilder.dltRoutingRules 设置。自定义 DLT 以及其他重试和死信主题将自动创建。有关更多信息,请参阅 基于抛出的异常将消息路由到自定义 DLT

弃用 ContainerProperties 的 transactionManager 属性

弃用 ContainerProperties 中的 transactionManager 属性,转而使用 KafkaAwareTransactionManager,与通用的 PlatformTransactionManager 相比,这是一个更具体的类型。请参阅 ContainerProperties事务同步

回滚后处理

提供了一个新的 AfterRollbackProcessor API processBatch。有关更多信息,请参阅 回滚后处理器

更改 @RetryableTopic SameIntervalTopicReuseStrategy 的默认值

@RetryableTopic 属性 SameIntervalTopicReuseStrategy 的默认值更改为 SINGLE_TOPIC。请参阅 单个主题用于最大间隔指数延迟

非阻塞重试支持类级别 @KafkaListener

非阻塞重试支持 类级别的 @KafkaListener。请参阅 非阻塞重试

在 RetryTopicConfigurationProvider 中支持处理类级别的 @RetryableTopic。

提供了一个新的公共 API 来查找 RetryTopicConfiguration。请参阅 查找 RetryTopicConfiguration

RetryTopicConfigurer 支持处理 MultiMethodKafkaListenerEndpoint。

RetryTopicConfigurer 支持处理和注册 MultiMethodKafkaListenerEndpointMultiMethodKafkaListenerEndpoint 提供了 getter/setter 用于属性 defaultMethodmethods。修改了专门用于 MethodKafkaListenerEndpoint 类型的 EndpointCustomizerEndpointHandlerMethod 添加了新的构造函数来构建提供的 bean 的实例。提供了一个新的类 EndpointHandlerMultiMethod 来处理重试端点的多方法。

新的 API 方法,用于根据用户提供的函数跳转到某个偏移量

ConsumerCallback 提供了一个新的 API,用于根据用户定义的函数跳转到某个偏移量,该函数以消费者中的当前偏移量作为参数。有关更多详细信息,请参阅 Seek API 文档

@PartitionOffset 支持 SeekPosition

@PartitionOffset 添加 seekPosition 属性,以支持 TopicPartitionOffset.SeekPosition。有关更多详细信息,请参阅 手动分配

TopicPartitionOffset 中接受用于计算要跳转到的偏移量的函数的新构造函数

TopicPartitionOffset 有一个新的构造函数,它接受用户提供的函数来计算要跳转到的偏移量。使用此构造函数时,框架将使用当前消费者偏移量位置作为输入参数来调用该函数。有关更多详细信息,请参阅 Seek API 文档

将 Spring Boot 应用程序名称用作默认客户端 ID 前缀

对于定义了应用程序名称的 Spring Boot 应用程序,此名称现在用作某些客户端类型的自动生成客户端 ID 的默认前缀。有关更多详细信息,请参阅 默认客户端 ID 前缀

增强的 MessageListenerContainer 获取

ListenerContainerRegistry 提供了两个新的 API 来动态查找和过滤 MessageListenerContainer 实例。getListenerContainersMatching(Predicate<String> idMatcher) 用于按 ID 过滤,另一个是 getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher) 用于按 ID 和容器属性过滤。

有关更多信息,请参阅 @KafkaListener 生命周期管理的 API 文档

通过提供更多跟踪标签来增强观测

KafkaTemplateObservation 提供了更多跟踪标签(低基数)。KafkaListenerObservation 提供了一个新的 API 来查找高基数键名和更多跟踪标签(高基数或低基数)。请参阅 Micrometer 观测