事务性绑定器
通过将 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
设置为非空值(例如 tx-
)来启用事务。在处理器应用中使用时,消费者会启动事务;消费者线程上发送的任何记录都参与同一事务。当监听器正常退出时,监听器容器会将偏移量发送到事务并提交。所有使用 spring.cloud.stream.kafka.binder.transaction.producer.*
属性配置的生产者绑定都使用一个通用的生产者工厂;单独的绑定 Kafka 生产者属性将被忽略。
正常的绑定器重试(和死信处理)不支持事务,因为重试将在原始事务中运行,该事务可能会回滚,并且任何已发布的记录也会回滚。当启用重试时(公共属性 maxAttempts 大于零),重试属性用于配置 DefaultAfterRollbackProcessor ,以在容器级别启用重试。类似地,死信记录发布功能不再在事务内执行,而是移至监听器容器,同样通过在主事务回滚后运行的 DefaultAfterRollbackProcessor 实现。 |
如果您希望在源应用中使用事务,或者从任意线程执行仅生产者事务(例如 @Scheduled
方法),您必须获取事务性生产者工厂的引用,并使用它定义一个 KafkaTransactionManager
bean。
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
@Value("${unique.tx.id.per.instance}") String txId) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
tm.setTransactionId(txId)
return tm;
}
请注意,我们使用 BinderFactory
获取绑定器的引用;当只配置一个绑定器时,第一个参数使用 null
。如果配置了多个绑定器,请使用绑定器名称获取引用。获得绑定器的引用后,我们可以获取 ProducerFactory
的引用并创建一个事务管理器。
然后您可以使用正常的 Spring 事务支持,例如 TransactionTemplate
或 @Transactional
,例如
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
如果您希望将仅生产者事务与其他事务管理器中的事务同步,请使用 ChainedTransactionManager
。
如果您部署应用的多个实例,每个实例需要一个唯一的 transactionIdPrefix 。 |
Kafka 事务中的异常重试行为
默认重试行为
DefaultAfterRollbackProcessor
决定哪些异常会在事务回滚后触发重试。默认情况下,所有异常都会重试,但您可以修改此行为
spring:
cloud:
stream:
kafka:
bindings:
<binding-name>:
consumer:
defaultRetryable: false # Change default to NOT retry exceptions
当 defaultRetryable
设置为 false
时,DefaultAfterRollbackProcessor
将配置为 defaultFalse(true)
,这意味着除非显式配置为可重试,否则异常将不会重试。
异常特定配置
为了进行精细控制,您可以为单个异常类型指定重试行为
spring:
cloud:
stream:
kafka:
bindings:
<binding-name>:
consumer:
retryableExceptions:
java.lang.IllegalStateException: true # Always retry this exception
java.lang.IllegalArgumentException: false # Never retry this exception
DefaultAfterRollbackProcessor
将对标记为 true
的异常使用 addRetryableExceptions()
,对标记为 false
的异常使用 addNotRetryableExceptions()
。这些异常特定配置优先于默认行为。