最新动态?

4.0 版本相比 3.3 版本的新特性

本节涵盖从 3.3 版本到 4.0 版本的变化。有关早期版本的更改,请参阅更改历史记录

Apache Kafka 4.0 客户端升级

Spring for Apache Kafka 已升级到使用 Apache Kafka 客户端版本 4.0.0。此次升级带来了一些重要的变化:

  • 所有基于 ZooKeeper 的功能已被删除,因为 Kafka 4.0 已完全转换为 KRaft 模式

  • ZooKeeper 依赖项已从项目中移除

  • 嵌入式 Kafka 测试框架现在完全使用 KRaft 模式

  • EmbeddedKafkaZKBroker 类已被删除,所有功能现在由 EmbeddedKafkaKraftBroker 处理

嵌入式 Kafka 测试框架更改

测试基础架构已显著更新

  • 已删除 EmbeddedKafkaRule JUnit 4 规则

  • @EmbeddedKafka 注解通过删除与 ZooKeeper 相关的属性而得到简化

  • kraft 属性已被删除,因为 KRaft 模式现在是唯一选项

  • 与 ZooKeeper 相关的属性,如 zookeeperPortzkConnectionTimeoutzkSessionTimeout 已被删除

  • KafkaClusterTestKit 导入现在使用 KRaft 模式的新包

  • 一些测试已更新,以解决 KRaft 模式中静态端口分配的限制

  • 已调整测试中的复制因子,以适应 KRaft 要求

ConsumerRecords 构造函数更改

ConsumerRecords 构造函数现在需要一个额外的 Map 参数,这已在整个框架中得到解决。直接使用此构造函数的应用程序需要更新其代码。

Producer 接口更新

已实现 Kafka Producer 接口中的新方法

  • registerMetricForSubscription

  • unregisterMetricFromSubscription

已删除的弃用功能

已删除几个弃用项

  • 已从运行时提示中删除已弃用的 partitioner

  • 已删除使用 String consumerGroupId 的已弃用 sendOffsetsToTransaction 方法

Kafka Streams API 更改

  • KafkaStreamBrancher 已更新为使用新的 split()branch() 方法,而不是已弃用的 branch() 方法

  • DeserializationExceptionHandler 已更新为使用新的 ErrorHandlerContext

与 Apache Kafka 4.0.0 相关的内部 API 更新

  • BrokerAddress 类现在使用 org.apache.kafka.server.network.BrokerEndPoint 而不是已弃用的 kafka.cluster.BrokerEndPoint

  • GlobalEmbeddedKafkaTestExecutionListener 已更新为仅在 KRaft 模式下工作

新的消费者再平衡协议

Spring for Apache Kafka 4.0 支持 Kafka 4.0 的新消费者再平衡协议 - KIP-848。有关详细信息,请参阅新消费者再平衡协议文档

支持多值头部

JsonKafkaHeaderMapperSimpleKafkaHeaderMapper 支持 Kafka 记录的多值头部映射。更多详细信息可在支持多值头部映射中找到。

配置附加的 RecordInterceptor

监听器容器现在通过 getRecordInterceptor() 支持拦截器自定义。有关详细信息,请参阅消息监听器容器部分。

批处理监听器中的每条记录可观察性

现在,在使用批处理监听器时,可以获取每条记录的观察。有关更多信息,请参阅批处理监听器的可观察性

Kafka 队列(共享消费者)支持

Spring for Apache Kafka 现在通过共享消费者提供对 Kafka 队列的早期访问支持,共享消费者是 Apache Kafka 4.0.0 的一部分并实现了 KIP-932。这使得协作消费成为可能,多个消费者可以同时从相同的分区消费,与传统消费者组相比提供更好的负载分配。有关更多信息,请参阅Kafka 队列(共享消费者)

Jackson 3 支持

Spring for Apache Kafka 现在全面支持 Jackson 3,并与现有的 Jackson 2 支持并行。Jackson 3 在可用时会自动检测并优先使用,提供增强的性能和现代 JSON 处理能力。

