事务性绑定器

通过将 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix 设置为非空值(例如 tx-)来启用事务。在处理器应用中使用时,消费者会启动事务;消费者线程上发送的任何记录都参与同一事务。当监听器正常退出时,监听器容器会将偏移量发送到事务并提交。所有使用 spring.cloud.stream.kafka.binder.transaction.producer.* 属性配置的生产者绑定都使用一个通用的生产者工厂;单独的绑定 Kafka 生产者属性将被忽略。

正常的绑定器重试(和死信处理)不支持事务,因为重试将在原始事务中运行,该事务可能会回滚,并且任何已发布的记录也会回滚。当启用重试时(公共属性 maxAttempts 大于零),重试属性用于配置 DefaultAfterRollbackProcessor,以在容器级别启用重试。类似地,死信记录发布功能不再在事务内执行,而是移至监听器容器,同样通过在主事务回滚后运行的 DefaultAfterRollbackProcessor 实现。

如果您希望在源应用中使用事务,或者从任意线程执行仅生产者事务(例如 @Scheduled 方法),您必须获取事务性生产者工厂的引用,并使用它定义一个 KafkaTransactionManager bean。

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
        @Value("${unique.tx.id.per.instance}") String txId) {

    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
    KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
    tm.setTransactionId(txId)
    return tm;
}

请注意,我们使用 BinderFactory 获取绑定器的引用;当只配置一个绑定器时,第一个参数使用 null。如果配置了多个绑定器,请使用绑定器名称获取引用。获得绑定器的引用后,我们可以获取 ProducerFactory 的引用并创建一个事务管理器。

然后您可以使用正常的 Spring 事务支持,例如 TransactionTemplate@Transactional,例如

public static class Sender {

    @Transactional
    public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
        stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
    }

}

如果您希望将仅生产者事务与其他事务管理器中的事务同步,请使用 ChainedTransactionManager

如果您部署应用的多个实例,每个实例需要一个唯一的 transactionIdPrefix

Kafka 事务中的异常重试行为

配置事务回滚重试行为

在 Kafka 事务中处理消息时,您可以使用 defaultRetryable 属性和 retryableExceptions 映射来配置哪些异常应在事务回滚后重试。

默认重试行为

DefaultAfterRollbackProcessor 决定哪些异常会在事务回滚后触发重试。默认情况下,所有异常都会重试,但您可以修改此行为

spring:
 cloud:
   stream:
     kafka:
       bindings:
         <binding-name>:
           consumer:
             defaultRetryable: false  # Change default to NOT retry exceptions

defaultRetryable 设置为 false 时,DefaultAfterRollbackProcessor 将配置为 defaultFalse(true),这意味着除非显式配置为可重试,否则异常将不会重试。

异常特定配置

为了进行精细控制,您可以为单个异常类型指定重试行为

spring:
 cloud:
   stream:
     kafka:
       bindings:
         <binding-name>:
           consumer:
             retryableExceptions:
               java.lang.IllegalStateException: true    # Always retry this exception
               java.lang.IllegalArgumentException: false  # Never retry this exception

DefaultAfterRollbackProcessor 将对标记为 true 的异常使用 addRetryableExceptions(),对标记为 false 的异常使用 addNotRetryableExceptions()。这些异常特定配置优先于默认行为。

实现细节

  • 使用事务时,retryableExceptions 中只能配置异常类型(Exception 的子类)

  • 如果指定了非异常类型,将抛出 IllegalArgumentException

  • 只有在启用事务且禁用批处理模式时,才会配置 DefaultAfterRollbackProcessor

  • 此配置确保事务重试行为与非事务性重试处理一致