事务
本节介绍 Spring for Apache Kafka 如何支持事务。
概述
0.11.0.0 客户端库增加了对事务的支持。Spring for Apache Kafka 以以下方式增加支持:
-
KafkaTransactionManager:与正常的 Spring 事务支持(@Transactional、TransactionTemplate等)一起使用 -
事务性
KafkaMessageListenerContainer -
使用
KafkaTemplate的本地事务 -
与其他事务管理器进行事务同步
通过为 DefaultKafkaProducerFactory 提供一个 transactionIdPrefix 来启用事务。在这种情况下,工厂不是管理一个共享的 Producer,而是维护一个事务性生产者缓存。当用户对生产者调用 close() 时,它被返回到缓存中以供重用,而不是实际关闭。每个生产者的 transactional.id 属性是 transactionIdPrefix + n,其中 n 从 0 开始,并为每个新生产者递增。在 Spring for Apache Kafka 的早期版本中,由记录监听器的监听器容器启动的事务的 transactional.id 的生成方式不同,以支持围栏僵尸,但这在 3.0 版本后不再需要,因为 EOSMode.V2 是唯一的选项。对于运行多个实例的应用程序,每个实例的 transactionIdPrefix 必须是唯一的。
另请参阅 精确一次语义。
另请参阅 transactionIdPrefix。
使用 Spring Boot 时,只需设置 spring.kafka.producer.transaction-id-prefix 属性即可——Spring Boot 将自动配置一个 KafkaTransactionManager bean 并将其连接到监听器容器中。
从版本 2.5.8 开始,您现在可以在生产者工厂上配置 maxAge 属性。当使用事务性生产者时,这非常有用,因为它们可能会在代理的 transactional.id.expiration.ms 期间处于空闲状态。在当前的 kafka-clients 中,这可能会导致 ProducerFencedException 而不会发生重新平衡。通过将 maxAge 设置为小于 transactional.id.expiration.ms,如果生产者超过其最大年龄,工厂将刷新生产者。 |
使用 KafkaTransactionManager
KafkaTransactionManager 是 Spring Framework 的 PlatformTransactionManager 的一个实现。它在构造函数中被提供了一个生产者工厂的引用。如果您提供自定义生产者工厂,它必须支持事务。请参阅 ProducerFactory.transactionCapable()。
您可以将 KafkaTransactionManager 与正常的 Spring 事务支持(@Transactional、TransactionTemplate 等)一起使用。如果事务处于活动状态,则在事务范围内执行的任何 KafkaTemplate 操作都将使用事务的 Producer。管理器根据成功或失败提交或回滚事务。您必须将 KafkaTemplate 配置为与事务管理器使用相同的 ProducerFactory。
事务同步
本节指的是仅生产者事务(非监听器容器启动的事务);有关容器启动事务时链式事务的信息,请参阅 使用消费者启动的事务。
如果您想向 Kafka 发送记录并执行一些数据库更新,您可以使用正常的 Spring 事务管理,例如使用 DataSourceTransactionManager。
@Transactional
public void process(List<Thing> things) {
things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
updateDb(things);
}
@Transactional 注解的拦截器启动事务,KafkaTemplate 将与该事务管理器同步事务;每次发送都将参与该事务。当方法退出时,数据库事务将提交,随后是 Kafka 事务。如果您希望以相反的顺序执行提交(Kafka 优先),请使用嵌套的 @Transactional 方法,其中外部方法配置为使用 DataSourceTransactionManager,内部方法配置为使用 KafkaTransactionManager。
有关在 Kafka 优先或 DB 优先配置中同步 JDBC 和 Kafka 事务的应用程序示例,请参阅 Kafka 事务与其他事务管理器的示例。
| 从 2.5.17、2.6.12、2.7.9 和 2.8.0 版本开始,如果同步事务(在主事务提交后)的提交失败,异常将抛给调用者。此前,此异常会被静默忽略(在调试级别记录)。应用程序应在必要时采取补救措施,以弥补已提交的主事务。 |
使用消费者启动的事务
ChainedKafkaTransactionManager 自 2.7 版本起已弃用;有关其父类 ChainedTransactionManager 的更多信息,请参阅 JavaDocs。相反,请在容器中使用 KafkaTransactionManager 启动 Kafka 事务,并使用 @Transactional 注解监听器方法以启动其他事务。
有关链式 JDBC 和 Kafka 事务的示例应用程序,请参阅 Kafka 事务与其他事务管理器的示例。
KafkaTemplate 本地事务
您可以使用 KafkaTemplate 在本地事务中执行一系列操作。以下示例展示了如何实现:
boolean result = template.executeInTransaction(t -> {
t.sendDefault("thing1", "thing2");
t.sendDefault("cat", "hat");
return true;
});
回调中的参数是模板本身(this)。如果回调正常退出,则事务被提交。如果抛出异常,则事务被回滚。
如果存在正在进行的 KafkaTransactionManager(或同步)事务,则不使用它。相反,将使用一个新的“嵌套”事务。 |
TransactionIdPrefix
在 EOSMode.V2(又名 BETA)中,唯一的受支持模式,不再需要使用相同的 transactional.id,即使对于消费者启动的事务也是如此;事实上,它必须在每个实例上唯一,就像生产者启动的事务一样。此属性在每个应用程序实例上必须具有不同的值。
TransactionIdSuffix Fixed
自 3.2 版本起,引入了一个新的 TransactionIdSuffixStrategy 接口来管理 transactional.id 后缀。默认实现是 DefaultTransactionIdSuffixStrategy,当 maxCache 设置为大于零时,可以在特定范围内重用 transactional.id,否则后缀将通过递增计数器即时生成。当请求事务生产者且所有 transactional.id 都已在使用时,抛出 NoProducerAvailableException。用户可以使用配置为重试该异常的 RetryTemplate,并配置适当的退避策略。
public static class Config {
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
pf.setTransactionIdSuffixStrategy(ss);
return pf;
}
}
当 maxCache 设置为 5 时,transactional.id 为 my.txid.+`{0-4}`。
当使用 KafkaTransactionManager 与 ConcurrentMessageListenerContainer 并启用 maxCache 时,需要将 maxCache 设置为大于或等于 concurrency 的值。如果 MessageListenerContainer 无法获取 transactional.id 后缀,它将抛出 NoProducerAvailableException。当在 ConcurrentMessageListenerContainer 中使用嵌套事务时,需要调整 maxCache 设置以处理增加的嵌套事务数量。 |
KafkaTemplate 事务性和非事务性发布
通常,当 KafkaTemplate 是事务性时(使用支持事务的生产者工厂配置),事务是必需的。事务可以由 TransactionTemplate、@Transactional 方法、调用 executeInTransaction 或由监听器容器(配置了 KafkaTransactionManager)启动。任何在事务范围之外使用模板的尝试都会导致模板抛出 IllegalStateException。从版本 2.4.3 开始,您可以将模板的 allowNonTransactional 属性设置为 true。在这种情况下,模板将允许操作在没有事务的情况下运行,通过调用 ProducerFactory 的 createNonTransactionalProducer() 方法;生产者将被缓存或线程绑定,以便正常重用。请参阅 使用 DefaultKafkaProducerFactory。
批量监听器事务
当监听器在使用事务时失败时,会在回滚发生后调用 AfterRollbackProcessor 以执行一些操作。当使用带有记录监听器的默认 AfterRollbackProcessor 时,会执行查找操作,以便重新传递失败的记录。但是,对于批量监听器,整个批次将重新传递,因为框架不知道批次中的哪个记录失败了。有关更多信息,请参阅 回滚后处理器。
在使用批量监听器时,版本 2.4.2 引入了一种处理批量处理失败的替代机制:BatchToRecordAdapter。当配置了 BatchToRecordAdapter 且 batchListener 设置为 true 的容器工厂时,监听器将一次处理一条记录。这使得在批处理中处理错误成为可能,同时仍然可以根据异常类型停止处理整个批处理。提供了默认的 BatchToRecordAdapter,可以配置标准的 ConsumerRecordRecoverer,例如 DeadLetterPublishingRecoverer。以下测试用例配置片段说明了如何使用此功能:
public static class TestListener {
final List<String> values = new ArrayList<>();
@KafkaListener(id = "batchRecordAdapter", topics = "test")
public void listen(String data) {
values.add(data);
if ("bar".equals(data)) {
throw new RuntimeException("reject partial");
}
}
}
@Configuration
@EnableKafka
public static class Config {
ConsumerRecord<?, ?> failed;
@Bean
public TestListener test() {
return new TestListener();
}
@Bean
public ConsumerFactory<?, ?> consumerFactory() {
return mock(ConsumerFactory.class);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) -> {
this.failed = record;
}));
return factory;
}
}