聚合器

聚合器基本上是分割器的镜像,它是一种消息处理器,接收多个消息并将它们组合成单个消息。实际上,聚合器通常是包含分割器的管道中的下游消费者。

从技术上讲,聚合器比分割器更复杂,因为它是有状态的。它必须保存要聚合的消息并确定何时准备好聚合完整的组消息。为此,它需要一个MessageStore

功能

聚合器通过关联和存储相关消息组,直到该组被认为完整为止。此时,聚合器通过处理整个组创建一个单个消息,并将聚合的消息作为输出发送。

实现聚合器需要提供执行聚合的逻辑(即,从许多消息创建单个消息)。两个相关的概念是关联和释放。

关联确定如何对消息进行分组以进行聚合。在 Spring Integration 中,默认情况下,基于IntegrationMessageHeaderAccessor.CORRELATION_ID消息头进行关联。具有相同IntegrationMessageHeaderAccessor.CORRELATION_ID的消息将分组在一起。但是,您可以自定义关联策略以允许其他方式指定如何将消息分组在一起。为此,您可以实现一个CorrelationStrategy(本章后面会介绍)。

为了确定准备好处理消息组的时间点,会咨询ReleaseStrategy。聚合器的默认释放策略在基于IntegrationMessageHeaderAccessor.SEQUENCE_SIZE头的序列中存在所有消息时释放一个组。您可以通过提供对自定义ReleaseStrategy实现的引用来覆盖此默认策略。

编程模型

聚合 API 包含许多类

  • 接口MessageGroupProcessor及其子类:MethodInvokingAggregatingMessageGroupProcessorExpressionEvaluatingMessageGroupProcessor

  • ReleaseStrategy接口及其默认实现:SimpleSequenceSizeReleaseStrategy

  • CorrelationStrategy接口及其默认实现:HeaderAttributeCorrelationStrategy

AggregatingMessageHandler

AggregatingMessageHandlerAbstractCorrelatingMessageHandler的子类)是一个MessageHandler实现,封装了聚合器(和其他关联用例)的常见功能,如下所示

  • 将消息关联到要聚合的组中

  • MessageStore中维护这些消息,直到可以释放该组为止

  • 决定何时可以释放该组

  • 将已释放的组聚合到单个消息中

  • 识别并响应已过期的组

决定如何将消息分组在一起的责任委托给CorrelationStrategy实例。决定是否可以释放消息组的责任委托给ReleaseStrategy实例。

以下列表简要介绍了基本AbstractAggregatingMessageGroupProcessor(实现aggregatePayloads方法的责任留给开发者)。

public abstract class AbstractAggregatingMessageGroupProcessor
              implements MessageGroupProcessor {

    protected Map<String, Object> aggregateHeaders(MessageGroup group) {
        // default implementation exists
    }

    protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);

}

DefaultAggregatingMessageGroupProcessorExpressionEvaluatingMessageGroupProcessorMethodInvokingMessageGroupProcessor视为AbstractAggregatingMessageGroupProcessor的开箱即用实现。

从 5.2 版本开始,Function<MessageGroup, Map<String, Object>> 策略可用于AbstractAggregatingMessageGroupProcessor 来合并和计算(聚合)输出消息的标头。DefaultAggregateHeadersFunction实现具有返回组中没有冲突的所有标头的逻辑;组中一个或多个消息上缺少的标头不被认为是冲突。冲突的标头将被忽略。连同新引入的DelegatingMessageGroupProcessor一起,此函数用于任何任意(非AbstractAggregatingMessageGroupProcessorMessageGroupProcessor实现。从本质上讲,框架将提供的函数注入到AbstractAggregatingMessageGroupProcessor实例中,并将所有其他实现包装到DelegatingMessageGroupProcessor中。AbstractAggregatingMessageGroupProcessorDelegatingMessageGroupProcessor在逻辑上的区别在于,后者不会在调用委托策略之前预先计算标头,并且如果委托返回MessageAbstractIntegrationMessageBuilder,则不会调用该函数。在这种情况下,框架假设目标实现已经负责生成已填充到返回结果中的适当标头集。Function<MessageGroup, Map<String, Object>>策略可作为XML配置的headers-function引用属性、Java DSL的AggregatorSpec.headersFunction()选项以及普通Java配置的AggregatorFactoryBean.setHeadersFunction()使用。

CorrelationStrategyAbstractCorrelatingMessageHandler拥有,并基于IntegrationMessageHeaderAccessor.CORRELATION_ID消息头具有默认值,如下例所示

public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
        CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
    ...
    this.correlationStrategy = correlationStrategy == null ?
        new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
    this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
    ...
}

至于消息组的实际处理,默认实现是DefaultAggregatingMessageGroupProcessor。它创建一个单个Message,其有效负载是为给定组接收的有效负载的List。这对于使用分割器、发布-订阅通道或收件人列表路由器上游的简单分散-收集实现非常有效。

在这种类型的场景中使用发布-订阅通道或收件人列表路由器时,请确保启用apply-sequence标志。这样做会添加必要的标头:CORRELATION_IDSEQUENCE_NUMBERSEQUENCE_SIZE。此行为在 Spring Integration 中对于分割器默认启用,但对于发布-订阅通道或收件人列表路由器未启用,因为这些组件可能在各种不需要这些标头的上下文中使用。

在为应用程序实现特定的聚合器策略时,您可以扩展AbstractAggregatingMessageGroupProcessor并实现aggregatePayloads方法。但是,还有更好的解决方案,与 API 的耦合度较低,用于实现聚合逻辑,可以通过 XML 或注释进行配置。

