事务

Spring Rabbit 框架支持同步和异步用例中的自动事务管理,具有多种不同的语义,可以像 Spring 事务的现有用户所熟悉的那样以声明方式选择。这使得实现许多(如果不是大多数)常见消息模式变得容易。

有两种方式向框架指示所需的事务语义。在 RabbitTemplateSimpleMessageListenerContainer 中,都有一个标志 channelTransacted,如果设置为 true,则指示框架使用事务性通道,并以提交或回滚(取决于结果)结束所有操作(发送或接收),异常表示回滚。另一种方式是提供一个外部事务,使用 Spring 的一个 PlatformTransactionManager 实现作为正在进行操作的上下文。如果在框架发送或接收消息时已经存在事务,并且 channelTransacted 标志为 true,则消息传递事务的提交或回滚将被推迟到当前事务结束。如果 channelTransacted 标志为 false,则消息传递操作不应用事务语义(它会自动确认)。

channelTransacted 标志是一个配置时设置。它在 AMQP 组件创建时声明和处理一次,通常在应用程序启动时。原则上,外部事务更具动态性,因为系统在运行时响应当前线程状态。然而,在实践中,当事务以声明方式叠加到应用程序上时,它通常也是一个配置设置。

对于带有 RabbitTemplate 的同步用例,外部事务由调用者提供,可以根据喜好采用声明式或编程式(通常的 Spring 事务模型)。以下示例展示了一种声明式方法(通常首选,因为它是非侵入性的),其中模板已配置 channelTransacted=true

@Transactional
public void doSomething() {
    String incoming = rabbitTemplate.receiveAndConvert();
    // do some more database processing...
    String outgoing = processInDatabaseAndExtractReply(incoming);
    rabbitTemplate.convertAndSend(outgoing);
}

在前面的示例中,一个 String 负载在标记为 @Transactional 的方法内部被接收、转换并作为消息体发送。如果数据库处理因异常失败,则收到的消息将被返回给 broker,而发送的消息不会被发送。这适用于在事务方法链内部使用 RabbitTemplate 进行的任何操作(除非例如直接操作 Channel 以提前提交事务)。

对于带有 SimpleMessageListenerContainer 的异步用例,如果需要外部事务,容器在设置监听器时必须请求它。为了指示需要外部事务,用户在配置容器时会向其提供一个 PlatformTransactionManager 的实现。以下示例展示了如何操作

@Configuration
public class ExampleExternalTransactionAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setTransactionManager(transactionManager());
        container.setChannelTransacted(true);
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

}

在前面的示例中,事务管理器作为依赖项从另一个 bean 定义(未显示)注入,并且 channelTransacted 标志也设置为 true。其效果是,如果监听器因异常失败,事务将被回滚,并且消息也会被返回给 broker。重要的是,如果事务提交失败(例如,由于数据库约束错误或连接问题),AMQP 事务也会回滚,并且消息被返回给 broker。这有时被称为“尽力单阶段提交”(Best Efforts 1 Phase Commit),是实现可靠消息传递的一种非常强大的模式。如果在前面的示例中,channelTransacted 标志设置为 false(默认值),则外部事务仍将为监听器提供,但所有消息操作都会自动确认,因此即使业务操作回滚,消息操作也会提交。

条件回滚

在 1.6.6 版本之前,在使用外部事务管理器(如 JDBC)时,向容器的 transactionAttribute 添加回滚规则没有效果。异常总是会回滚事务。

此外,在容器的增强链中使用事务增强时,条件回滚也不是很有效,因为所有监听器异常都被包装在 ListenerExecutionFailedException 中。

第一个问题已得到纠正,规则现在已正确应用。此外,现在提供了 ListenerFailedRuleBasedTransactionAttribute。它是 RuleBasedTransactionAttribute 的子类,唯一的区别是它了解 ListenerExecutionFailedException,并使用此类异常的原因来应用规则。此事务属性可以直接在容器中使用,或通过事务增强使用。

以下示例使用了此规则

@Bean
public AbstractMessageListenerContainer container() {
    ...
    container.setTransactionManager(transactionManager);
    RuleBasedTransactionAttribute transactionAttribute =
        new ListenerFailedRuleBasedTransactionAttribute();
    transactionAttribute.setRollbackRules(Collections.singletonList(
        new NoRollbackRuleAttribute(DontRollBackException.class)));
    container.setTransactionAttribute(transactionAttribute);
    ...
}

关于已接收消息回滚的说明

AMQP 事务仅适用于发送到 broker 的消息和确认。因此,当 Spring 事务回滚且消息已被接收时,Spring AMQP 不仅要回滚事务,还需要手动拒绝消息(有点像 nack,但规范不这么称呼)。消息拒绝时采取的操作与事务无关,并取决于 defaultRequeueRejected 属性(默认值:true)。有关拒绝失败消息的更多信息,请参见消息监听器和异步情况

有关 RabbitMQ 事务及其限制的更多信息,请参见RabbitMQ Broker 语义

在 RabbitMQ 2.7.0 之前,此类消息(以及通道关闭或中止时未确认的任何消息)会在 Rabbit broker 上进入队列尾部。从 2.7.0 起,拒绝的消息会进入队列头部,这与 JMS 回滚消息的方式类似。
之前,事务回滚时消息的重新入队行为在本地事务和提供 TransactionManager 的情况下不一致。在前者中,应用正常的重新入队逻辑(AmqpRejectAndDontRequeueExceptiondefaultRequeueRejected=false)(参见消息监听器和异步情况)。使用事务管理器时,消息在回滚时会无条件重新入队。从 2.0 版本开始,行为一致,两种情况下都应用正常的重新入队逻辑。要恢复到之前的行为,可以将容器的 alwaysRequeueWithTxManagerRollback 属性设置为 true。参见消息监听器容器配置

使用 RabbitTransactionManager

RabbitTransactionManager 是一种替代方案,用于在外部事务中执行 Rabbit 操作并与之同步。此事务管理器是 PlatformTransactionManager 接口的一个实现,应与单个 Rabbit ConnectionFactory 一起使用。

此策略无法提供 XA 事务,例如,无法在消息传递和数据库访问之间共享事务。

应用程序代码需要通过 ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean) 而不是标准的 Connection.createChannel() 调用及其后续的通道创建来检索事务性 Rabbit 资源。使用 Spring AMQP 的 RabbitTemplate 时,它会自动检测线程绑定的 Channel 并自动参与其事务。

通过 Java 配置,您可以使用以下 bean 设置新的 RabbitTransactionManager

@Bean
public RabbitTransactionManager rabbitTransactionManager() {
    return new RabbitTransactionManager(connectionFactory);
}

如果您更喜欢 XML 配置,可以在 XML 应用程序上下文文件中声明以下 bean

<bean id="rabbitTxManager"
      class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

事务同步

将 RabbitMQ 事务与某些其他(例如 DBMS)事务同步可提供“尽力单阶段提交”(Best Effort One Phase Commit)语义。RabbitMQ 事务可能在事务同步的完成后续阶段(after completion phase)提交失败。这会被 spring-tx 基础设施记录为错误,但不会向调用代码抛出异常。从 2.3.10 版本开始,您可以在事务已提交后,在处理事务的同一线程上调用 ConnectionUtils.checkAfterCompletion()。如果没有发生异常,它将简单返回;否则,它将抛出 AfterCompletionFailedException,该异常将包含一个表示完成同步状态的属性。

通过调用 ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true) 启用此功能;这是一个全局标志,适用于所有线程。