变更历史

3.2 版本相对于 3.1 版本的新特性

本节介绍从 3.1 版本到 3.2 版本的变化。有关更早版本中的变化,请参见变更历史

Kafka 客户端版本

此版本需要 3.7.0 kafka-clients。3.7.0 版本的 Kafka 客户端引入了新的消费组协议。有关更多详细信息及其限制,请参见 KIP-848。新的消费组协议是早期访问版本,不建议在生产环境中使用。此版本仅建议用于测试目的。因此,Spring for Apache Kafka 对此新的消费组协议的支持仅限于 kafka-client 本身提供的测试级别支持。默认情况下,Spring for Apache Kafka 使用经典消费组协议,在测试新的消费组协议时,需要通过 consumer 上的 group.protocol 属性启用。

测试支持的变化

EmbeddedKafka 中的 kraft 模式默认禁用,需要使用 kraft 模式的用户必须启用它。这是由于在使用 EmbeddedKafkakraft 模式时观察到某些不稳定,尤其是在测试新的消费组协议时。新的消费组协议仅在 kraft 模式下支持,因此在测试新协议时,需要在真实的 Kafka 集群上进行,而不是基于 KafkaClusterTestKitEmbeddedKafka 基于此)的集群。此外,在使用 EmbeddedKafkakraft 模式运行多个 KafkaListener 方法时,还观察到其他一些竞态条件。在这些问题解决之前,EmbeddedKafkakraft 的默认值将保持为 false

Kafka Streams 交互式查询支持

一个用于访问 Kafka Streams 交互式查询中所用可查询存储的新 API KafkaStreamsInteractiveQuerySupport 已引入。更多详细信息请参见Kafka Streams 交互式支持

TransactionIdSuffixStrategy

引入了一个新的接口 TransactionIdSuffixStrategy,用于管理 transactional.id 后缀。当设置 maxCache 大于零时,默认实现 DefaultTransactionIdSuffixStrategy 可以在特定范围内重用 transactional.id,否则将通过递增计数器动态生成后缀。更多信息请参见固定事务 ID 后缀

异步 @KafkaListener 返回

@KafkaListener(和 @KafkaHandler)方法现在可以返回异步返回类型,包括 CompletableFuture<?>Mono<?> 和 Kotlin 的 suspend 函数。更多信息请参见异步返回

基于抛出异常将消息路由到自定义 DLT

现在可以基于在消息处理期间抛出的异常类型,将消息重定向到自定义 DLT。重定向规则可以通过 RetryableTopic.exceptionBasedDltRoutingRetryTopicConfigurationBuilder.dltRoutingRules 设置。自定义 DLT 以及其他重试和死信主题都会自动创建。更多信息请参见基于抛出异常将消息路由到自定义 DLT

ContainerProperties 的 transactionManager 属性已弃用

弃用 ContainerProperties 中的 transactionManager 属性,转而使用 KafkaAwareTransactionManager,相较于通用的 PlatformTransactionManager,这是一个更窄的类型。参见ContainerProperties事务同步

回滚后处理

提供了一个新的 AfterRollbackProcessor API processBatch。更多信息请参见回滚后处理器

改变 @RetryableTopic SameIntervalTopicReuseStrategy 默认值

@RetryableTopicSameIntervalTopicReuseStrategy 属性默认值改为 SINGLE_TOPIC。参见最大间隔指数延迟的单一主题

非阻塞重试支持类级别的 @KafkaListener

非阻塞重试支持类级别的 @KafkaListener。参见非阻塞重试

RetryTopicConfigurationProvider 中支持处理类级别的 @RetryableTopic。

提供了一个新的公共 API,用于查找 RetryTopicConfiguration。参见查找 RetryTopicConfiguration

RetryTopicConfigurer 支持处理 MultiMethodKafkaListenerEndpoint。

RetryTopicConfigurer 支持处理和注册 MultiMethodKafkaListenerEndpointMultiMethodKafkaListenerEndpoint 为属性 defaultMethodmethods 提供 getter/setter 方法。修改严格针对 MethodKafkaListenerEndpoint 类型的 EndpointCustomizerEndpointHandlerMethod 添加了新的构造函数,用于为提供的 bean 构建实例。提供新的类 EndpointHandlerMultiMethod,用于处理重试端点的多方法。

基于用户提供的函数跳转到偏移量的新 API 方法

ConsumerCallback 提供了一个新的 API,用于基于用户定义的函数跳转到偏移量,该函数以 consumer 中的当前偏移量作为参数。更多详细信息请参见Seek API 文档

@PartitionOffset 支持 SeekPosition

@PartitionOffset 添加 seekPosition 属性以支持 TopicPartitionOffset.SeekPosition。更多详细信息请参见手动分配

TopicPartitionOffset 中接受计算跳转偏移量函数的新的构造函数

TopicPartitionOffset 有一个新的构造函数,它接受一个用户提供的函数,用于计算跳转到哪个偏移量。使用此构造函数时,框架会调用该函数,并将当前 consumer 偏移量位置作为输入参数。更多详细信息请参见Seek API 文档

Spring Boot 应用名称作为默认客户端 ID 前缀

对于定义了应用名称的 Spring Boot 应用,该名称现在被用作某些客户端类型的自动生成的客户端 ID 的默认前缀。更多详细信息请参见默认客户端 ID 前缀

增强的 MessageListenerContainers 获取

ListenerContainerRegistry 提供了两个新的 API,用于动态查找和过滤 MessageListenerContainer 实例。getListenerContainersMatching(Predicate<String> idMatcher) 用于按 ID 过滤,另一个是 getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher) 用于按 ID 和容器属性过滤。

通过提供更多跟踪标签增强可观测性