通常,如果任何 POJO 提供接受单个java.util.List作为参数的方法(也支持参数化列表),则它可以实现聚合算法。此方法用于如下聚合消息:

  • 如果参数是java.util.Collection<T>并且参数类型 T 可分配给Message,则将为聚合累积的整个消息列表发送到聚合器。

  • 如果参数是非参数化的java.util.Collection或参数类型不可分配给Message,则该方法将接收累积消息的有效负载。

  • 如果返回类型不可分配给Message,则它将被视为框架自动创建的Message的有效负载。

为了代码简单性和促进最佳实践(例如低耦合、可测试性等),实现聚合逻辑的首选方法是通过 POJO 并使用 XML 或注释支持在应用程序中对其进行配置。

从 5.3 版本开始,处理消息组后,AbstractCorrelatingMessageHandler对多个嵌套级别的适当分割器-聚合器场景执行MessageBuilder.popSequenceDetails()消息标头修改。仅当消息组释放结果不是消息集合时才执行此操作。在这种情况下,目标MessageGroupProcessor负责在构建这些消息时调用MessageBuilder.popSequenceDetails()

如果MessageGroupProcessor返回一个Message,则只有当sequenceDetails与组中的第一个消息匹配时,才会对输出消息执行MessageBuilder.popSequenceDetails()。(以前,只有当从MessageGroupProcessor返回简单的有效负载或AbstractIntegrationMessageBuilder时才执行此操作。)

此功能可以通过新的popSequence布尔属性进行控制,因此可以在某些情况下禁用MessageBuilder.popSequenceDetails(),此时标准分割器未填充关联详细信息。此属性从本质上撤消了AbstractMessageSplitter中最近上游applySequence = true所做的操作。有关更多信息,请参见Splitter

SimpleMessageGroup.getMessages()方法返回一个unmodifiableCollection。因此,如果聚合 POJO 方法具有Collection<Message>参数,则传入的参数正是该Collection实例,并且当您对聚合器使用SimpleMessageStore时,该原始Collection<Message>会在释放该组后被清除。因此,如果 POJO 中的Collection<Message>变量从聚合器传出,它也会被清除。如果您希望原样释放该集合以进行进一步处理,则必须构建一个新的Collection(例如,new ArrayList<Message>(messages))。从 4.3 版本开始,框架不再将消息复制到新的集合中,以避免不必要的额外对象创建。

在 4.2 版本之前,无法使用 XML 配置提供MessageGroupProcessor。只有 POJO 方法才能用于聚合。现在,如果框架检测到引用的(或内部的)bean 实现MessageProcessor,则将其用作聚合器的输出处理器。

如果您希望将对象集合从自定义MessageGroupProcessor作为消息的有效负载释放,则您的类应扩展AbstractAggregatingMessageGroupProcessor并实现aggregatePayloads()

此外,从4.2版本开始,提供了一个SimpleMessageGroupProcessor。它返回组中的消息集合,如前所述,这会导致已释放的消息被单独发送。

这使得聚合器可以作为一个消息屏障,到达的消息将被保留,直到释放策略触发并将组作为一系列单个消息释放。

从6.0版本开始,上述拆分行为只有在组处理器是SimpleMessageGroupProcessor时才有效。否则,对于返回Collection<Message>的任何其他MessageGroupProcessor实现,只会发出单个回复消息,其有效负载为整个消息集合。这种逻辑是由聚合器的规范目的决定的——按某个键收集请求消息并生成单个分组消息。

ReleaseStrategy

ReleaseStrategy接口定义如下:

public interface ReleaseStrategy {

  boolean canRelease(MessageGroup group);

}

一般来说,任何POJO都可以实现完成决策逻辑,如果它提供了一个方法,该方法接受单个java.util.List作为参数(也支持参数化列表)并返回布尔值。此方法在每条新消息到达后被调用,以决定组是否完整,如下所示:

  • 如果参数是java.util.List<T>并且参数类型T可赋值给Message,则将累积在组中的整个消息列表发送到该方法。

  • 如果参数是非参数化的java.util.List或参数类型不可赋值给Message,则该方法接收累积消息的有效负载。

  • 如果消息组已准备好进行聚合,则该方法必须返回true,否则返回false

以下示例显示了如何将@ReleaseStrategy注解用于类型为MessageList

public class MyReleaseStrategy {

    @ReleaseStrategy
    public boolean canMessagesBeReleased(List<Message<?>>) {...}
}

以下示例显示了如何将@ReleaseStrategy注解用于类型为StringList

public class MyReleaseStrategy {

    @ReleaseStrategy
    public boolean canMessagesBeReleased(List<String>) {...}
}

根据前面两个示例中的签名,基于POJO的释放策略传递的是尚未释放的消息的Collection(如果您需要访问整个Message)或有效负载对象的Collection(如果类型参数不是Message)。这满足大多数用例。但是,如果由于某种原因,您需要访问完整的MessageGroup,则应提供ReleaseStrategy接口的实现。

处理潜在的大型组时,您应该了解这些方法是如何调用的,因为在释放组之前,可能会多次调用释放策略。最有效的是ReleaseStrategy的实现,因为聚合器可以直接调用它。第二有效的是具有Collection<Message<?>>参数类型的POJO方法。效率最低的是具有Collection<Something>类型的POJO方法。框架必须每次调用释放策略时都将有效负载从组中的消息复制到新的集合(并且可能尝试将有效负载转换为Something)。使用Collection<?>可以避免转换,但仍然需要创建新的Collection

由于这些原因,对于大型组,我们建议您实现ReleaseStrategy

