聚合器
聚合器与拆分器基本是镜像,它是一种消息处理器,接收多个消息并将它们组合成一个消息。实际上,聚合器通常是包含拆分器的管道中的下游消费者。
从技术上讲,聚合器比拆分器更复杂,因为它是有状态的。它必须保存要聚合的消息,并确定何时完成的消息组可以进行聚合。为此,它需要一个 MessageStore。
功能
聚合器通过关联和存储相关消息组,直到该组被认为是完整的。此时,聚合器通过处理整个组来创建一个单一消息,并将聚合后的消息作为输出发送。
实现聚合器需要提供执行聚合的逻辑(即,从多个消息创建单个消息)。两个相关概念是关联和释放。
关联确定消息如何分组进行聚合。在 Spring Integration 中,关联默认基于 IntegrationMessageHeaderAccessor.CORRELATION_ID 消息头。具有相同 IntegrationMessageHeaderAccessor.CORRELATION_ID 的消息被分组在一起。但是,您可以自定义关联策略,以允许其他方式指定消息应如何分组。为此,您可以实现 CorrelationStrategy(本章稍后介绍)。
为了确定消息组何时准备好进行处理,会咨询 ReleaseStrategy。聚合器的默认释放策略是当序列中包含的所有消息都存在时释放一个组,基于 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 头。您可以通过提供对自定义 ReleaseStrategy 实现的引用来覆盖此默认策略。
编程模型
聚合 API 由多个类组成
-
接口
MessageGroupProcessor及其子类:MethodInvokingAggregatingMessageGroupProcessor和ExpressionEvaluatingMessageGroupProcessor -
ReleaseStrategy接口及其默认实现:SimpleSequenceSizeReleaseStrategy -
CorrelationStrategy接口及其默认实现:HeaderAttributeCorrelationStrategy
AggregatingMessageHandler
AggregatingMessageHandler (AbstractCorrelatingMessageHandler 的子类) 是 MessageHandler 实现,封装了聚合器(以及其他关联用例)的常见功能,如下所示
-
将消息关联到要聚合的组中
-
在
MessageStore中维护这些消息,直到组可以被释放 -
决定何时可以释放组
-
将已释放的组聚合为单个消息
-
识别并响应已过期的组
决定消息应如何分组的责任委托给 CorrelationStrategy 实例。决定消息组是否可以释放的责任委托给 ReleaseStrategy 实例。
以下清单显示了基类 AbstractAggregatingMessageGroupProcessor 的简要亮点(实现 aggregatePayloads 方法的责任留给开发人员)
public abstract class AbstractAggregatingMessageGroupProcessor
implements MessageGroupProcessor {
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
// default implementation exists
}
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
}
请参阅 DefaultAggregatingMessageGroupProcessor、ExpressionEvaluatingMessageGroupProcessor 和 MethodInvokingMessageGroupProcessor 作为 AbstractAggregatingMessageGroupProcessor 的开箱即用实现。
从 5.2 版本开始,AbstractAggregatingMessageGroupProcessor 提供了一个 Function<MessageGroup, Map<String, Object>> 策略,用于合并和计算输出消息的头部。DefaultAggregateHeadersFunction 实现提供了逻辑,该逻辑返回组中没有冲突的所有头部;组中一个或多个消息中缺失的头部不被视为冲突。冲突的头部将被忽略。与新引入的 DelegatingMessageGroupProcessor 一起,此函数用于任何任意(非 AbstractAggregatingMessageGroupProcessor)MessageGroupProcessor 实现。本质上,框架将提供的函数注入到 AbstractAggregatingMessageGroupProcessor 实例中,并将所有其他实现包装到 DelegatingMessageGroupProcessor 中。AbstractAggregatingMessageGroupProcessor 和 DelegatingMessageGroupProcessor 之间的逻辑差异在于,后者在调用委托策略之前不预先计算头部,并且如果委托返回 Message 或 AbstractIntegrationMessageBuilder,则不调用该函数。在这种情况下,框架假定目标实现已负责生成一组正确的头部并填充到返回结果中。Function<MessageGroup, Map<String, Object>> 策略作为 XML 配置的 headers-function 引用属性、Java DSL 的 AggregatorSpec.headersFunction() 选项以及纯 Java 配置的 AggregatorFactoryBean.setHeadersFunction() 可用。
CorrelationStrategy 由 AbstractCorrelatingMessageHandler 拥有,并根据 IntegrationMessageHeaderAccessor.CORRELATION_ID 消息头具有默认值,如下例所示
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
...
this.correlationStrategy = correlationStrategy == null ?
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
...
}
至于消息组的实际处理,默认实现是 DefaultAggregatingMessageGroupProcessor。它创建一个单个 Message,其负载是给定组接收到的负载的 List。这对于带有拆分器、发布-订阅通道或上游接收者列表路由器的简单分散-聚合实现非常有效。
在这种情况下使用发布-订阅通道或接收者列表路由器时,请务必启用 apply-sequence 标志。这样做会添加必要的头部:CORRELATION_ID、SEQUENCE_NUMBER 和 SEQUENCE_SIZE。此行为在 Spring Integration 中默认对拆分器启用,但不对发布-订阅通道或接收者列表路由器启用,因为这些组件可以在不需要这些头部的各种上下文中使。 |
在为应用程序实现特定的聚合器策略时,您可以扩展 AbstractAggregatingMessageGroupProcessor 并实现 aggregatePayloads 方法。然而,有更好的解决方案,与 API 的耦合度更低,用于实现聚合逻辑,可以通过 XML 或注解进行配置。
通常,任何 POJO 都可以实现聚合算法,如果它提供一个接受单个 java.util.List 作为参数的方法(也支持参数化列表)。此方法按如下方式调用以聚合消息
-
如果参数是
java.util.Collection<T>并且参数类型 T 可以赋值给Message,则将为聚合累积的整个消息列表发送到聚合器。 -
如果参数是未参数化的
java.util.Collection或参数类型不能赋值给Message,则方法接收累积消息的负载。 -
如果返回类型不能赋值给
Message,则将其视为由框架自动创建的Message的负载。
| 为了代码简洁并推广低耦合、可测试性等最佳实践,实现聚合逻辑的首选方法是通过 POJO 并使用 XML 或注解支持在应用程序中进行配置。 |
从 5.3 版本开始,处理消息组后,AbstractCorrelatingMessageHandler 对具有多个嵌套级别的拆分器-聚合器场景执行 MessageBuilder.popSequenceDetails() 消息头修改。这仅在消息组释放结果不是消息集合的情况下完成。在这种情况下,目标 MessageGroupProcessor 负责在构建这些消息时调用 MessageBuilder.popSequenceDetails()。
如果 MessageGroupProcessor 返回一个 Message,只有当 sequenceDetails 与组中的第一个消息匹配时,才会在输出消息上执行 MessageBuilder.popSequenceDetails()。(以前,这只在 MessageGroupProcessor 返回纯负载或 AbstractIntegrationMessageBuilder 时才完成。)
此功能可以通过新的 popSequence 布尔属性控制,因此在某些情况下,当关联详细信息未由标准拆分器填充时,可以禁用 MessageBuilder.popSequenceDetails()。此属性本质上撤销了最近上游 AbstractMessageSplitter 中 applySequence = true 所做的事情。有关更多信息,请参阅 拆分器。
SimpleMessageGroup.getMessages() 方法返回一个 unmodifiableCollection。因此,如果聚合 POJO 方法具有 Collection<Message> 参数,则传入的参数就是该 Collection 实例,并且当您将 SimpleMessageStore 用于聚合器时,该原始 Collection<Message> 在释放组后被清除。因此,如果 POJO 中的 Collection<Message> 变量超出聚合器,它也会被清除。如果您希望仅将该集合原样释放以供进一步处理,则必须构建一个新的 Collection(例如,new ArrayList<Message>(messages))。从 4.3 版本开始,框架不再将消息复制到新集合中,以避免不必要的额外对象创建。 |
在 4.2 版本之前,无法通过 XML 配置提供 MessageGroupProcessor。只能使用 POJO 方法进行聚合。现在,如果框架检测到引用的(或内部)bean 实现了 MessageProcessor,则将其用作聚合器的输出处理器。
如果您希望从自定义 MessageGroupProcessor 释放对象集合作为消息的负载,您的类应该扩展 AbstractAggregatingMessageGroupProcessor 并实现 aggregatePayloads()。
此外,自 4.2 版本以来,提供了 SimpleMessageGroupProcessor。它返回组中的消息集合,如前所述,这会导致释放的消息单独发送。
这使得聚合器可以作为消息屏障,其中到达的消息被保留,直到释放策略触发并将组作为一系列单独的消息释放。
从 6.0 版本开始,上述拆分行为仅在组处理器为 SimpleMessageGroupProcessor 时有效。否则,对于任何其他返回 Collection<Message> 的 MessageGroupProcessor 实现,只会发出一个回复消息,其负载是整个消息集合。这种逻辑是由聚合器的规范目的决定的——通过某个键收集请求消息并生成一个单一的组合消息。
在 6.5 版本之前,如果 MessageGroupProcessor(通常是 DSL 中的 lambda)返回一个负载集合,AbstractCorrelatingMessageHandler 会因 IllegalArgumentException 而失败,声明只允许消息集合。从现在起,这种限制被取消,返回的负载集合将作为聚合器发出的单个回复消息,其中只包含来自最后一条请求消息的头部。如果需要同时进行头部聚合和负载集合,建议使用 AbstractAggregatingMessageGroupProcessor 实现,而不是普通的 MessageGroupProcessor 函数式接口。
ReleaseStrategy
ReleaseStrategy 接口定义如下
public interface ReleaseStrategy {
boolean canRelease(MessageGroup group);
}
通常,任何 POJO 都可以实现完成决策逻辑,如果它提供一个接受单个 java.util.List 作为参数(也支持参数化列表)并返回布尔值的方法。此方法在每条新消息到达后调用,以决定组是否完成,如下所示
-
如果参数是
java.util.List<T>并且参数类型T可以赋值给Message,则组中累积的整个消息列表将发送到该方法。 -
如果参数是未参数化的
java.util.List或参数类型不能赋值给Message,则方法接收累积消息的负载。 -
如果消息组已准备好进行聚合,则该方法必须返回
true,否则返回 false。
以下示例展示了如何将 @ReleaseStrategy 注解用于 Message 类型的 List
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<Message<?>>) {...}
}
以下示例展示了如何将 @ReleaseStrategy 注解用于 String 类型的 List
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<String>) {...}
}
根据前面两个示例中的签名,基于 POJO 的释放策略会传递一个尚未释放的消息 Collection(如果您需要访问整个 Message)或一个负载对象 Collection(如果类型参数不是 Message)。这满足了大多数用例。但是,如果出于某种原因,您需要访问完整的 MessageGroup,则应提供 ReleaseStrategy 接口的实现。
|
在处理可能很大的组时,您应该了解这些方法是如何调用的,因为在组释放之前,释放策略可能会被多次调用。最有效的是 由于这些原因,对于大型组,我们建议您实现 |
当组被释放以进行聚合时,所有尚未释放的消息都会被处理并从组中移除。如果组也已完成(即,如果序列中的所有消息都已到达或者没有定义序列),则该组被标记为完成。此组的任何新消息都会发送到丢弃通道(如果已定义)。将 expire-groups-upon-completion 设置为 true(默认为 false)会移除整个组,并且任何新消息(与移除的组具有相同的关联 ID)会形成一个新组。您可以使用 MessageGroupStoreReaper 和将 send-partial-result-on-expiry 设置为 true 来释放部分序列。
从 6.5 版本开始,关联处理程序还可以配置 discardIndividuallyOnExpiry 选项,以将整个组作为单个消息丢弃。本质上,此消息的负载是来自已过期组的消息列表。仅在 sendPartialResultOnExpiry 设置为 false(默认)且提供了 dicardChannel 时有效。
为了方便丢弃迟到的消息,聚合器必须在组释放后保持其状态。这最终可能导致内存不足的情况。为了避免这种情况,您应该考虑配置 MessageGroupStoreReaper 来移除组元数据。过期参数应设置为在达到某个点后使组过期,此后预期不会有迟到的消息到达。有关配置收割器的信息,请参阅 在聚合器中管理状态:MessageGroupStore。 |
Spring Integration 提供了 ReleaseStrategy 的实现:SimpleSequenceSizeReleaseStrategy。此实现会查询每个到达消息的 SEQUENCE_NUMBER 和 SEQUENCE_SIZE 头部,以决定消息组何时完成并准备好进行聚合。如前所示,它也是默认策略。
在 5.0 版本之前,默认的释放策略是 SequenceSizeReleaseStrategy,它在大组中表现不佳。使用该策略,会检测并拒绝重复的序列号。此操作可能很耗时。 |
如果您正在聚合大型组,并且不需要释放部分组,并且不需要检测/拒绝重复序列,请考虑使用 SimpleSequenceSizeReleaseStrategy——对于这些用例,它效率更高,并且自 5.0 版本 以来,在未指定部分组释放时,它就是默认值。
聚合大型组
4.3 版本将 SimpleMessageGroup 中消息的默认 Collection 更改为 HashSet(之前是 BlockingQueue)。这在从大型组中移除单个消息时成本很高(需要 O(n) 线性扫描)。虽然哈希集通常移除速度快得多,但对于大型消息来说可能成本很高,因为在插入和移除时都必须计算哈希。如果您的消息哈希成本很高,请考虑使用其他集合类型。如 使用 MessageGroupFactory 中所述,提供了 SimpleMessageGroupFactory,以便您可以选择最适合您需求的 Collection。您也可以提供自己的工厂实现来创建其他 Collection<Message<?>>。
以下示例展示了如何使用先前实现和 SimpleSequenceSizeReleaseStrategy 配置聚合器
<int:aggregator input-channel="aggregate"
output-channel="out" message-store="store" release-strategy="releaser" />
<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
<property name="messageGroupFactory">
<bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
<constructor-arg value="BLOCKING_QUEUE"/>
</bean>
</property>
</bean>
<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果过滤器端点涉及聚合器上游的流,序列大小释放策略(固定或基于 sequenceSize 头部)将无法实现其目的,因为序列中的某些消息可能会被过滤器丢弃。在这种情况下,建议选择另一个 ReleaseStrategy,或者使用从丢弃子流发送的补偿消息,其内容中包含一些信息,以便在自定义完整组函数中跳过。有关更多信息,请参阅 过滤器。 |
关联策略
CorrelationStrategy 接口定义如下
public interface CorrelationStrategy {
Object getCorrelationKey(Message<?> message);
}
该方法返回一个 Object,它表示用于将消息与消息组关联的关联键。该键必须满足 Map 中键的实现标准,包括 equals() 和 hashCode()。
通常,任何 POJO 都可以实现关联逻辑,并且将消息映射到方法参数的规则与 ServiceActivator 相同(包括对 @Header 注解的支持)。方法必须返回一个值,并且该值不能为 null。
Spring Integration 提供了 CorrelationStrategy 的一个实现:HeaderAttributeCorrelationStrategy。此实现将其中一个消息头(其名称由构造函数参数指定)的值作为关联键返回。默认情况下,关联策略是一个 HeaderAttributeCorrelationStrategy,它返回 CORRELATION_ID 头属性的值。如果您想使用自定义头名称进行关联,可以在 HeaderAttributeCorrelationStrategy 实例上配置它,并将其作为聚合器关联策略的引用。
锁注册表
对组的更改是线程安全的。因此,当您同时发送具有相同关联 ID 的消息时,聚合器中只会处理其中一个,使其有效地成为每个消息组的单线程。LockRegistry 用于获取已解析的关联 ID 的锁。默认情况下使用 DefaultLockRegistry(内存中)。为了跨服务器同步更新(在使用共享 MessageGroupStore 时),您必须配置一个共享锁注册表。
避免死锁
如上所述,当消息组发生变异(添加或释放消息)时,会持有一个锁。
考虑以下流程
...->aggregator1-> ... ->aggregator2-> ...
如果存在多个线程,并且聚合器共享一个公共锁注册表,则可能发生死锁。这将导致线程挂起,jstack <pid> 可能会显示如下结果
Found one Java-level deadlock:
=============================
"t2":
waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t1"
"t1":
waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t2"
有几种方法可以避免这个问题
-
确保每个聚合器都有自己的锁注册表(这可以是跨应用程序实例的共享注册表,但流中的两个或更多聚合器必须各自拥有一个独特的注册表)
-
使用
ExecutorChannel或QueueChannel作为聚合器的输出通道,以便下游流在新线程上运行 -
从 5.1.1 版本开始,将
releaseLockBeforeSend聚合器属性设置为true
| 如果由于某种原因,单个聚合器的输出最终被路由回同一个聚合器,也会导致此问题。当然,上述第一个解决方案在这种情况下不适用。 |
在 Java DSL 中配置聚合器
有关如何在 Java DSL 中配置聚合器,请参阅 聚合器和重排序器。
使用 XML 配置聚合器
Spring Integration 支持通过 <aggregator/> 元素配置聚合器。以下示例展示了一个聚合器示例
<channel id="inputChannel"/>
<int:aggregator id="myAggregator" (1)
auto-startup="true" (2)
input-channel="inputChannel" (3)
output-channel="outputChannel" (4)
discard-channel="throwAwayChannel" (5)
message-store="persistentMessageStore" (6)
order="1" (7)
send-partial-result-on-expiry="false" (8)
send-timeout="1000" (9)
correlation-strategy="correlationStrategyBean" (10)
correlation-strategy-method="correlate" (11)
correlation-strategy-expression="headers['foo']" (12)
ref="aggregatorBean" (13)
method="aggregate" (14)
release-strategy="releaseStrategyBean" (15)
release-strategy-method="release" (16)
release-strategy-expression="size() == 5" (17)
expire-groups-upon-completion="false" (18)
empty-group-min-timeout="60000" (19)
lock-registry="lockRegistry" (20)
group-timeout="60000" (21)
group-timeout-expression="size() ge 2 ? 100 : -1" (22)
expire-groups-upon-timeout="true" (23)
scheduler="taskScheduler" > (24)
<expire-transactional/> (25)
<expire-advice-chain/> (26)
</aggregator>
<int:channel id="outputChannel"/>
<int:channel id="throwAwayChannel"/>
<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
<constructor-arg ref="dataSource"/>
</bean>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
| 1 | 聚合器的 ID 是可选的。 |
| 2 | 生命周期属性,表示聚合器是否应在应用程序上下文启动期间启动。可选(默认为 'true')。 |
| 3 | 聚合器接收消息的通道。必需。 |
| 4 | 聚合器发送聚合结果的通道。可选(因为传入消息本身可以在 'replyChannel' 消息头中指定回复通道)。 |
| 5 | 聚合器发送超时消息的通道(如果 send-partial-result-on-expiry 为 false)。可选。 |
| 6 | 对 MessageGroupStore 的引用,用于在消息组完成之前,将其存储在关联键下。可选。默认情况下,它是易失的内存存储。有关更多信息,请参阅 消息存储。 |
| 7 | 当多个句柄订阅到同一个 DirectChannel 时,此聚合器的顺序(用于负载均衡目的)。可选。 |
| 8 | 指示过期消息应在它们包含的 MessageGroup 过期后(参见 MessageGroupStore.expireMessageGroups(long))聚合并发送到 'output-channel' 或 'replyChannel'。过期 MessageGroup 的一种方法是配置 MessageGroupStoreReaper。但是,您也可以通过调用 MessageGroupStore.expireMessageGroups(timeout) 来过期 MessageGroup。您可以通过控制总线操作或如果您有 MessageGroupStore 实例的引用,通过调用 expireMessageGroups(timeout) 来实现。否则,此属性本身什么也不做。它仅作为一个指示器,指示是否丢弃或发送到输出或回复通道中那些仍处于即将过期的 MessageGroup 中的任何消息。可选(默认为 false)。注意:此属性可能更恰当地称为 send-partial-result-on-timeout,因为如果 expire-groups-upon-timeout 设置为 false,该组可能实际上不会过期。 |
| 9 | 当向 output-channel 或 discard-channel 发送回复 Message 时等待的超时间隔。默认为 30 秒。它仅在输出通道具有某些“发送”限制(例如具有固定“容量”的 QueueChannel)时应用。在这种情况下,会抛出 MessageDeliveryException。对于 AbstractSubscribableChannel 实现,send-timeout 被忽略。对于 group-timeout(-expression),来自调度过期任务的 MessageDeliveryException 导致此任务被重新调度。可选。 |
| 10 | 对实现消息关联(分组)算法的 bean 的引用。该 bean 可以是 CorrelationStrategy 接口的实现,也可以是 POJO。在后一种情况下,还必须定义 correlation-strategy-method 属性。可选(默认情况下,聚合器使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 头部)。 |
| 11 | 在 correlation-strategy 引用的 bean 上定义的方法。它实现了关联决策算法。可选,有约束(correlation-strategy 必须存在)。 |
| 12 | 表示关联策略的 SpEL 表达式。示例:"headers['something']"。correlation-strategy 或 correlation-strategy-expression 只能有一个。 |
| 13 | 对应用程序上下文中定义的 bean 的引用。该 bean 必须实现聚合逻辑,如前所述。可选(默认情况下,聚合消息列表成为输出消息的负载)。 |
| 14 | 在 ref 属性引用的 bean 上定义的方法。它实现了消息聚合算法。可选(取决于是否定义了 ref 属性)。 |
| 15 | 对实现释放策略的 bean 的引用。该 bean 可以是 ReleaseStrategy 接口的实现,也可以是 POJO。在后一种情况下,还必须定义 release-strategy-method 属性。可选(默认情况下,聚合器使用 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 头属性)。 |
| 16 | 在 release-strategy 属性引用的 bean 上定义的方法。它实现了完成决策算法。可选,有约束(release-strategy 必须存在)。 |
| 17 | 表示释放策略的 SpEL 表达式。表达式的根对象是 MessageGroup。示例:"size() == 5"。release-strategy 或 release-strategy-expression 只能有一个。 |
| 18 | 当设置为 true 时(默认值为 false),完成的组将从消息存储中移除,从而允许具有相同关联的后续消息形成新组。默认行为是将与已完成组具有相同关联的消息发送到 discard-channel。 |
| 19 | 仅当为 <aggregator> 的 MessageStore 配置了 MessageGroupStoreReaper 时才适用。默认情况下,当 MessageGroupStoreReaper 配置为过期部分组时,空组也会被移除。空组在组正常释放后存在。空组可以检测并丢弃迟到的消息。如果您希望空组以比过期部分组更长的时间表过期,请设置此属性。空组将在此毫秒数内未被修改后才从 MessageStore 中移除。请注意,过期空组的实际时间也受收割器的 timeout 属性影响,可能高达此值加上超时时间。 |
| 20 | 对 org.springframework.integration.util.LockRegistry bean 的引用。它用于根据 groupId 获取 Lock,以进行 MessageGroup 上的并发操作。默认情况下,使用内部 DefaultLockRegistry。使用分布式 LockRegistry(例如 ZookeeperLockRegistry)可确保只有一个聚合器实例可以并发操作一个组。有关更多信息,请参阅 Redis 锁注册表 或 Zookeeper 锁注册表。 |
| 21 | 一个超时(以毫秒为单位),用于在当前消息到达时,如果 ReleaseStrategy 未释放组,则强制 MessageGroup 完成。此属性为聚合器提供了一个内置的基于时间的释放策略,当需要在超时时间内(从最后一条消息到达时开始计时)没有新消息到达 MessageGroup 时,发出部分结果(或丢弃组)。要设置从 MessageGroup 创建时间开始计时的超时,请参阅 group-timeout-expression 信息。当新消息到达聚合器时,将取消其 MessageGroup 的任何现有 ScheduledFuture<?>。如果 ReleaseStrategy 返回 false(表示不释放)并且 groupTimeout > 0,则会安排一个新任务来使组过期。我们不建议将此属性设置为零(或负值)。这样做会有效地禁用聚合器,因为每个消息组都会立即完成。但是,您可以使用表达式有条件地将其设置为零(或负值)。有关信息,请参阅 group-timeout-expression。完成期间采取的操作取决于 ReleaseStrategy 和 send-partial-group-on-expiry 属性。有关更多信息,请参阅 聚合器和组超时。它与 group-timeout-expression 属性互斥。 |
| 22 | SpEL 表达式,它计算为 groupTimeout,其中 MessageGroup 作为 #root 评估上下文对象。用于调度 MessageGroup 以强制完成。如果表达式计算为 null,则不调度完成。如果计算为零,则该组立即在当前线程上完成。实际上,这提供了一个动态的 group-timeout 属性。例如,如果您希望在组创建时间起 10 秒后强制完成 MessageGroup,您可以考虑使用以下 SpEL 表达式:timestamp + 10000 - T(System).currentTimeMillis(),其中 timestamp 由 MessageGroup.getTimestamp() 提供,因为这里的 MessageGroup 是 #root 评估上下文对象。但是请记住,组创建时间可能与第一条到达消息的时间不同,具体取决于其他组过期属性的配置。有关更多信息,请参阅 group-timeout。与 group-timeout 属性互斥。 |
| 23 | 当一个组因超时(或由 MessageGroupStoreReaper)而完成时,该组默认会过期(完全移除)。迟到的消息会启动一个新组。您可以将其设置为 false 以完成该组但保留其元数据,以便丢弃迟到的消息。空组可以在以后使用 MessageGroupStoreReaper 和 empty-group-min-timeout 属性一起过期。它默认为 'true'。 |
| 24 | 一个 TaskScheduler bean 引用,用于调度 MessageGroup,以便在 groupTimeout 内没有新消息到达 MessageGroup 时强制其完成。如果未提供,则使用 ApplicationContext 中注册的默认调度程序(taskScheduler)(ThreadPoolTaskScheduler)。如果未指定 group-timeout 或 group-timeout-expression,则此属性不适用。 |
| 25 | 自 4.1 版本起。它允许为 forceComplete 操作启动事务。它由 group-timeout(-expression) 或 MessageGroupStoreReaper 启动,不适用于正常的 add、release 和 discard 操作。只允许此子元素或 <expire-advice-chain/>。 |
| 26 | 自 4.1 版本 起。它允许为 forceComplete 操作配置任何 Advice。它由 group-timeout(-expression) 或 MessageGroupStoreReaper 启动,不适用于正常的 add、release 和 discard 操作。只允许此子元素或 <expire-transactional/>。也可以使用 Spring tx 命名空间在此处配置事务 Advice。 |
|
过期组
有两个属性与组的过期(完全删除)有关。当一个组过期时,就没有它的记录了,如果新消息以相同的关联到达,就会启动一个新组。当一个组完成(没有过期)时,空组仍然存在,迟到的消息会被丢弃。空组可以稍后使用
如果一个组没有正常完成,而是因为超时而被释放或丢弃,则该组通常会过期。从 4.1 版本开始,您可以使用
从 5.0 版本开始,空组也会在 从 5.4 版本开始,聚合器(和重新排序器)可以配置为使孤立组过期(那些在持久消息存储中可能不会被释放的组)。 |
如果自定义聚合器处理程序实现可能在其他 <aggregator> 定义中被引用,我们通常建议使用 ref 属性。但是,如果自定义聚合器实现仅由 <aggregator> 的单个定义使用,则可以使用内部 bean 定义(从 1.0.3 版本开始)在 <aggregator> 元素中配置聚合 POJO,如下例所示
<aggregator input-channel="input" method="sum" output-channel="output">
<beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
在相同的 <aggregator> 配置中同时使用 ref 属性和内部 bean 定义是不允许的,因为它会创建歧义条件。在这种情况下,会抛出异常。 |
以下示例展示了聚合器 bean 的实现
public class PojoAggregator {
public Long add(List<Long> results) {
long total = 0l;
for (long partialResult: results) {
total += partialResult;
}
return total;
}
}
前面示例的完成策略 bean 的实现可能如下所示
public class PojoReleaseStrategy {
...
public boolean canRelease(List<Long> numbers) {
int sum = 0;
for (long number: numbers) {
sum += number;
}
return sum >= maxValue;
}
}
| 在适当的情况下,释放策略方法和聚合器方法可以组合成一个 bean。 |
前面示例的关联策略 bean 的实现可能如下所示
public class PojoCorrelationStrategy {
...
public Long groupNumbersByLastDigit(Long number) {
return number % 10;
}
}
前面示例中的聚合器将根据某个标准(在本例中为除以十后的余数)对数字进行分组,并保留该组,直到有效负载提供的数字之和超过某个值。
| 在适当的情况下,释放策略方法、关联策略方法和聚合器方法可以组合在一个 bean 中。(实际上,它们全部或任意两个都可以组合。) |
聚合器和 Spring 表达式语言 (SpEL)
自 Spring Integration 2.0 起,您可以使用 SpEL 处理各种策略(关联、释放和聚合),如果这些释放策略背后的逻辑相对简单,我们建议使用 SpEL。假设您有一个遗留组件,它被设计为接收对象数组。我们知道默认的释放策略会将所有聚合消息组装到 List 中。现在我们有两个问题。首先,我们需要从列表中提取单个消息。其次,我们需要提取每条消息的负载并组装对象数组。以下示例解决了这两个问题
public String[] processRelease(List<Message<String>> messages){
List<String> stringList = new ArrayList<>();
for (Message<String> message : messages) {
stringList.add(message.getPayload());
}
return stringList.toArray(new String[]{});
}
然而,使用 SpEL,这样的要求实际上可以通过一行表达式相对容易地处理,从而省去编写自定义类并将其配置为 bean 的麻烦。以下示例展示了如何实现
<int:aggregator input-channel="aggChannel"
output-channel="replyChannel"
expression="#this.![payload].toArray()"/>
在前面的配置中,我们使用 集合投影 表达式从列表中所有消息的负载中组装一个新的集合,然后将其转换为数组,从而实现与早期 Java 代码相同的结果。
在处理自定义释放和关联策略时,您可以应用相同的基于表达式的方法。
除了在 correlation-strategy 属性中定义自定义 CorrelationStrategy 的 bean 外,您还可以将简单的关联逻辑实现为 SpEL 表达式,并在 correlation-strategy-expression 属性中进行配置,如下例所示
correlation-strategy-expression="payload.person.id"
在前面的示例中,我们假设负载具有一个带有 id 的 person 属性,该 id 将用于关联消息。
同样,对于 ReleaseStrategy,您可以将释放逻辑实现为 SpEL 表达式,并在 release-strategy-expression 属性中进行配置。评估上下文的根对象是 MessageGroup 本身。消息列表可以通过表达式中的组的 message 属性进行引用。
在 5.0 版本之前的版本中,根对象是 Message<?> 的集合,如上一个示例所示 |
release-strategy-expression="!messages.?[payload==5].empty"
在前面的示例中,SpEL 评估上下文的根对象是 MessageGroup 本身,您声明一旦此组中存在负载为 5 的消息,该组就应该被释放。
聚合器和组超时
从 4.0 版本开始,引入了两个新的互斥属性:group-timeout 和 group-timeout-expression。请参阅 使用 XML 配置聚合器。在某些情况下,如果 ReleaseStrategy 在当前消息到达时未释放,您可能需要在超时后发出聚合器结果(或丢弃组)。为此,groupTimeout 选项允许调度 MessageGroup 强制完成,如下例所示
<aggregator input-channel="input" output-channel="output"
send-partial-result-on-expiry="true"
group-timeout-expression="size() ge 2 ? 10000 : -1"
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>
在此示例中,如果聚合器收到序列中的最后一条消息(由 release-strategy-expression 定义),则可以进行正常释放。如果该特定消息未到达,groupTimeout 将在十秒后强制组完成,只要该组至少包含两条消息。
强制组完成的结果取决于 ReleaseStrategy 和 send-partial-result-on-expiry。首先,再次咨询释放策略,以查看是否进行正常释放。尽管组没有改变,但 ReleaseStrategy 此时可以决定释放组。如果释放策略仍然不释放组,则它会过期。如果 send-partial-result-on-expiry 为 true,则(部分)MessageGroup 中的现有消息将作为正常的聚合器回复消息发送到 output-channel。否则,它将被丢弃。
groupTimeout 行为与 MessageGroupStoreReaper 之间存在差异(参见 使用 XML 配置聚合器)。收割器定期为 MessageGroupStore 中的所有 MessageGroup 启动强制完成。groupTimeout 在新消息未在 groupTimeout 期间到达时,单独对每个 MessageGroup 执行此操作。此外,收割器可用于删除空组(那些为丢弃迟到的消息而保留的组,如果 expire-groups-upon-completion 为 false)。
从 5.5 版本开始,groupTimeoutExpression 可以评估为 java.util.Date 实例。这在确定基于组创建时间(MessageGroup.getTimestamp())而不是当前消息到达时间的计划任务时刻时非常有用,因为当 groupTimeoutExpression 评估为 long 时会计算当前消息到达时间
group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
使用注解配置聚合器
以下示例展示了使用注解配置的聚合器
public class Waiter {
...
@Aggregator (1)
public Delivery aggregatingMethod(List<OrderItem> items) {
...
}
@ReleaseStrategy (2)
public boolean releaseChecker(List<Message<?>> messages) {
...
}
@CorrelationStrategy (3)
public String correlateBy(OrderItem item) {
...
}
}
| 1 | 一个注解,指示此方法应作为聚合器使用。如果此类别用作聚合器,则必须指定此注解。 |
| 2 | 一个注解,指示此方法用作聚合器的释放策略。如果任何方法上不存在,聚合器将使用 SimpleSequenceSizeReleaseStrategy。 |
| 3 | 一个注解,指示此方法应作为聚合器的关联策略使用。如果没有指示关联策略,聚合器将使用基于 CORRELATION_ID 的 HeaderAttributeCorrelationStrategy。 |
XML 元素提供的所有配置选项也适用于 @Aggregator 注解。
聚合器可以从 XML 中显式引用,或者,如果 @MessageEndpoint 定义在类上,则可以通过类路径扫描自动检测。
聚合器组件的注解配置(@Aggregator 等)仅涵盖简单的用例,其中大多数默认选项就足够了。如果您在使用注解配置时需要对这些选项进行更多控制,请考虑为 AggregatingMessageHandler 使用 @Bean 定义,并使用 @ServiceActivator 标记其 @Bean 方法,如下例所示
@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageGroupStore);
aggregator.setOutputChannel(resultsChannel());
aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
aggregator.setTaskScheduler(this.taskScheduler);
return aggregator;
}
有关更多信息,请参阅 编程模型 和 @Bean 方法上的注解。
从 4.2 版本开始,提供了 AggregatorFactoryBean 来简化 AggregatingMessageHandler 的 Java 配置。 |
在聚合器中管理状态:MessageGroupStore
聚合器(以及 Spring Integration 中的其他一些模式)是一种有状态模式,需要根据一段时间内到达的具有相同关联键的消息组做出决策。有状态模式(如 ReleaseStrategy)中接口的设计遵循的原则是,组件(无论是框架定义的还是用户定义的)应该能够保持无状态。所有状态都由 MessageGroup 携带,其管理委托给 MessageGroupStore。MessageGroupStore 接口定义如下
public interface MessageGroupStore {
int getMessageCountForAllMessageGroups();
int getMarkedMessageCountForAllMessageGroups();
int getMessageGroupCount();
MessageGroup getMessageGroup(Object groupId);
MessageGroup addMessageToGroup(Object groupId, Message<?> message);
MessageGroup markMessageGroup(MessageGroup group);
MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);
MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);
void removeMessageGroup(Object groupId);
void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
int expireMessageGroups(long timeout);
}
有关更多信息,请参阅 Javadoc。
MessageGroupStore 在等待释放策略触发时累积 MessageGroups 中的状态信息,并且该事件可能永远不会发生。因此,为了防止陈旧消息滞留,并为易失性存储提供在应用程序关闭时进行清理的钩子,MessageGroupStore 允许您注册回调,以便在其 MessageGroups 过期时应用这些回调。该接口非常简单明了,如下清单所示
public interface MessageGroupCallback {
void execute(MessageGroupStore messageGroupStore, MessageGroup group);
}
回调可以直接访问存储和消息组,以便它可以管理持久状态(例如,通过完全从存储中移除组)。
MessageGroupStore 维护这些回调的列表,当所有时间戳早于作为参数提供的时间(请参阅前面描述的 registerMessageGroupExpiryCallback(..) 和 expireMessageGroups(..) 方法)的消息时,它会按需应用这些回调。
当您打算依赖 expireMessageGroups 功能时,在不同的聚合器组件中不使用相同的 MessageGroupStore 实例非常重要。每个 AbstractCorrelatingMessageHandler 都会根据 forceComplete() 回调注册自己的 MessageGroupCallback。这样,每个要过期的组都可能被错误的聚合器完成或丢弃。从 5.0.10 版本开始,AbstractCorrelatingMessageHandler 使用 UniqueExpiryCallback 在 MessageGroupStore 中注册回调。MessageGroupStore 反过来会检查是否存在此类的实例,如果回调集中已存在,则会记录一个包含适当消息的错误。这样,框架就不允许在不同的聚合器/重新排序器中使用 MessageGroupStore 实例,以避免上述过期不由特定关联处理程序创建的组的副作用。 |
您可以调用 expireMessageGroups 方法并指定超时值。任何早于当前时间减去此值的消息都将过期并应用回调。因此,是存储的用户定义了消息组“过期”的含义。
为了方便用户,Spring Integration 以 MessageGroupStoreReaper 的形式提供了消息过期包装器,如下例所示
<bean id="reaper" class="org...MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
收割器是一个 Runnable。在前面的示例中,消息组存储的过期方法每十秒调用一次。超时本身是 30 秒。
重要的是要理解 MessageGroupStoreReaper 的“timeout”属性是一个近似值,并且受任务调度器速率的影响,因为此属性仅在 MessageGroupStoreReaper 任务的下一次调度执行时检查。例如,如果超时设置为十分钟,但 MessageGroupStoreReaper 任务安排每小时运行一次,并且 MessageGroupStoreReaper 任务的上次执行发生在超时前一分钟,则 MessageGroup 在接下来的 59 分钟内不会过期。因此,我们建议将速率设置为至少等于或短于超时值。 |
除了收割器之外,当应用程序通过 AbstractCorrelatingMessageHandler 中的生命周期回调关闭时,还会调用过期回调。
AbstractCorrelatingMessageHandler 注册其自己的过期回调,这与聚合器 XML 配置中的布尔标志 send-partial-result-on-expiry 相关联。如果该标志设置为 true,那么当调用过期回调时,组中尚未释放的任何未标记消息都可以发送到输出通道。
由于 MessageGroupStoreReaper 是从计划任务中调用的,并且可能导致生成消息(取决于 sendPartialResultOnExpiry 选项)到下游集成流,因此建议提供一个带有 MessagePublishingErrorHandler 的自定义 TaskScheduler,以通过 errorChannel 处理异常,正如常规聚合器释放功能所期望的那样。相同的逻辑适用于组超时功能,该功能也依赖于 TaskScheduler。有关更多信息,请参阅 错误处理。 |
|
当不同关联端点使用共享 一些 有关 |
Flux 聚合器
在 5.2 版本中,引入了 FluxAggregatorMessageHandler 组件。它基于 Project Reactor 的 Flux.groupBy() 和 Flux.window() 操作符。传入的消息被发送到由该组件构造函数中的 Flux.create() 初始化的 FluxSink。如果未提供 outputChannel 或它不是 ReactiveStreamsSubscribableChannel 的实例,则主 Flux 的订阅是在 Lifecycle.start() 实现中完成的。否则,它将推迟到由 ReactiveStreamsSubscribableChannel 实现完成的订阅。消息使用 CorrelationStrategy 作为组键,通过 Flux.groupBy() 进行分组。默认情况下,会查询消息的 IntegrationMessageHeaderAccessor.CORRELATION_ID 头部。
默认情况下,每个关闭的窗口都作为一个 Flux 释放到要生成的消息的负载中。此消息包含窗口中第一条消息的所有头部。输出消息负载中的此 Flux 必须在下游订阅和处理。这种逻辑可以通过 FluxAggregatorMessageHandler 的 setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>) 配置选项进行自定义(或取代)。例如,如果希望最终消息中包含 List 的负载,可以像这样配置 Flux.collectList()
fluxAggregatorMessageHandler.setCombineFunction(
(messageFlux) ->
messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new));
FluxAggregatorMessageHandler 中有几个选项可以选择合适的窗口策略
-
setBoundaryTrigger(Predicate<Message<?>>)- 传播到Flux.windowUntil()运算符。有关更多信息,请参阅其 Javadoc。优先于所有其他窗口选项。 -
setWindowSize(int)和setWindowSizeFunction(Function<Message<?>, Integer>)- 传播到Flux.window(int)或windowTimeout(int, Duration)。默认情况下,窗口大小根据组中的第一条消息及其IntegrationMessageHeaderAccessor.SEQUENCE_SIZE头部计算。 -
setWindowTimespan(Duration)- 根据窗口大小配置传播到Flux.window(Duration)或windowTimeout(int, Duration)。 -
setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)- 一个函数,用于将转换应用于分组的 Flux,以进行未被公开选项覆盖的任何自定义窗口操作。
由于此组件是一个 MessageHandler 实现,因此可以简单地将其用作 @Bean 定义以及 @ServiceActivator 消息传递注解。通过 Java DSL,可以从 .handle() EIP 方法中使用它。下面的示例演示了我们如何在运行时注册 IntegrationFlow 以及 FluxAggregatorMessageHandler 如何与上游拆分器相关联
IntegrationFlow fluxFlow =
(flow) -> flow
.split()
.channel(MessageChannels.flux())
.handle(new FluxAggregatorMessageHandler());
IntegrationFlowContext.IntegrationFlowRegistration registration =
this.integrationFlowContext.registration(fluxFlow)
.register();
Flux<Message<?>> window =
registration.getMessagingTemplate()
.convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);