重排序器

重新排序器与聚合器相关,但其用途不同。聚合器组合消息,而重新排序器则在不更改消息的情况下传递消息。

功能

重新排序器的工作方式类似于聚合器,因为它使用CORRELATION_ID将消息存储在组中。不同之处在于重新排序器不会以任何方式处理消息。相反,它会按照其SEQUENCE_NUMBER头值的顺序释放它们。

关于这一点,您可以选择一次性释放所有消息(在整个序列之后,根据SEQUENCE_SIZE和其他可能性)或在可用有效序列后立即释放。(我们将在本章后面介绍“有效序列”的含义。)

重新排序器旨在重新排序具有少量间隙的相对较短的消息序列。如果您有大量具有许多间隙的不连续序列,则可能会遇到性能问题。

配置重新排序器

有关在 Java DSL 中配置重新排序器,请参阅聚合器和重新排序器

配置重新排序器只需要在 XML 中包含相应的元素。

以下示例显示了重新排序器配置

<int:channel id="inputChannel"/>

<int:channel id="outputChannel"/>

<int:resequencer id="completelyDefinedResequencer"  (1)
  input-channel="inputChannel"  (2)
  output-channel="outputChannel"  (3)
  discard-channel="discardChannel"  (4)
  release-partial-sequences="true"  (5)
  message-store="messageStore"  (6)
  send-partial-result-on-expiry="true"  (7)
  send-timeout="86420000"  (8)
  correlation-strategy="correlationStrategyBean"  (9)
  correlation-strategy-method="correlate"  (10)
  correlation-strategy-expression="headers['something']"  (11)
  release-strategy="releaseStrategyBean"  (12)
  release-strategy-method="release"  (13)
  release-strategy-expression="size() == 10"  (14)
  empty-group-min-timeout="60000"  (15)

  lock-registry="lockRegistry"  (16)

  group-timeout="60000"  (17)
  group-timeout-expression="size() ge 2 ? 100 : -1"  (18)
  scheduler="taskScheduler" />  (19)
  expire-group-upon-timeout="false" />  (20)
1 重新排序器的 ID 可选。
2 重新排序器的输入通道。必需。
3 重新排序器将重新排序的消息发送到的通道。可选。
4 重新排序器将超时消息发送到的通道(如果send-partial-result-on-timeout设置为false)。可选。
5 是否在可用时立即发送有序序列,或者仅在整个消息组到达后发送。可选。(默认值为false。)
6 MessageGroupStore的引用,该存储可用于在关联键下存储消息组,直到它们完成。可选。(默认情况下,为易失性内存存储。)
7 在组过期时,是否应发送有序组(即使缺少某些消息)。可选。(默认值为 false。)请参阅在聚合器中管理状态:MessageGroupStore
8 将回复Message发送到output-channeldiscard-channel时等待的超时间隔。仅当输出通道存在一些“发送”限制时才应用此间隔,例如具有固定“容量”的QueueChannel。在这种情况下,将抛出MessageDeliveryException。对于AbstractSubscribableChannel实现,将忽略send-timeout。对于group-timeout(-expression),来自计划的过期任务的MessageDeliveryException导致此任务重新计划。可选。
9 对实现消息关联(分组)算法的 Bean 的引用。该 Bean 可以是CorrelationStrategy接口的实现或 POJO。在后一种情况下,还必须定义correlation-strategy-method属性。可选。(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID头。)
10 correlation-strategy引用的 Bean 上定义的方法,并实现关联决策算法。可选,并受限制(需要存在correlation-strategy)。
11 表示关联策略的 SpEL 表达式。例如:"headers['something']"。仅允许使用correlation-strategycorrelation-strategy-expression之一。
12 对实现释放策略的 Bean 的引用。该 Bean 可以是ReleaseStrategy接口的实现或 POJO。在后一种情况下,还必须定义release-strategy-method属性。可选(默认情况下,聚合器将使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZE头属性)。
13 release-strategy引用的 Bean 上定义的方法,并实现完成决策算法。可选,并受限制(需要存在release-strategy)。
14 表示释放策略的 SpEL 表达式。表达式的根对象是MessageGroup。例如:"size() == 5"。仅允许使用release-strategyrelease-strategy-expression之一。
15 仅当为<resequencer>MessageStore配置了MessageGroupStoreReaper时才适用。默认情况下,当配置MessageGroupStoreReaper以使部分组过期时,还会删除空组。在组正常释放后,存在空组。这是为了能够检测和丢弃延迟到达的消息。如果希望以比使部分组过期更长的计划使空组过期,请设置此属性。然后,空组不会从MessageStore中删除,直到它们至少经过此毫秒数未被修改。请注意,空组的实际过期时间还会受到清理程序的超时属性的影响,并且最多可以达到此值加上超时时间。
16 请参阅使用 XML 配置聚合器
17 请参阅使用 XML 配置聚合器
18 请参阅使用 XML 配置聚合器
19 请参阅使用 XML 配置聚合器
20 默认情况下,当组因超时(或由MessageGroupStoreReaper)完成时,将保留空组的元数据。延迟到达的消息将立即被丢弃。将其设置为true以完全删除组。然后,延迟到达的消息将启动一个新组,并且不会被丢弃,直到该组再次超时。由于导致超时的序列范围中的“空洞”,新组永远不会正常释放。可以使用MessageGroupStoreReaper以及empty-group-min-timeout属性稍后使空组过期(完全删除)。从版本 5.0 开始,空组在empty-group-min-timeout过去后也会被计划删除。默认为“false”。

另请参阅聚合器过期组以获取更多信息。

由于在 Java 类中没有针对重新排序器实现的自定义行为,因此没有对其进行注释支持。