当组被释放以进行聚合时,将处理其所有尚未释放的消息并将其从组中删除。如果组也已完成(也就是说,如果序列中的所有消息都已到达或没有定义序列),则该组将被标记为已完成。此组的任何新消息都将发送到丢弃通道(如果已定义)。将expire-groups-upon-completion设置为true(默认值为false)将删除整个组,任何新消息(与已删除组具有相同的相关 ID)都将形成一个新组。您可以通过将MessageGroupStoreReapersend-partial-result-on-expiry设置为true一起使用来释放部分序列。

为了便于丢弃迟到的消息,聚合器必须在释放组后维护有关组的状态。这最终可能导致内存不足的情况。为了避免这种情况,您应该考虑配置MessageGroupStoreReaper来删除组元数据。应将过期参数设置为在不再预期迟到的消息到达的点之后过期组。有关配置清理程序的信息,请参见在聚合器中管理状态:MessageGroupStore

Spring Integration 提供了 ReleaseStrategy 的一个实现:SimpleSequenceSizeReleaseStrategy。此实现查询每条到达消息的 SEQUENCE_NUMBERSEQUENCE_SIZE 头来确定消息组何时完成并准备好进行聚合。如前所述,它也是默认策略。

在 5.0 版本之前,默认释放策略是 SequenceSizeReleaseStrategy,它在大型组中性能不佳。使用该策略时,会检测并拒绝重复的序列号。此操作可能代价高昂。

如果您正在聚合大型组,不需要释放部分组,也不需要检测/拒绝重复序列,请考虑使用SimpleSequenceSizeReleaseStrategy——对于这些用例,它效率更高,并且自5.0 版本以来(未指定部分组释放时)它是默认策略。

聚合大型组

4.3 版本将SimpleMessageGroup中消息的默认Collection更改为HashSet(以前是BlockingQueue)。当从大型组中删除单个消息时,这代价高昂(需要 O(n) 线性扫描)。尽管哈希集通常删除速度快得多,但对于大型消息而言,它可能代价高昂,因为必须在插入和删除时都计算哈希值。如果您的消息哈希代价高昂,请考虑使用其他集合类型。如使用MessageGroupFactory中所述,提供了一个SimpleMessageGroupFactory,以便您可以选择最适合您需求的Collection。您还可以提供自己的工厂实现来创建其他Collection<Message<?>>

以下示例显示了如何使用以前的实现和SimpleSequenceSizeReleaseStrategy配置聚合器:

<int:aggregator input-channel="aggregate"
    output-channel="out" message-store="store" release-strategy="releaser" />

<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
    <property name="messageGroupFactory">
        <bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
            <constructor-arg value="BLOCKING_QUEUE"/>
        </bean>
    </property>
</bean>

<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果过滤器端点参与聚合器上游的流程,则序列大小释放策略(固定或基于sequenceSize头)将无法发挥其作用,因为序列中的一些消息可能会被过滤器丢弃。在这种情况下,建议选择另一个ReleaseStrategy,或使用从丢弃子流程发送的补偿消息,这些消息在其内容中携带一些信息,以便在自定义完整组函数中跳过这些信息。有关更多信息,请参见过滤器

关联策略

CorrelationStrategy接口定义如下:

public interface CorrelationStrategy {

  Object getCorrelationKey(Message<?> message);

}

该方法返回一个Object,该对象表示用于将消息与消息组关联的相关键。该键必须满足用于Map中键的标准,关于equals()hashCode()的实现。

一般来说,任何POJO都可以实现关联逻辑,将消息映射到方法参数(或参数)的规则与ServiceActivator相同(包括对@Header注解的支持)。该方法必须返回值,并且该值不能为null

Spring Integration 提供了 CorrelationStrategy 的一个实现:HeaderAttributeCorrelationStrategy。此实现返回消息头之一的值(其名称由构造函数参数指定)作为相关键。默认情况下,关联策略是返回CORRELATION_ID头属性值的HeaderAttributeCorrelationStrategy。如果您想使用自定义头名称进行关联,则可以在HeaderAttributeCorrelationStrategy的实例上配置它,并将其作为聚合器的关联策略的引用。

锁注册表

对组的更改是线程安全的。因此,当您同时发送相同相关 ID 的消息时,聚合器中只会处理其中一条消息,使其有效地成为每个消息组单线程LockRegistry用于获取已解析相关 ID 的锁。默认情况下使用DefaultLockRegistry(内存中)。为了在使用共享MessageGroupStore的服务器之间同步更新,您必须配置共享锁注册表。

避免死锁

如上所述,当消息组被修改(添加或释放消息)时,会持有锁。

考虑以下流程:

...->aggregator1-> ... ->aggregator2-> ...

如果有多个线程,并且聚合器共享一个公共锁注册表,则可能会发生死锁。这将导致线程挂起,并且jstack <pid>可能会显示如下结果:

Found one Java-level deadlock:
=============================
"t2":
  waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "t1"
"t1":
  waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "t2"

有几种方法可以避免此问题:

  • 确保每个聚合器都有自己的锁注册表(这可以是跨应用程序实例的共享注册表,但流程中的两个或多个聚合器必须各自具有不同的注册表)。

  • 使用ExecutorChannelQueueChannel作为聚合器的输出通道,以便下游流程在新线程上运行。

  • 从 5.1.1 版本开始,将releaseLockBeforeSend聚合器属性设置为true

如果由于某种原因,单个聚合器的输出最终被路由回同一个聚合器,也可能导致此问题。当然,上述第一种解决方案在这种情况下不适用。

在 Java DSL 中配置聚合器

