Scatter-Gather
从 4.1 版本开始,Spring Integration 提供了 scatter-gather 企业集成模式的实现。它是一个复合端点,其目标是将消息发送给接收者并聚合结果。正如 Enterprise Integration Patterns 中所述,它是一个用于“最优报价”等场景的组件,在这种场景下,我们需要向多个供应商请求信息并决定哪一个提供了最优惠的条款。
以前,可以使用离散组件配置该模式。此增强功能带来了更方便的配置。
ScatterGatherHandler
是一个请求-回复端点,它结合了 PublishSubscribeChannel
(或 RecipientListRouter
)和 AggregatingMessageHandler
。请求消息被发送到 scatter
通道,ScatterGatherHandler
等待聚合器发送到 outputChannel
的回复。
功能
Scatter-Gather
模式提出了两种场景:“拍卖(auction)”和“分发(distribution)”。在这两种情况下,aggregation
功能是相同的,并提供了 AggregatingMessageHandler
的所有可用选项。(实际上,ScatterGatherHandler
只需要一个 AggregatingMessageHandler
作为构造函数参数)。有关更多信息,请参见 Aggregator。
拍卖
拍卖 Scatter-Gather
变体对请求消息使用“发布-订阅”逻辑,其中“scatter”通道是一个 PublishSubscribeChannel
,带有 apply-sequence="true"
。但是,此通道可以是任何 MessageChannel
实现(与 ContentEnricher
中的 request-channel
一样 — 请参见 Content Enricher)。但是,在这种情况下,您应该为 aggregation
功能创建自己的自定义 correlationStrategy
。
分发
分发 Scatter-Gather
变体基于 RecipientListRouter
(参见 RecipientListRouter
),并拥有 RecipientListRouter
的所有可用选项。这是 ScatterGatherHandler
的第二个构造函数参数。如果您只想依赖于 recipient-list-router
和 aggregator
的默认 correlationStrategy
,则应指定 apply-sequence="true"
。否则,您应该为 aggregator
提供自定义的 correlationStrategy
。与 PublishSubscribeChannel
变体(拍卖变体)不同,recipient-list-router
的 selector
选项允许根据消息过滤目标供应商。使用 apply-sequence="true"
时,会提供默认的 sequenceSize
,并且 aggregator
可以正确释放组。分发选项与拍卖选项互斥。
applySequence=true 仅在基于 ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) 构造函数配置的普通 Java 配置中是必需的,因为框架无法修改外部提供的组件。为了方便起见,从 6.0 版本开始,用于 Scatter-Gather 的 XML 和 Java DSL 将 applySequence 设置为 true。 |
对于拍卖和分发两种变体,请求(分散)消息都会通过 gatherResultChannel
头进行丰富,以等待来自 aggregator
的回复消息。
默认情况下,所有供应商应将其结果发送到 replyChannel
头(通常通过省略最终端点的 output-channel
)。但是,还提供了 gatherChannel
选项,允许供应商将其回复发送到该通道进行聚合。
配置 Scatter-Gather 端点
以下示例展示了使用 Java 配置定义 Scatter-Gather
bean 的方法
@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setApplySequence(true);
router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
distributionChannel3()));
return router;
}
@Bean
public MessageHandler gatherer() {
return new AggregatingMessageHandler(
new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy(
IntegrationMessageHeaderAccessor.CORRELATION_ID),
new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}
@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setOutputChannel(output());
return handler;
}
在前面的示例中,我们使用 applySequence="true"
和接收者通道列表配置了 RecipientListRouter
distributor
bean。下一个 bean 是用于 AggregatingMessageHandler
的。最后,我们将这两个 bean 注入到 ScatterGatherHandler
bean 定义中,并将其标记为 @ServiceActivator
,以便将 Scatter-Gather 组件连接到集成流中。
以下示例展示了如何使用 XML 命名空间配置 <scatter-gather>
端点
<scatter-gather
id="" (1)
auto-startup="" (2)
input-channel="" (3)
output-channel="" (4)
scatter-channel="" (5)
gather-channel="" (6)
order="" (7)
phase="" (8)
send-timeout="" (9)
gather-timeout="" (10)
requires-reply="" > (11)
<scatterer/> (12)
<gatherer/> (13)
</scatter-gather>
1 | 端点的 ID。ScatterGatherHandler bean 会注册一个别名 id + '.handler' 。RecipientListRouter bean 会注册一个别名 id + '.scatterer' 。AggregatingMessageHandler bean 会注册一个别名 id + '.gatherer' 。可选。(BeanFactory 会生成默认的 id 值。) |
2 | 生命周期属性,指示端点是否应在应用上下文初始化期间启动。此外,ScatterGatherHandler 还实现了 Lifecycle 接口,并启动和停止内部创建的 gatherEndpoint (如果提供了 gather-channel )。可选。(默认为 true 。) |
3 | 在哪个通道上接收请求消息,以便在 ScatterGatherHandler 中处理它们。必需。 |
4 | ScatterGatherHandler 将聚合结果发送到的通道。可选。(入站消息可以在 replyChannel 消息头中指定回复通道)。 |
5 | 对于拍卖场景,用于发送分散消息的通道。可选。与 <scatterer> 子元素互斥。 |
6 | 用于接收每个供应商回复消息进行聚合的通道。它在分散消息中作为 replyChannel 头使用。可选。默认情况下,会创建一个 FixedSubscriberChannel 。 |
7 | 当多个处理器订阅同一个 DirectChannel 时,此组件的顺序(用于负载均衡目的)。可选。 |
8 | 指定端点应在哪个阶段启动和停止。启动顺序从最低到最高,停止顺序从最高到最低。默认情况下,此值为 Integer.MAX_VALUE ,这意味着此容器尽可能晚地启动,并尽可能早地停止。可选。 |
9 | 向 output-channel 发送回复 Message 时等待的超时时间间隔。默认情况下,send() 会阻塞一秒钟。它仅适用于输出通道存在某些“发送”限制的情况,例如容量固定的 QueueChannel 已满。在这种情况下,会抛出 MessageDeliveryException 。对于 AbstractSubscribableChannel 实现,send-timeout 会被忽略。对于 group-timeout(-expression) ,来自计划过期任务的 MessageDeliveryException 会导致该任务被重新调度。可选。 |
10 | 允许您指定 Scatter-Gather 在返回之前等待回复消息的时间。默认情况下,它会等待 30 秒。如果回复超时,则返回 'null'。可选。 |
11 | 指定 Scatter-Gather 是否必须返回非 null 值。此值默认为 true 。因此,当底层聚合器在 gather-timeout 后返回 null 值时,会抛出 ReplyRequiredException 。请注意,如果可能返回 null ,则应指定 gather-timeout 以避免无限期等待。 |
12 | <recipient-list-router> 选项。可选。与 scatter-channel 属性互斥。 |
13 | <aggregator> 选项。必需。 |
错误处理
由于 Scatter-Gather 是一个多请求-回复组件,错误处理会增加一些复杂性。在某些情况下,如果 ReleaseStrategy
允许在回复少于请求的情况下完成处理,则最好捕获并忽略下游异常。在其他情况下,当发生错误时,应考虑从子流返回类似“补偿消息”的东西。
每个异步子流都应配置 errorChannel
头,以便 MessagePublishingErrorHandler
正确发送错误消息。否则,错误将发送到全局 errorChannel
,并使用通用的错误处理逻辑。有关异步错误处理的更多信息,请参见 错误处理。
同步流可以使用 ExpressionEvaluatingRequestHandlerAdvice
来忽略异常或返回补偿消息。当异常从一个子流抛到 ScatterGatherHandler
时,它会被重新抛到上游。这样,所有其他子流的工作都将徒劳无功,它们的回复将在 ScatterGatherHandler
中被忽略。有时这可能是预期的行为,但在大多数情况下,最好在特定的子流中处理错误,而不会影响所有其他子流以及 gatherer 中的预期。
从版本 5.1.3 开始,ScatterGatherHandler
提供了 errorChannelName
选项。它会被填充到分散消息的 errorChannel
头中,并在发生异步错误时使用,或者可以在常规同步子流中用于直接发送错误消息。
以下示例配置展示了通过返回补偿消息进行异步错误处理
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}
为了生成正确的回复,我们必须从由 MessagePublishingErrorHandler
发送到 scatterGatherErrorChannel
的 MessagingException
的 failedMessage
中复制头(包括 replyChannel
和 errorChannel
)。这样,目标异常将返回到 ScatterGatherHandler
的 gatherer,以便完成回复消息组。此类异常 payload
可以在 gatherer 的 MessageGroupProcessor
中被过滤掉,或者在 Scatter-Gather 端点之后的下游以其他方式处理。
在将分散结果发送到 gatherer 之前,ScatterGatherHandler 会恢复请求消息头,包括回复和错误通道(如果存在)。这样,来自 AggregatingMessageHandler 的错误将传播到调用者,即使在分散接收者子流中应用了异步移交。为了成功操作,必须将 gatherResultChannel 、originalReplyChannel 和 originalErrorChannel 头传回分散接收者子流的回复中。在这种情况下,必须为 ScatterGatherHandler 配置一个合理的、有限的 gatherTimeout 。否则,默认情况下它将无限期地阻塞,等待 gatherer 的回复。 |