KafkaTemplateObservation 提供更多跟踪标签(低基数)。KafkaListenerObservation 提供了一个新的 API,用于查找高基数键名以及更多跟踪标签(高基数或低基数)。参见Micrometer 可观测性

3.1 版本相对于 3.0 版本的新特性

本节介绍从 3.0 版本到 3.1 版本的变化。有关更早版本中的变化,请参见变更历史

Kafka 客户端版本

此版本需要 3.6.0 kafka-clients

EmbeddedKafkaBroker

现在提供了一个额外的实现,用于使用 Kraft 而不是 Zookeeper。更多信息请参见嵌入式 Kafka Broker

JsonDeserializer

发生反序列化异常时,SerializationException 消息不再包含形式为 Can’t deserialize data [[123, 34, 98, 97, 122, …​ 的数据;每数据字节的数值数组没有用处,并且对于大量数据来说会非常冗长。与 ErrorHandlingDeserializer 一起使用时,发送到错误处理器的 DeserializationException 包含 data 属性,该属性包含无法反序列化的原始数据。不与 ErrorHandlingDeserializer 一起使用时,KafkaConsumer 会不断为同一条记录发出异常,显示主题/分区/偏移量以及 Jackson 抛出的原因。

ContainerPostProcessor

可以通过在 @KafkaListener 注解上指定 ContainerPostProcessor 的 bean 名称,对监听器容器应用后处理。这发生在容器创建之后,以及在容器工厂上配置的任何 ContainerCustomizer 之后。更多信息请参见容器工厂

ErrorHandlingDeserializer

现在可以将一个 Validator 添加到此反序列化器中;如果委托的 Deserializer 成功反序列化对象,但该对象验证失败,则会抛出一个类似于发生反序列化异常的异常。这允许将原始原始数据传递给错误处理器。更多信息请参见使用 ErrorHandlingDeserializer

可重试主题

当使用 @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC) 时,将后缀 -retry-5000 改为 -retry。如果想保留后缀 -retry-5000,请使用 @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2")。更多信息请参见主题命名

监听器容器变化

手动分配分区时,如果 consumer 的 group.idnull,则 AckMode 现在会自动强制设置为 MANUAL。更多信息请参见手动分配所有分区

3.0 版本相对于 2.9 版本的新特性

Kafka 客户端版本

此版本需要 3.3.1 kafka-clients

精确一次语义

EOSMode.V1 (即 ALPHA) 不再支持。

使用事务时,最低 broker 版本为 2.5。

更多信息请参见精确一次语义KIP-447

可观测性

现在支持使用 Micrometer 启用计时器和跟踪的可观测性。更多信息请参见可观测性

原生镜像

提供了创建原生镜像的支持。更多信息请参见原生镜像

全局单一嵌入式 Kafka

嵌入式 Kafka(EmbeddedKafkaBroker)现在可以作为整个测试计划的单一全局实例启动。更多信息请参见在多个测试类中使用相同的 Broker(s)

可重试主题变化

此功能不再被视为实验性功能(就其 API 而言),此功能本身自 2.7 版本以来就已支持,但存在高于正常水平的破坏性 API 变更的可能性。

此版本更改了非阻塞重试基础设施 bean 的引导方式,以避免某些应用中与应用初始化有关的时序问题。

现在可以为重试容器设置不同的 concurrency;默认情况下,concurrency 与主容器相同。

@RetryableTopic 现在可以用作自定义注解上的元注解,包括支持 @AliasFor 属性。

更多信息请参见配置

重试主题的默认复制因子现在是 -1(使用 broker 默认值)。如果您的 broker 版本早于 2.4,您现在需要显式设置此属性。

现在可以在同一个应用上下文中为同一个主题配置多个 @RetryableTopic 监听器。之前,这是不可能的。更多信息请参见多个监听器监听同一主题

RetryTopicConfigurationSupport 中存在破坏性 API 变更;具体来说,如果您覆盖了 destinationTopicResolverkafkaConsumerBackoffManager 和/或 retryTopicConfigurer 的 bean 定义方法,这些方法现在需要一个 ObjectProvider<RetryTopicComponentFactory> 参数。

监听器容器变化

与 consumer 认证和授权失败相关的事件现在由容器发布。更多信息请参见应用事件

现在可以自定义 consumer 线程使用的线程名称。更多信息请参见容器线程命名

容器属性 restartAfterAuthException 已添加。更多信息请参见监听器容器属性

KafkaTemplate 变化

此类返回的 future 现在是 CompletableFuture s 而不是 ListenableFuture s。关于使用此版本进行迁移的帮助,请参见使用 KafkaTemplate

ReplyingKafkaTemplate 变化

此类返回的 future 现在是 CompletableFuture s 而不是 ListenableFuture s。关于使用此版本进行迁移的帮助,请参见使用 ReplyingKafkaTemplate使用 Message<?> 进行请求/回复

@KafkaListener 变化

现在可以使用自定义关联头,该关联头将在任何回复消息中回显。更多信息请参见使用 ReplyingKafkaTemplate 末尾的说明。

现在可以手动提交批处理的一部分,在整个批处理完成处理之前。更多信息请参见提交偏移量

KafkaHeaders 变化

在 2.9.x 中已弃用的 KafkaHeaders 中的四个常量现已移除。

  • 取代 MESSAGE_KEY,请使用 KEY

  • 取代 PARTITION_ID,请使用 PARTITION

类似地,RECEIVED_MESSAGE_KEYRECEIVED_KEY 取代,RECEIVED_PARTITION_IDRECEIVED_PARTITION 取代。

测试变化

3.0.7 版本引入了 MockConsumerFactoryMockProducerFactory。更多信息请参见Mock Consumer 和 Producer

从 3.0.10 版本开始,嵌入式 Kafka broker 默认将 Spring Boot 属性 spring.kafka.bootstrap-servers 设置为嵌入式 broker 的地址。

2.9 版本相对于 2.8 版本的新特性

Kafka 客户端版本

此版本需要 3.2.0 kafka-clients

错误处理器变化

现在可以将 DefaultErrorHandler 配置为暂停容器一个 poll 并使用前一个 poll 的剩余结果,而不是跳转到剩余记录的偏移量。更多信息请参见DefaultErrorHandler

DefaultErrorHandler 现在有一个 BackOffHandler 属性。更多信息请参见Back Off Handler

监听器容器变化

interceptBeforeTx 现在适用于所有事务管理器(之前仅在使用 KafkaAwareTransactionManager 时应用)。参见[interceptBeforeTx]

提供了一个新的容器属性 pauseImmediate,这允许容器在当前记录处理后暂停 consumer,而不是在前一个 poll 的所有记录处理后。参见[pauseImmediate]

与 consumer 认证和授权相关的事件

Header Mapper 变化

现在可以配置应该映射哪些入站头。在 2.8.8 或更高版本中也可用。更多信息请参见消息头

KafkaTemplate 变化

在 3.0 版本中,此类返回的 future 将是 CompletableFuture s 而不是 ListenableFuture s。关于使用此版本进行迁移的帮助,请参见使用 KafkaTemplate

ReplyingKafkaTemplate 变化

该模板现在提供了一个方法,用于等待回复容器上的分配,以避免在回复容器初始化之前发送请求时出现的竞态条件。在 2.8.8 或更高版本中也可用。参见使用 ReplyingKafkaTemplate

在 3.0 版本中,此类返回的 future 将是 CompletableFuture s 而不是 ListenableFuture s。关于使用此版本进行迁移的帮助,请参见使用 ReplyingKafkaTemplate使用 Message<?> 进行请求/回复

2.8 版本相对于 2.7 版本的新特性

本节介绍从 2.7 版本到 2.8 版本的变化。有关更早版本中的变化,请参见变更历史

Kafka 客户端版本

此版本需要 3.0.0 kafka-clients

包变化

与类型映射相关的类和接口已从 …​support.converter 移动到 …​support.mapping

  • AbstractJavaTypeMapper

  • ClassMapper

  • DefaultJackson2JavaTypeMapper

  • Jackson2JavaTypeMapper

无序手动提交

监听器容器现在可以配置为接受无序的手动偏移量提交(通常是异步的)。容器将延迟提交,直到缺失的偏移量被确认。更多信息请参见手动提交偏移量

@KafkaListener 变化

现在可以在方法本身上指定监听器方法是否为批处理监听器。这使得同一个容器工厂可以用于记录和批处理监听器。

更多信息请参见[batch-listeners]

批处理监听器现在可以处理转换异常。

RecordFilterStrategy 与批处理监听器一起使用时,现在可以在一次调用中过滤整个批处理。更多信息请参见[batch-listeners]末尾的说明。

@KafkaListener 注解现在具有 filter 属性,用于覆盖容器工厂在此监听器上的 RecordFilterStrategy

@KafkaListener 注解现在具有 info 属性;这用于填充新的监听器容器属性 listenerInfo。然后将其用于在每条记录中填充一个 KafkaHeaders.LISTENER_INFO 头,可以在 RecordInterceptorRecordFilterStrategy 或监听器本身中使用。更多信息请参见监听器信息头AbstractMessageListenerContainer 属性

KafkaTemplate 变化

现在可以接收单条记录,给定主题、分区和偏移量。更多信息请参见使用 KafkaTemplate 接收

CommonErrorHandler 添加

旧版的 GenericErrorHandler 及其用于记录和批处理监听器的子接口层次结构已被新的单一接口 CommonErrorHandler 取代,新接口的实现对应于大多数旧版 GenericErrorHandler 的实现。更多信息请参见 容器错误处理器将自定义旧版错误处理器实现迁移到 CommonErrorHandler

监听器容器变更

interceptBeforeTx 容器属性现在默认为 true

authorizationExceptionRetryInterval 属性已重命名为 authExceptionRetryInterval,现在除了之前应用于 AuthorizationException 外,也适用于 AuthenticationException。默认情况下,这两种异常都被视为致命异常,容器将停止,除非设置了此属性。

序列化器/反序列化器变更

现在提供了 DelegatingByTopicSerializerDelegatingByTopicDeserializer。更多信息请参见 委派序列化器和反序列化器

DeadLetterPublishingRecover 变更

属性 stripPreviousExceptionHeaders 现在默认为 true

现在有几种技术可以自定义哪些头部会被添加到输出记录中。

更多信息请参见 管理死信记录头部

可重试主题变更

现在,您可以对可重试主题和不可重试主题使用相同的工厂。更多信息请参见 指定 ListenerContainerFactory

现在有一个可管理的全局致命异常列表,这些异常将使失败的记录直接进入 DLT。请参考 异常分类器 查看如何管理它。

您现在可以结合使用阻塞和非阻塞重试。更多信息请参见 结合使用阻塞和非阻塞重试

使用可重试主题功能时抛出的 KafkaBackOffException 现在记录在 DEBUG 级别。如果您需要将日志级别改回 WARN 或设置为任何其他级别,请参见 更改 KafkaBackOffException 日志级别

2.6 与 2.7 之间的变更

Kafka 客户端版本

此版本需要 2.7.0 版本的 kafka-clients。自版本 2.7.1 起,它也与 2.8.0 客户端兼容;请参见 覆盖 Spring Boot 依赖

使用主题实现非阻塞延迟重试

此版本中增加了这一重要的新功能。当严格排序不重要时,失败的投递可以发送到另一个主题以便稍后消费。可以配置一系列此类重试主题,并增加延迟时间。更多信息请参见 非阻塞重试

监听器容器变更

onlyLogRecordMetadata 容器属性现在默认为 true

现在提供了一个新的容器属性 stopImmediate

更多信息请参见 监听器容器属性

在投递尝试之间使用 BackOff 的错误处理器(例如 SeekToCurrentErrorHandlerDefaultAfterRollbackProcessor)现在会在容器停止后立即退出退避间隔,而不是延迟停止。

继承 FailedRecordProcessor 的错误处理器和回滚后处理器现在可以配置一个或多个 RetryListener,以接收有关重试和恢复进度的信息。

RecordInterceptor 现在增加了在监听器返回(正常或通过抛出异常)后调用的方法。它还有一个子接口 ConsumerAwareRecordInterceptor。此外,现在还有一个用于批处理监听器的 BatchInterceptor。更多信息请参见 消息监听器容器

@KafkaListener 变更

您现在可以验证 @KafkaHandler 方法(类级别监听器)的载荷参数。更多信息请参见 @KafkaListener @Payload 验证

您现在可以在 MessagingMessageConverterBatchMessagingMessageConverter 上设置 rawRecordHeader 属性,这使得原始 ConsumerRecord 会被添加到转换后的 Message<?> 中。这对于例如您希望在监听器错误处理器中使用 DeadLetterPublishingRecoverer 的场景非常有用。更多信息请参见 监听器错误处理器

您现在可以在应用程序初始化期间修改 @KafkaListener 注解。更多信息请参见 @KafkaListener 属性修改

DeadLetterPublishingRecover 变更

现在,如果键和值都反序列化失败,原始值会发布到 DLT。以前,值会被填充,但键的 DeserializationException 会留在头部中。如果您子类化了恢复器并重写了 createProducerRecord 方法,这是一个破坏性的 API 变更。

此外,恢复器在发布到目标分区之前,会验证目标解析器选择的分区确实存在。

更多信息请参见 发布死信记录

ChainedKafkaTransactionManager 已弃用

更多信息请参见 事务

ReplyingKafkaTemplate 变更

现在有一种机制可以在检查回复时,如果存在某些条件,则异常地终止 future。

已添加对发送和接收 spring-messaging Message<?> 的支持。

更多信息请参见 使用 ReplyingKafkaTemplate

Kafka Streams 变更

默认情况下,StreamsBuilderFactoryBean 现在配置为不清理本地状态。更多信息请参见 配置

KafkaAdmin 变更

已添加新的方法 createOrModifyTopicsdescribeTopics。已添加 KafkaAdmin.NewTopics 以方便在单个 bean 中配置多个主题。更多信息请参见 [configuring-topics]

MessageConverter 变更

现在可以将 spring-messaging SmartMessageConverter 添加到 MessagingMessageConverter 中,从而允许根据 contentType 头部进行内容协商。更多信息请参见 Spring Messaging 消息转换

按顺序启动 @KafkaListener

更多信息请参见 按顺序启动 @KafkaListener

ExponentialBackOffWithMaxRetries

提供了一种新的 BackOff 实现,使得配置最大重试次数更加方便。更多信息请参见 ExponentialBackOffWithMaxRetries 实现

条件委派错误处理器

这些新的错误处理器可以配置为根据异常类型委派给不同的错误处理器。更多信息请参见 委派错误处理器

2.5 与 2.6 之间的变更

Kafka 客户端版本

此版本需要 2.6.0 版本的 kafka-clients

监听器容器变更

默认的 EOSMode 现在是 BETA。更多信息请参见 精确一次语义

各种错误处理器(继承 FailedRecordProcessor)和 DefaultAfterRollbackProcessor 现在在恢复失败时会重置 BackOff。此外,您现在可以根据失败的记录和/或异常来选择使用的 BackOff

您现在可以在容器属性中配置 adviceChain。更多信息请参见 监听器容器属性

当容器配置为发布 ListenerContainerIdleEvent 时,在发布空闲事件后接收到记录时,它现在会发布 ListenerContainerNoLongerIdleEvent。更多信息请参见 应用事件检测空闲和无响应的消费者

@KafkaListener 变更

使用手动分区分配时,您现在可以指定一个通配符来确定哪些分区应重置为初始偏移量。此外,如果监听器实现了 ConsumerSeekAware,则在手动分配后会调用 onPartitionsAssigned() 方法。(也在版本 2.5.5 中添加)。更多信息请参见 显式分区分配

AbstractConsumerSeekAware 中添加了便捷方法,使得 seek 操作更容易。更多信息请参见 [seek]

ErrorHandler 变更

FailedRecordProcessor 的子类(例如 SeekToCurrentErrorHandlerDefaultAfterRollbackProcessorRecoveringBatchErrorHandler)现在可以配置为,如果异常类型与此记录之前发生的异常类型不同,则重置重试状态。

生产者工厂变更

您现在可以为生产者设置最大存活时间,超过此时间后它们将被关闭并重新创建。更多信息请参见 事务

您现在可以在创建 DefaultKafkaProducerFactory 后更新配置映射。这可能很有用,例如,如果您在凭证更改后必须更新 SSL key/trust store 位置。更多信息请参见 使用 DefaultKafkaProducerFactory

2.4 与 2.5 之间的变更

本节介绍从 2.4 版到 2.5 版的变更。对于早期版本的变更,请参见 变更历史

消费者/生产者工厂变更

默认的消费者和生产者工厂现在可以在创建或关闭消费者或生产者时调用回调。提供了原生 Micrometer 指标的实现。更多信息请参见 工厂监听器

您现在可以在运行时更改 bootstrap server 属性,从而实现故障转移到另一个 Kafka 集群。更多信息请参见 连接到 Kafka

StreamsBuilderFactoryBean 变更

工厂 bean 现在可以在创建或销毁 KafkaStreams 时调用回调。提供了原生 Micrometer 指标的实现。更多信息请参见 KafkaStreams Micrometer 支持

Kafka 客户端版本

此版本需要 2.5.0 版本的 kafka-clients

类/包变更

SeekUtils 已从 o.s.k.support 包移动到 o.s.k.listener 包。

投递尝试头部

现在有一个选项可以在使用某些错误处理器和回滚后处理器时添加一个用于跟踪投递尝试次数的头部。更多信息请参见 投递尝试头部

@KafkaListener 变更

@KafkaListener 返回类型为 Message<?> 时,如果需要,默认回复头部现在会自动填充。更多信息请参见 回复类型 Message<?>

当传入记录的键为 null 时,KafkaHeaders.RECEIVED_MESSAGE_KEY 不再填充 null 值;而是完全省略该头部。

@KafkaListener 方法现在可以指定 ConsumerRecordMetadata 参数,而不是使用离散的头部来获取元数据,例如主题、分区等。更多信息请参见 消费者记录元数据

监听器容器变更

assignmentCommitOption 容器属性现在默认为 LATEST_ONLY_NO_TX。更多信息请参见 监听器容器属性

使用事务时,subBatchPerPartition 容器属性现在默认为 true。更多信息请参见 事务

现在提供了一个新的 RecoveringBatchErrorHandler

现在支持静态组成员资格。更多信息请参见 消息监听器容器

配置了增量/协同再平衡时,如果偏移量提交失败且异常为非致命的 RebalanceInProgressException,容器将尝试在再平衡完成后重新提交仍分配给此实例的分区的偏移量。

现在,记录监听器的默认错误处理器是 SeekToCurrentErrorHandler,批处理监听器是 RecoveringBatchErrorHandler。更多信息请参见 容器错误处理器

您现在可以控制标准错误处理器有意抛出的异常的日志级别。更多信息请参见 容器错误处理器

已添加 getAssignmentsByClientId() 方法,使得更容易确定并发容器中的哪些消费者被分配了哪些分区。更多信息请参见 监听器容器属性

您现在可以抑制在错误、调试日志等中记录完整的 ConsumerRecord。请参见 监听器容器属性 中的 onlyLogRecordMetadata

KafkaTemplate 变更

KafkaTemplate 现在可以维护 micrometer 计时器。更多信息请参见 监控

KafkaTemplate 现在可以使用 ProducerConfig 属性进行配置,以覆盖生产者工厂中的配置。更多信息请参见 使用 KafkaTemplate

现在提供了 RoutingKafkaTemplate。更多信息请参见 使用 RoutingKafkaTemplate

您现在可以使用 KafkaSendCallback 代替 ListenerFutureCallback 来获取更窄范围的异常,从而更容易提取失败的 ProducerRecord。更多信息请参见 使用 KafkaTemplate

Kafka 字符串序列化器/反序列化器

现在提供了新的 ToStringSerializer/StringDeserializer 以及关联的 SerDe。更多信息请参见 字符串序列化

JsonDeserializer

JsonDeserializer 现在在确定反序列化类型方面更灵活。更多信息请参见 使用方法确定类型

委派序列化器/反序列化器

当出站记录没有头部时,DelegatingSerializer 现在可以处理“标准”类型。更多信息请参见 委派序列化器和反序列化器

测试变更

KafkaTestUtils.consumerProps() 辅助记录现在默认将 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 设置为 earliest。更多信息请参见 JUnit

2.3 与 2.4 之间的变更

Kafka 客户端版本

此版本需要 2.4.0 或更高版本的 kafka-clients,并支持新的增量再平衡功能。

ConsumerAwareRebalanceListener

ConsumerRebalanceListener 类似,此接口现在增加了一个方法 onPartitionsLost。更多信息请参考 Apache Kafka 文档。

ConsumerRebalanceListener 不同,默认实现**不**调用 onPartitionsRevoked。相反,监听器容器在调用 onPartitionsLost 之后会调用该方法;因此,在实现 ConsumerAwareRebalanceListener 时,您不应执行相同的操作。

更多信息请参见 再平衡监听器 末尾的 IMPORTANT 注意事项。

GenericErrorHandler

isAckAfterHandle() 的默认实现现在默认为 true。

KafkaTemplate

KafkaTemplate 现在支持非事务性发布与事务性发布并行。更多信息请参见 KafkaTemplate 事务性与非事务性发布

AggregatingReplyingKafkaTemplate

releaseStrategy 现在是一个 BiConsumer。它现在在超时后(以及记录到达时)被调用;在超时后调用时,第二个参数为 true

更多信息请参见 聚合多个回复

监听器容器

ContainerProperties 提供了一个 authorizationExceptionRetryInterval 选项,允许监听器容器在 KafkaConsumer 抛出任何 AuthorizationException 后重试。请参见其 JavaDocs 和 使用 KafkaMessageListenerContainer 了解更多信息。

@KafkaListener

@KafkaListener 注解增加了一个新属性 splitIterables;默认为 true。当回复监听器返回 Iterable 时,此属性控制返回结果是作为单个记录发送,还是为每个元素发送一个记录。更多信息请参见 使用 @SendTo 转发监听器结果

批处理监听器现在可以使用 BatchToRecordAdapter 进行配置;例如,这允许在事务中处理批处理,而监听器每次获取一条记录。使用默认实现时,可以使用 ConsumerRecordRecoverer 来处理批处理中的错误,而不会停止整个批处理的进程——这在使用事务时可能很有用。更多信息请参见 批处理监听器事务

Kafka Streams

StreamsBuilderFactoryBean 接受一个新的属性 KafkaStreamsInfrastructureCustomizer。这允许在流创建之前配置 builder 和/或 topology。更多信息请参见 Spring 管理

2.2 与 2.3 之间的变更

本节介绍从 2.2 版到 2.3 版的变更。

技巧、窍门和示例

已添加新的章节 技巧、窍门和示例。请提交 GitHub issue 和/或 pull request,以便在该章节中添加更多条目。

Kafka 客户端版本

此版本需要 2.3.0 或更高版本的 kafka-clients

类/包变更

TopicPartitionInitialOffset 已弃用,推荐使用 TopicPartitionOffset

配置变更

从 2.3.4 版本开始,missingTopicsFatal 容器属性默认为 false。当此属性为 true 时,如果 broker 关闭,应用程序将无法启动;许多用户受到了此变更的影响;考虑到 Kafka 是一个高可用平台,我们并未预期在没有活动 broker 的情况下启动应用程序是一个常见的用例。

生产者和消费者工厂变更

DefaultKafkaProducerFactory 现在可以配置为每个线程创建一个生产者。您还可以在构造函数中提供 Supplier<Serializer> 实例,作为配置类(需要无参数构造函数)或使用 Serializer 实例(然后在所有生产者之间共享)的替代方案。更多信息请参见 使用 DefaultKafkaProducerFactory

DefaultKafkaConsumerFactory 中也提供了使用 Supplier<Deserializer> 实例的相同选项。更多信息请参见 使用 KafkaMessageListenerContainer

监听器容器变更

以前,当使用监听器适配器(例如 @KafkaListener)调用监听器时,错误处理器会接收到 ListenerExecutionFailedException(其中实际的监听器异常作为 cause)。由原生 GenericMessageListener 抛出的异常会原样传递给错误处理器。现在,参数总是 ListenerExecutionFailedException(其中实际的监听器异常作为 cause),它提供了访问容器的 group.id 属性的途径。

由于监听器容器有自己的提交偏移量的机制,它偏好将 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 设置为 false。现在,除非在消费者工厂或容器的消费者属性覆盖中明确设置,否则它会自动将其设置为 false。

ackOnError 属性现在默认为 false

现在可以在监听器方法中获取消费者的 group.id 属性。更多信息请参见 获取消费者 group.id

容器增加了一个新的属性 recordInterceptor,允许在调用监听器之前检查或修改记录。如果需要调用多个拦截器,还提供了 CompositeRecordInterceptor。更多信息请参见 消息监听器容器

ConsumerSeekAware 增加了新方法,允许您相对于开头、结尾或当前位置执行 seek 操作,以及 seek 到大于或等于时间戳的第一个偏移量。更多信息请参见 [seek]

现在提供了一个便捷类 AbstractConsumerSeekAware,以简化 seek 操作。更多信息请参见 [seek]

ContainerProperties 提供了一个 idleBetweenPolls 选项,允许监听器容器的主循环在 KafkaConsumer.poll() 调用之间休眠。请参见其 JavaDocs 和 使用 KafkaMessageListenerContainer 了解更多信息。

使用 AckMode.MANUAL(或 MANUAL_IMMEDIATE)时,您现在可以通过在 Acknowledgment 上调用 nack 来触发重新投递。更多信息请参见 提交偏移量

现在可以使用 Micrometer Timer 监控监听器性能。更多信息请参见 监控

容器现在发布更多与启动相关的消费者生命周期事件。更多信息请参见 应用事件

事务性批处理监听器现在支持僵尸防护(zombie fencing)。更多信息请参见 事务

监听器容器工厂现在可以使用 ContainerCustomizer 进行配置,以便在每个容器创建和配置后进一步进行配置。更多信息请参见 容器工厂

ErrorHandler 变更

SeekToCurrentErrorHandler 现在将某些异常视为致命异常,并禁用这些异常的重试,在首次失败时即调用恢复器。

SeekToCurrentErrorHandlerSeekToCurrentBatchErrorHandler 现在可以配置为在投递尝试之间应用 BackOff(线程休眠)。

从 2.3.2 版本开始,在错误处理器恢复失败记录后返回时,恢复记录的偏移量将被提交。

DeadLetterPublishingRecoverer 在与 ErrorHandlingDeserializer 结合使用时,现在将发送到死信主题的消息的载荷设置为无法反序列化的原始值。以前,它为 null,用户代码需要从消息头部提取 DeserializationException。更多信息请参见 发布死信记录

TopicBuilder

提供了一个新类 TopicBuilder,用于更方便地创建用于自动主题配置的 NewTopic @Bean。更多信息请参见 [configuring-topics]

Kafka Streams 变更

您现在可以对由 @EnableKafkaStreams 创建的 StreamsBuilderFactoryBean 执行额外配置。更多信息请参见 Streams 配置

现在提供了一个 RecoveringDeserializationExceptionHandler,它允许恢复带有反序列化错误的记录。它可以与 DeadLetterPublishingRecoverer 结合使用,将这些记录发送到死信主题。更多信息请参见 从反序列化异常中恢复

提供了 HeaderEnricher 转换器,它使用 SpEL 生成头部值。更多信息请参见 头部丰富器

提供了 MessagingTransformer。这使得 Kafka Streams topology 可以与 spring-messaging 组件(例如 Spring Integration 流)进行交互。更多信息请参见 MessagingProcessor[从 KStream 调用 Spring Integration 流]

JSON 组件变更

现在,所有支持 JSON 的组件默认都配置了由 JacksonUtils.enhancedObjectMapper() 产生的 Jackson ObjectMapperJsonDeserializer 现在提供了基于 TypeReference 的构造函数,以更好地处理目标泛型容器类型。此外,还引入了 JacksonMimeTypeModule,用于将 org.springframework.util.MimeType 序列化为纯字符串。更多信息请参见其 JavaDocs 和 序列化、反序列化和消息转换

提供了 ByteArrayJsonMessageConverter 以及所有 JSON 转换器的新超类 JsonMessageConverter。此外,现在还提供了 StringOrBytesSerializer;它可以序列化 ProducerRecord 中的 byte[]BytesString 值。更多信息请参见 Spring Messaging 消息转换

JsonSerializerJsonDeserializerJsonSerde 现在具有流畅的 API,使得编程配置更加简单。请参见 javadoc、序列化、反序列化和消息转换 以及 Streams JSON 序列化和反序列化 了解更多信息。

ReplyingKafkaTemplate

当回复超时时,future 将以 KafkaReplyTimeoutException 异常完成,而不是 KafkaException

此外,现在提供了一个重载的 sendAndReceive 方法,允许按消息指定回复超时时间。

AggregatingReplyingKafkaTemplate

通过聚合来自多个接收者的回复来扩展 ReplyingKafkaTemplate。更多信息请参见 聚合多个回复

事务变更

您现在可以在 KafkaTemplateKafkaTransactionManager 上覆盖生产者工厂的 transactionIdPrefix。更多信息请参见 transactionIdPrefix

新的委派序列化器/反序列化器

框架现在提供了一个委派 SerializerDeserializer,它利用头部来支持生成和消费具有多种键/值类型的记录。更多信息请参见 委派序列化器和反序列化器

新的重试反序列化器

框架现在提供了一个委派的 RetryingDeserializer,用于在发生瞬时错误(例如网络问题)时重试序列化。更多信息请参见 重试反序列化器

2.1 与 2.2 之间的变更

Kafka 客户端版本

此版本需要 2.0.0 或更高版本的 kafka-clients

类和包变更

ContainerProperties 类已从 org.springframework.kafka.listener.config 包移动到 org.springframework.kafka.listener 包。

AckMode 枚举已从 AbstractMessageListenerContainer 移动到 ContainerProperties

setBatchErrorHandler()setErrorHandler() 方法已从 ContainerProperties 移动到 AbstractMessageListenerContainerAbstractKafkaListenerContainerFactory

回滚后处理

提供了一个新的 AfterRollbackProcessor 策略。更多信息请参见 回滚后处理器

ConcurrentKafkaListenerContainerFactory 变更

您现在可以使用 ConcurrentKafkaListenerContainerFactory 创建和配置任何 ConcurrentMessageListenerContainer,而不仅仅是用于 @KafkaListener 注解的容器。更多信息请参见 容器工厂

监听器容器变更

已添加新的容器属性(missingTopicsFatal)。更多信息请参见 使用 KafkaMessageListenerContainer

消费者停止时,现在会发出 ConsumerStoppedEvent。更多信息请参见 线程安全

批处理监听器可以选择接收完整的 ConsumerRecords<?, ?> 对象,而不是 List<ConsumerRecord<?, ?>。更多信息请参见 [batch-listeners]

DefaultAfterRollbackProcessorSeekToCurrentErrorHandler 现在可以恢复(跳过)持续失败的记录,并且默认在失败 10 次后执行此操作。它们可以配置为将失败的记录发布到死信主题。

从 2.2.4 版本开始,在选择死信主题名称时可以使用消费者的 group ID。

已添加 ConsumerStoppingEvent。更多信息请参见 应用事件

当容器配置为 AckMode.MANUAL_IMMEDIATE 时(自 2.2.4 版起),SeekToCurrentErrorHandler 现在可以配置为提交已恢复记录的偏移量。

@KafkaListener 变更

您现在可以通过在注解上设置属性来覆盖监听器容器工厂的 concurrencyautoStartup 属性。您现在可以添加配置来决定将哪些头信息(如果有)复制到回复消息中。更多信息请参阅 @KafkaListener 注解

您现在可以将 @KafkaListener 作为元注解用于您自己的注解。更多信息请参阅 @KafkaListener 作为元注解

现在更容易为 @Payload 验证配置一个 Validator。更多信息请参阅 @KafkaListener @Payload 验证

您现在可以直接在注解上指定 Kafka 消费者属性;这些属性将覆盖消费者工厂中定义的同名属性(自版本 2.2.4 起)。更多信息请参阅 注解属性

头信息映射变更

类型为 MimeTypeMediaType 的头信息现在在 RecordHeader 值中映射为简单的字符串。以前,它们被映射为 JSON,并且只有 MimeType 被解码。MediaType 无法解码。现在为了互操作性,它们是简单的字符串。

此外,DefaultKafkaHeaderMapper 有一个新的 addToStringClasses 方法,允许指定应使用 toString() 而非 JSON 映射的类型。更多信息请参阅 消息头信息

嵌入式 Kafka 变更

KafkaEmbedded 类及其 KafkaRule 接口已被弃用,转而使用 EmbeddedKafkaBroker 及其 JUnit 4 包装器 EmbeddedKafkaRule@EmbeddedKafka 注解现在填充一个 EmbeddedKafkaBroker bean,而不是已弃用的 KafkaEmbedded。此变更允许在 JUnit 5 测试中使用 @EmbeddedKafka@EmbeddedKafka 注解现在具有 ports 属性,用于指定填充 EmbeddedKafkaBroker 的端口。更多信息请参阅 测试应用程序

JsonSerializer/Deserializer 增强

您现在可以使用生产者和消费者属性提供类型映射信息。

反序列化器上提供了新的构造函数,允许使用提供的目标类型覆盖类型头信息。

JsonDeserializer 现在默认删除任何类型信息头信息。

您现在可以使用 Kafka 属性将 JsonDeserializer 配置为忽略类型信息头信息(自版本 2.2.3 起)。

更多信息请参阅 序列化、反序列化和消息转换

Kafka Streams 变更

streams 配置 bean 现在必须是一个 KafkaStreamsConfiguration 对象,而不是一个 StreamsConfig 对象。

StreamsBuilderFactoryBean 已从包 …​core 移至 …​config

引入了 KafkaStreamBrancher,以便在基于 KStream 实例构建条件分支时提供更好的用户体验。

更多信息请参阅 Apache Kafka Streams 支持配置

事务 ID

当事务由监听器容器启动时,transactional.id 现在是 transactionIdPrefix 附加 <group.id>.<topic>.<partition>。此变更允许正确地隔离僵尸进程,如此处所述

2.0 和 2.1 之间的变更

Kafka 客户端版本

此版本要求 1.0.0 或更高版本的 kafka-clients

版本 2.2 原生支持 1.1.x 客户端。

JSON 改进

StringJsonMessageConverterJsonSerializer 现在在 Headers 中添加类型信息,允许转换器和 JsonDeserializer 在接收时基于消息本身而不是固定的配置类型创建特定类型。更多信息请参阅 序列化、反序列化和消息转换

停止容器的错误处理器

现在为记录和批量监听器提供了容器错误处理器,它们将监听器抛出的任何异常视为致命错误,并停止容器。更多信息请参阅 处理异常

暂停和恢复容器

监听器容器现在具有 pause()resume() 方法(自版本 2.1.3 起)。更多信息请参阅 暂停和恢复监听器容器

有状态重试

自版本 2.1.3 起,您可以配置有状态重试。更多信息请参阅 有状态重试

客户端 ID

自版本 2.1.1 起,您现在可以在 @KafkaListener 上设置 client.id 前缀。以前,要自定义客户端 ID,您需要为每个监听器配置单独的消费者工厂(和容器工厂)。当您使用并发时,前缀后会添加 -n 以提供唯一的客户端 ID。

记录偏移量提交日志

默认情况下,使用 DEBUG 日志级别记录主题偏移量提交日志。自版本 2.1.2 起,ContainerProperties 中的新属性 commitLogLevel 允许您指定这些消息的日志级别。更多信息请参阅 使用 KafkaMessageListenerContainer

默认 @KafkaHandler

自版本 2.1.3 起,您可以将类级别 @KafkaListener 上的一个 @KafkaHandler 注解指定为默认注解。更多信息请参阅 类级别 @KafkaListener

ReplyingKafkaTemplate

自版本 2.1.3 起,提供了一个 KafkaTemplate 的子类来支持请求/回复语义。更多信息请参阅 使用 ReplyingKafkaTemplate

ChainedKafkaTransactionManager

版本 2.1.3 引入了 ChainedKafkaTransactionManager。(现已弃用)。

从 2.0 迁移指南

请参阅 2.0 到 2.1 迁移 指南。

1.3 和 2.0 之间的变更

Spring Framework 和 Java 版本

Spring for Apache Kafka 项目现在需要 Spring Framework 5.0 和 Java 8。

@KafkaListener 变更

您现在可以使用 @SendTo 注解 @KafkaListener 方法(以及类和 @KafkaHandler 方法)。如果方法返回结果,则会转发到指定的主题。更多信息请参阅 使用 @SendTo 转发监听器结果

消息监听器

消息监听器现在可以感知 Consumer 对象。更多信息请参阅 [message-listeners]

使用 ConsumerAwareRebalanceListener

重平衡监听器现在可以在重平衡通知期间访问 Consumer 对象。更多信息请参阅 重平衡监听器

1.2 和 1.3 之间的变更

事务支持

0.11.0.0 客户端库增加了对事务的支持。已添加 KafkaTransactionManager 及其他事务支持。更多信息请参阅 事务

头信息支持

0.11.0.0 客户端库增加了对消息头信息的支持。现在可以将它们映射到 spring-messaging MessageHeaders 并从中映射。更多信息请参阅 消息头信息

创建主题

0.11.0.0 客户端库提供了一个 AdminClient,您可以使用它来创建主题。KafkaAdmin 使用此客户端自动添加定义为 @Bean 实例的主题。

Kafka 时间戳支持

KafkaTemplate 现在支持添加带有时间戳的记录的 API。已引入新的 KafkaHeaders 以支持 timestamp。此外,还添加了新的 KafkaConditions.timestamp()KafkaMatchers.hasTimestamp() 测试工具。更多详情请参阅 使用 KafkaTemplate@KafkaListener 注解测试应用程序

@KafkaListener 变更

您现在可以配置 KafkaListenerErrorHandler 来处理异常。更多信息请参阅 处理异常

默认情况下,@KafkaListenerid 属性现在用作 group.id 属性,覆盖消费者工厂中配置的属性(如果存在)。此外,您可以在注解上显式配置 groupId。以前,您需要为每个监听器使用单独的容器工厂(和消费者工厂)来使用不同的 group.id 值。要恢复使用工厂配置的 group.id 的先前行为,请将注解上的 idIsGroup 属性设置为 false

@EmbeddedKafka 注解

为了方便起见,提供了类级别的测试注解 @EmbeddedKafka,用于将 KafkaEmbedded 注册为 bean。更多信息请参阅 测试应用程序

Kerberos 配置

现在提供了配置 Kerberos 的支持。更多信息请参阅 JAAS 和 Kerberos

1.1 和 1.2 之间的变更

此版本使用 0.10.2.x 客户端。

1.0 和 1.1 之间的变更

Kafka 客户端

此版本使用 Apache Kafka 0.10.x.x 客户端。

批量监听器

可以将监听器配置为接收 consumer.poll() 操作返回的整个消息批次,而不是逐个接收。

空载荷

当使用日志压缩时,空载荷用于“删除”键。

初始偏移量

当显式分配分区时,您现在可以配置相对于消费者组当前位置的初始偏移量,而不是相对于绝对位置或当前结束位置。

Seek

您现在可以寻找每个主题或分区的偏移位置。当使用组管理且 Kafka 分配分区时,您可以使用此功能在初始化期间设置初始位置。当检测到空闲容器或在应用程序执行的任何任意点时,您也可以进行寻求。更多信息请参阅 [seek]