事务支持

本章介绍 Spring 集成对事务的支持。它涵盖以下主题

理解消息流中的事务

Spring 集成提供了多个钩子来解决消息流的事务需求。为了更好地理解这些钩子以及如何从中获益,我们首先需要回顾一下可以用来启动消息流的六种机制,并了解如何在每种机制中解决这些流的事务需求。

以下六种机制启动消息流(每个机制的详细信息将在本手册中提供)

  • 网关代理:一个基本的信使网关。

  • 消息通道:直接与MessageChannel方法交互(例如,channel.send(message))。

  • 消息发布者:作为 Spring Bean 上方法调用的副产品启动消息流的方式。

  • 入站通道适配器和网关:通过将第三方系统连接到 Spring 集成信使系统来启动消息流的方式(例如,[JmsMessage] → Jms 入站适配器[SI 消息] → SI 通道)。

  • 调度器:基于预先配置的调度器分发的调度事件启动消息流的方式。

  • 轮询器:类似于调度器,这是基于预先配置的轮询器分发的调度或间隔事件启动消息流的方式。

我们可以将这六种机制分为两大类

  • 由用户进程启动的消息流:此类别中的示例场景包括调用网关方法或显式地将Message发送到MessageChannel。换句话说,这些消息流依赖于第三方进程(例如您编写的某些代码)来启动。

  • 由守护进程启动的消息流:此类别中的示例场景包括轮询器轮询消息队列以使用轮询到的消息启动新的消息流,或者调度器通过创建新消息并在预定义的时间启动消息流来调度进程。

显然,网关代理、MessageChannel.send(…​)MessagePublisher都属于第一类,而入站适配器和网关、调度器和轮询器属于第二类。

那么,如何在每种类别中的各种场景中解决事务需求?Spring 集成是否需要为特定场景提供与事务相关的显式内容?或者可以使用 Spring 的事务支持?

Spring 本身提供了对事务管理的一流支持。因此,我们的目标不是提供新的东西,而是使用 Spring 来利用其现有的事务支持。换句话说,作为框架,我们必须向 Spring 的事务管理功能公开钩子。但是,由于 Spring 集成配置基于 Spring 配置,因此我们并不总是需要公开这些钩子,因为 Spring 已经公开它们。毕竟,每个 Spring 集成组件都是一个 Spring Bean。

考虑到这个目标,我们可以再次考虑两种情况:由用户进程发起的消息流和由守护进程发起的消息流。

由用户进程发起并在 Spring 应用程序上下文中配置的消息流,会受到此类进程的常规事务配置的影响。因此,它们不需要由 Spring Integration 显式配置来支持事务。事务可以通过 Spring 的标准事务支持来启动,并且应该这样做。Spring Integration 消息流自然会遵守组件的事务语义,因为它本身是由 Spring 配置的。例如,网关或服务激活器方法可以使用 @Transactional 进行注解,或者可以在 XML 配置中定义一个 TransactionInterceptor,并使用指向特定方法的切入点表达式,这些方法应该具有事务性。最重要的是,您对这些情况下的事务配置和边界拥有完全控制权。

但是,当涉及到由守护进程发起的消息流时,情况就有所不同了。尽管这些流是由开发人员配置的,但它们并不直接涉及人类或其他进程来启动。这些是基于触发器的流,由触发器进程(守护进程)根据进程的配置来启动。例如,我们可以让调度器在每个星期五晚上启动一个消息流。我们还可以配置一个触发器,它每秒启动一个消息流,等等。因此,我们需要一种方法来让这些基于触发器的进程知道我们希望使结果消息流具有事务性,以便在启动新的消息流时创建事务上下文。换句话说,我们需要公开一些事务配置,但只需要足以委托给 Spring 已经提供的事务支持(就像我们在其他情况下所做的那样)。

轮询器事务支持

Spring Integration 为轮询器提供事务支持。轮询器是一种特殊的组件类型,因为在轮询器任务中,我们可以对本身具有事务性的资源调用 receive(),从而将 receive() 调用包含在事务边界内,这使得它可以在任务失败时回滚。如果我们要为通道添加相同的支持,则添加的事务将影响从 send() 调用开始的所有下游组件。这为事务划分提供了相当广泛的范围,却没有充分的理由,尤其是在 Spring 已经提供了多种方法来解决任何下游组件的事务需求的情况下。但是,receive() 方法被包含在事务边界内是轮询器的“充分理由”。

每次配置 Poller 时,您可以使用transactional子元素及其属性提供事务配置,如下例所示

<int:poller max-messages-per-poll="1" fixed-rate="1000">
    <transactional transaction-manager="txManager"
                   isolation="DEFAULT"
                   propagation="REQUIRED"
                   read-only="true"
                   timeout="1000"/>