所有 Jackson 2 类现在都有 Jackson 3 对应类,具有一致的命名和改进的类型安全

  • JsonKafkaHeaderMapper 替换 DefaultKafkaHeaderMapper

  • JacksonJsonSerializer/Deserializer 替换 JsonSerializer/Deserializer

  • JacksonJsonSerde 替换 JsonSerde

  • JacksonJsonMessageConverter 家族替换 JsonMessageConverter 家族

  • JacksonProjectingMessageConverter 替换 ProjectingMessageConverter

  • DefaultJacksonJavaTypeMapper 替换 DefaultJackson2JavaTypeMapper

新的 Jackson 3 类使用 JsonMapper 而不是通用的 ObjectMapper,以增强类型安全,并利用 Jackson 3 改进的模块系统和性能优化。

迁移路径:现有应用程序可与 Jackson 2 保持不变地继续工作。要迁移到 Jackson 3,只需将 Jackson 3 添加到您的类路径中,并更新类引用以使用新的 Jackson 3 等效项。当两个版本都存在时,框架会自动检测并优先使用 Jackson 3。

向后兼容性:所有 Jackson 2 类都已弃用,但仍完全可用。它们将在未来的主要版本中移除。

有关配置示例,请参阅序列化、反序列化和消息转换

Spring Retry 依赖项移除

Spring for Apache Kafka 已移除对 Spring Retry 的依赖,转而使用 Spring Framework 7 中引入的核心重试支持。这是一个重大更改,影响整个框架的重试配置和 API。

BackOffValuesGenerator,用于预先生成所需的 BackOff 值,现在直接与 Spring Framework 的 BackOff 接口配合使用,而不是 BackOffPolicy。这些值随后由监听器基础设施管理,Spring Retry 不再参与。

从配置角度来看,Spring Kafka 严重依赖 Spring Retry 的 @Backoff 注解。由于 Spring Framework 中没有等效项,该注解已作为 @BackOff 移至 Spring Kafka,并进行了以下改进:

  • 统一命名:使用 @BackOff 而不是 @Backoff 以保持一致性

  • 表达式评估:所有字符串属性都支持 SpEL 表达式和属性占位符

  • 持续时间格式支持:字符串属性接受 java.util.Duration 格式(例如,“2s”、“500ms”)

  • 增强文档:改进了 Javadoc,提供了更清晰的解释

迁移示例

// Before
@RetryableTopic(backoff = @Backoff(delay = 2000, maxDelay = 10000, multiplier = 2))

// After
@RetryableTopic(backOff = @BackOff(delay = 2000, maxDelay = 10000, multiplier = 2))

// With new duration format support
@RetryableTopic(backOff = @BackOff(delayString = "2s", maxDelayString = "10s", multiplier = 2))

// With property placeholders
@RetryableTopic(backOff = @BackOff(delayString = "${retry.delay}", multiplierString = "${retry.multiplier}"))

RetryingDeserializer 不再提供 RecoveryCallback,但提供了一个接受 RetryException 作为输入的等效函数。这包含抛出的异常以及重试次数

// Before
retryingDeserializer.setRecoveryCallback(context -> {
    return fallbackValue;
});

// After
retryingDeserializer.setRecoveryCallback(retryException -> {
    return fallbackValue;
});

BinaryExceptionClassifier 的使用已被新引入的 ExceptionMatcher 取代,后者提供了一个更完善的 API。

其他更改包括:

  • DestinationTopicPropertiesFactory 使用 ExceptionMatcher 而不是 BinaryExceptionClassifier

  • RetryTopicConfigurationBuilder 中的 uniformRandomBackoff 方法已被弃用,转而支持抖动

  • 错误处理工具已更新,以适应新的异常匹配系统

  • Kafka Streams 重试模板现在使用 Spring Framework 的重试支持

应用程序必须更新其配置以使用新的 Spring Framework 重试 API,但重试行为和功能保持不变。

© . This site is unofficial and not affiliated with VMware.