聚合器
聚合器基本上是分割器的镜像,它是一种消息处理器,接收多条消息并将它们合并成一条消息。事实上,聚合器通常是包含分割器的管道中的下游消费者。
从技术上讲,聚合器比分割器更复杂,因为它是有状态的。它必须持有待聚合的消息,并确定何时完整的消息组已准备好进行聚合。为此,它需要一个 MessageStore
。
功能
聚合器通过关联和存储一组相关消息来将它们组合起来,直到该组被认为是完整的。此时,聚合器通过处理整个组来创建一条消息,并将聚合后的消息作为输出发送。
实现聚合器需要提供执行聚合的逻辑(即从多条消息创建一条消息)。两个相关的概念是关联(correlation)和释放(release)。
关联决定了消息如何分组进行聚合。在 Spring Integration 中,关联默认是基于 IntegrationMessageHeaderAccessor.CORRELATION_ID
消息头进行的。具有相同 IntegrationMessageHeaderAccessor.CORRELATION_ID
的消息会被分组在一起。但是,你可以自定义关联策略,以允许使用其他方式指定消息应如何分组。为此,你可以实现一个 CorrelationStrategy
(本章后面会介绍)。
为了确定消息组何时准备好进行处理,会咨询一个 ReleaseStrategy
。聚合器的默认释放策略是当序列中包含的所有消息都存在时释放该组,这是基于 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
头进行的。你可以通过提供自定义 ReleaseStrategy
实现的引用来覆盖此默认策略。
编程模型
聚合 API 由以下类组成
-
MessageGroupProcessor
接口及其子类:MethodInvokingAggregatingMessageGroupProcessor
和ExpressionEvaluatingMessageGroupProcessor
-
ReleaseStrategy
接口及其默认实现:SimpleSequenceSizeReleaseStrategy
-
CorrelationStrategy
接口及其默认实现:HeaderAttributeCorrelationStrategy
AggregatingMessageHandler
AggregatingMessageHandler
(AbstractCorrelatingMessageHandler
的子类)是一个 MessageHandler
实现,封装了聚合器(以及其他关联用例)的通用功能,如下所示
-
将消息关联到一个组以进行聚合
-
将这些消息保存在
MessageStore
中,直到该组可以被释放 -
决定何时可以释放该组
-
将已释放的组聚合为一条消息
-
识别并响应已过期的组
决定消息应如何分组的责任委托给 CorrelationStrategy
实例。决定消息组是否可以被释放的责任委托给 ReleaseStrategy
实例。
以下列表简要展示了基础 AbstractAggregatingMessageGroupProcessor
(实现 aggregatePayloads
方法的责任留给开发者)
public abstract class AbstractAggregatingMessageGroupProcessor
implements MessageGroupProcessor {
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
// default implementation exists
}
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
}
将 DefaultAggregatingMessageGroupProcessor
、ExpressionEvaluatingMessageGroupProcessor
和 MethodInvokingMessageGroupProcessor
视为 AbstractAggregatingMessageGroupProcessor
的开箱即用实现。
从 5.2 版本开始,AbstractAggregatingMessageGroupProcessor
可使用 Function<MessageGroup, Map<String, Object>>
策略来合并和计算(聚合)输出消息的头。DefaultAggregateHeadersFunction
实现提供了一种逻辑,该逻辑返回组中所有没有冲突的头;组内一个或多个消息中缺少的头不被视为冲突。冲突的头会被忽略。连同新引入的 DelegatingMessageGroupProcessor
,此函数用于任何任意(非 AbstractAggregatingMessageGroupProcessor
)的 MessageGroupProcessor
实现。本质上,框架会将提供的函数注入到 AbstractAggregatingMessageGroupProcessor
实例中,并将所有其他实现包装到 DelegatingMessageGroupProcessor
中。AbstractAggregatingMessageGroupProcessor
和 DelegatingMessageGroupProcessor
之间的逻辑区别在于,后者不会在调用委托策略之前提前计算头,并且如果委托返回 Message
或 AbstractIntegrationMessageBuilder
,则不会调用该函数。在这种情况下,框架假定目标实现已经负责生成填充到返回结果中的适当头集。Function<MessageGroup, Map<String, Object>>
策略在 XML 配置中可用作 headers-function
引用属性,在 Java DSL 中可用作 AggregatorSpec.headersFunction()
选项,在普通 Java 配置中可用作 AggregatorFactoryBean.setHeadersFunction()
。
CorrelationStrategy
由 AbstractCorrelatingMessageHandler
持有,并具有基于 IntegrationMessageHeaderAccessor.CORRELATION_ID
消息头的默认值,如下例所示
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
...
this.correlationStrategy = correlationStrategy == null ?
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
...
}
关于消息组的实际处理,默认实现是 DefaultAggregatingMessageGroupProcessor
。它创建一个单一的 Message
,其负载是给定组收到的负载的 List
。这对于带有分割器、发布/订阅通道或接收者列表路由器的简单分发-聚合实现非常有效。
在此类场景中使用发布/订阅通道或接收者列表路由器时,请务必启用 apply-sequence 标志。这样做会添加必要的头:CORRELATION_ID 、SEQUENCE_NUMBER 和 SEQUENCE_SIZE 。此行为在 Spring Integration 中默认对分割器启用,但不对发布/订阅通道或接收者列表路由器启用,因为这些组件可以在各种不需要这些头的上下文中使用。 |
在为应用程序实现特定的聚合器策略时,你可以扩展 AbstractAggregatingMessageGroupProcessor
并实现 aggregatePayloads
方法。然而,还有更好的解决方案,与 API 耦合度较低,用于实现聚合逻辑,这些逻辑可以通过 XML 或注解进行配置。
通常,任何 POJO 都可以实现聚合算法,只要它提供一个接受单个 java.util.List
作为参数的方法(也支持参数化列表)。此方法在聚合消息时按如下方式调用
-
如果参数是
java.util.Collection<T>
并且参数类型 T 可赋值给Message
,则累积用于聚合的整个消息列表会发送到聚合器。 -
如果参数是非参数化的
java.util.Collection
或参数类型不可赋值给Message
,则方法会接收累积消息的负载。 -
如果返回类型不可赋值给
Message
,则将其视为框架自动创建的Message
的负载。
为了代码的简洁性并推广低耦合、可测试性等最佳实践,实现聚合逻辑的首选方式是通过 POJO,并使用 XML 或注解支持在应用程序中进行配置。 |
从 5.3 版本开始,在处理消息组后,AbstractCorrelatingMessageHandler
会对具有多个嵌套级别的正确分割器-聚合器场景执行 MessageBuilder.popSequenceDetails()
消息头修改。仅当消息组释放结果不是消息集合时才会执行此操作。在这种情况下,目标 MessageGroupProcessor
负责在构建这些消息时调用 MessageBuilder.popSequenceDetails()
。
如果 MessageGroupProcessor
返回一个 Message
,则仅当 sequenceDetails
与组中的第一条消息匹配时,才会对输出消息执行 MessageBuilder.popSequenceDetails()
。(之前仅当从 MessageGroupProcessor
返回纯负载或 AbstractIntegrationMessageBuilder
时才会执行此操作。)
此功能可以通过新的 popSequence
boolean
属性控制,因此在某些关联详细信息未由标准分割器填充的场景中,可以禁用 MessageBuilder.popSequenceDetails()
。此属性本质上撤消了最近的上游 AbstractMessageSplitter
中 applySequence = true
所做的工作。更多信息请参见分割器。
SimpleMessageGroup.getMessages() 方法返回一个 unmodifiableCollection 。因此,如果聚合 POJO 方法有一个 Collection<Message> 参数,传入的参数就是那个 Collection 实例;并且当你为聚合器使用 SimpleMessageStore 时,该原始 Collection<Message> 在释放组后会被清除。因此,如果将 POJO 中的 Collection<Message> 变量传出聚合器之外,它也会被清除。如果你希望直接释放该集合以进行进一步处理,你必须构建一个新的 Collection (例如,new ArrayList<Message>(messages) )。从 4.3 版本开始,框架不再将消息复制到新的集合中,以避免不必要的额外对象创建。 |
在 4.2 版本之前,无法通过 XML 配置提供 MessageGroupProcessor
。聚合只能使用 POJO 方法。现在,如果框架检测到引用的(或内部的)bean 实现了 MessageProcessor
,则它被用作聚合器的输出处理器。
如果你希望将来自自定义 MessageGroupProcessor
的对象集合作为消息的负载释放,你的类应扩展 AbstractAggregatingMessageGroupProcessor
并实现 aggregatePayloads()
。
此外,自 4.2 版本以来,提供了 SimpleMessageGroupProcessor
。它返回组中的消息集合,如前所述,这使得释放的消息会被单独发送。
这使得聚合器可以充当消息屏障,到达的消息会被 удержи,直到释放策略触发,然后该组作为一系列独立消息被释放。
从 6.0 版本开始,上述的分割行为仅在组处理器是 SimpleMessageGroupProcessor
时才有效。否则,对于任何其他返回 Collection<Message>
的 MessageGroupProcessor
实现,只会发出一则回复消息,其负载是整个消息集合。这种逻辑是由聚合器的规范目的决定的——按某个键收集请求消息并生成一个单一的组消息。
ReleaseStrategy
ReleaseStrategy
接口定义如下
public interface ReleaseStrategy {
boolean canRelease(MessageGroup group);
}
通常,任何 POJO 都可以实现完成决策逻辑,只要它提供一个接受单个 java.util.List
作为参数的方法(也支持参数化列表)并返回一个布尔值。此方法在每条新消息到达后被调用,以决定该组是否完成,具体如下
-
如果参数是
java.util.List<T>
并且参数类型T
可赋值给Message
,则累积在组中的整个消息列表会发送到该方法。 -
如果参数是非参数化的
java.util.List
或参数类型不可赋值给Message
,则方法会接收累积消息的负载。 -
如果消息组已准备好进行聚合,方法必须返回
true
,否则返回false
。
以下示例展示了如何将 @ReleaseStrategy
注解用于类型为 Message
的 List
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<Message<?>>) {...}
}
以下示例展示了如何将 @ReleaseStrategy
注解用于类型为 String
的 List
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<String>) {...}
}
基于前两个示例中的签名,基于 POJO 的释放策略会接收一个未释放消息的 Collection
(如果你需要访问整个 Message
),或者一个负载对象的 Collection
(如果类型参数不是 Message
)。这满足了大多数用例。但是,如果出于某种原因你需要访问完整的 MessageGroup
,你应该提供一个 ReleaseStrategy
接口的实现。
处理潜在的大组时,你应该理解这些方法是如何调用的,因为在组被释放之前,释放策略可能会被多次调用。最高效的是 由于这些原因,对于大组,我们建议你实现 |
当组被释放进行聚合时,其所有未释放的消息都会被处理并从组中移除。如果组也已完成(即序列中的所有消息已到达,或者没有定义序列),则该组被标记为完成。此组的任何新消息都会发送到丢弃通道(如果已定义)。将 expire-groups-upon-completion
设置为 true
(默认为 false
)会移除整个组,并且任何新消息(与已移除组具有相同关联 ID 的)会形成一个新组。你可以通过使用 MessageGroupStoreReaper
并将 send-partial-result-on-expiry
设置为 true
来释放部分序列。
为了方便丢弃延迟到达的消息,聚合器在组被释放后必须维护关于该组的状态。这最终可能导致内存溢出。为避免此类情况,你应该考虑配置一个 MessageGroupStoreReaper 来移除组的元数据。过期参数应设置为在某个点之后(此后不期望有延迟消息到达)使组过期。有关配置 reaper 的信息,请参见在聚合器中管理状态:MessageGroupStore 。 |
Spring Integration 为 ReleaseStrategy
提供了一个实现:SimpleSequenceSizeReleaseStrategy
。此实现会查阅每条到达消息的 SEQUENCE_NUMBER
和 SEQUENCE_SIZE
头,以决定消息组何时完成并准备好进行聚合。如前所示,它也是默认策略。
在 5.0 版本之前,默认的释放策略是 SequenceSizeReleaseStrategy ,它在大组处理方面表现不佳。使用该策略时,会检测并拒绝重复的序列号。此操作可能开销很大。 |
如果你正在聚合大组,不需要释放部分组,并且不需要检测/拒绝重复序列,可以考虑改用 SimpleSequenceSizeReleaseStrategy
——它对于这些用例效率更高,并且自 5.0 版本以来在未指定部分组释放时是默认策略。
聚合大组
4.3 版本将 SimpleMessageGroup
中消息的默认 Collection
更改为 HashSet
(之前是 BlockingQueue
)。从大组中移除单条消息时,这曾开销很大(需要 O(n) 线性扫描)。虽然哈希集合通常移除速度更快,但对于大消息来说可能开销很大,因为在插入和移除时都需要计算哈希。如果你的消息哈希开销很大,考虑使用其他集合类型。如使用 MessageGroupFactory
中所讨论的,提供了 SimpleMessageGroupFactory
,以便你可以选择最适合你需求的 Collection
。你也可以提供自己的工厂实现来创建其他 Collection<Message<?>>
。
以下示例展示了如何使用先前的实现和 SimpleSequenceSizeReleaseStrategy
配置聚合器
<int:aggregator input-channel="aggregate"
output-channel="out" message-store="store" release-strategy="releaser" />
<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
<property name="messageGroupFactory">
<bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
<constructor-arg value="BLOCKING_QUEUE"/>
</bean>
</property>
</bean>
<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果过滤器端点参与了聚合器上游的流,序列大小释放策略(固定的或基于 sequenceSize 头)将无法实现其目的,因为序列中的某些消息可能被过滤器丢弃。在这种情况下,建议选择另一个 ReleaseStrategy ,或者使用从丢弃子流发送的补偿消息,这些消息的内容中包含一些信息,以便在自定义的完整组函数中跳过。更多信息请参见过滤器。 |
关联策略
CorrelationStrategy
接口定义如下
public interface CorrelationStrategy {
Object getCorrelationKey(Message<?> message);
}
该方法返回一个 Object
,它表示用于将消息与消息组关联的关联键。该键必须满足在 Map
中用作键的标准,即关于 equals()
和 hashCode()
的实现。
通常,任何 POJO 都可以实现关联逻辑,并且将消息映射到方法参数(或多个参数)的规则与 ServiceActivator
的规则相同(包括支持 @Header
注解)。该方法必须返回一个值,并且该值不能为空(null
)。
Spring Integration 提供了 CorrelationStrategy
的一个实现:HeaderAttributeCorrelationStrategy
。此实现返回消息头中的一个值(该值的名称由构造函数参数指定)作为关联键(correlation key)。默认情况下,关联策略是 HeaderAttributeCorrelationStrategy
,它返回 CORRELATION_ID
消息头属性的值。如果您想使用自定义的消息头名称进行关联,可以在 HeaderAttributeCorrelationStrategy
的实例上进行配置,并将其作为聚合器关联策略的引用提供。
锁注册中心(Lock Registry)
组的更改是线程安全的。因此,当您并发发送具有相同关联 ID 的消息时,聚合器只会处理其中一条,使其有效地成为每个消息组单线程。聚合器使用 LockRegistry
来获取解析后的关联 ID 的锁。默认使用 DefaultLockRegistry
(内存中)。为了在使用共享 MessageGroupStore
的服务器之间同步更新,您必须配置一个共享的锁注册中心。
避免死锁
如上所述,当消息组被修改(添加或释放消息)时,会持有锁。
考虑以下流程
...->aggregator1-> ... ->aggregator2-> ...
如果存在多个线程,并且这些聚合器共享一个共同的锁注册中心,则可能发生死锁。这将导致线程挂起,并且 jstack <pid>
可能会显示类似如下的结果
Found one Java-level deadlock:
=============================
"t2":
waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t1"
"t1":
waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t2"
有几种方法可以避免此问题
-
确保每个聚合器都有自己的锁注册中心(这可以在应用程序实例之间共享,但流程中的两个或多个聚合器必须各自拥有一个不同的注册中心)
-
使用
ExecutorChannel
或QueueChannel
作为聚合器的输出通道,以便下游流程在新线程上运行 -
从 5.1.1 版本开始,将
releaseLockBeforeSend
聚合器属性设置为true
如果由于某种原因,单个聚合器的输出最终又路由回同一聚合器,也可能导致此问题。当然,上述第一种解决方案在这种情况下不适用。 |
在 Java DSL 中配置聚合器
有关如何在 Java DSL 中配置聚合器,请参阅 聚合器和重排序器(Aggregators and Resequencers)。
使用 XML 配置聚合器
Spring Integration 通过 <aggregator/>
元素支持使用 XML 配置聚合器。以下示例展示了一个聚合器示例
<channel id="inputChannel"/>
<int:aggregator id="myAggregator" (1)
auto-startup="true" (2)
input-channel="inputChannel" (3)
output-channel="outputChannel" (4)
discard-channel="throwAwayChannel" (5)
message-store="persistentMessageStore" (6)
order="1" (7)
send-partial-result-on-expiry="false" (8)
send-timeout="1000" (9)
correlation-strategy="correlationStrategyBean" (10)
correlation-strategy-method="correlate" (11)
correlation-strategy-expression="headers['foo']" (12)
ref="aggregatorBean" (13)
method="aggregate" (14)
release-strategy="releaseStrategyBean" (15)
release-strategy-method="release" (16)
release-strategy-expression="size() == 5" (17)
expire-groups-upon-completion="false" (18)
empty-group-min-timeout="60000" (19)
lock-registry="lockRegistry" (20)
group-timeout="60000" (21)
group-timeout-expression="size() ge 2 ? 100 : -1" (22)
expire-groups-upon-timeout="true" (23)
scheduler="taskScheduler" > (24)
<expire-transactional/> (25)
<expire-advice-chain/> (26)
</aggregator>
<int:channel id="outputChannel"/>
<int:channel id="throwAwayChannel"/>
<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
<constructor-arg ref="dataSource"/>
</bean>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
1 | 聚合器的 ID 是可选的。 |
2 | Lifecycle 属性,指示聚合器是否应在应用程序上下文启动期间启动。可选(默认值为 'true')。 |
3 | 聚合器接收消息的通道。必需。 |
4 | 聚合器发送聚合结果的通道。可选(因为接收到的消息本身可以在 'replyChannel' 消息头中指定回复通道)。 |
5 | 聚合器发送超时消息的通道(如果 send-partial-result-on-expiry 为 false )。可选。 |
6 | 引用一个 MessageGroupStore ,用于在其关联键下存储消息组,直到它们完成为止。可选。默认情况下,它是一个易失性内存存储。有关更多信息,请参阅 消息存储(Message Store)。 |
7 | 当多个处理程序订阅到同一个 DirectChannel 时,此聚合器的顺序(用于负载均衡目的)。可选。 |
8 | 表示过期消息应在包含它们的 MessageGroup 过期后进行聚合并发送到 'output-channel' 或 'replyChannel'(参见 MessageGroupStore.expireMessageGroups(long) )。一种使 MessageGroup 过期的方法是配置 MessageGroupStoreReaper 。但是,您也可以通过调用 MessageGroupStore.expireMessageGroups(timeout) 来使 MessageGroup 过期。您可以通过控制总线(Control Bus)操作实现此目的,或者,如果您有 MessageGroupStore 实例的引用,则可以通过调用 expireMessageGroups(timeout) 来实现。否则,仅此属性本身不起作用。它仅作为一种指示,用于指示是否丢弃或发送到输出或回复通道任何仍在即将过期的 MessageGroup 中的消息。可选(默认值为 false )。注意:此属性更准确的名称可能是 send-partial-result-on-timeout ,因为如果 expire-groups-upon-timeout 设置为 false ,该组可能实际上不会过期。 |
9 | 将回复 Message 发送到 output-channel 或 discard-channel 时等待的超时间隔。默认为 30 秒。仅当输出通道存在某些“发送”限制(例如具有固定“容量”的 QueueChannel )时才应用。在这种情况下,将抛出 MessageDeliveryException 。对于 AbstractSubscribableChannel 实现,将忽略 send-timeout 。对于 group-timeout(-expression) ,来自计划过期任务的 MessageDeliveryException 会导致该任务重新计划。可选。 |
10 | 引用实现消息关联(分组)算法的 bean。该 bean 可以是 CorrelationStrategy 接口的实现或 POJO。在后一种情况下,必须同时定义 correlation-strategy-method 属性。可选(默认情况下,聚合器使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 消息头)。 |
11 | 在由 correlation-strategy 引用的 bean 上定义的方法。它实现了关联决策算法。可选,但有限制(必须存在 correlation-strategy )。 |
12 | 表示关联策略的 SpEL 表达式。示例:"headers['something']" 。只允许使用 correlation-strategy 或 correlation-strategy-expression 中的一个。 |
13 | 引用应用程序上下文中定义的 bean。该 bean 必须实现聚合逻辑,如前所述。可选(默认情况下,聚合消息列表成为输出消息的有效载荷)。 |
14 | 在由 ref 属性引用的 bean 上定义的方法。它实现了消息聚合算法。可选(取决于 ref 属性是否已定义)。 |
15 | 引用实现释放策略的 bean。该 bean 可以是 ReleaseStrategy 接口的实现或 POJO。在后一种情况下,必须同时定义 release-strategy-method 属性。可选(默认情况下,聚合器使用 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 消息头属性)。 |
16 | 在由 release-strategy 属性引用的 bean 上定义的方法。它实现了完成决策算法。可选,但有限制(必须存在 release-strategy )。 |
17 | 表示释放策略的 SpEL 表达式。表达式的根对象是 MessageGroup 。示例:"size() == 5" 。只允许使用 release-strategy 或 release-strategy-expression 中的一个。 |
18 | 当设置为 true 时(默认为 false ),已完成的组将从消息存储中移除,从而使具有相同关联的后续消息形成一个新组。默认行为是将与已完成组具有相同关联的消息发送到 discard-channel 。 |
19 | 仅在为 <aggregator> 的 MessageStore 配置了 MessageGroupStoreReaper 时才适用。默认情况下,当配置 MessageGroupStoreReaper 来使部分组过期时,空组也会被移除。组正常释放后会存在空组。空组可以检测并丢弃延迟到达的消息。如果您希望以比部分组过期更长的时间间隔使空组过期,请设置此属性。然后,空组在至少在此毫秒数内未被修改之前不会从 MessageStore 中移除。请注意,空组的实际过期时间也受收割机(reaper)的 timeout 属性影响,它可能与此值加上超时一样长。 |
20 | 引用一个 org.springframework.integration.util.LockRegistry bean。它用于根据 groupId 获取一个 Lock ,以便对 MessageGroup 进行并发操作。默认情况下,使用内部 DefaultLockRegistry 。使用分布式 LockRegistry (如 ZookeeperLockRegistry )可确保聚合器只有一个实例可以并发操作组。有关更多信息,请参阅 Redis Lock Registry 或 Zookeeper Lock Registry。 |
21 | 一个超时值(毫秒),用于在当前消息到达时 ReleaseStrategy 未释放组的情况下强制 MessageGroup 完成。此属性为聚合器提供了一个内置的基于时间的释放策略,当需要发射部分结果(或丢弃组)时,如果新消息在超时时间内(从最后一条消息到达的时间开始计算)未到达 MessageGroup 。要设置从创建 MessageGroup 的时间开始计算的超时,请参阅 group-timeout-expression 信息。当新消息到达聚合器时,任何现有的针对其 MessageGroup 的 ScheduledFuture<?> 都将被取消。如果 ReleaseStrategy 返回 false (表示不释放)且 groupTimeout > 0 ,则会计划一个新的任务来使组过期。我们不建议将此属性设置为零(或负值)。这样做会有效地禁用聚合器,因为每个消息组都会立即完成。但是,您可以使用表达式有条件地将其设置为零(或负值)。有关信息,请参阅 group-timeout-expression 。完成期间采取的操作取决于 ReleaseStrategy 和 send-partial-result-on-expiry 属性。有关更多信息,请参阅 Aggregator 和 Group Timeout。它与 'group-timeout-expression' 属性互斥。 |
22 | 评估为 groupTimeout 的 SpEL 表达式,其中 MessageGroup 作为 #root 评估上下文对象。用于计划强制完成 MessageGroup 。如果表达式评估为 null ,则不计划完成。如果评估为零,则组会立即在当前线程上完成。实际上,这提供了一个动态的 group-timeout 属性。例如,如果您希望在组创建时间过去 10 秒后强制完成一个 MessageGroup ,您可以考虑使用以下 SpEL 表达式:timestamp + 10000 - T(System).currentTimeMillis() ,其中 timestamp 由 MessageGroup.getTimestamp() 提供,因为这里的 MessageGroup 是 #root 评估上下文对象。但请记住,组创建时间可能与第一条到达消息的时间不同,具体取决于其他组过期属性的配置。有关更多信息,请参阅 group-timeout 。与 'group-timeout' 属性互斥。 |
23 | 当组由于超时(或 MessageGroupStoreReaper )而完成时,默认情况下组会过期(完全移除)。延迟到达的消息会启动一个新组。您可以将此设置为 false 以完成组但保留其元数据,以便丢弃延迟到达的消息。空组以后可以使用 MessageGroupStoreReaper 与 empty-group-min-timeout 属性一起移除。默认为 'true'。 |
24 | 一个 TaskScheduler bean 引用,用于计划 MessageGroup ,以便如果在 groupTimeout 内没有新消息到达 MessageGroup ,则强制完成。如果未提供,则使用在 ApplicationContext 中注册的默认调度器(taskScheduler )(ThreadPoolTaskScheduler )。如果未指定 group-timeout 或 group-timeout-expression ,则此属性不适用。 |
25 | 自 4.1 版本起。它允许为 forceComplete 操作启动事务。它由 group-timeout(-expression) 或 MessageGroupStoreReaper 启动,不适用于正常的 add 、release 和 discard 操作。只允许使用此子元素或 <expire-advice-chain/> 。 |
26 | 自 4.1 版本起。它允许为 forceComplete 操作配置任何 Advice 。它由 group-timeout(-expression) 或 MessageGroupStoreReaper 启动,不适用于正常的 add 、release 和 discard 操作。只允许使用此子元素或 <expire-transactional/> 。也可以在此处使用 Spring tx 命名空间配置事务 Advice 。 |
组的过期(Expiring Groups)
有两个属性与组的过期(完全移除)相关。当组过期时,没有它的记录,如果新消息带有相同的关联 ID 到达,则会启动一个新的组。当组完成(未过期)时,空组仍然存在,延迟到达的消息将被丢弃。空组以后可以使用
如果组未正常完成,而是由于超时而被释放或丢弃,则通常组会过期。自 4.1 版本起,您可以使用
自 5.0 版本起,空组也会在 从 5.4 版本开始,可以配置聚合器(和重排序器)以使孤立组过期(持久消息存储中可能无法以其他方式释放的组)。 |
如果自定义聚合器处理程序实现在其他 <aggregator>
定义中被引用,我们通常建议使用 ref
属性。但是,如果自定义聚合器实现仅被单个 <aggregator>
定义使用,您可以使用内部 bean 定义(从 1.0.3 版本开始)在 <aggregator/>
元素内配置聚合 POJO,如下例所示
<aggregator input-channel="input" method="sum" output-channel="output">
<beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
在同一 <aggregator> 配置中同时使用 ref 属性和内部 bean 定义是不允许的,因为它会创建歧义条件。在这种情况下,会抛出 Exception。 |
以下示例显示了聚合器 bean 的实现
public class PojoAggregator {
public Long add(List<Long> results) {
long total = 0l;
for (long partialResult: results) {
total += partialResult;
}
return total;
}
}
前面示例的完成策略 bean 实现可能如下所示
public class PojoReleaseStrategy {
...
public boolean canRelease(List<Long> numbers) {
int sum = 0;
for (long number: numbers) {
sum += number;
}
return sum >= maxValue;
}
}
在有意义的地方,释放策略方法和聚合器方法可以合并到单个 bean 中。 |
上述示例的关联策略 bean 实现可能如下所示
public class PojoCorrelationStrategy {
...
public Long groupNumbersByLastDigit(Long number) {
return number % 10;
}
}
前面示例中的聚合器将按某个标准(在本例中是除以十后的余数)对数字进行分组,并保持该组,直到有效载荷提供的数字总和超过某个值。
在有意义的地方,释放策略方法、关联策略方法和聚合器方法可以合并到单个 bean 中。(实际上,它们中的所有方法或任意两个方法都可以合并。) |
聚合器和 Spring Expression Language (SpEL)
自 Spring Integration 2.0 起,您可以使用 SpEL 处理各种策略(关联、释放和聚合)。如果发布策略背后的逻辑相对简单,我们建议使用 SpEL。假设您有一个遗留组件,设计用于接收对象数组。我们知道默认的发布策略将所有聚合消息组装到 List
中。现在我们有两个问题。首先,我们需要从列表中提取单个消息。其次,我们需要提取每条消息的有效载荷并组装对象数组。以下示例解决了这两个问题
public String[] processRelease(List<Message<String>> messages){
List<String> stringList = new ArrayList<String>();
for (Message<String> message : messages) {
stringList.add(message.getPayload());
}
return stringList.toArray(new String[]{});
}
然而,使用 SpEL,这样的要求实际上可以通过一行表达式相对容易地处理,从而无需编写自定义类并将其配置为 bean。以下示例展示了如何实现这一点
<int:aggregator input-channel="aggChannel"
output-channel="replyChannel"
expression="#this.![payload].toArray()"/>
在上述配置中,我们使用 集合投影(collection projection) 表达式从列表中所有消息的有效载荷组装一个新集合,然后将其转换为数组,从而达到与早期 Java 代码相同的结果。
处理自定义释放和关联策略时,您可以应用相同的基于表达式的方法。
您无需在 correlation-strategy
属性中定义自定义 CorrelationStrategy
的 bean,而是可以将简单的关联逻辑实现为 SpEL 表达式,并将其配置在 correlation-strategy-expression
属性中,如下例所示
correlation-strategy-expression="payload.person.id"
在上述示例中,我们假设有效载荷具有一个带有 id
的 person
属性,该属性将用于关联消息。
同样,对于 ReleaseStrategy
,您可以将释放逻辑实现为 SpEL 表达式,并将其配置在 release-strategy-expression
属性中。评估上下文的根对象是 MessageGroup
本身。可以使用表达式中的组的 message
属性引用消息列表。
在 5.0 版本之前的版本中,根对象是 Message<?> 的集合,如前面示例所示 |
release-strategy-expression="!messages.?[payload==5].empty"
在上述示例中,SpEL 评估上下文的根对象是 MessageGroup
本身,您正在声明,只要该组中存在有效载荷为 5
的消息,该组就应该被释放。
Aggregator 和 Group Timeout
自 4.0 版本起,引入了两个新的互斥属性:group-timeout
和 group-timeout-expression
。参见 使用 XML 配置聚合器。在某些情况下,如果在当前消息到达时 ReleaseStrategy
未释放聚合器结果(或丢弃组),您可能需要在超时后发出聚合器结果。为此,groupTimeout
选项允许计划强制完成 MessageGroup
,如下例所示
<aggregator input-channel="input" output-channel="output"
send-partial-result-on-expiry="true"
group-timeout-expression="size() ge 2 ? 10000 : -1"
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>
在此示例中,如果聚合器接收到由 release-strategy-expression
定义的序列中的最后一条消息,则可以进行正常释放。如果该特定消息未到达,只要组包含至少两条消息,groupTimeout
会在十秒后强制组完成。
强制组完成的结果取决于 ReleaseStrategy
和 send-partial-result-on-expiry
。首先,再次咨询释放策略,以查看是否应进行正常释放。虽然组未更改,但 ReleaseStrategy
此时可以决定释放组。如果释放策略在超时期间仍然不释放组,则组会过期。如果 send-partial-result-on-expiry
为 true
,则(部分)MessageGroup
中存在的现有消息作为正常聚合器回复消息释放到 output-channel
。否则,它将被丢弃。
groupTimeout
行为与 MessageGroupStoreReaper
存在差异(参见 使用 XML 配置聚合器)。收割机定期启动对 MessageGroupStore
中所有 MessageGroup
的强制完成。groupTimeout
会针对每个 MessageGroup
单独执行此操作,前提是在 groupTimeout
期间没有新消息到达。此外,收割机可用于移除空组(如果 expire-groups-upon-completion
为 false,则保留空组以便丢弃延迟消息)。
自 5.5 版本起,groupTimeoutExpression
可以评估为 java.util.Date
实例。这在某些情况下可能很有用,例如根据组创建时间(MessageGroup.getTimestamp()
)确定计划任务时刻,而不是根据当前消息到达时间计算(当 groupTimeoutExpression
评估为 long
时)。
group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
使用注解配置聚合器
以下示例显示了使用注解配置的聚合器
public class Waiter {
...
@Aggregator (1)
public Delivery aggregatingMethod(List<OrderItem> items) {
...
}
@ReleaseStrategy (2)
public boolean releaseChecker(List<Message<?>> messages) {
...
}
@CorrelationStrategy (3)
public String correlateBy(OrderItem item) {
...
}
}
1 | 一个注解,指示此方法应作为聚合器使用。如果此类用作聚合器,则必须指定此注解。 |
2 | 一个注解,指示此方法用作聚合器的释放策略。如果在任何方法上不存在此注解,则聚合器使用 SimpleSequenceSizeReleaseStrategy 。 |
3 | 一个注解,指示此方法应作为聚合器的关联策略使用。如果未指定关联策略,则聚合器使用基于 CORRELATION_ID 的 HeaderAttributeCorrelationStrategy 。 |
XML 元素提供的所有配置选项也适用于 @Aggregator
注解。
聚合器可以通过 XML 显式引用,或者,如果在类上定义了 @MessageEndpoint
,则可以通过类路径扫描自动检测。
Aggregator 组件的注解配置(@Aggregator
等)仅涵盖简单用例,其中大多数默认选项已足够。如果在使用注解配置时需要更多地控制这些选项,请考虑为 AggregatingMessageHandler
使用 @Bean
定义,并使用 @ServiceActivator
标记其 @Bean
方法,如下例所示
@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageGroupStore);
aggregator.setOutputChannel(resultsChannel());
aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
aggregator.setTaskScheduler(this.taskScheduler);
return aggregator;
}
自 4.2 版本起,AggregatorFactoryBean 可用于简化 AggregatingMessageHandler 的 Java 配置。 |
在聚合器中管理状态:MessageGroupStore
聚合器(以及 Spring Integration 中的一些其他模式)是一种有状态模式,需要基于一段时间内到达的具有相同关联键的一组消息做出决策。有状态模式(如 ReleaseStrategy
)中接口的设计遵循一个原则:组件(无论是框架定义的还是用户定义的)应能保持无状态。所有状态都由 MessageGroup
携带,其管理委托给 MessageGroupStore
。MessageGroupStore
接口定义如下
public interface MessageGroupStore {
int getMessageCountForAllMessageGroups();
int getMarkedMessageCountForAllMessageGroups();
int getMessageGroupCount();
MessageGroup getMessageGroup(Object groupId);
MessageGroup addMessageToGroup(Object groupId, Message<?> message);
MessageGroup markMessageGroup(MessageGroup group);
MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);
MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);
void removeMessageGroup(Object groupId);
void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
int expireMessageGroups(long timeout);
}
有关更多信息,请参阅 Javadoc。
MessageGroupStore
在等待释放策略触发时在 MessageGroups
中累积状态信息,而此事件可能永远不会发生。因此,为了防止陈旧消息滞留,并为易失性存储提供在应用程序关闭时进行清理的钩子,MessageGroupStore
允许您注册回调,以便在 MessageGroups
过期时应用于它们。接口非常直接,如下所示
public interface MessageGroupCallback {
void execute(MessageGroupStore messageGroupStore, MessageGroup group);
}
回调可以直接访问存储和消息组,以便管理持久状态(例如,从存储中完全移除组)。
MessageGroupStore
维护这些回调的列表,并根据需要将其应用于时间戳早于作为参数提供的时间的所有消息(参见前面描述的 registerMessageGroupExpiryCallback(..)
和 expireMessageGroups(..)
方法)。
重要的是,在依赖 expireMessageGroups 功能时,不要在不同的聚合器组件中使用同一个 MessageGroupStore 实例。每个 AbstractCorrelatingMessageHandler 都会基于 forceComplete() 回调注册自己的 MessageGroupCallback 。这样,每个待过期的组都可能被错误的聚合器完成或丢弃。自 5.0.10 版本起,AbstractCorrelatingMessageHandler 在 MessageGroupStore 中注册回调时使用 UniqueExpiryCallback 。MessageGroupStore 反过来检查此类的实例是否存在,如果回调集中已存在一个实例,则会记录带有适当消息的错误。这样,框架禁止在不同的聚合器/重排序器中使用同一个 MessageGroupStore 实例,以避免上述副作用,即由特定关联处理程序未创建的组被过期。 |
您可以调用带有超时值的 expireMessageGroups
方法。任何早于当前时间减去此值的消息都会过期并应用回调。因此,由存储的用户定义消息组“过期”的含义。
作为用户的便利,Spring Integration 提供了消息过期功能的包装器,形式为 MessageGroupStoreReaper
,如下例所示
<bean id="reaper" class="org...MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
收割机是一个 Runnable
。在上述示例中,消息组存储的过期方法每十秒钟调用一次。超时本身是 30 秒。
理解 MessageGroupStoreReaper 的 'timeout' 属性是一个近似值,并且受任务调度器的速率影响非常重要,因为此属性仅在 MessageGroupStoreReaper 任务的下一次计划执行时检查。例如,如果超时设置为十分钟,但 MessageGroupStoreReaper 任务计划每小时运行一次,并且 MessageGroupStoreReaper 任务的最后一次执行发生在超时前一分钟,则 MessageGroup 在接下来的 59 分钟内都不会过期。因此,我们建议将速率设置为至少等于超时值或更短。 |
除了收割机之外,在应用程序通过 AbstractCorrelatingMessageHandler
中的生命周期回调关闭时,也会调用过期回调。
AbstractCorrelatingMessageHandler
注册了自己的过期回调,这与聚合器 XML 配置中的布尔标志 send-partial-result-on-expiry
相关联。如果标志设置为 true
,则当调用过期回调时,未释放组中任何未标记的消息可以发送到输出通道。
由于 MessageGroupStoreReaper 是从计划任务中调用的,并可能导致生成消息(取决于 sendPartialResultOnExpiry 选项)到下游集成流,因此建议提供一个带有 MessagePublishingErrorHandler 的自定义 TaskScheduler ,以通过 errorChannel 处理异常,这可能是常规聚合器释放功能所期望的。同样的逻辑也适用于组超时功能,它也依赖于 TaskScheduler 。有关更多信息,请参阅 错误处理(Error Handling)。 |
当共享 一些 有关 |
Flux 聚合器
在 5.2 版本中,引入了 FluxAggregatorMessageHandler
组件。它基于 Project Reactor 的 Flux.groupBy()
和 Flux.window()
操作符。接收到的消息被发送到由该组件构造函数中的 Flux.create()
启动的 FluxSink
。如果未提供 outputChannel
或它不是 ReactiveStreamsSubscribableChannel
的实例,则对主 Flux
的订阅是在 Lifecycle.start()
实现中完成的。否则,它会推迟到 ReactiveStreamsSubscribableChannel
实现完成订阅。消息通过 Flux.groupBy()
使用 CorrelationStrategy
作为组键进行分组。默认情况下,会查询消息的 IntegrationMessageHeaderAccessor.CORRELATION_ID
消息头。
默认情况下,每个关闭的窗口都作为消息有效载荷中的 Flux
释放以产生。此消息包含窗口中第一条消息的所有消息头。输出消息有效载荷中的此 Flux
必须在下游订阅和处理。这种逻辑可以通过 FluxAggregatorMessageHandler
的 setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)
配置选项进行定制(或取代)。例如,如果我们希望最终消息中包含有效载荷的 List
,我们可以配置 Flux.collectList()
,如下所示
fluxAggregatorMessageHandler.setCombineFunction(
(messageFlux) ->
messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new));
FluxAggregatorMessageHandler
中有几个选项可用于选择适当的窗口策略
-
setBoundaryTrigger(Predicate<Message<?>>)
- 传播到Flux.windowUntil()
操作符。有关更多信息,请参阅其 JavaDocs。此选项优先于所有其他窗口选项。 -
setWindowSize(int)
和setWindowSizeFunction(Function<Message<?>, Integer>)
- 传播到Flux.window(int)
或windowTimeout(int, Duration)
。默认情况下,窗口大小是根据组中第一条消息及其IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
消息头计算得出的。 -
setWindowTimespan(Duration)
- 传播到Flux.window(Duration)
或windowTimeout(int, Duration)
,具体取决于窗口大小配置。 -
setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)
- 一个函数,用于将转换应用于分组的 Flux,以实现任何未通过暴露选项覆盖的自定义窗口操作。
由于此组件是一个 MessageHandler
实现,它可以简单地作为 `@Bean` 定义与 `@ServiceActivator` 消息注解一起使用。使用 Java DSL 时,可以在 .handle()
EIP 方法中使用它。下面的示例演示了我们如何在运行时注册 IntegrationFlow
以及如何将 FluxAggregatorMessageHandler
与上游的 splitter 进行关联。
IntegrationFlow fluxFlow =
(flow) -> flow
.split()
.channel(MessageChannels.flux())
.handle(new FluxAggregatorMessageHandler());
IntegrationFlowContext.IntegrationFlowRegistration registration =
this.integrationFlowContext.registration(fluxFlow)
.register();
Flux<Message<?>> window =
registration.getMessagingTemplate()
.convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);