线程屏障
有时,我们需要暂停消息流线程,直到发生其他异步事件。例如,考虑一个将消息发布到 RabbitMQ 的 HTTP 请求。我们可能希望在 RabbitMQ 代理发出确认消息已接收的确认之前不回复用户。
在 4.2 版本中,Spring Integration 引入了 <barrier/>
组件来实现此目的。底层的 MessageHandler
是 BarrierMessageHandler
。此类还实现了 MessageTriggerAction
,其中传递给 trigger()
方法的消息会释放 handleRequestMessage()
方法中的对应线程(如果存在)。
挂起线程和触发线程通过在消息上调用CorrelationStrategy
来关联。当消息发送到input-channel
时,线程会挂起,最多等待requestTimeout
毫秒,等待相应的触发消息。默认的关联策略使用IntegrationMessageHeaderAccessor.CORRELATION_ID
头。当触发消息到达并具有相同的关联时,线程将被释放。释放后发送到output-channel
的消息是使用MessageGroupProcessor
构建的。默认情况下,消息是两个有效负载的Collection<?>
,并且头信息使用DefaultAggregatingMessageGroupProcessor
合并。
如果trigger() 方法先被调用(或在主线程超时后),它将被挂起,最多等待triggerTimeout ,等待挂起消息到达。如果您不想挂起触发线程,请考虑将其移交给TaskExecutor ,以便其线程被挂起。
|
在 5.4 版本之前,请求消息和触发消息只有一个timeout 选项,但在某些用例中,为这些操作设置不同的超时时间会更好。因此,引入了requestTimeout 和triggerTimeout 选项。
|
requires-reply
属性决定在挂起线程在触发消息到达之前超时时要采取的操作。默认情况下,它是false
,这意味着端点返回null
,流程结束,线程返回调用者。当为true
时,将抛出ReplyRequiredException
。
您可以以编程方式调用trigger()
方法(使用名称获取 bean 引用,barrier.handler
,其中barrier
是屏障端点的 bean 名称)。或者,您可以配置一个<outbound-channel-adapter/>
来触发释放。
只有一个线程可以使用相同的关联被挂起。相同的关联可以多次使用,但一次只能使用一次。如果第二个线程到达并具有相同的关联,则会抛出异常。 |
以下示例展示了如何使用自定义头进行关联
-
Java
-
XML
@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
barrier.setOutputChannel(out());
barrier.setDiscardChannel(lateTriggerChannel);
return barrier;
}
@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
correlation-strategy-expression="headers['myHeader']"
output-processor="myOutputProcessor"
discard-channel="lateTriggerChannel"
timeout="10000">
</int:barrier>
<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />
取决于哪个线程先收到消息,发送消息到in
的线程或发送消息到release
的线程将等待最多十秒,直到另一个消息到达。当消息被释放时,out
通道将收到一条消息,该消息组合了调用名为myOutputProcessor
的自定义MessageGroupProcessor
bean的结果。如果主线程超时,而触发消息稍后到达,您可以配置一个丢弃通道,将延迟的触发消息发送到该通道。
有关此组件的示例,请参见屏障示例应用程序。