聚合器和重排序器

聚合器(Aggregator)在概念上与拆分器(Splitter)相反。它将一系列独立消息聚合成一个单一消息,因此必然更为复杂。默认情况下,聚合器返回一个包含来自入站消息的多个负载集合的消息。重排序器(Resequencer)也适用同样的规则。以下示例展示了拆分器-聚合器模式的典型用法

@Bean
public IntegrationFlow splitAggregateFlow() {
    return IntegrationFlow.from("splitAggregateInput")
            .split()
            .channel(MessageChannels.executor(this.taskExecutor()))
            .resequence()
            .aggregate()
            .get();
}

split() 方法将列表拆分成独立消息并发送到 ExecutorChannelresequence() 方法根据消息头中的序列详情重新排序消息。aggregate() 方法收集这些消息。

但是,您可以通过指定释放策略(release strategy)和关联策略(correlation strategy)等来改变默认行为。考虑以下示例

.aggregate(a ->
        a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
            .releaseStrategy(g -> g.size() > 10)
            .messageStore(messageStore()))

上述示例将具有 myCorrelationKey 消息头的消息关联起来,并在至少累积十条消息后释放它们。

resequence() EIP 方法也提供了类似的 lambda 配置。