变更历史
3.1 相较于 3.0 的新增功能
本节介绍了从 3.0 版到 3.1 版所做的更改。有关早期版本的更改,请参阅 变更历史。
EmbeddedKafkaBroker
现在提供了额外的实现来使用 Kraft
而不是 Zookeeper。有关更多信息,请参阅 嵌入式 Kafka 代理。
JsonDeserializer
当发生反序列化异常时,SerializationException
消息不再包含 Can’t deserialize data [[123, 34, 98, 97, 122, …
形式的数据;每个数据字节的数值数组并没有用,对于大型数据来说可能会很冗长。当与 ErrorHandlingDeserializer
一起使用时,发送到错误处理程序的 DeserializationException
包含 data
属性,该属性包含无法反序列化的原始数据。当不与 ErrorHandlingDeserializer
一起使用时,KafkaConsumer
将持续为同一记录发出异常,显示主题/分区/偏移量以及 Jackson 引发的异常原因。
ContainerPostProcessor
可以通过在 @KafkaListener
注解上指定 ContainerPostProcessor
的 Bean 名称,对监听器容器应用后处理。这发生在容器创建之后以及在容器工厂上配置的任何已配置的 ContainerCustomizer
之后。有关更多信息,请参阅 容器工厂。
ErrorHandlingDeserializer
您现在可以向此反序列化程序添加 Validator
;如果委托 Deserializer
成功地反序列化了对象,但该对象验证失败,则会抛出一个类似于反序列化异常的异常。这允许将原始原始数据传递到错误处理程序。有关更多信息,请参阅 使用 ErrorHandlingDeserializer
。
可重试主题
当 @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
时,将后缀 -retry-5000
更改为 -retry
。如果要保留后缀 -retry-5000
,请使用 @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2")
。有关更多信息,请参阅 主题命名。
监听器容器更改
当手动分配分区时,使用 null
消费者 group.id
,AckMode
现在会自动强制转换为 MANUAL
。有关更多信息,请参阅 手动分配所有分区。
3.0 相较于 2.9 的新增功能
观测
现在支持使用 Micrometer 启用计时器和跟踪的观测。有关更多信息,请参阅 观测。
原生镜像
提供了对创建原生镜像的支持。有关更多信息,请参阅 原生镜像。
全局单个嵌入式 Kafka
嵌入式 Kafka(EmbeddedKafkaBroker
)现在可以作为整个测试计划的单个全局实例启动。有关更多信息,请参阅 对多个测试类使用相同的代理。
可重试主题更改
此功能不再被视为实验性的(就其 API 而言),此功能本身已从 2.7 版开始支持,但 API 更改的可能性高于正常水平。
在此版本中,非阻塞重试 基础设施 Bean 的引导已更改,以避免某些应用程序在应用程序初始化方面发生的一些时间问题。
您现在可以为重试容器设置不同的 并发性
;默认情况下,并发性与主容器相同。
@RetryableTopic
现在可以用作自定义注释上的元注释,包括对 @AliasFor
属性的支持。
有关更多信息,请参阅 配置。
重试主题的默认复制因子现在为 -1
(使用代理默认值)。如果您的代理早于 2.4 版,则现在需要显式设置该属性。
您现在可以在同一应用程序上下文中同一主题上配置多个 @RetryableTopic
监听器。以前,这是不可能的。有关更多信息,请参阅 多个监听器,相同主题。
RetryTopicConfigurationSupport
中存在重大 API 更改;具体来说,如果您覆盖了 destinationTopicResolver
、kafkaConsumerBackoffManager
和/或 retryTopicConfigurer
的 bean 定义方法;这些方法现在需要一个 ObjectProvider<RetryTopicComponentFactory>
参数。
监听器容器更改
容器现在发布与消费者身份验证和授权失败相关的事件。有关更多信息,请参见 应用程序事件。
您现在可以自定义消费者线程使用的线程名称。有关更多信息,请参见 容器线程命名。
添加了容器属性 restartAfterAuthException
。有关更多信息,请参见 监听器容器属性。
KafkaTemplate
更改
此类返回的 Future 现在是 CompletableFuture
而不是 ListenableFuture
。请参见 使用 KafkaTemplate
。
ReplyingKafkaTemplate
更改
此类返回的 Future 现在是 CompletableFuture
而不是 ListenableFuture
。请参见 使用 ReplyingKafkaTemplate
和 使用 Message<?>
进行请求/回复。
@KafkaListener
更改
您现在可以使用自定义关联标头,该标头将在任何回复消息中进行回显。有关更多信息,请参见 使用 ReplyingKafkaTemplate
末尾的注释。
您现在可以手动提交批处理的一部分,然后再处理整个批处理。有关更多信息,请参见 提交偏移量。
KafkaHeaders
更改
KafkaHeaders
中四个在 2.9.x 中已弃用的常量现已删除。
-
请使用
KEY
代替MESSAGE_KEY
。 -
请使用
PARTITION
代替PARTITION_ID
类似地,RECEIVED_MESSAGE_KEY
被 RECEIVED_KEY
替换,RECEIVED_PARTITION_ID
被 RECEIVED_PARTITION
替换。
测试更改
3.0.7 版本引入了 MockConsumerFactory
和 MockProducerFactory
。有关更多信息,请参见 模拟消费者和生产者。
从 3.0.10 版本开始,嵌入式 Kafka 代理默认情况下会将 Spring Boot 属性 spring.kafka.bootstrap-servers
设置为嵌入式代理的地址。
2.9 相比 2.8 的新增功能
错误处理程序更改
DefaultErrorHandler
现在可以配置为暂停容器一个轮询并使用上一个轮询的剩余结果,而不是跳转到剩余记录的偏移量。有关更多信息,请参见 DefaultErrorHandler。
DefaultErrorHandler
现在具有 BackOffHandler
属性。有关更多信息,请参见 回退处理程序。
监听器容器更改
interceptBeforeTx
现在适用于所有事务管理器(以前仅在使用 KafkaAwareTransactionManager
时应用)。请参见 [interceptBeforeTx]。
提供了一个新的容器属性 pauseImmediate
,它允许容器在处理完当前记录后暂停消费者,而不是在处理完上一个轮询的所有记录后暂停。请参见 [pauseImmediate]。
与消费者身份验证和授权相关的事件
标头映射器更改
您现在可以配置要映射哪些入站标头。在 2.8.8 或更高版本中也可用。有关更多信息,请参见 消息标头。
KafkaTemplate
更改
在 3.0 中,此类返回的 Future 将是 CompletableFuture
而不是 ListenableFuture
。请参见 使用 KafkaTemplate
以在使用此版本时进行过渡。
ReplyingKafkaTemplate
更改
模板现在提供了一种方法来等待回复容器上的分配,以避免在回复容器初始化之前发送请求时出现竞争。在 2.8.8 或更高版本中也可用。请参见 使用 ReplyingKafkaTemplate
。
在 3.0 中,此类返回的 Future 将是 CompletableFuture
而不是 ListenableFuture
。请参见 使用 ReplyingKafkaTemplate
和 使用 Message<?>
进行请求/回复 以在使用此版本时进行过渡。
2.8 相比 2.7 的新增功能
本节介绍从 2.7 版到 2.8 版所做的更改。有关早期版本的更改,请参见 更改历史记录。
包更改
与类型映射相关的类和接口已从 …support.converter
移动到 …support.mapping
。
-
AbstractJavaTypeMapper
-
ClassMapper
-
DefaultJackson2JavaTypeMapper
-
Jackson2JavaTypeMapper
无序手动提交
监听器容器现在可以配置为接受无序(通常是异步)的手动偏移量提交。容器将推迟提交,直到确认缺少的偏移量。有关更多信息,请参见 手动提交偏移量。
@KafkaListener
更改
现在可以在方法本身指定监听器方法是否为批处理监听器。这允许对记录和批处理监听器都使用相同的容器工厂。
有关更多信息,请参见 [batch-listeners]。
批处理监听器现在可以处理转换异常。
有关更多信息,请参见 批处理错误处理程序的转换错误。
RecordFilterStrategy
在与批处理监听器一起使用时,现在可以在一次调用中过滤整个批处理。有关更多信息,请参见 [batch-listeners] 末尾的注释。
@KafkaListener
注释现在具有 filter
属性,用于覆盖仅此监听器的容器工厂的 RecordFilterStrategy
。
KafkaTemplate
更改
您现在可以接收单个记录,前提是给定主题、分区和偏移量。有关更多信息,请参见 使用 KafkaTemplate
接收。
添加 CommonErrorHandler
旧的 GenericErrorHandler
及其用于记录和批处理监听器的子接口层次结构已被新的单个接口 CommonErrorHandler
替换,该接口的实现对应于 GenericErrorHandler
的大多数旧实现。有关更多信息,请参见 容器错误处理程序 和 将自定义旧错误处理程序实现迁移到 CommonErrorHandler
。
监听器容器更改
interceptBeforeTx
容器属性现在默认为 true
。
authorizationExceptionRetryInterval
属性已重命名为 authExceptionRetryInterval
,现在除了以前适用的 AuthorizationException
之外,还适用于 AuthenticationException
。这两个异常都被视为致命异常,默认情况下容器将停止,除非设置此属性。
有关更多信息,请参见 使用 KafkaMessageListenerContainer
和 监听器容器属性。
序列化程序/反序列化程序更改
现在提供了 DelegatingByTopicSerializer
和 DelegatingByTopicDeserializer
。有关更多信息,请参见 委托序列化程序和反序列化程序。
DeadLetterPublishingRecover
更改
属性 stripPreviousExceptionHeaders
现在默认为 true
。
现在有几种技术可以自定义添加到输出记录中的标头。
有关更多信息,请参见 管理死信记录标头。
可重试主题更改
现在,您可以对可重试主题和不可重试主题使用相同的工厂。有关更多信息,请参见 指定 ListenerContainerFactory。
现在有一个可管理的全局致命异常列表,这些异常将使失败的记录直接进入 DLT。请参阅 异常分类器 以了解如何管理它。
您现在可以结合使用阻塞和非阻塞重试。有关更多信息,请参见 组合阻塞和非阻塞重试。
使用可重试主题功能时抛出的 KafkaBackOffException 现在记录在 DEBUG 级别。如果您需要将日志级别更改回 WARN 或将其设置为任何其他级别,请参见 更改 KafkaBackOffException 日志级别。
2.6 和 2.7 之间的更改
Kafka 客户端版本
此版本需要 2.7.0 版的 kafka-clients
。从 2.7.1 版开始,它也兼容 2.8.0 版的客户端;请参阅 覆盖 Spring Boot 依赖项。
使用主题的非阻塞延迟重试
此版本新增了此重要功能。当严格排序不重要时,失败的传递可以发送到另一个主题,以便稍后使用。可以配置一系列此类重试主题,并设置递增的延迟。有关更多信息,请参阅 非阻塞重试。
侦听器容器更改
onlyLogRecordMetadata
容器属性现在默认为 true
。
现在可以使用新的容器属性 stopImmediate
。
有关更多信息,请参阅 侦听器容器属性。
在传递尝试之间使用 BackOff
的错误处理程序(例如 SeekToCurrentErrorHandler
和 DefaultAfterRollbackProcessor
)现在将在容器停止后立即退出回退间隔,而不是延迟停止。
扩展 FailedRecordProcessor
的错误处理程序和回滚后处理器现在可以配置一个或多个 RetryListener
来接收有关重试和恢复进度的信息。
RecordInterceptor
现在具有在侦听器返回(正常或通过抛出异常)后调用的其他方法。它还有一个子接口 ConsumerAwareRecordInterceptor
。此外,现在还有一个用于批处理侦听器的 BatchInterceptor
。有关更多信息,请参阅 消息侦听器容器。
@KafkaListener
更改
您现在可以验证 @KafkaHandler
方法(类级侦听器)的有效负载参数。有关更多信息,请参阅 @KafkaListener
@Payload
验证。
您现在可以设置 MessagingMessageConverter
和 BatchMessagingMessageConverter
上的 rawRecordHeader
属性,这会导致原始 ConsumerRecord
添加到转换后的 Message<?>
中。例如,如果您希望在侦听器错误处理程序中使用 DeadLetterPublishingRecoverer
,这将非常有用。有关更多信息,请参阅 侦听器错误处理程序。
您现在可以在应用程序初始化期间修改 @KafkaListener
注解。有关更多信息,请参阅 @KafkaListener
属性修改。
DeadLetterPublishingRecover
更改
现在,如果键和值都反序列化失败,则原始值将发布到 DLT。以前,值已填充,但键 DeserializationException
仍保留在标头中。如果您是 recoverer 的子类并覆盖了 createProducerRecord
方法,则存在一个破坏性 API 更改。
此外,recoverer 会在发布到目标分区之前验证目标解析器选择的分区是否存在。
有关更多信息,请参阅 发布死信记录。
ChainedKafkaTransactionManager
已弃用
有关更多信息,请参阅 事务。
ReplyingKafkaTemplate
更改
现在有一种机制可以检查回复,如果存在某些条件,则使 future 异常失败。
已添加对发送和接收 spring-messaging
Message<?>
的支持。
有关更多信息,请参阅 使用 ReplyingKafkaTemplate
。
Kafka Streams 更改
默认情况下,StreamsBuilderFactoryBean
现在配置为不清理本地状态。有关更多信息,请参阅 配置。
KafkaAdmin
更改
已添加新方法 createOrModifyTopics
和 describeTopics
。已添加 KafkaAdmin.NewTopics
以便于在单个 bean 中配置多个主题。有关更多信息,请参阅 [configuring-topics]。
MessageConverter
更改
现在可以将 spring-messaging
SmartMessageConverter
添加到 MessagingMessageConverter
中,从而允许基于 contentType
标头进行内容协商。有关更多信息,请参阅 Spring 消息消息转换。
排序 @KafkaListener
有关更多信息,请参阅 按顺序启动 @KafkaListener
。
ExponentialBackOffWithMaxRetries
提供了一个新的 BackOff
实现,使配置最大重试次数更加方便。有关更多信息,请参阅 ExponentialBackOffWithMaxRetries
实现。
条件委托错误处理程序
可以将这些新的错误处理程序配置为委托给不同的错误处理程序,具体取决于异常类型。有关更多信息,请参阅 委托错误处理程序。
2.5 版和 2.6 版之间的更改
侦听器容器更改
默认 EOSMode
现在为 BETA
。有关更多信息,请参阅 恰好一次语义。
各种错误处理程序(扩展 FailedRecordProcessor
)和 DefaultAfterRollbackProcessor
现在会在恢复失败时重置 BackOff
。此外,您现在可以根据失败的记录和/或异常选择要使用的 BackOff
。
您现在可以在容器属性中配置 adviceChain
。有关更多信息,请参阅 侦听器容器属性。
当容器配置为发布 ListenerContainerIdleEvent
时,现在会在发布空闲事件后接收到记录时发布 ListenerContainerNoLongerIdleEvent
。有关更多信息,请参阅 应用程序事件 和 检测空闲和无响应的使用者。
@KafkaListener 更改
使用手动分区分配时,您现在可以指定一个通配符来确定应将哪些分区重置为初始偏移量。此外,如果侦听器实现了 ConsumerSeekAware
,则在手动分配后会调用 onPartitionsAssigned()
。(也添加到 2.5.5 版中)。有关更多信息,请参阅 显式分区分配。
已向 AbstractConsumerSeekAware
添加便利方法,以简化搜索操作。有关更多信息,请参阅 [seek]。
错误处理程序更改
FailedRecordProcessor
的子类(例如 SeekToCurrentErrorHandler
、DefaultAfterRollbackProcessor
、RecoveringBatchErrorHandler
)现在可以配置为重置重试状态,如果异常类型与之前使用此记录发生的异常类型不同。
生产者工厂更改
您现在可以设置生产者的最大生存期,在此生存期之后,它们将被关闭并重新创建。有关更多信息,请参阅 事务。
您现在可以在创建 DefaultKafkaProducerFactory
后更新配置映射。例如,如果您必须在凭据更改后更新 SSL 密钥/信任库位置,这将非常有用。有关更多信息,请参阅 使用 DefaultKafkaProducerFactory
。
2.4 版和 2.5 版之间的更改
本节介绍从 2.4 版到 2.5 版所做的更改。有关早期版本的更改,请参阅 更改历史记录。
使用者/生产者工厂更改
默认的使用者和生产者工厂现在可以在创建或关闭使用者或生产者时调用回调。提供了用于原生 Micrometer 指标的实现。有关更多信息,请参阅 工厂侦听器。
您现在可以在运行时更改引导服务器属性,从而启用故障转移到另一个 Kafka 集群。有关更多信息,请参阅 连接到 Kafka。
StreamsBuilderFactoryBean
更改
工厂 bean 现在可以在创建或销毁 KafkaStreams
时调用回调。提供了用于原生 Micrometer 指标的实现。有关更多信息,请参阅 KafkaStreams Micrometer 支持。
传递尝试标头
现在有一个选项可以添加一个标头,该标头在使用某些错误处理程序和回滚后处理器时跟踪传递尝试。有关更多信息,请参阅 传递尝试标头。
@KafkaListener 更改
如果 @KafkaListener
返回类型为 Message<?>
,则现在将自动填充默认回复标头(如果需要)。有关更多信息,请参阅 回复类型 Message<?>。
当传入记录具有 null
键时,KafkaHeaders.RECEIVED_MESSAGE_KEY
不再填充 null
值;标头完全被省略。
@KafkaListener
方法现在可以指定 ConsumerRecordMetadata
参数,而不是使用诸如主题、分区等元数据的离散标头。有关更多信息,请参阅 使用者记录元数据。
侦听器容器更改
assignmentCommitOption
容器属性现在默认为 LATEST_ONLY_NO_TX
。有关更多信息,请参阅 侦听器容器属性。
当使用事务时,subBatchPerPartition
容器属性现在默认值为 true
。有关更多信息,请参阅事务。
现在提供了一个新的 RecoveringBatchErrorHandler
。
现在支持静态组成员资格。有关更多信息,请参阅消息侦听器容器。
如果配置了增量/协作重新平衡,如果偏移量提交失败并出现非致命性 RebalanceInProgressException
,则容器将在重新平衡完成后尝试重新提交分配给此实例的分区的偏移量。
对于记录侦听器,默认错误处理程序现在是 SeekToCurrentErrorHandler
,对于批处理侦听器,默认错误处理程序是 RecoveringBatchErrorHandler
。有关更多信息,请参阅容器错误处理程序。
现在可以控制标准错误处理程序故意抛出的异常的日志记录级别。有关更多信息,请参阅容器错误处理程序。
已添加 getAssignmentsByClientId()
方法,从而更轻松地确定并发容器中的哪些消费者分配了哪些分区。有关更多信息,请参阅侦听器容器属性。
现在可以禁止在错误、调试日志等中记录整个 ConsumerRecord
。请参阅侦听器容器属性中的 onlyLogRecordMetadata
。
KafkaTemplate 更改
KafkaTemplate
现在可以维护 Micrometer 计时器。有关更多信息,请参阅监控。
现在可以使用 ProducerConfig
属性配置 KafkaTemplate
以覆盖生产者工厂中的属性。有关更多信息,请参阅使用 KafkaTemplate
。
现在提供了 RoutingKafkaTemplate
。有关更多信息,请参阅使用 RoutingKafkaTemplate
。
现在可以使用 KafkaSendCallback
代替 ListenerFutureCallback
来获取更窄的异常,从而更轻松地提取失败的 ProducerRecord
。有关更多信息,请参阅使用 KafkaTemplate
。
Kafka 字符串序列化器/反序列化器
现在提供了新的 ToStringSerializer
/StringDeserializer
以及关联的 SerDe
。有关更多信息,请参阅字符串序列化。
JsonDeserializer
JsonDeserializer
现在具有更多灵活性来确定反序列化类型。有关更多信息,请参阅使用方法确定类型。
委托序列化器/反序列化器
当出站记录没有标头时,DelegatingSerializer
现在可以处理“标准”类型。有关更多信息,请参阅委托序列化器和反序列化器。
测试更改
KafkaTestUtils.consumerProps()
帮助程序记录现在默认将 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
设置为 earliest
。有关更多信息,请参阅JUnit。
2.3 和 2.4 之间的更改
ConsumerAwareRebalanceListener
与 ConsumerRebalanceListener
一样,此接口现在还有一个附加方法 onPartitionsLost
。有关更多信息,请参阅 Apache Kafka 文档。
与 ConsumerRebalanceListener
不同,默认实现**不会**调用 onPartitionsRevoked
。相反,侦听器容器将在调用 onPartitionsLost
后调用该方法;因此,在实现 ConsumerAwareRebalanceListener
时不应执行相同的操作。
有关更多信息,请参阅重新平衡侦听器末尾的重要说明。
KafkaTemplate
KafkaTemplate
现在支持与事务并行进行的非事务发布。有关更多信息,请参阅KafkaTemplate
事务和非事务发布。
AggregatingReplyingKafkaTemplate
releaseStrategy
现在是 BiConsumer
。它现在在超时后(以及记录到达时)被调用;在超时后调用的情况下,第二个参数为 true
。
有关更多信息,请参阅聚合多个回复。
侦听器容器
ContainerProperties
提供了一个 authorizationExceptionRetryInterval
选项,允许侦听器容器在 KafkaConsumer
抛出任何 AuthorizationException
后重试。有关更多信息,请参阅其 JavaDoc 和使用 KafkaMessageListenerContainer
。
@KafkaListener
@KafkaListener
注解有一个新的属性 splitIterables
;默认为 true。当回复侦听器返回 Iterable
时,此属性控制返回结果是作为单个记录发送还是为每个元素发送记录。有关更多信息,请参阅使用 @SendTo
转发侦听器结果。
批处理侦听器现在可以使用 BatchToRecordAdapter
进行配置;例如,这允许在事务中处理批处理,而侦听器一次获取一条记录。使用默认实现,可以使用 ConsumerRecordRecoverer
处理批处理中的错误,而不会停止整个批处理的处理 - 这在使用事务时可能很有用。有关更多信息,请参阅带有批处理侦听器的事务。
Kafka Streams
StreamsBuilderFactoryBean
接受一个新的属性 KafkaStreamsInfrastructureCustomizer
。这允许在创建流之前配置构建器和/或拓扑。有关更多信息,请参阅Spring 管理。
2.2 和 2.3 之间的更改
本节介绍从版本 2.2 到版本 2.3 所做的更改。
提示、技巧和示例
添加了一个新的章节提示、技巧和示例。请提交 GitHub 问题和/或拉取请求以获取该章节中的其他条目。
配置更改
从版本 2.3.4 开始,missingTopicsFatal
容器属性默认为 false。如果代理已关闭,当此属性为 true 时,应用程序将无法启动;许多用户受到此更改的影响;鉴于 Kafka 是一个高可用性平台,我们没有预料到启动没有活动代理的应用程序将是常见用例。
生产者和消费者工厂更改
现在可以将 DefaultKafkaProducerFactory
配置为为每个线程创建一个生产者。您还可以在构造函数中提供 Supplier<Serializer>
实例,作为配置类(需要无参数构造函数)或使用 Serializer
实例进行构造(然后在所有生产者之间共享)的替代方法。有关更多信息,请参阅使用 DefaultKafkaProducerFactory
。
DefaultKafkaConsumerFactory
中的 Supplier<Deserializer>
实例也提供了相同的选项。有关更多信息,请参阅使用 KafkaMessageListenerContainer
。
侦听器容器更改
以前,当使用侦听器适配器(例如 @KafkaListener
)调用侦听器时,错误处理程序会接收 ListenerExecutionFailedException
(实际侦听器异常作为 cause
)。本机 GenericMessageListener
抛出的异常将未经更改地传递给错误处理程序。现在 ListenerExecutionFailedException
始终是参数(实际侦听器异常作为 cause
),它提供了对容器的 group.id
属性的访问。
由于侦听器容器有自己的偏移量提交机制,因此它更喜欢将 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
设置为 false
。它现在会自动将其设置为 false,除非在消费者工厂或容器的消费者属性覆盖中明确设置。
ackOnError
属性现在默认为 false
。
现在可以在侦听器方法中获取消费者的 group.id
属性。有关更多信息,请参阅获取消费者 group.id
。
容器有一个新的属性 recordInterceptor
,允许在调用侦听器之前检查或修改记录。如果需要调用多个拦截器,也会提供 CompositeRecordInterceptor
。有关更多信息,请参阅消息侦听器容器。
ConsumerSeekAware
有新的方法允许您相对于开头、结尾或当前位置执行查找,并查找大于或等于时间戳的第一个偏移量。有关更多信息,请参阅[查找]。
现在提供了一个便利类 AbstractConsumerSeekAware
来简化查找。有关更多信息,请参阅[查找]。
ContainerProperties
提供了一个 idleBetweenPolls
选项,允许侦听器容器中的主循环在 KafkaConsumer.poll()
调用之间休眠。有关更多信息,请参阅其 JavaDoc 和使用 KafkaMessageListenerContainer
。
当使用 AckMode.MANUAL
(或 MANUAL_IMMEDIATE
)时,您现在可以通过在 Acknowledgment
上调用 nack
来导致重新传递。有关更多信息,请参阅提交偏移量。
现在可以使用 Micrometer Timer
监控侦听器性能。有关更多信息,请参阅监控。
容器现在发布与启动相关的其他消费者生命周期事件。有关更多信息,请参阅应用程序事件。
事务批处理侦听器现在可以支持僵尸围栏。有关更多信息,请参阅事务。
现在可以使用 ContainerCustomizer
配置侦听器容器工厂,以在创建和配置每个容器后进一步配置它。有关更多信息,请参阅容器工厂。
ErrorHandler 更改
SeekToCurrentErrorHandler
现在将某些异常视为致命异常,并禁用这些异常的重试,并在第一次失败时调用恢复程序。
SeekToCurrentErrorHandler
和 SeekToCurrentBatchErrorHandler
现在可以配置为在传递尝试之间应用 BackOff
(线程休眠)。
从版本 2.3.2 开始,在错误处理程序恢复失败记录后返回时,将提交恢复记录的偏移量。
DeadLetterPublishingRecoverer
在与 ErrorHandlingDeserializer
一起使用时,现在将发送到死信主题的消息的有效负载设置为无法反序列化的原始值。以前,它是 null
,用户代码需要从消息标头中提取 DeserializationException
。有关更多信息,请参阅发布死信记录。
TopicBuilder
提供了一个新的类TopicBuilder
,用于更方便地创建用于自动主题配置的NewTopic
@Bean
。有关更多信息,请参阅[configuring-topics]。
Kafka Streams 更改
现在,您可以对@EnableKafkaStreams
创建的StreamsBuilderFactoryBean
执行其他配置。有关更多信息,请参阅Streams 配置。
现在提供了RecoveringDeserializationExceptionHandler
,它允许恢复反序列化错误的记录。它可以与DeadLetterPublishingRecoverer
结合使用,将这些记录发送到死信主题。有关更多信息,请参阅从反序列化异常中恢复。
已提供HeaderEnricher
转换器,使用SpEL生成标头值。有关更多信息,请参阅标头增强器。
已提供MessagingTransformer
。这允许Kafka Streams拓扑与Spring Messaging组件(例如Spring Integration流)交互。有关更多信息,请参阅MessagingProcessor
和参阅[从KStream
调用Spring Integration流]。
JSON 组件更改
现在,所有支持JSON的组件都默认配置了由JacksonUtils.enhancedObjectMapper()
生成的Jackson ObjectMapper
。JsonDeserializer
现在提供基于TypeReference
的构造函数,以便更好地处理目标泛型容器类型。此外,还引入了JacksonMimeTypeModule
用于将org.springframework.util.MimeType
序列化为普通字符串。有关更多信息,请参阅其JavaDocs和序列化、反序列化和消息转换。
已提供ByteArrayJsonMessageConverter
以及所有Json转换器的新超类JsonMessageConverter
。此外,现在可以使用StringOrBytesSerializer
;它可以在ProducerRecord
中序列化byte[]
、Bytes
和String
值。有关更多信息,请参阅Spring Messaging 消息转换。
JsonSerializer
、JsonDeserializer
和JsonSerde
现在具有流畅的API,使程序配置更简单。有关更多信息,请参阅Javadocs、序列化、反序列化和消息转换和Streams JSON 序列化和反序列化。
ReplyingKafkaTemplate
当回复超时时,future 将使用KafkaReplyTimeoutException
而不是KafkaException
异常完成。
此外,现在提供了重载的sendAndReceive
方法,允许在每个消息的基础上指定回复超时。
AggregatingReplyingKafkaTemplate
通过聚合来自多个接收者的回复来扩展ReplyingKafkaTemplate
。有关更多信息,请参阅聚合多个回复。
事务更改
现在,您可以在KafkaTemplate
和KafkaTransactionManager
上覆盖生产者工厂的transactionIdPrefix
。有关更多信息,请参阅transactionIdPrefix
。
新的委托序列化程序/反序列化程序
框架现在提供了一个委托的Serializer
和Deserializer
,利用标头来启用使用多个键/值类型生成和使用记录。有关更多信息,请参阅委托序列化程序和反序列化程序。
新的重试反序列化程序
框架现在提供了一个委托的RetryingDeserializer
,在发生网络问题等瞬态错误时重试序列化。有关更多信息,请参阅重试反序列化程序。
2.1 版和 2.2 版之间的更改
类和包更改
ContainerProperties
类已从org.springframework.kafka.listener.config
移动到org.springframework.kafka.listener
。
AckMode
枚举已从AbstractMessageListenerContainer
移动到ContainerProperties
。
setBatchErrorHandler()
和setErrorHandler()
方法已从ContainerProperties
移动到AbstractMessageListenerContainer
和AbstractKafkaListenerContainerFactory
。
回滚后处理
提供了一种新的AfterRollbackProcessor
策略。有关更多信息,请参阅回滚后处理器。
ConcurrentKafkaListenerContainerFactory
更改
现在,您可以使用ConcurrentKafkaListenerContainerFactory
创建和配置任何ConcurrentMessageListenerContainer
,而不仅仅是那些用于@KafkaListener
注释的容器。有关更多信息,请参阅容器工厂。
侦听器容器更改
添加了一个新的容器属性(missingTopicsFatal
)。有关更多信息,请参阅使用KafkaMessageListenerContainer
。
当使用者停止时,现在会发出ConsumerStoppedEvent
。有关更多信息,请参阅线程安全。
批处理侦听器可以选择接收完整的ConsumerRecords<?, ?>
对象而不是List<ConsumerRecord<?, ?>
。有关更多信息,请参阅[batch-listeners]。
DefaultAfterRollbackProcessor
和SeekToCurrentErrorHandler
现在可以恢复(跳过)持续失败的记录,并且默认情况下,在10次失败后这样做。可以将它们配置为将失败的记录发布到死信主题。
从 2.2.4 版开始,在选择死信主题名称时可以使用使用者的组 ID。
已添加ConsumerStoppingEvent
。有关更多信息,请参阅应用程序事件。
现在可以将SeekToCurrentErrorHandler
配置为在容器配置为AckMode.MANUAL_IMMEDIATE
时提交已恢复记录的偏移量(自 2.2.4 版起)。
@KafkaListener 更改
现在,您可以通过在注释上设置属性来覆盖侦听器容器工厂的concurrency
和autoStartup
属性。现在,您可以添加配置以确定将哪些标头(如果有)复制到回复消息中。有关更多信息,请参阅@KafkaListener
注释。
现在,您可以在自己的注释上使用@KafkaListener
作为元注释。有关更多信息,请参阅@KafkaListener
作为元注释。
现在,更轻松地为@Payload
验证配置Validator
。有关更多信息,请参阅@KafkaListener
@Payload
验证。
现在,您可以在注释上直接指定 Kafka 使用者属性;这些属性将覆盖使用者工厂中定义的任何具有相同名称的属性(自 2.2.4 版起)。有关更多信息,请参阅注释属性。
标头映射更改
类型为MimeType
和MediaType
的标头现在在RecordHeader
值中映射为简单字符串。以前,它们被映射为JSON,并且仅解码了MimeType
。无法解码MediaType
。现在它们是简单的字符串,用于互操作性。
此外,DefaultKafkaHeaderMapper
有一个新的addToStringClasses
方法,允许指定应使用toString()
而不是JSON映射的类型。有关更多信息,请参阅消息标头。
嵌入式 Kafka 更改
KafkaEmbedded
类及其KafkaRule
接口已被弃用,取而代之的是EmbeddedKafkaBroker
及其JUnit 4 EmbeddedKafkaRule
包装器。@EmbeddedKafka
注释现在填充EmbeddedKafkaBroker
bean,而不是已弃用的KafkaEmbedded
。此更改允许在JUnit 5 测试中使用@EmbeddedKafka
。@EmbeddedKafka
注释现在具有属性ports
,用于指定填充EmbeddedKafkaBroker
的端口。有关更多信息,请参阅测试应用程序。
JsonSerializer/Deserializer 增强功能
现在,您可以通过使用生产者和使用者属性来提供类型映射信息。
反序列化程序上提供了新的构造函数,允许使用提供的目标类型覆盖类型标头信息。
JsonDeserializer
现在默认删除任何类型信息标头。
现在,您可以使用 Kafka 属性配置JsonDeserializer
以忽略类型信息标头(自 2.2.3 版起)。
有关更多信息,请参阅序列化、反序列化和消息转换。
Kafka Streams 更改
Streams 配置 bean 现在必须是KafkaStreamsConfiguration
对象,而不是StreamsConfig
对象。
StreamsBuilderFactoryBean
已从包…core
移动到…config
。
已引入KafkaStreamBrancher
,以便在KStream
实例之上构建条件分支时提供更好的最终用户体验。
有关更多信息,请参阅Apache Kafka Streams 支持和配置。
事务 ID
当侦听器容器启动事务时,transactional.id
现在是transactionIdPrefix
附加<group.id>.<topic>.<partition>
。此更改允许正确地隔离僵尸进程,此处进行了说明。
2.0 版和 2.1 版之间的更改
JSON 改进
StringJsonMessageConverter
和JsonSerializer
现在在Headers
中添加类型信息,让转换器和JsonDeserializer
在接收时创建特定类型,基于消息本身而不是固定的配置类型。有关更多信息,请参阅序列化、反序列化和消息转换。
容器停止错误处理程序
现在为记录和批处理侦听器提供了容器错误处理程序,它们将侦听器抛出的任何异常视为致命异常/ 它们会停止容器。有关更多信息,请参阅处理异常。
暂停和恢复容器
侦听器容器现在具有pause()
和resume()
方法(自 2.1.3 版起)。有关更多信息,请参阅暂停和恢复侦听器容器。
有状态重试
从 2.1.3 版开始,您可以配置有状态重试。有关更多信息,请参阅有状态重试。
客户端 ID
从 2.1.1 版开始,您现在可以在@KafkaListener
上设置client.id
前缀。以前,要自定义客户端 ID,您需要为每个侦听器提供一个单独的使用者工厂(和容器工厂)。前缀后跟-n
,以便在使用并发时提供唯一的客户端 ID。
记录偏移量提交
默认情况下,主题偏移量提交的日志记录使用DEBUG
日志级别执行。从 2.1.2 版本开始,ContainerProperties
中新增了一个名为commitLogLevel
的属性,允许您指定这些消息的日志级别。有关更多信息,请参见使用KafkaMessageListenerContainer
。
默认的 @KafkaHandler
从 2.1.3 版本开始,您可以将类级别 @KafkaListener
上的一个 @KafkaHandler
注解指定为默认注解。有关更多信息,请参见类上的@KafkaListener
。
ReplyingKafkaTemplate
从 2.1.3 版本开始,提供了一个KafkaTemplate
的子类来支持请求/回复语义。有关更多信息,请参见使用ReplyingKafkaTemplate
。
从 2.0 迁移指南
请参阅2.0 到 2.1 迁移指南。
1.3 和 2.0 之间的更改
@KafkaListener
更改
您现在可以使用@SendTo
注解@KafkaListener
方法(以及类和@KafkaHandler
方法)。如果方法返回结果,则将其转发到指定的主题。有关更多信息,请参见使用@SendTo
转发侦听器结果。
消息侦听器
消息侦听器现在可以感知Consumer
对象。有关更多信息,请参见[message-listeners]。
使用ConsumerAwareRebalanceListener
重新平衡侦听器现在可以在重新平衡通知期间访问Consumer
对象。有关更多信息,请参见重新平衡侦听器。
1.2 和 1.3 之间的更改
事务支持
0.11.0.0 客户端库添加了对事务的支持。已添加KafkaTransactionManager
和其他事务支持。有关更多信息,请参见事务。
标头支持
0.11.0.0 客户端库添加了对消息标头的支持。这些现在可以映射到spring-messaging
MessageHeaders
并从中映射。有关更多信息,请参见消息标头。
Kafka 时间戳支持
KafkaTemplate
现在支持一个 API 用于添加带有时间戳的记录。引入了关于timestamp
支持的新KafkaHeaders
。此外,还添加了新的KafkaConditions.timestamp()
和KafkaMatchers.hasTimestamp()
测试实用程序。有关更多详细信息,请参见使用KafkaTemplate
、@KafkaListener
注解和测试应用程序。
@KafkaListener
更改
您现在可以配置一个KafkaListenerErrorHandler
来处理异常。有关更多信息,请参见处理异常。
默认情况下,@KafkaListener
的id
属性现在用作group.id
属性,覆盖消费者工厂中配置的属性(如果存在)。此外,您可以在注解上显式配置groupId
。以前,您需要一个单独的容器工厂(和消费者工厂)才能为侦听器使用不同的group.id
值。要恢复使用工厂配置的group.id
的先前行为,请将注解上的idIsGroup
属性设置为false
。
@EmbeddedKafka
注解
为方便起见,提供了一个测试类级别的@EmbeddedKafka
注解,用于将KafkaEmbedded
注册为 Bean。有关更多信息,请参见测试应用程序。
Kerberos 配置
现在提供对配置 Kerberos 的支持。有关更多信息,请参见JAAS 和 Kerberos。
1.0 和 1.1 之间的更改
Seek
您现在可以查找每个主题或分区的职位。您可以使用它在初始化期间设置初始位置,当使用组管理并且 Kafka 分配分区时。您还可以在检测到空闲容器时或应用程序执行中的任意点进行查找。有关更多信息,请参见[seek]。