散布-收集

从 4.1 版本开始,Spring Integration 提供了 散布-收集 企业集成模式的实现。它是一个复合端点,其目标是将消息发送给接收方并聚合结果。如 企业集成模式 中所述,它是一个用于“最佳报价”等场景的组件,在这些场景中,我们需要从多个供应商请求信息并确定哪个供应商为我们提供了所请求项目的最佳条款。

以前,可以通过使用离散组件来配置该模式。此增强功能带来了更方便的配置。

ScatterGatherHandler 是一个请求-回复端点,它结合了 PublishSubscribeChannel(或 RecipientListRouter)和 AggregatingMessageHandler。请求消息被发送到 scatter 通道,ScatterGatherHandler 等待聚合器发送到 outputChannel 的回复。

功能

散布-收集 模式建议两种场景:“拍卖”和“分发”。在这两种情况下,聚合 功能都是相同的,并提供 AggregatingMessageHandler 可用的所有选项。(实际上,ScatterGatherHandler 只需要 AggregatingMessageHandler 作为构造函数参数。)有关更多信息,请参阅 聚合器

拍卖

拍卖 散布-收集 变体使用“发布-订阅”逻辑来处理请求消息,其中“散布”通道是具有 apply-sequence="true"PublishSubscribeChannel。但是,此通道可以是任何 MessageChannel 实现(就像 ContentEnricher 中的 request-channel 一样,请参阅 内容丰富器)。但是,在这种情况下,您应该为 聚合 功能创建自己的自定义 correlationStrategy

分发

分发 散布-收集 变体基于 RecipientListRouter(请参阅 RecipientListRouter),并提供 RecipientListRouter 可用的所有选项。这是 ScatterGatherHandler 的第二个构造函数参数。如果您想仅依赖于 recipient-list-routeraggregator 的默认 correlationStrategy,则应指定 apply-sequence="true"。否则,您应该为 aggregator 提供自定义 correlationStrategy。与 PublishSubscribeChannel 变体(拍卖变体)不同,拥有 recipient-list-router selector 选项允许根据消息过滤目标供应商。使用 apply-sequence="true",将提供默认的 sequenceSize,并且 aggregator 可以正确释放组。分发选项与拍卖选项互斥。

applySequence=true 仅在基于 ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) 构造函数配置的纯 Java 配置中需要,因为框架无法修改外部提供的组件。为了方便起见,从 6.0 版本开始,Scatter-Gather 的 XML 和 Java DSL 将 applySequence 设置为 true。

对于拍卖和分发变体,请求(散布)消息都用 gatherResultChannel 头部进行丰富,以等待来自 聚合器 的回复消息。

默认情况下,所有供应商都应将其结果发送到 replyChannel 头部(通常通过省略最终端点的 output-channel)。但是,也提供了 gatherChannel 选项,允许供应商将其回复发送到该通道以进行聚合。

配置 Scatter-Gather 端点

以下示例展示了 Scatter-Gather 的 bean 定义的 Java 配置

@Bean
public MessageHandler distributor() {
    RecipientListRouter router = new RecipientListRouter();
    router.setApplySequence(true);
    router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
            distributionChannel3()));
    return router;
}

@Bean
public MessageHandler gatherer() {
	return new AggregatingMessageHandler(
			new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
			new SimpleMessageStore(),
			new HeaderAttributeCorrelationStrategy(
			       IntegrationMessageHeaderAccessor.CORRELATION_ID),
			new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}

@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
	ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
	handler.setOutputChannel(output());
	return handler;
}

在前面的示例中,我们使用 applySequence="true" 和接收者通道列表配置了 RecipientListRouter distributor bean。下一个 bean 是用于 AggregatingMessageHandler 的。最后,我们将这两个 bean 注入到 ScatterGatherHandler bean 定义中,并将其标记为 @ServiceActivator,以将散布-聚合组件连接到集成流中。

以下示例展示了如何使用 XML 命名空间配置 <scatter-gather> 端点

<scatter-gather
		id=""  (1)
		auto-startup=""  (2)
		input-channel=""  (3)
		output-channel=""  (4)
		scatter-channel=""  (5)
		gather-channel=""  (6)
		order=""  (7)
		phase=""  (8)
		send-timeout=""  (9)
		gather-timeout=""  (10)
		requires-reply="" > (11)
			<scatterer/>  (12)
			<gatherer/>  (13)
