最新动态?
4.0 版本相比 3.3 版本的新特性
本节涵盖从 3.3 版本到 4.0 版本的变化。有关早期版本的更改,请参阅更改历史记录。
Apache Kafka 4.0 客户端升级
Spring for Apache Kafka 已升级到使用 Apache Kafka 客户端版本 4.0.0。此次升级带来了一些重要的变化:
-
所有基于 ZooKeeper 的功能已被删除,因为 Kafka 4.0 已完全转换为 KRaft 模式
-
ZooKeeper 依赖项已从项目中移除
-
嵌入式 Kafka 测试框架现在完全使用 KRaft 模式
-
EmbeddedKafkaZKBroker类已被删除,所有功能现在由EmbeddedKafkaKraftBroker处理
嵌入式 Kafka 测试框架更改
测试基础架构已显著更新
-
已删除
EmbeddedKafkaRuleJUnit 4 规则 -
@EmbeddedKafka注解通过删除与 ZooKeeper 相关的属性而得到简化 -
kraft属性已被删除,因为 KRaft 模式现在是唯一选项 -
与 ZooKeeper 相关的属性,如
zookeeperPort、zkConnectionTimeout和zkSessionTimeout已被删除 -
KafkaClusterTestKit 导入现在使用 KRaft 模式的新包
-
一些测试已更新,以解决 KRaft 模式中静态端口分配的限制
-
已调整测试中的复制因子,以适应 KRaft 要求
Producer 接口更新
已实现 Kafka Producer 接口中的新方法
-
registerMetricForSubscription -
unregisterMetricFromSubscription
已删除的弃用功能
已删除几个弃用项
-
已从运行时提示中删除已弃用的
partitioner类 -
已删除使用
String consumerGroupId的已弃用sendOffsetsToTransaction方法
Kafka Streams API 更改
-
KafkaStreamBrancher已更新为使用新的split()和branch()方法,而不是已弃用的branch()方法 -
DeserializationExceptionHandler已更新为使用新的ErrorHandlerContext
与 Apache Kafka 4.0.0 相关的内部 API 更新
-
BrokerAddress类现在使用org.apache.kafka.server.network.BrokerEndPoint而不是已弃用的kafka.cluster.BrokerEndPoint -
GlobalEmbeddedKafkaTestExecutionListener已更新为仅在 KRaft 模式下工作
新的消费者再平衡协议
Spring for Apache Kafka 4.0 支持 Kafka 4.0 的新消费者再平衡协议 - KIP-848。有关详细信息,请参阅新消费者再平衡协议文档。
支持多值头部
JsonKafkaHeaderMapper 和 SimpleKafkaHeaderMapper 支持 Kafka 记录的多值头部映射。更多详细信息可在支持多值头部映射中找到。
配置附加的 RecordInterceptor
监听器容器现在通过 getRecordInterceptor() 支持拦截器自定义。有关详细信息,请参阅消息监听器容器部分。
批处理监听器中的每条记录可观察性
现在,在使用批处理监听器时,可以获取每条记录的观察。有关更多信息,请参阅批处理监听器的可观察性。
Kafka 队列(共享消费者)支持
Spring for Apache Kafka 现在通过共享消费者提供对 Kafka 队列的早期访问支持,共享消费者是 Apache Kafka 4.0.0 的一部分并实现了 KIP-932。这使得协作消费成为可能,多个消费者可以同时从相同的分区消费,与传统消费者组相比提供更好的负载分配。有关更多信息,请参阅Kafka 队列(共享消费者)。
Jackson 3 支持
Spring for Apache Kafka 现在全面支持 Jackson 3,并与现有的 Jackson 2 支持并行。Jackson 3 在可用时会自动检测并优先使用,提供增强的性能和现代 JSON 处理能力。
所有 Jackson 2 类现在都有 Jackson 3 对应类,具有一致的命名和改进的类型安全
-
JsonKafkaHeaderMapper替换DefaultKafkaHeaderMapper -
JacksonJsonSerializer/Deserializer替换JsonSerializer/Deserializer -
JacksonJsonSerde替换JsonSerde -
JacksonJsonMessageConverter家族替换JsonMessageConverter家族 -
JacksonProjectingMessageConverter替换ProjectingMessageConverter -
DefaultJacksonJavaTypeMapper替换DefaultJackson2JavaTypeMapper
新的 Jackson 3 类使用 JsonMapper 而不是通用的 ObjectMapper,以增强类型安全,并利用 Jackson 3 改进的模块系统和性能优化。
迁移路径:现有应用程序可与 Jackson 2 保持不变地继续工作。要迁移到 Jackson 3,只需将 Jackson 3 添加到您的类路径中,并更新类引用以使用新的 Jackson 3 等效项。当两个版本都存在时,框架会自动检测并优先使用 Jackson 3。
向后兼容性:所有 Jackson 2 类都已弃用,但仍完全可用。它们将在未来的主要版本中移除。
有关配置示例,请参阅序列化、反序列化和消息转换。
Spring Retry 依赖项移除
Spring for Apache Kafka 已移除对 Spring Retry 的依赖,转而使用 Spring Framework 7 中引入的核心重试支持。这是一个重大更改,影响整个框架的重试配置和 API。
BackOffValuesGenerator,用于预先生成所需的 BackOff 值,现在直接与 Spring Framework 的 BackOff 接口配合使用,而不是 BackOffPolicy。这些值随后由监听器基础设施管理,Spring Retry 不再参与。
从配置角度来看,Spring Kafka 严重依赖 Spring Retry 的 @Backoff 注解。由于 Spring Framework 中没有等效项,该注解已作为 @BackOff 移至 Spring Kafka,并进行了以下改进:
-
统一命名:使用
@BackOff而不是@Backoff以保持一致性 -
表达式评估:所有字符串属性都支持 SpEL 表达式和属性占位符
-
持续时间格式支持:字符串属性接受
java.util.Duration格式(例如,“2s”、“500ms”) -
增强文档:改进了 Javadoc,提供了更清晰的解释
迁移示例
// Before
@RetryableTopic(backoff = @Backoff(delay = 2000, maxDelay = 10000, multiplier = 2))
// After
@RetryableTopic(backOff = @BackOff(delay = 2000, maxDelay = 10000, multiplier = 2))
// With new duration format support
@RetryableTopic(backOff = @BackOff(delayString = "2s", maxDelayString = "10s", multiplier = 2))
// With property placeholders
@RetryableTopic(backOff = @BackOff(delayString = "${retry.delay}", multiplierString = "${retry.multiplier}"))
RetryingDeserializer 不再提供 RecoveryCallback,但提供了一个接受 RetryException 作为输入的等效函数。这包含抛出的异常以及重试次数
// Before
retryingDeserializer.setRecoveryCallback(context -> {
return fallbackValue;
});
// After
retryingDeserializer.setRecoveryCallback(retryException -> {
return fallbackValue;
});
BinaryExceptionClassifier 的使用已被新引入的 ExceptionMatcher 取代,后者提供了一个更完善的 API。
其他更改包括:
-
DestinationTopicPropertiesFactory使用ExceptionMatcher而不是BinaryExceptionClassifier -
RetryTopicConfigurationBuilder中的uniformRandomBackoff方法已被弃用,转而支持抖动 -
错误处理工具已更新,以适应新的异常匹配系统
-
Kafka Streams 重试模板现在使用 Spring Framework 的重试支持
应用程序必须更新其配置以使用新的 Spring Framework 重试 API,但重试行为和功能保持不变。