事务
本节描述了 Spring for Apache Kafka 如何支持事务。
概览
0.11.0.0 客户端库增加了对事务的支持。Spring for Apache Kafka 通过以下方式添加了支持
-
`KafkaTransactionManager`:与常规 Spring 事务支持(`@Transactional`、`TransactionTemplate` 等)一起使用
-
事务型 `KafkaMessageListenerContainer`
-
使用 `KafkaTemplate` 的本地事务
-
与其他事务管理器进行事务同步
通过为 `DefaultKafkaProducerFactory` 提供 `transactionIdPrefix` 来启用事务。在这种情况下,工厂不再管理一个共享的 `Producer`,而是维护一个事务型生产者缓存。当用户对生产者调用 `close()` 时,它会返回到缓存中以便重用,而不是实际关闭。每个生产者的 `transactional.id` 属性是 `transactionIdPrefix` + `n`,其中 `n` 从 `0` 开始,并为每个新生产者递增。在之前版本的 Spring for Apache Kafka 中,对于由基于记录的监听器容器启动的事务,`transactional.id` 的生成方式不同,以支持隔离僵尸,但从 3.0 开始,`EOSMode.V2` 是唯一的选项,因此不再需要这样做。对于运行多个实例的应用,每个实例的 `transactionIdPrefix` 必须是唯一的。
另请参阅精确一次语义。
对于 Spring Boot,只需设置 `spring.kafka.producer.transaction-id-prefix` 属性 - Spring Boot 将自动配置一个 `KafkaTransactionManager` bean 并将其注入到监听器容器中。
从 2.5.8 版本开始,您现在可以在生产者工厂上配置 `maxAge` 属性。这在使用事务型生产者时很有用,因为这些生产者可能会在 broker 的 `transactional.id.expiration.ms` 期间处于空闲状态。对于当前的 `kafka-clients`,这可能导致 `ProducerFencedException` 而不发生 rebalance。通过将 `maxAge` 设置为小于 `transactional.id.expiration.ms`,如果生产者超过其最大年龄,工厂将刷新生产者。 |
使用 `KafkaTransactionManager`
`KafkaTransactionManager` 是 Spring Framework 的 `PlatformTransactionManager` 的一个实现。它在其构造函数中提供了对生产者工厂的引用。如果您提供自定义的生产者工厂,它必须支持事务。请参阅 `ProducerFactory.transactionCapable()`。
您可以将 `KafkaTransactionManager` 与常规 Spring 事务支持(`@Transactional`、`TransactionTemplate` 等)一起使用。如果存在活动事务,在事务范围内执行的任何 `KafkaTemplate` 操作都将使用该事务的 `Producer`。事务管理器会根据成功或失败来提交或回滚事务。您必须配置 `KafkaTemplate` 使用与事务管理器相同的 `ProducerFactory`。
事务同步
本节指生产者专用事务(非监听器容器启动的事务);有关容器启动事务时链式事务的信息,请参阅使用消费者启动的事务。
如果您想将记录发送到 Kafka 并执行一些数据库更新,您可以使用常规的 Spring 事务管理,例如使用 `DataSourceTransactionManager`。
@Transactional
public void process(List<Thing> things) {
things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
updateDb(things);
}
`@Transactional` 注解的拦截器会启动事务,并且 `KafkaTemplate` 将与该事务管理器同步事务;每个发送操作都将参与该事务。当方法退出时,数据库事务将提交,随后是 Kafka 事务。如果您希望以相反的顺序(Kafka 优先)执行提交,请使用嵌套的 `@Transactional` 方法,其中外部方法配置为使用 `DataSourceTransactionManager`,内部方法配置为使用 `KafkaTransactionManager`。
有关在 Kafka 优先或数据库优先配置中同步 JDBC 和 Kafka 事务的应用示例,请参阅使用其他事务管理器进行 Kafka 事务的示例。
从 2.5.17、2.6.12、2.7.9 和 2.8.0 版本开始,如果同步事务(在主事务提交后)提交失败,异常将被抛给调用者。在此之前,这种情况会被静默忽略(在 debug 级别记录日志)。如果需要,应用程序应采取补救措施,以弥补已提交的主事务。 |
使用消费者启动的事务
`ChainedKafkaTransactionManager` 从 2.7 版本开始已被弃用;有关更多信息,请参阅其超类 `ChainedTransactionManager` 的 JavaDocs。相反,请在容器中使用 `KafkaTransactionManager` 来启动 Kafka 事务,并用 `@Transactional` 注解监听器方法来启动其他事务。
有关链式处理 JDBC 和 Kafka 事务的应用示例,请参阅使用其他事务管理器进行 Kafka 事务的示例。
`KafkaTemplate` 本地事务
您可以使用 `KafkaTemplate` 在本地事务中执行一系列操作。以下示例展示了如何实现这一点
boolean result = template.executeInTransaction(t -> {
t.sendDefault("thing1", "thing2");
t.sendDefault("cat", "hat");
return true;
});
回调中的参数是模板本身 (`this`)。如果回调正常退出,事务将提交。如果抛出异常,事务将回滚。
如果存在 `KafkaTransactionManager`(或同步的)事务正在处理中,则不会使用它。相反,将使用一个新的“嵌套”事务。 |
`TransactionIdPrefix`
对于 `EOSMode.V2`(也称为 `BETA`),这是唯一支持的模式,不再需要使用相同的 `transactional.id`,即使对于消费者启动的事务也是如此;实际上,它必须在每个实例上唯一,与生产者启动的事务相同。此属性在每个应用程序实例上必须具有不同的值。
`TransactionIdSuffix 固定`
从 3.2 版本开始,引入了一个新的 `TransactionIdSuffixStrategy` 接口来管理 `transactional.id` 后缀。默认实现是 `DefaultTransactionIdSuffixStrategy`,当设置的 `maxCache` 大于零时,可以在特定范围内重用 `transactional.id`,否则后缀将通过递增计数器动态生成。当请求事务生产者而所有 `transactional.id` 都被使用时,将抛出 `NoProducerAvailableException`。用户可以使用配置好的 `RetryTemplate` 来重试该异常,并配置适当的退避策略。
public static class Config {
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
pf.setTransactionIdSuffixStrategy(ss);
return pf;
}
}
当将 `maxCache` 设置为 5 时,`transactional.id` 为 `my.txid.`+`{0-4}`。
当使用 `KafkaTransactionManager` 和 `ConcurrentMessageListenerContainer` 并启用 `maxCache` 时,需要将 `maxCache` 设置为一个大于或等于 `concurrency` 的值。如果 `MessageListenerContainer` 无法获取 `transactional.id` 后缀,将抛出 `NoProducerAvailableException`。在使用 `ConcurrentMessageListenerContainer` 中的嵌套事务时,需要调整 `maxCache` 设置以处理增加的嵌套事务数量。 |
`KafkaTemplate` 事务型和非事务型发布
通常,当 `KafkaTemplate` 是事务型的(配置了支持事务的生产者工厂)时,需要事务。事务可以由 `TransactionTemplate`、`@Transactional` 方法、调用 `executeInTransaction` 或由配置了 `KafkaTransactionManager` 的监听器容器启动。任何在事务范围之外使用模板的尝试都会导致模板抛出 `IllegalStateException`。从 2.4.3 版本开始,您可以将模板的 `allowNonTransactional` 属性设置为 `true`。在这种情况下,模板将允许操作在没有事务的情况下运行,通过调用 `ProducerFactory` 的 `createNonTransactionalProducer()` 方法;生产者将被缓存或绑定到线程,以便像往常一样重用。请参阅使用 DefaultKafkaProducerFactory
。
批处理监听器与事务
在使用事务时,当监听器失败时,`AfterRollbackProcessor` 会在回滚发生后被调用以执行某些操作。当对记录监听器使用默认的 `AfterRollbackProcessor` 时,会执行 seek 操作,以便重新投递失败的记录。但是,对于批处理监听器,整个批次将被重新投递,因为框架不知道批次中的哪个记录失败了。有关更多信息,请参阅回滚后处理器。
在使用批处理监听器时,2.4.2 版本引入了一种处理批处理失败的替代机制:`BatchToRecordAdapter`。当将 `batchListener` 设置为 true 的容器工厂配置了 `BatchToRecordAdapter` 时,监听器将一次调用一个记录。这可以在批处理中进行错误处理,同时仍然可以根据异常类型停止处理整个批次。提供了一个默认的 `BatchToRecordAdapter`,可以配置一个标准的 `ConsumerRecordRecoverer`,例如 `DeadLetterPublishingRecoverer`。以下测试用例配置片段说明了如何使用此功能
public static class TestListener {
final List<String> values = new ArrayList<>();
@KafkaListener(id = "batchRecordAdapter", topics = "test")
public void listen(String data) {
values.add(data);
if ("bar".equals(data)) {
throw new RuntimeException("reject partial");
}
}
}
@Configuration
@EnableKafka
public static class Config {
ConsumerRecord<?, ?> failed;
@Bean
public TestListener test() {
return new TestListener();
}
@Bean
public ConsumerFactory<?, ?> consumerFactory() {
return mock(ConsumerFactory.class);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) -> {
this.failed = record;
}));
return factory;
}
}