有关如何在 Java DSL 中配置聚合器,请参见聚合器和重新排序器

使用 XML 配置聚合器

Spring Integration 通过<aggregator/>元素支持使用XML配置聚合器。以下示例显示了一个聚合器的示例:

<channel id="inputChannel"/>

<int:aggregator id="myAggregator"                          (1)
        auto-startup="true"                                (2)
        input-channel="inputChannel"                       (3)
        output-channel="outputChannel"                     (4)
        discard-channel="throwAwayChannel"                 (5)
        message-store="persistentMessageStore"             (6)
        order="1"                                          (7)
        send-partial-result-on-expiry="false"              (8)
        send-timeout="1000"                                (9)

        correlation-strategy="correlationStrategyBean"     (10)
        correlation-strategy-method="correlate"            (11)
        correlation-strategy-expression="headers['foo']"   (12)

        ref="aggregatorBean"                               (13)
        method="aggregate"                                 (14)

        release-strategy="releaseStrategyBean"             (15)
        release-strategy-method="release"                  (16)
        release-strategy-expression="size() == 5"          (17)

        expire-groups-upon-completion="false"              (18)
        empty-group-min-timeout="60000"                    (19)

        lock-registry="lockRegistry"                       (20)

        group-timeout="60000"                              (21)
        group-timeout-expression="size() ge 2 ? 100 : -1"  (22)
        expire-groups-upon-timeout="true"                  (23)

        scheduler="taskScheduler" >                        (24)
            <expire-transactional/>                        (25)
            <expire-advice-chain/>                         (26)
</aggregator>

<int:channel id="outputChannel"/>

<int:channel id="throwAwayChannel"/>

<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
    <constructor-arg ref="dataSource"/>
</bean>

<bean id="aggregatorBean" class="sample.PojoAggregator"/>

<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>

