聚合器和重新排序器
从概念上讲,Aggregator
与 Splitter
相反。它将一系列单独的消息聚合到单个消息中,并且必然更加复杂。默认情况下,聚合器返回一条消息,其中包含来自传入消息的有效负载集合。Resequencer
也应用相同的规则。以下示例显示了分隔符-聚合器模式的规范示例
@Bean
public IntegrationFlow splitAggregateFlow() {
return IntegrationFlow.from("splitAggregateInput")
.split()
.channel(MessageChannels.executor(this.taskExecutor()))
.resequence()
.aggregate()
.get();
}
split()
方法将列表拆分为各个消息并将其发送到 ExecutorChannel
。resequence()
方法按消息头中找到的序列详细信息重新排序消息。aggregate()
方法收集这些消息。
但是,您可以通过指定释放策略和关联策略等来更改默认行为。考虑以下示例
.aggregate(a ->
a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
.releaseStrategy(g -> g.size() > 10)
.messageStore(messageStore()))
前面的示例关联具有 myCorrelationKey
头的消息,并在累积至少十条消息后释放这些消息。
为 resequence()
EIP 方法提供了类似的 lambda 配置。