线程屏障

有时,我们需要暂停消息流线程,直到某些其他异步事件发生。例如,考虑发布消息到 RabbitMQ 的 HTTP 请求。我们可能希望在 RabbitMQ 代理发出确认消息已收到之前不回复用户。

在 4.2 版本中,Spring Integration 引入了 <barrier/> 组件用于此目的。底层的 MessageHandlerBarrierMessageHandler。此类还实现了 MessageTriggerAction,其中传递给 trigger() 方法的消息会释放 handleRequestMessage() 方法(如果存在)中的对应线程。

挂起的线程和触发线程通过在消息上调用 CorrelationStrategy 来关联。当消息发送到 input-channel 时,线程会暂停最多 requestTimeout 毫秒,等待相应的触发消息。默认的相关策略使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 头。当带有相同相关性的触发消息到达时,线程将被释放。释放后发送到 output-channel 的消息是使用 MessageGroupProcessor 构造的。默认情况下,消息是两个有效负载的 Collection<?>,并且标头使用 DefaultAggregatingMessageGroupProcessor 合并。

如果首先调用 trigger() 方法(或在主线程超时后调用),它将暂停最多 triggerTimeout 时间,等待挂起消息到达。如果您不想暂停触发线程,请考虑将其移交给 TaskExecutor,以便改为暂停其线程。
在 5.4 版本之前,请求和触发消息只有一个 timeout 选项,但在某些用例中,为这些操作设置不同的超时时间会更好。因此,引入了 requestTimeouttriggerTimeout 选项。

requires-reply 属性决定如果挂起的线程在触发消息到达之前超时该采取的操作。默认情况下,它是 false,这意味着端点返回 null,流程结束,线程返回调用方。当为 true 时,将抛出 ReplyRequiredException

您可以以编程方式调用 trigger() 方法(使用名称 barrier.handler 获取 Bean 引用,其中 barrier 是屏障端点的 Bean 名称)。或者,您可以配置 <outbound-channel-adapter/> 来触发释放。

只有一个线程可以使用相同的相关性被挂起。相同的相关性可以多次使用,但一次只能并发使用一次。如果第二个线程到达时具有相同的相关性,则会抛出异常。

以下示例显示了如何使用自定义头进行关联

  • Java

  • XML

@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
    BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
    barrier.setOutputChannel(out());
    barrier.setDiscardChannel(lateTriggerChannel);
    return barrier;
}

@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
    return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
        correlation-strategy-expression="headers['myHeader']"
        output-processor="myOutputProcessor"
        discard-channel="lateTriggerChannel"
        timeout="10000">
</int:barrier>

<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />

取决于哪个线程首先收到消息,发送消息到 in 的线程或发送消息到 release 的线程都会等待最多十秒,直到另一条消息到达。当消息被释放时,out 通道将收到一条消息,该消息结合了调用名为 myOutputProcessor 的自定义 MessageGroupProcessor Bean 的结果。如果主线程超时并且稍后到达触发器,您可以配置一个丢弃通道,将延迟的触发器发送到该通道。如果请求消息未及时到达,也会丢弃触发器消息。

有关此组件的示例,请参阅 屏障示例应用程序