</scatter-gather>
1 端点的 ID。ScatterGatherHandler bean 使用别名 id + '.handler' 注册。RecipientListRouter bean 使用别名 id + '.scatterer' 注册。AggregatingMessageHandler bean 使用别名 id + '.gatherer' 注册。可选。(BeanFactory 生成默认的 id 值。)
2 生命周期属性,指示在应用程序上下文初始化期间是否应启动端点。此外,ScatterGatherHandler 还实现了 Lifecycle,并启动和停止 gatherEndpoint,如果提供了 gather-channel,则会在内部创建 gatherEndpoint。可选。(默认值为 true。)
3 用于接收请求消息的通道,以便在 ScatterGatherHandler 中处理它们。必需。
4 ScatterGatherHandler 发送聚合结果的通道。可选。(传入消息可以在 replyChannel 消息头部中指定自己的回复通道)。
5 用于向拍卖场景发送散布消息的通道。可选。与 <scatterer> 子元素互斥。
6 用于接收来自每个供应商的回复以进行聚合的通道。它用作散布消息中的 replyChannel 头部。可选。默认情况下,将创建 FixedSubscriberChannel
7 当多个处理程序订阅同一个 DirectChannel 时,此组件的顺序(用于负载均衡目的)。可选。
8 指定端点应启动和停止的阶段。启动顺序从最低到最高,关闭顺序从最高到最低。默认情况下,此值为 Integer.MAX_VALUE,表示此容器尽可能晚启动,尽可能早停止。可选。
9 output-channel 发送回复 Message 时等待的超时间隔。默认情况下,send() 会阻塞一秒钟。它仅在输出通道存在一些“发送”限制时适用,例如,具有固定“容量”且已满的 QueueChannel。在这种情况下,将抛出 MessageDeliveryException。对于 AbstractSubscribableChannel 实现,send-timeout 将被忽略。对于 group-timeout(-expression),来自计划的过期任务的 MessageDeliveryException 会导致此任务重新计划。可选。
10 允许您指定散布-收集在返回之前等待回复消息的时间。默认情况下,它会等待 30 秒。如果回复超时,则返回 'null'。可选。
11 指定散布-收集是否必须返回非空值。此值默认情况下为 true。因此,当底层聚合器在 gather-timeout 后返回空值时,将抛出 ReplyRequiredException。请注意,如果 null 是可能的,则应指定 gather-timeout 以避免无限期等待。
12 <recipient-list-router> 选项。可选。与 scatter-channel 属性互斥。
13 <aggregator> 选项。必需。

错误处理

由于散布-收集是一个多请求-回复组件,因此错误处理具有一些额外的复杂性。在某些情况下,如果 ReleaseStrategy 允许进程以少于请求的回复完成,则最好只捕获并忽略下游异常。在其他情况下,当发生错误时,应考虑类似“补偿消息”的东西从子流程返回。

每个异步子流程都应使用 errorChannel 标头配置,以便从 MessagePublishingErrorHandler 正确发送错误消息。否则,错误将使用常见的错误处理逻辑发送到全局 errorChannel。有关异步错误处理的更多信息,请参阅 错误处理

同步流程可以使用 ExpressionEvaluatingRequestHandlerAdvice 来忽略异常或返回补偿消息。当从一个子流程到 ScatterGatherHandler 抛出异常时,它只是重新抛出到上游。这样,所有其他子流程将毫无用处,它们的回复将在 ScatterGatherHandler 中被忽略。这有时可能是预期的行为,但在大多数情况下,最好在特定子流程中处理错误,而不会影响所有其他子流程和收集器中的预期。

从版本 5.1.3 开始,ScatterGatherHandler 提供了 errorChannelName 选项。它填充到散布消息的 errorChannel 标头中,并在发生异步错误时使用,或者可以在常规同步子流程中使用,用于直接发送错误消息。

下面的示例配置演示了通过返回补偿消息来处理异步错误

@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
    return f -> f
            .scatterGather(
                    scatterer -> scatterer
                            .recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
                            .recipientFlow(f2 -> f2
                                    .channel(c -> c.executor(taskExecutor))
                                    .transform(p -> {
                                        throw new RuntimeException("Sub-flow#2");
                                    })),
                    null,
                    s -> s.errorChannel("scatterGatherErrorChannel"));
}

@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
    return MessageBuilder.withPayload(payload.getCause().getCause())
            .copyHeaders(payload.getFailedMessage().getHeaders())
            .build();
}

为了生成正确的回复,我们必须从 `MessagingException` 的 `failedMessage` 中复制头信息(包括 `replyChannel` 和 `errorChannel`),该 `MessagingException` 由 `MessagePublishingErrorHandler` 发送到 `scatterGatherErrorChannel`。这样,目标异常将被返回给 `ScatterGatherHandler` 的聚合器,用于回复消息组的完成。这种异常的 `payload` 可以被 `MessageGroupProcessor` 在聚合器中过滤掉,或者在散列-聚合端点之后,以其他方式在下游处理。

在将散列结果发送到聚合器之前,`ScatterGatherHandler` 会恢复请求消息头,包括回复和错误通道(如果有)。这样,即使在散列接收者子流程中应用了异步传递,来自 `AggregatingMessageHandler` 的错误也会传播到调用者。为了成功操作,`gatherResultChannel`、`originalReplyChannel` 和 `originalErrorChannel` 头信息必须被传递回来自散列接收者子流程的回复。在这种情况下,必须为 `ScatterGatherHandler` 配置一个合理的、有限的 `gatherTimeout`。否则,它将默认情况下被阻塞,永远等待来自聚合器的回复。