事务支持
理解消息流中的事务
Spring Integration 提供了几个钩子来满足你的消息流的事务需求。为了更好地理解这些钩子以及如何从中受益,我们首先需要回顾一下可以用来启动消息流的六种机制,并看看如何在每种机制中处理这些流的事务需求。
以下六种机制启动一个消息流(每种机制的详细信息在本手册中都有介绍):
-
网关代理:一种基本的消息网关。
-
消息通道:直接与
MessageChannel
方法交互(例如,channel.send(message)
)。 -
消息发布者:通过调用 Spring bean 方法的副产品来启动消息流的方式。
-
入站通道适配器和网关:将第三方系统与 Spring Integration 消息系统连接起来,并基于此启动消息流的方式(例如,
[JmsMessage] → Jms 入站适配器 [SI Message] → SI 通道
)。 -
调度器:基于预配置的调度器分发的调度事件来启动消息流的方式。
-
轮询器 (Poller):类似于调度器,这是一种基于预配置的轮询器分发的调度或基于间隔的事件来启动消息流的方式。
我们可以将这六种机制分为两大类:
-
用户进程启动的消息流:此类示例场景包括调用网关方法或显式向
MessageChannel
发送Message
。换句话说,这些消息流的启动依赖于第三方进程(例如您编写的一些代码)。 -
守护进程启动的消息流:此类示例场景包括轮询器轮询消息队列以使用轮询的消息启动新的消息流,或者调度器通过创建新消息并在预定义的时间启动消息流来调度进程。
显然,网关代理、MessageChannel.send(…)
和 MessagePublisher
都属于第一类,而入站适配器和网关、调度器和轮询器属于第二类。
那么,如何在每种类别中的各种场景中解决事务需求?Spring Integration 是否需要针对特定场景提供明确的事务支持?或者您可以使用 Spring 的事务支持吗?
Spring 本身提供了对事务管理的一流支持。所以我们这里的目标不是提供新的东西,而是利用 Spring 现有的事务支持从中受益。换句话说,作为一个框架,我们必须暴露钩子以连接 Spring 的事务管理功能。然而,由于 Spring Integration 配置基于 Spring 配置,我们不需要总是暴露这些钩子,因为 Spring 已经暴露了它们。毕竟,每个 Spring Integration 组件都是一个 Spring Bean。
考虑到这一目标,我们可以再次考虑这两种场景:用户进程启动的消息流和守护进程启动的消息流。
由用户进程启动并在 Spring 应用上下文中配置的消息流受到此类进程通常的事务配置的约束。因此,它们无需由 Spring Integration 显式配置来支持事务。事务可以而且应该通过 Spring 的标准事务支持来启动。Spring Integration 消息流自然会遵循组件的事务语义,因为它本身就是由 Spring 配置的。例如,网关或服务激活器方法可以用 @Transactional
注解,或者可以在 XML 配置中定义一个 TransactionInterceptor
,其切入点表达式指向需要事务化的特定方法。总而言之,在这些场景中,您对事务配置和边界拥有完全控制权。
然而,对于守护进程启动的消息流来说,情况有所不同。尽管由开发人员配置,但这些流不直接涉及人类或某些其他进程来启动。这些是基于触发器的流,由触发器进程(守护进程)根据进程配置启动。例如,我们可以让一个调度器在每个周五晚上启动一个消息流。我们还可以配置一个触发器,每秒钟启动一个消息流等等。因此,我们需要一种方法来让这些基于触发器的进程知道我们想要使结果消息流具有事务性,以便在每次启动新的消息流时都可以创建一个事务上下文。换句话说,我们需要公开一些事务配置,但仅够将工作委托给 Spring 已提供的事务支持(就像我们在其他场景中所做的那样)。
轮询器事务支持
Spring Integration 为轮询器提供了事务支持。轮询器是一种特殊类型的组件,因为在轮询器任务中,我们可以对自身具有事务性的资源调用 receive()
,从而将 receive()
调用包含在事务边界内,这样在任务失败时可以回滚。如果我们对通道添加相同的支持,添加的事务将影响所有从 send()
调用开始的下游组件。这为事务划分提供了相当广泛的范围,而没有任何强有力的理由,特别是当 Spring 已经提供了几种方法来解决任何下游组件的事务需求时。然而,将 receive()
方法包含在事务边界内是支持轮询器的“强有力理由”。
无论何时配置轮询器,您都可以通过使用 transactional
子元素及其属性来提供事务配置,如下例所示:
<int:poller max-messages-per-poll="1" fixed-rate="1000">
<transactional transaction-manager="txManager"
isolation="DEFAULT"
propagation="REQUIRED"
read-only="true"
timeout="1000"/>
</poller>
上述配置类似于原生的 Spring 事务配置。您仍然必须提供对事务管理器的引用,并指定事务属性或依赖默认值(例如,如果未指定 'transaction-manager' 属性,则默认使用名为 'transactionManager' 的 bean)。在内部,该过程包装在 Spring 的原生事务中,其中 TransactionInterceptor
负责处理事务。有关如何配置事务管理器、事务管理器的类型(例如 JTA、Datasource 等)以及与事务配置相关的其他详细信息,请参阅 Spring Framework 参考指南。
通过上述配置,此轮询器启动的所有消息流都将是事务性的。有关轮询器事务配置的更多信息和详细信息,请参阅 轮询和事务。
除了事务之外,在运行轮询器时,您可能还需要解决几个横切关注点。为了提供帮助,轮询器元素接受一个 <advice-chain>
子元素,允许您定义要应用于轮询器的一系列自定义通知实例。(更多详细信息请参阅 可轮询消息源)。在 Spring Integration 2.0 中,轮询器经过了重构,现在使用代理机制来解决事务问题以及其他横切关注点。这项工作带来的一个显著变化是,我们让 <transactional>
和 <advice-chain>
元素互斥。这样做的理由是,如果您需要多个通知且其中一个是事务通知,您可以将其包含在 <advice-chain>
中,就像以前一样方便,但控制能力更强,因为您现在可以选择将通知放置在所需的位置。以下示例展示了如何实现:
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<advice-chain>
<ref bean="txAdvice"/>
<ref bean="someOtherAdviceBean" />
<beans:bean class="foo.bar.SampleAdvice"/>
</advice-chain>
</poller>
<tx:advice id="txAdvice" transaction-manager="txManager">
<tx:attributes>
<tx:method name="get*" read-only="true"/>
<tx:method name="*"/>
</tx:attributes>
</tx:advice>
上述示例展示了基于 XML 的 Spring 事务通知 (txAdvice
) 的基本配置,并将其包含在轮询器定义的 <advice-chain>
中。如果您只需要处理轮询器的事务问题,仍然可以使用 <transactional>
元素作为便利方式。
事务边界
另一个重要因素是消息流中事务的边界。事务启动时,事务上下文会绑定到当前线程。因此,无论消息流中有多少端点和通道,只要您确保流在同一线程上继续执行,您的事务上下文就会得到保留。一旦您通过引入 可轮询通道 (Pollable Channel) 或 执行器通道 (Executor Channel) 或在某些服务中手动启动新线程来中断它,事务边界也会被打破。本质上,事务将立即结束,并且如果在线程之间发生了成功的移交,即使流将继续并且下游可能仍然导致异常,该流也将被视为成功并发送 COMMIT 信号。如果这样的流是同步的,该异常可能会被抛回给消息流的启动者(同时也是事务上下文的启动者),并且事务将导致 ROLLBACK。中间方案是在任何线程边界被打破的点使用事务通道。例如,您可以使用委托给事务性 MessageStore 策略的基于队列的通道,或者使用基于 JMS 的通道。
事务同步
在某些环境中,将操作与包含整个流的事务同步会有所帮助。例如,考虑流开头的一个 <file:inbound-channel-adapter/>
,它执行一些数据库更新。如果事务提交,我们可能想将文件移动到 success
目录;而如果事务回滚,我们可能想将其移动到 failure
目录。
Spring Integration 2.2 引入了将这些操作与事务同步的能力。此外,如果您没有“真实的”事务但仍希望在成功或失败时执行不同的操作,则可以配置一个 PseudoTransactionManager
。更多信息请参阅 伪事务。
以下列表显示了此功能的主要策略接口:
public interface TransactionSynchronizationFactory {
TransactionSynchronization create(Object key);
}
public interface TransactionSynchronizationProcessor {
void processBeforeCommit(IntegrationResourceHolder holder);
void processAfterCommit(IntegrationResourceHolder holder);
void processAfterRollback(IntegrationResourceHolder holder);
}
该工厂负责创建一个 TransactionSynchronization
对象。您可以实现自己的,或使用框架提供的:DefaultTransactionSynchronizationFactory
。此实现返回一个 TransactionSynchronization
,它委托给 TransactionSynchronizationProcessor
的默认实现:ExpressionEvaluatingTransactionSynchronizationProcessor
。此处理器支持三个 SpEL 表达式:beforeCommitExpression
、afterCommitExpression
和 afterRollbackExpression
。
这些操作对于熟悉事务的人来说应该是不言自明的。在每种情况下,#root
变量都是原始的 Message
。在某些情况下,根据轮询器轮询的 MessageSource
,会提供其他 SpEL 变量。例如,MongoDbMessageSource
提供了 #mongoTemplate
变量,该变量引用消息源的 MongoTemplate
。类似地,RedisStoreMessageSource
提供了 #store
变量,该变量引用轮询创建的 RedisStore
。
要为特定的轮询器启用此功能,您可以在轮询器的 <transactional/>
元素上使用 synchronization-factory
属性提供对 TransactionSynchronizationFactory
的引用。
从 5.0 版本开始,Spring Integration 提供了 PassThroughTransactionSynchronizationFactory
,当未配置 TransactionSynchronizationFactory
但通知链中存在类型为 TransactionInterceptor
的通知时,它会默认应用于轮询端点。使用任何开箱即用的 TransactionSynchronizationFactory
实现时,如果在事务通知之后抛出异常,轮询端点会将轮询的消息绑定到当前事务上下文,并在 MessagingException
中将其作为 failedMessage
提供。使用不实现 TransactionInterceptor
的自定义事务通知时,您可以显式配置 PassThroughTransactionSynchronizationFactory
以实现此行为。在这两种情况下,MessagingException
都将成为发送到 errorChannel
的 ErrorMessage
的 payload,而 cause 是通知抛出的原始异常。在此之前,ErrorMessage
的 payload 是通知抛出的原始异常,并且不提供 failedMessage
信息的引用,这使得难以确定事务提交问题的原因。
为了简化这些组件的配置,Spring Integration 为默认工厂提供了命名空间支持。以下示例展示了如何使用命名空间配置文件入站通道适配器:
<int-file:inbound-channel-adapter id="inputDirPoller"
channel="someChannel"
directory="/foo/bar"
filter="filter"
comparator="testComparator">
<int:poller fixed-rate="5000">
<int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
</int:poller>
</int-file:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.renameTo(new java.io.File('/success/' + payload.name))"
channel="committedChannel" />
<int:after-rollback expression="payload.renameTo(new java.io.File('/failed/' + payload.name))"
channel="rolledBackChannel" />
</int:transaction-synchronization-factory>
SpEL 评估的结果将作为 payload 发送到 committedChannel
或 rolledBackChannel
(在本例中,这将是 Boolean.TRUE
或 Boolean.FALSE
——即 java.io.File.renameTo()
方法调用的结果)。
如果您希望发送整个 payload 以进行进一步的 Spring Integration 处理,请使用 'payload' 表达式。
重要的是要理解,这会将操作与事务同步。它并不会使一个并非固有事务性的资源真正具有事务性。相反,事务(无论是 JDBC 还是其他类型)在轮询之前启动,并在流完成时提交或回滚,然后执行同步操作。 如果您提供了自定义的 |
除了 after-commit
和 after-rollback
表达式之外,还支持 before-commit
。在这种情况下,如果评估(或下游处理)抛出异常,事务将回滚而不是提交。
伪事务
在阅读了事务同步一节后,你可能会认为在流程完成时执行这些“成功”或“失败”操作会很有用,即使在轮询器的下游没有“真正的”事务性资源(如 JDBC)。例如,考虑一个“<file:inbound-channel-adapter/>”后跟一个“<ftp:outbout-channel-adapter/>”。这两个组件都不是事务性的,但我们可能希望根据 FTP 传输的成功或失败,将输入文件移动到不同的目录。
为了提供此功能,框架提供了一个 PseudoTransactionManager
,即使没有涉及真正的事务性资源,也可以启用上述配置。如果流程正常完成,将调用 beforeCommit
和 afterCommit
同步。如果失败,将调用 afterRollback
同步。由于它不是真正的事务,因此不会发生实际的提交或回滚。伪事务是用于启用同步特性的载体。
要使用 PseudoTransactionManager
,你可以将其定义为一个 <bean/>,就像配置真正的事务管理器一样。下面的例子展示了如何这样做
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />