事务
Spring Rabbit 框架支持在同步和异步用例中进行自动事务管理,并提供多种不同的语义,可以声明式地选择,这对于 Spring 事务的现有用户来说很熟悉。这使得许多(如果不是大多数)常见的消息模式易于实现。
有两种方法可以向框架发出所需的事务语义信号。在 RabbitTemplate 和 SimpleMessageListenerContainer 中,都有一个标志 channelTransacted,如果设置为 true,则告诉框架使用事务性通道,并通过提交或回滚(取决于结果)来结束所有操作(发送或接收),异常表示回滚。另一个信号是提供一个外部事务,其中一个 Spring 的 PlatformTransactionManager 实现作为正在进行的操作的上下文。如果框架发送或接收消息时已经有一个事务正在进行,并且 channelTransacted 标志为 true,则消息事务的提交或回滚将推迟到当前事务结束。如果 channelTransacted 标志为 false,则没有事务语义应用于消息操作(它会自动确认)。
channelTransacted 标志是一个配置时设置。它在 AMQP 组件创建时(通常在应用程序启动时)声明和处理一次。外部事务原则上更具动态性,因为系统在运行时响应当前线程状态。然而,实际上,当事务以声明方式分层到应用程序时,它通常也是一个配置设置。
对于 RabbitTemplate 的同步用例,外部事务由调用者根据喜好(通常的 Spring 事务模型)以声明式或命令式方式提供。以下示例显示了一个声明式方法(通常首选,因为它是非侵入性的),其中模板已配置为 channelTransacted=true
@Transactional
public void doSomething() {
String incoming = rabbitTemplate.receiveAndConvert();
// do some more database processing...
String outgoing = processInDatabaseAndExtractReply(incoming);
rabbitTemplate.convertAndSend(outgoing);
}
在前面的示例中,一个 String 有效负载在标记为 @Transactional 的方法中被接收、转换并作为消息体发送。如果数据库处理失败并抛出异常,则传入消息将返回到代理,并且不发送传出消息。这适用于在事务性方法链中与 RabbitTemplate 进行的任何操作(除非,例如,直接操作 Channel 以提前提交事务)。
对于 SimpleMessageListenerContainer 的异步用例,如果需要外部事务,则必须由容器在设置侦听器时请求。为了表示需要外部事务,用户在配置容器时向其提供 PlatformTransactionManager 的实现。以下示例显示了如何执行此操作
@Configuration
public class ExampleExternalTransactionAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
}
在前面的示例中,事务管理器作为从另一个 bean 定义(未显示)注入的依赖项添加,并且 channelTransacted 标志也设置为 true。其效果是,如果侦听器失败并抛出异常,则事务将回滚,并且消息也将返回到代理。重要的是,如果事务提交失败(例如,由于数据库约束错误或连接问题),AMQP 事务也将回滚,并且消息将返回到代理。这有时被称为“尽力而为的单阶段提交”,对于可靠的消息传递来说,这是一个非常强大的模式。如果在前面的示例中 channelTransacted 标志设置为 false(默认值),则外部事务仍将提供给侦听器,但所有消息操作都将自动确认,因此即使在业务操作回滚时,消息操作也会提交。
条件回滚
在版本 1.6.6 之前,在使用外部事务管理器(如 JDBC)时,向容器的 transactionAttribute 添加回滚规则无效。异常总是回滚事务。
此外,当在容器的通知链中使用事务通知时,条件回滚不是很实用,因为所有侦听器异常都被包装在 ListenerExecutionFailedException 中。
第一个问题已得到纠正,现在规则已正确应用。此外,现在提供了 ListenerFailedRuleBasedTransactionAttribute。它是 RuleBasedTransactionAttribute 的子类,唯一的区别是它知道 ListenerExecutionFailedException 并使用此类异常的原因作为规则。此事务属性可以直接在容器中使用,也可以通过事务通知使用。
以下示例使用此规则
@Bean
public AbstractMessageListenerContainer container() {
...
container.setTransactionManager(transactionManager);
RuleBasedTransactionAttribute transactionAttribute =
new ListenerFailedRuleBasedTransactionAttribute();
transactionAttribute.setRollbackRules(Collections.singletonList(
new NoRollbackRuleAttribute(DontRollBackException.class)));
container.setTransactionAttribute(transactionAttribute);
...
}
关于接收消息回滚的注意事项
AMQP 事务仅适用于发送到代理的消息和确认。因此,当 Spring 事务回滚并且已接收到消息时,Spring AMQP 不仅需要回滚事务,还需要手动拒绝消息(某种形式的 nack,但规范不这样称呼它)。消息拒绝时采取的操作独立于事务,并取决于 defaultRequeueRejected 属性(默认值:true)。有关拒绝失败消息的更多信息,请参阅消息侦听器和异步情况。
有关 RabbitMQ 事务及其限制的更多信息,请参阅RabbitMQ 代理语义。
| 在 RabbitMQ 2.7.0 之前,此类消息(以及在通道关闭或中止时未确认的任何消息)会返回到 Rabbit 代理队列的末尾。自 2.7.0 起,被拒绝的消息会返回到队列的前面,类似于 JMS 回滚的消息。 |
以前,在本地事务和提供 TransactionManager 时,事务回滚时消息重新入队的行为不一致。在前一种情况下,应用正常的重新入队逻辑(AmqpRejectAndDontRequeueException 或 defaultRequeueRejected=false)(请参阅消息侦听器和异步情况)。使用事务管理器时,消息在回滚时无条件重新入队。从 2.0 版本开始,行为一致,两种情况下都应用正常的重新入队逻辑。要恢复到以前的行为,可以将容器的 alwaysRequeueWithTxManagerRollback 属性设置为 true。请参阅消息侦听器容器配置。 |
使用 RabbitTransactionManager
RabbitTransactionManager 是在外部事务中执行 Rabbit 操作并与之同步的替代方案。此事务管理器是 PlatformTransactionManager 接口的实现,应与单个 Rabbit ConnectionFactory 一起使用。
| 此策略无法提供 XA 事务 — 例如,为了在消息传递和数据库访问之间共享事务。 |
应用程序代码需要通过 ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean) 而不是标准的 Connection.createChannel() 调用和后续通道创建来检索事务性 Rabbit 资源。当使用 Spring AMQP 的 RabbitTemplate 时,它将自动检测线程绑定的 Channel 并自动参与其事务。
使用 Java 配置,您可以通过以下 bean 设置新的 RabbitTransactionManager
@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return new RabbitTransactionManager(connectionFactory);
}
如果您喜欢 XML 配置,可以在 XML 应用程序上下文文件中声明以下 bean
<bean id="rabbitTxManager"
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
事务同步
将 RabbitMQ 事务与某些其他(例如 DBMS)事务同步提供“尽力而为的单阶段提交”语义。RabbitMQ 事务可能在事务同步的完成后期提交失败。这会被 spring-tx 基础设施记录为错误,但不会向调用代码抛出异常。从 2.3.10 版本开始,您可以在事务在处理该事务的同一线程上提交后调用 ConnectionUtils.checkAfterCompletion()。如果未发生异常,它将简单返回;否则,它将抛出 AfterCompletionFailedException,该异常将具有表示完成的同步状态的属性。
通过调用 ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true) 启用此功能;这是一个全局标志,适用于所有线程。