<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
1 聚合器的ID是可选的。
2 生命周期属性表示聚合器是否应在应用程序上下文启动期间启动。可选(默认为“true”)。
3 聚合器从中接收消息的通道。必需。
4 聚合器向其发送聚合结果的通道。可选(因为传入的消息本身可以在“replyChannel”消息头中指定回复通道)。
5 聚合器向其发送超时消息的通道(如果send-partial-result-on-expiryfalse)。可选。
6 MessageGroupStore的引用,用于在其关联键下存储消息组,直到它们完成。可选。默认情况下,它是一个易失的内存中存储。有关更多信息,请参见消息存储
7 当多个句柄订阅同一个DirectChannel时,此聚合器的顺序(用于负载平衡目的)。可选。
8 指示过期消息应在包含它们的MessageGroup过期后聚合并发送到“output-channel”或“replyChannel”(参见MessageGroupStore.expireMessageGroups(long))。使MessageGroup过期的一种方法是配置MessageGroupStoreReaper。但是,您也可以通过调用MessageGroupStore.expireMessageGroups(timeout)使MessageGroup过期。您可以通过控制总线操作来实现这一点,或者如果您有MessageGroupStore实例的引用,则可以通过调用expireMessageGroups(timeout)来实现。否则,此属性本身不会执行任何操作。它仅作为指示是否丢弃或发送到输出或回复通道中仍处于即将过期的MessageGroup中的任何消息的指示器。可选(默认为false)。注意:此属性可能更恰当地称为send-partial-result-on-timeout,因为如果expire-groups-upon-timeout设置为false,则该组可能不会实际过期。
9 将回复Message发送到output-channeldiscard-channel时等待的超时时间间隔。默认为30秒。仅当输出通道有一些“发送”限制时才应用此设置,例如具有固定“容量”的QueueChannel。在这种情况下,将抛出MessageDeliveryException。对于AbstractSubscribableChannel实现,将忽略send-timeout。对于group-timeout(-expression),来自计划的过期任务的MessageDeliveryException会导致此任务重新调度。可选。
10 对实现消息关联(分组)算法的bean的引用。该bean可以是CorrelationStrategy接口的实现或POJO。在后一种情况下,还必须定义correlation-strategy-method属性。可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID标头)。
11 correlation-strategy引用的bean上定义的方法。它实现了关联决策算法。可选,有限制(必须存在correlation-strategy)。
12 表示关联策略的SpEL表达式。例如:"headers['something']"。只允许correlation-strategycorrelation-strategy-expression中的一个。
13 对应用程序上下文定义的bean的引用。该bean必须实现前面描述的聚合逻辑。可选(默认情况下,聚合消息的列表成为输出消息的有效负载)。
14 ref属性引用的bean上定义的方法。它实现了消息聚合算法。可选(取决于是否定义了ref属性)。
15 对实现释放策略的bean的引用。该bean可以是ReleaseStrategy接口的实现或POJO。在后一种情况下,还必须定义release-strategy-method属性。可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZE标头属性)。
16 release-strategy属性引用的bean上定义的方法。它实现了完成决策算法。可选,有限制(必须存在release-strategy)。
17 表示释放策略的SpEL表达式。表达式的根对象是MessageGroup。例如:"size() == 5"。只允许release-strategyrelease-strategy-expression中的一个。
18 设置为true时(默认为false),已完成的组将从消息存储中删除,允许具有相同关联关系的后续消息形成新组。默认行为是将与已完成组具有相同关联关系的消息发送到discard-channel
19 仅当为<aggregator>MessageStore配置了MessageGroupStoreReaper时才适用。默认情况下,当配置MessageGroupStoreReaper以使部分组过期时,空组也将被删除。组正常释放后存在空组。空组使检测和丢弃迟到的消息成为可能。如果您希望以比过期部分组更长的计划来过期空组,请设置此属性。然后,空组将不会从MessageStore中删除,直到它们至少在经过此毫秒数后未被修改为止。请注意,过期空组的实际时间也受清道夫的timeout属性的影响,它最多可以达到此值加上超时时间。
20 org.springframework.integration.util.LockRegistry bean的引用。它用于基于groupId获取Lock,用于对MessageGroup进行并发操作。默认情况下,使用内部DefaultLockRegistry。使用分布式LockRegistry(例如ZookeeperLockRegistry)可确保只有一个聚合器实例可以同时操作一个组。有关更多信息,请参见Redis 锁注册表Zookeeper 锁注册表
21 超时时间(以毫秒为单位),用于在ReleaseStrategy在当前消息到达时未释放组时强制MessageGroup完成。当需要在MessageGroup在从上次消息到达的时间算起在超时时间内没有到达新消息时发出部分结果(或丢弃组)时,此属性为聚合器提供内置的基于时间的释放策略。要设置从MessageGroup创建时开始计算的超时时间,请参阅group-timeout-expression信息。当新的消息到达聚合器时,其MessageGroup的任何现有的ScheduledFuture<?>都将被取消。如果ReleaseStrategy返回false(表示不释放)并且groupTimeout > 0,则将调度一个新任务以使组过期。我们建议不要将此属性设置为零(或负值)。这样做实际上会禁用聚合器,因为每个消息组都会立即完成。但是,您可以使用表达式有条件地将其设置为零(或负值)。有关信息,请参见group-timeout-expression。完成期间采取的操作取决于ReleaseStrategysend-partial-group-on-expiry属性。有关更多信息,请参见聚合器和组超时。它与“group-timeout-expression”属性互斥。
22 计算为groupTimeout的SpEL表达式,其中MessageGroup作为#root评估上下文对象。用于调度MessageGroup强制完成。如果表达式计算结果为null,则不会调度完成。如果其计算结果为零,则该组将在当前线程上立即完成。实际上,这提供了一个动态的group-timeout属性。例如,如果您希望在组创建后经过 10 秒后强制完成MessageGroup,您可以考虑使用以下 SpEL 表达式:timestamp + 10000 - T(System).currentTimeMillis(),其中timestampMessageGroup.getTimestamp()提供,因为这里的MessageGroup#root评估上下文对象。但是请记住,根据其他组过期属性的配置,组创建时间可能与第一个到达的消息的时间有所不同。有关更多信息,请参见group-timeout。与“group-timeout”属性互斥。
23 当组由于超时(或由MessageGroupStoreReaper)完成时,该组默认情况下会过期(完全删除)。迟到的消息将启动一个新组。您可以将其设置为false以完成组,但使其元数据保持不变,以便丢弃迟到的消息。可以使用MessageGroupStoreReaper结合empty-group-min-timeout属性稍后过期空组。默认为“true”。
24 一个TaskScheduler bean 引用,用于调度MessageGroup强制完成,如果在groupTimeout内没有新的消息到达MessageGroup。如果未提供,则使用在ApplicationContext中注册的默认调度程序(taskScheduler)(ThreadPoolTaskScheduler)。如果未指定group-timeoutgroup-timeout-expression,则此属性不适用。
25 自版本 4.1 起。它允许为forceComplete操作启动事务。它由group-timeout(-expression)MessageGroupStoreReaper启动,并且不应用于正常的addreleasediscard操作。只允许此子元素或<expire-advice-chain/>
26 版本 4.1起。它允许为forceComplete操作配置任何Advice。它由group-timeout(-expression)MessageGroupStoreReaper启动,并且不应用于正常的addreleasediscard操作。只允许此子元素或<expire-transactional/>。还可以使用 Spring tx 命名空间在此处配置事务Advice
过期组

有两个属性与过期(完全删除)组有关。当组过期时,没有它的记录,并且如果新的消息以相同的关联关系到达,则会启动一个新组。当组完成(无过期)时,空组仍然存在,并且会丢弃迟到的消息。可以使用MessageGroupStoreReaper结合empty-group-min-timeout属性稍后删除空组。

expire-groups-upon-completionReleaseStrategy释放组时的“正常”完成相关。默认为false

如果组没有正常完成,而是由于超时而被释放或丢弃,则该组通常会过期。自版本 4.1 起,您可以使用expire-groups-upon-timeout控制此行为。为了向后兼容,它默认为true

当组超时时,ReleaseStrategy将获得一次更多机会来释放该组。如果它这样做并且expire-groups-upon-timeout为false,则过期由expire-groups-upon-completion控制。如果在超时期间释放策略未释放该组,则过期由expire-groups-upon-timeout控制。超时组将被丢弃或发生部分释放(基于send-partial-result-on-expiry)。

自版本 5.0 起,空组也在empty-group-min-timeout后被计划删除。如果expireGroupsUponCompletion == falseminimumTimeoutForEmptyGroups > 0,则在正常或部分序列释放发生时会调度删除该组的任务。

从 5.4 版本开始,可以配置聚合器(和重新排序器)以使孤立组过期(持久性消息存储中可能不会以其他方式释放的组)。expireTimeout(如果大于0)表示存储中比此值旧的组应被清除。purgeOrphanedGroups()方法在启动时调用,并与提供的expireDuration一起定期在计划任务中调用。此方法也可以随时从外部调用。过期逻辑完全委托给根据上述提供的过期选项的forceComplete(MessageGroup)功能。当需要从那些不会再通过常规消息到达逻辑释放的旧组中清理消息存储时,这种定期清除功能非常有用。在大多数情况下,这发生在使用持久性消息组存储时应用程序重新启动后。此功能类似于带有计划任务的MessageGroupStoreReaper,但在使用组超时而不是清道夫时,提供了一种方便的方法来处理特定组件中的旧组。必须专门为当前关联端点提供MessageGroupStore。否则,一个聚合器可能会清除另一个聚合器的组。对于聚合器,使用此技术过期的组将被丢弃或作为部分组释放,具体取决于expireGroupsUponCompletion属性。