</poller>

前面的配置看起来类似于本机 Spring 事务配置。您仍然需要提供对事务管理器的引用,并指定事务属性或依赖于默认值(例如,如果未指定“transaction-manager”属性,则默认为名为“transactionManager”的 bean)。在内部,该过程包装在 Spring 的本机事务中,其中TransactionInterceptor负责处理事务。有关如何配置事务管理器、事务管理器类型(例如 JTA、Datasource 等)以及与事务配置相关的其他详细信息,请参阅Spring 框架参考指南

使用前面的配置,由该 Poller 启动的所有消息流都是事务性的。有关 Poller 事务配置的更多信息和详细信息,请参阅轮询和事务

除了事务之外,在运行 Poller 时,您可能还需要解决几个更广泛的跨领域问题。为了帮助解决这个问题,Poller 元素接受一个<advice-chain>子元素,它允许您定义要应用于 Poller 的自定义建议实例链。(有关更多详细信息,请参阅可轮询的消息源)。在 Spring Integration 2.0 中,Poller 经历了重构工作,现在使用代理机制来解决事务问题以及其他跨领域问题。从这项工作中发展出来的一个重大变化是,我们使<transactional><advice-chain>元素相互排斥。这样做的理由是,如果您需要多个建议,其中一个建议是事务建议,您可以将其包含在<advice-chain>中,与以前一样方便,但控制更多,因为您现在可以选择将建议放置在所需的顺序中。以下示例展示了如何做到这一点

<int:poller max-messages-per-poll="1" fixed-rate="10000">
  <advice-chain>
    <ref bean="txAdvice"/>
    <ref bean="someOtherAdviceBean" />
    <beans:bean class="foo.bar.SampleAdvice"/>
  </advice-chain>
</poller>

<tx:advice id="txAdvice" transaction-manager="txManager">
  <tx:attributes>
    <tx:method name="get*" read-only="true"/>
    <tx:method name="*"/>
  </tx:attributes>
</tx:advice>

前面的示例展示了 Spring 事务建议(txAdvice)的基本基于 XML 的配置,并将其包含在 Poller 定义的<advice-chain>中。如果您只需要解决 Poller 的事务问题,您仍然可以使用<transactional>元素作为一种便利方式。

事务边界

另一个重要因素是消息流中事务的边界。当启动事务时,事务上下文绑定到当前线程。因此,无论您的消息流中有多少个端点和通道,只要您确保流在同一线程上继续,您的事务上下文就会被保留。一旦您通过引入可轮询通道执行器通道或在某些服务中手动启动新线程来打破它,事务边界也会被打破。本质上,事务将在那里结束,如果在线程之间成功进行了移交,则该流将被视为成功,并且将发送 COMMIT 信号,即使该流将继续并且可能仍然会导致下游某个地方出现异常。如果这样的流是同步的,则该异常可能会被抛回给消息流的启动者,该启动者也是事务上下文的启动者,并且事务将导致回滚。中间地带是在任何线程边界被打破的地方使用事务性通道。例如,您可以使用一个队列支持的通道,该通道委托给一个事务性消息存储策略,或者您可以使用一个 JMS 支持的通道。

事务同步

在某些环境中,使用涵盖整个流程的事务来同步操作会有所帮助。例如,考虑在流程开始时执行多个数据库更新的 <file:inbound-channel-adapter/>。如果事务提交,我们可能希望将文件移动到 success 目录,而如果事务回滚,我们可能希望将文件移动到 failure 目录。

Spring Integration 2.2 引入了与事务同步这些操作的功能。此外,如果您没有“真实”事务,但仍然希望在成功或失败时执行不同的操作,则可以配置 PseudoTransactionManager。有关更多信息,请参阅 伪事务

以下列表显示了此功能的关键策略接口

public interface TransactionSynchronizationFactory {

    TransactionSynchronization create(Object key);
}

public interface TransactionSynchronizationProcessor {

    void processBeforeCommit(IntegrationResourceHolder holder);

    void processAfterCommit(IntegrationResourceHolder holder);

    void processAfterRollback(IntegrationResourceHolder holder);

}

工厂负责创建 TransactionSynchronization 对象。您可以实现自己的对象,也可以使用框架提供的对象:DefaultTransactionSynchronizationFactory。此实现返回一个 TransactionSynchronization,它委托给 TransactionSynchronizationProcessor 的默认实现:ExpressionEvaluatingTransactionSynchronizationProcessor。此处理器支持三个 SpEL 表达式:beforeCommitExpressionafterCommitExpressionafterRollbackExpression

