线程屏障
有时,我们需要暂停消息流线程,直到发生其他异步事件。例如,考虑一个将消息发布到 RabbitMQ 的 HTTP 请求。我们可能希望在 RabbitMQ 代理确认消息已接收之前,不对用户进行回复。
在 4.2 版本中,Spring Integration 引入了 <barrier/>
组件来实现此目的。底层是 BarrierMessageHandler
,它实现了 MessageHandler
接口。该类还实现了 MessageTriggerAction
接口,通过调用其 trigger()
方法传递消息,可以释放 handleRequestMessage()
方法中对应的(如果存在)挂起线程。
挂起的线程和触发线程通过对消息调用 CorrelationStrategy
进行关联。当消息发送到 input-channel
时,线程将被挂起,最多等待 requestTimeout
毫秒,等待相应的触发消息。默认的关联策略使用 IntegrationMessageHeaderAccessor.CORRELATION_ID
头信息。当带有相同关联 ID 的触发消息到达时,线程被释放。释放后发送到 output-channel
的消息是使用 MessageGroupProcessor
构建的。默认情况下,该消息是一个包含两个 payload 的 Collection<?>
,并且头信息使用 DefaultAggregatingMessageGroupProcessor
合并。
如果 trigger() 方法先被调用(或主线程超时后),它将挂起,最多等待 triggerTimeout 毫秒,等待挂起消息的到达。如果您不想挂起触发线程,请考虑将其交给 TaskExecutor 处理,以便由 TaskExecutor 的线程来挂起。 |
在 5.4 版本之前,请求消息和触发消息只有一个 timeout 选项,但在某些用例中,最好对这些操作设置不同的超时时间。因此,引入了 requestTimeout 和 triggerTimeout 选项。 |
requires-reply
属性决定了在触发消息到达之前,如果挂起线程超时应采取的操作。默认情况下,它为 false
,这意味着端点返回 null
,流结束,并且线程返回给调用者。当设置为 true
时,将抛出 ReplyRequiredException
异常。
您可以通过编程方式调用 trigger()
方法(使用 bean 名称 barrier.handler
获取 bean 引用,其中 barrier
是 barrier 端点的 bean 名称)。或者,您可以配置一个 <outbound-channel-adapter/>
来触发释放。
使用相同的关联 ID 只能挂起一个线程。同一个关联 ID 可以多次使用,但不能同时用于多个挂起操作。如果第二个线程带着相同的关联 ID 到达,则会抛出异常。 |
以下示例展示了如何使用自定义头信息进行关联
-
Java
-
XML
@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
barrier.setOutputChannel(out());
barrier.setDiscardChannel(lateTriggerChannel);
return barrier;
}
@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
correlation-strategy-expression="headers['myHeader']"
output-processor="myOutputProcessor"
discard-channel="lateTriggerChannel"
timeout="10000">
</int:barrier>
<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />
根据哪个通道先接收到消息,发送消息到 in
通道的线程或发送消息到 release
通道的线程将最多等待十秒,直到另一条消息到达。消息释放后,out
通道会发送一条消息,该消息结合了调用名为 myOutputProcessor
的自定义 MessageGroupProcessor
bean 的结果。如果主线程超时并且触发消息稍后才到达,您可以配置一个丢弃通道,将延迟到达的触发消息发送到该通道。如果请求消息未及时到达,触发消息也会被丢弃。
有关此组件的示例,请参阅barrier 示例应用。