如果自定义聚合器处理程序实现可能在其他<aggregator>定义中引用,我们通常建议使用ref属性。但是,如果自定义聚合器实现仅由<aggregator>的单个定义使用,则可以使用内部 bean 定义(从版本 1.0.3 开始)在<aggregator>元素内配置聚合 POJO,如下例所示。

<aggregator input-channel="input" method="sum" output-channel="output">
    <beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
在同一个<aggregator>配置中同时使用ref属性和内部bean定义是不允许的,因为它会创建一个歧义条件。在这种情况下,会抛出异常。

下面的例子展示了聚合器bean的实现

public class PojoAggregator {

  public Long add(List<Long> results) {
    long total = 0l;
    for (long partialResult: results) {
      total += partialResult;
    }
    return total;
  }
}

前面例子的完成策略bean的实现可能如下所示

public class PojoReleaseStrategy {
...
  public boolean canRelease(List<Long> numbers) {
    int sum = 0;
    for (long number: numbers) {
      sum += number;
    }
    return sum >= maxValue;
  }
}
在有意义的情况下,释放策略方法和聚合器方法可以合并到一个bean中。

上面例子的关联策略bean的实现可能如下所示

public class PojoCorrelationStrategy {
...
  public Long groupNumbersByLastDigit(Long number) {
    return number % 10;
  }
}

前面例子中的聚合器会根据某种标准(在本例中,是除以十后的余数)对数字进行分组,并在提供的有效负载数字之和超过某个值之前保留该组。

在有意义的情况下,释放策略方法、关联策略方法和聚合器方法可以组合到一个bean中。(实际上,所有方法或任意两个方法都可以组合。)

聚合器和 Spring 表达式语言 (SpEL)

从 Spring Integration 2.0 开始,您可以使用SpEL来处理各种策略(关联、释放和聚合),如果释放策略背后的逻辑相对简单,我们推荐这种方法。假设您有一个遗留组件,该组件被设计为接收对象数组。我们知道默认的释放策略会将所有聚合的消息组装在List中。现在我们有两个问题。首先,我们需要从列表中提取单个消息。其次,我们需要提取每条消息的有效负载并组装对象数组。下面的例子解决了这两个问题

public String[] processRelease(List<Message<String>> messages){
    List<String> stringList = new ArrayList<String>();
    for (Message<String> message : messages) {
        stringList.add(message.getPayload());
    }
    return stringList.toArray(new String[]{});
}

但是,使用 SpEL,这样的需求实际上可以用一行表达式相对轻松地处理,从而避免编写自定义类并将其配置为bean。下面的例子展示了如何做到这一点

<int:aggregator input-channel="aggChannel"
    output-channel="replyChannel"
    expression="#this.![payload].toArray()"/>

在前面的配置中,我们使用集合投影表达式从列表中所有消息的有效负载组装一个新的集合,然后将其转换为数组,从而达到与之前的 Java 代码相同的结果。

在处理自定义释放和关联策略时,您可以应用相同基于表达式的方案。

您可以不用在correlation-strategy属性中定义自定义CorrelationStrategy的bean,而是将简单的关联逻辑实现为 SpEL 表达式,并在correlation-strategy-expression属性中配置它,如下例所示

correlation-strategy-expression="payload.person.id"

在前面的例子中,我们假设有效负载具有一个带有idperson属性,该属性将用于关联消息。

同样,对于ReleaseStrategy,您可以将释放逻辑实现为 SpEL 表达式,并在release-strategy-expression属性中配置它。评估上下文的根对象是MessageGroup本身。消息的List可以通过在表达式中使用组的message属性来引用。

在 5.0 版本之前的版本中,根对象是Message<?>的集合,如前面的示例所示
release-strategy-expression="!messages.?[payload==5].empty"

在前面的示例中,SpEL 评估上下文的根对象是MessageGroup本身,并且您声明一旦该组中有一条有效负载为5的消息,就应该释放该组。

聚合器和组超时

从 4.0 版本开始,引入了两个新的互斥属性:group-timeoutgroup-timeout-expression。参见使用 XML 配置聚合器。在某些情况下,如果ReleaseStrategy在当前消息到达时没有释放,您可能需要在超时后发出聚合器结果(或丢弃该组)。为此,groupTimeout选项允许强制调度MessageGroup完成,如下例所示

<aggregator input-channel="input" output-channel="output"
        send-partial-result-on-expiry="true"
        group-timeout-expression="size() ge 2 ? 10000 : -1"
        release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>

通过此示例,如果聚合器按release-strategy-expression定义的顺序接收最后一条消息,则可以进行正常释放。如果该特定消息未到达,则groupTimeout会在十秒钟后强制组完成,只要该组至少包含两条消息。

强制组完成的结果取决于ReleaseStrategysend-partial-result-on-expiry。首先,再次咨询释放策略以查看是否要进行正常释放。虽然组没有改变,但ReleaseStrategy可以决定此时释放组。如果释放策略仍然不释放组,则该组已过期。如果send-partial-result-on-expirytrue,则将(部分)MessageGroup中的现有消息作为正常的聚合器回复消息释放到output-channel。否则,将被丢弃。