对于熟悉事务的人来说,这些操作应该是不言自明的。在每种情况下,#root 变量都是原始的 Message。在某些情况下,会提供其他 SpEL 变量,具体取决于轮询器轮询的 MessageSource。例如,MongoDbMessageSource 提供 #mongoTemplate 变量,该变量引用消息源的 MongoTemplate。类似地,RedisStoreMessageSource 提供 #store 变量,该变量引用轮询创建的 RedisStore

要为特定轮询器启用此功能,您可以使用 synchronization-factory 属性在轮询器的 <transactional/> 元素上提供对 TransactionSynchronizationFactory 的引用。

从 5.0 版本开始,Spring Integration 提供了 PassThroughTransactionSynchronizationFactory,它默认应用于轮询端点,前提是未配置 TransactionSynchronizationFactory,但建议链中存在类型为 TransactionInterceptor 的建议。当使用任何开箱即用的 TransactionSynchronizationFactory 实现时,轮询端点将轮询的消息绑定到当前事务上下文,并在事务建议后抛出异常时将其作为 MessagingException 中的 failedMessage 提供。当使用不实现 TransactionInterceptor 的自定义事务建议时,您可以显式配置 PassThroughTransactionSynchronizationFactory 来实现此行为。在这两种情况下,MessagingException 都成为发送到 errorChannelErrorMessage 的有效负载,而原因是建议抛出的原始异常。以前,ErrorMessage 的有效负载是建议抛出的原始异常,并且没有提供对 failedMessage 信息的引用,这使得难以确定事务提交问题的原因。

为了简化这些组件的配置,Spring Integration 为默认工厂提供了命名空间支持。以下示例展示了如何使用命名空间来配置文件入站通道适配器。

<int-file:inbound-channel-adapter id="inputDirPoller"
    channel="someChannel"
    directory="/foo/bar"
    filter="filter"
    comparator="testComparator">
    <int:poller fixed-rate="5000">
        <int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
    </int:poller>
</int-file:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="payload.renameTo(new java.io.File('/success/' + payload.name))"
        channel="committedChannel" />
    <int:after-rollback expression="payload.renameTo(new java.io.File('/failed/' + payload.name))"
        channel="rolledBackChannel" />
</int:transaction-synchronization-factory>

SpEL 表达式评估的结果将作为有效负载发送到 committedChannelrolledBackChannel(在本例中,这将是 Boolean.TRUEBoolean.FALSE - java.io.File.renameTo() 方法调用的结果)。

如果您希望发送整个有效负载以进行进一步的 Spring Integration 处理,请使用“payload”表达式。

重要的是要理解,这会将操作与事务同步。它不会使本来不是事务性的资源实际成为事务性的。相反,事务(无论是 JDBC 还是其他)在轮询之前启动,并在流程完成后提交或回滚,然后是同步操作。

如果您提供自定义的 TransactionSynchronizationFactory,它负责创建资源同步,该同步会在事务完成后自动解除绑定资源。默认的 TransactionSynchronizationFactory 通过返回 ResourceHolderSynchronization 的子类来实现这一点,默认的 shouldUnbindAtCompletion() 返回 true

除了 after-commitafter-rollback 表达式之外,还支持 before-commit。在这种情况下,如果评估(或下游处理)抛出异常,则事务将回滚而不是提交。

伪事务

在阅读了 事务同步 部分后,您可能会认为在流程完成后执行这些“成功”或“失败”操作会很有用,即使在轮询器下游没有“真实”的事务性资源(如 JDBC)。例如,考虑一个“<file:inbound-channel-adapter/>” 后面跟着一个“<ftp:outbout-channel-adapter/>”。这两个组件都不是事务性的,但我们可能希望根据 FTP 传输的成功或失败将输入文件移动到不同的目录。

为了提供此功能,框架提供了一个PseudoTransactionManager,即使没有真正的交易资源,也能实现上述配置。如果流程正常完成,则会调用beforeCommitafterCommit同步。如果失败,则会调用afterRollback同步。因为它不是真正的交易,所以不会发生实际的提交或回滚。伪交易是用于启用同步功能的工具。

要使用PseudoTransactionManager,您可以将其定义为一个<bean/>,就像配置真正的交易管理器一样。以下示例展示了如何操作。

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />

响应式交易

从 5.3 版本开始,ReactiveTransactionManager也可以与TransactionInterceptor建议一起使用,用于返回响应式类型的端点。这包括MessageSourceReactiveMessageHandler实现(例如ReactiveMongoDbMessageSource),它们生成带有FluxMono有效负载的消息。当其他所有回复生成的消息处理程序实现的回复有效负载也是某种响应式类型时,它们可以依赖于ReactiveTransactionManager