groupTimeout行为和MessageGroupStoreReaper之间存在差异(参见使用 XML 配置聚合器)。reaper 定期启动对MessageGroupStore中所有MessageGroup的强制完成。如果在groupTimeout期间没有到达新消息,则groupTimeout会针对每个MessageGroup单独执行此操作。此外,reaper 可用于删除空组(如果expire-groups-upon-completion为 false,则保留空组以丢弃延迟消息)。

从 5.5 版本开始,groupTimeoutExpression可以计算为java.util.Date实例。这在根据组创建时间 (MessageGroup.getTimestamp()) 确定计划任务时间(而不是像groupTimeoutExpression计算为long时那样根据当前消息到达时间计算)的情况下非常有用。

group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"

使用注解配置聚合器

以下示例显示了使用注解配置的聚合器

public class Waiter {
  ...

  @Aggregator  (1)
  public Delivery aggregatingMethod(List<OrderItem> items) {
    ...
  }

  @ReleaseStrategy  (2)
  public boolean releaseChecker(List<Message<?>> messages) {
    ...
  }

  @CorrelationStrategy  (3)
  public String correlateBy(OrderItem item) {
    ...
  }
}
1 指示此方法应作为聚合器使用的注解。如果此类用作聚合器,则必须指定此注解。
2 指示此方法用作聚合器的释放策略的注解。如果任何方法上都没有此注解,则聚合器将使用SimpleSequenceSizeReleaseStrategy
3 指示此方法应作为聚合器的关联策略使用的注解。如果没有指示关联策略,则聚合器将使用基于CORRELATION_IDHeaderAttributeCorrelationStrategy

XML 元素提供的全部配置选项也适用于@Aggregator注解。

聚合器可以从 XML 中显式引用,或者如果在类上定义了@MessageEndpoint,则可以通过类路径扫描自动检测。

聚合器组件的注解配置(@Aggregator等)仅涵盖简单的用例,其中大多数默认选项就足够了。如果您在使用注解配置时需要对这些选项进行更多控制,请考虑对AggregatingMessageHandler使用@Bean定义,并将其@Bean方法标记为@ServiceActivator,如下例所示

@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
     AggregatingMessageHandler aggregator =
                       new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
                                                 jdbcMessageGroupStore);
     aggregator.setOutputChannel(resultsChannel());
     aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
     aggregator.setTaskScheduler(this.taskScheduler);
     return aggregator;
}

有关更多信息,请参见编程模型@Bean方法上的注解

从 4.2 版本开始,AggregatorFactoryBean可用以简化AggregatingMessageHandler的 Java 配置。

管理聚合器中的状态:MessageGroupStore

聚合器(以及 Spring Integration 中的其他一些模式)是一种有状态模式,需要根据一段时间内到达的一组消息(所有消息都具有相同的关联键)做出决策。有状态模式(如ReleaseStrategy)中接口的设计遵循这样的原则:组件(无论是框架定义的还是用户定义的)应该能够保持无状态。所有状态都由MessageGroup携带,其管理委托给MessageGroupStoreMessageGroupStore接口定义如下

public interface MessageGroupStore {

    int getMessageCountForAllMessageGroups();

    int getMarkedMessageCountForAllMessageGroups();

    int getMessageGroupCount();

    MessageGroup getMessageGroup(Object groupId);

    MessageGroup addMessageToGroup(Object groupId, Message<?> message);

    MessageGroup markMessageGroup(MessageGroup group);

    MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);

    MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);

    void removeMessageGroup(Object groupId);

    void registerMessageGroupExpiryCallback(MessageGroupCallback callback);

    int expireMessageGroups(long timeout);
}

有关更多信息,请参见Javadoc

MessageGroupStore在等待触发释放策略时累积MessageGroups中的状态信息,而该事件可能永远不会发生。因此,为了防止陈旧消息持续存在,并为易失性存储提供在应用程序关闭时清理的钩子,MessageGroupStore允许您注册回调以将其应用于其MessageGroups在它们过期时。该接口非常简单,如下所示

public interface MessageGroupCallback {

    void execute(MessageGroupStore messageGroupStore, MessageGroup group);

}

回调可以直接访问存储和消息组,以便它可以管理持久性状态(例如,通过完全从存储中删除组)。

MessageGroupStore维护这些回调的列表,它会按需将其应用于所有时间戳早于作为参数提供的时间的消息(请参见前面描述的registerMessageGroupExpiryCallback(..)expireMessageGroups(..)方法)。

当您打算依赖expireMessageGroups功能时,重要的是不要在不同的聚合器组件中使用相同的MessageGroupStore实例。每个AbstractCorrelatingMessageHandler都会基于forceComplete()回调注册其自己的MessageGroupCallback。这样,每个要过期的组都可能被错误的聚合器完成或丢弃。从 5.0.10 版本开始,AbstractCorrelatingMessageHandler使用UniqueExpiryCallback进行MessageGroupStore中的注册回调。MessageGroupStore反过来检查此类的实例是否存在,如果在回调集中已经存在此类实例,则会记录一条带有相应消息的错误。通过这种方式,框架不允许在不同的聚合器/重新排序器中使用MessageGroupStore实例,以避免前面提到的过期组未由特定关联处理程序创建的副作用。

您可以使用超时值调用expireMessageGroups方法。任何早于当前时间减去此值的旧消息都已过期,并且已应用回调。因此,它是存储的用户定义消息组“过期”的含义。

为方便用户使用,Spring Integration 提供了消息过期的包装器,形式为MessageGroupStoreReaper,如下例所示

<bean id="reaper" class="org...MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="messageStore"/>
    <property name="timeout" value="30000"/>
</bean>

<task:scheduled-tasks scheduler="scheduler">
    <task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>

reaper 是一个Runnable。在前面的示例中,每十秒钟调用一次消息组存储的过期方法。超时本身为 30 秒。

需要注意的是,MessageGroupStoreReaper 的“timeout”属性是一个近似值,会受到任务调度程序速率的影响,因为此属性仅在MessageGroupStoreReaper任务的下一次计划执行时才会被检查。例如,如果超时设置为十分钟,但MessageGroupStoreReaper任务计划每小时运行一次,并且上次执行MessageGroupStoreReaper任务发生在超时前一分钟,则MessageGroup不会在接下来的59分钟内过期。因此,我们建议将速率设置为至少等于或小于超时值。

除了Reaper之外,当应用程序通过AbstractCorrelatingMessageHandler中的生命周期回调关闭时,也会调用过期回调。

AbstractCorrelatingMessageHandler注册它自己的过期回调,这是与聚合器XML配置中的布尔标志send-partial-result-on-expiry的关联。如果该标志设置为true,则当调用过期回调时,可以将组中尚未释放的任何未标记消息发送到输出通道。

由于MessageGroupStoreReaper是从计划任务调用的,并且可能会生成一条消息(取决于sendPartialResultOnExpiry选项)到下游集成流,因此建议使用带有MessagePublishingErrorHandler的自定义TaskScheduler通过errorChannel处理异常,这可能是常规聚合器释放功能所期望的。相同的逻辑适用于也依赖于TaskScheduler的组超时功能。有关更多信息,请参见错误处理

当对不同的相关端点使用共享的MessageStore时,必须配置正确的CorrelationStrategy以确保组ID的唯一性。否则,当一个相关端点释放或使其他端点的消息过期时,可能会发生意外行为。具有相同相关键的消息存储在同一个消息组中。

一些MessageStore实现允许通过对数据进行分区来使用相同的物理资源。例如,JdbcMessageStore具有region属性,MongoDbMessageStore具有collectionName属性。

有关MessageStore接口及其实现的更多信息,请参见消息存储

Flux聚合器

在5.2版本中,引入了FluxAggregatorMessageHandler组件。它基于Project Reactor的Flux.groupBy()Flux.window()运算符。传入的消息被发送到由该组件构造函数中的Flux.create()启动的FluxSink中。如果未提供outputChannel或它不是ReactiveStreamsSubscribableChannel的实例,则对主Flux的订阅将从Lifecycle.start()实现完成。否则,它将被推迟到ReactiveStreamsSubscribableChannel实现完成的订阅。消息通过使用CorrelationStrategy进行组键的Flux.groupBy()进行分组。默认情况下,将查询消息的IntegrationMessageHeaderAccessor.CORRELATION_ID头。

默认情况下,每个关闭的窗口都会作为消息有效负载中的Flux释放以进行生成。此消息包含窗口中第一条消息的所有头信息。输出消息有效负载中的此Flux必须在下游进行订阅和处理。此逻辑可以通过FluxAggregatorMessageHandlersetCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)配置选项进行自定义(或替换)。例如,如果我们希望在最终消息中包含一个有效负载列表,我们可以像这样配置一个Flux.collectList()

fluxAggregatorMessageHandler.setCombineFunction(
                (messageFlux) ->
                        messageFlux
                                .map(Message::getPayload)
                                .collectList()
                                .map(GenericMessage::new));

FluxAggregatorMessageHandler中有一些选项可以选择合适的窗口策略

  • setBoundaryTrigger(Predicate<Message<?>>) - 传播到Flux.windowUntil()运算符。有关更多信息,请参见其JavaDocs。优先于所有其他窗口选项。

  • setWindowSize(int)setWindowSizeFunction(Function<Message<?>, Integer>) - 传播到Flux.window(int)windowTimeout(int, Duration)。默认情况下,窗口大小是从组中的第一条消息及其IntegrationMessageHeaderAccessor.SEQUENCE_SIZE头计算的。

  • setWindowTimespan(Duration) - 根据窗口大小配置,传播到Flux.window(Duration)windowTimeout(int, Duration)

  • setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>) - 一个函数,用于对分组的流应用转换,以进行未包含在公开选项中的任何自定义窗口操作。

由于此组件是MessageHandler实现,因此它可以简单地用作@Bean定义以及@ServiceActivator消息传递注释一起使用。使用Java DSL,可以从.handle() EIP方法使用它。下面的示例演示了我们如何在运行时注册IntegrationFlow以及如何将FluxAggregatorMessageHandler与上游分割器相关联。

IntegrationFlow fluxFlow =
        (flow) -> flow
                .split()
                .channel(MessageChannels.flux())
                .handle(new FluxAggregatorMessageHandler());

IntegrationFlowContext.IntegrationFlowRegistration registration =
        this.integrationFlowContext.registration(fluxFlow)
                .register();

Flux<Message<?>> window =
        registration.getMessagingTemplate()
                .convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);

消息组的条件

从5.5版本开始,AbstractCorrelatingMessageHandler(包括其Java和XML DSL)公开了一个groupConditionSupplier选项,该选项为BiFunction<Message<?>, String, String>实现。此函数用于添加到组中的每条消息,并将结果条件语句存储到组中以供将来考虑。ReleaseStrategy可以查询此条件,而不是迭代组中的所有消息。有关更多信息,请参见GroupConditionProvider JavaDocs和消息组条件

另请参见文件聚合器