重排序器
重序器与聚合器相关,但服务于不同的目的。聚合器组合消息,而重序器则不改变消息地传递它们。
功能
重序器的工作方式与聚合器类似,因为它使用 CORRELATION_ID 将消息分组存储。不同之处在于,重序器不以任何方式处理消息。相反,它按照它们的 SEQUENCE_NUMBER 头部值释放它们。
关于这一点,你可以选择一次性释放所有消息(在整个序列之后,根据 SEQUENCE_SIZE 和其他可能性)或在有效序列可用时立即释放。(我们将在本章后面讨论“有效序列”的含义。)
| 重序器旨在重新排序具有小间隙的相对较短的消息序列。如果你有大量具有许多间隙的不连续序列,你可能会遇到性能问题。 |
配置重序器
有关在 Java DSL 中配置重序器,请参阅聚合器和重序器。
配置重序器只需要在 XML 中包含相应的元素即可。
以下示例显示了重序器配置
<int:channel id="inputChannel"/>
<int:channel id="outputChannel"/>
<int:resequencer id="completelyDefinedResequencer" (1)
input-channel="inputChannel" (2)
output-channel="outputChannel" (3)
discard-channel="discardChannel" (4)
release-partial-sequences="true" (5)
message-store="messageStore" (6)
send-partial-result-on-expiry="true" (7)
send-timeout="86420000" (8)
correlation-strategy="correlationStrategyBean" (9)
correlation-strategy-method="correlate" (10)
correlation-strategy-expression="headers['something']" (11)
release-strategy="releaseStrategyBean" (12)
release-strategy-method="release" (13)
release-strategy-expression="size() == 10" (14)
empty-group-min-timeout="60000" (15)
lock-registry="lockRegistry" (16)
group-timeout="60000" (17)
group-timeout-expression="size() ge 2 ? 100 : -1" (18)
scheduler="taskScheduler" /> (19)
expire-group-upon-timeout="false" /> (20)
| 1 | 重序器的 ID 是可选的。 |
| 2 | 重序器的输入通道。必需。 |
| 3 | 重序器发送重新排序消息的通道。可选。 |
| 4 | 重序器发送超时消息的通道(如果 send-partial-result-on-timeout 设置为 false)。可选。 |
| 5 | 是否在有序序列可用时立即发送,还是仅在整个消息组到达后才发送。可选。(默认值为 false。) |
| 6 | 对 MessageGroupStore 的引用,可用于在其关联键下存储消息组直到它们完整。可选。(默认是易失性内存存储。) |
| 7 | 当组过期时,是否应发送有序组(即使缺少一些消息)。可选。(默认值为 false。)请参阅管理聚合器中的状态:MessageGroupStore。 |
| 8 | 发送回复 Message 到 output-channel 或 discard-channel 时等待的超时间隔。仅当输出通道具有某些“发送”限制(例如具有固定“容量”的 QueueChannel)时才适用。在这种情况下,会抛出 MessageDeliveryException。对于 AbstractSubscribableChannel 实现,send-timeout 被忽略。对于 group-timeout(-expression),来自计划过期任务的 MessageDeliveryException 导致此任务被重新调度。可选。 |
| 9 | 对实现消息关联(分组)算法的 bean 的引用。该 bean 可以是 CorrelationStrategy 接口的实现或 POJO。在后一种情况下,还必须定义 correlation-strategy-method 属性。可选。(默认情况下,聚合器使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 头部。) |
| 10 | 在 correlation-strategy 引用的 bean 上定义并实现关联决策算法的方法。可选,有限制(需要存在 correlation-strategy)。 |
| 11 | 表示关联策略的 SpEL 表达式。示例:"headers['something']"。只允许使用 correlation-strategy 或 correlation-strategy-expression 中的一个。 |
| 12 | 对实现释放策略的 bean 的引用。该 bean 可以是 ReleaseStrategy 接口的实现或 POJO。在后一种情况下,还必须定义 release-strategy-method 属性。可选(默认情况下,聚合器将使用 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 头部属性)。 |
| 13 | 在 release-strategy 引用的 bean 上定义并实现完成决策算法的方法。可选,有限制(需要存在 release-strategy)。 |
| 14 | 表示释放策略的 SpEL 表达式。表达式的根对象是 MessageGroup。示例:"size() == 5"。只允许使用 release-strategy 或 release-strategy-expression 中的一个。 |
| 15 | 仅当为 <resequencer> MessageStore 配置了 MessageGroupStoreReaper 时才适用。默认情况下,当 MessageGroupStoreReaper 配置为使部分组过期时,空组也会被移除。在组正常释放后存在空组。这是为了启用迟到消息的检测和丢弃。如果你希望空组的过期时间比部分组长,请设置此属性。然后,空组不会从 MessageStore 中移除,直到它们至少在此毫秒数内未被修改。请注意,空组的实际过期时间也受收割者的超时属性影响,它可能最多是此值加上超时。 |
| 16 | 请参阅使用 XML 配置聚合器。 |
| 17 | 请参阅使用 XML 配置聚合器。 |
| 18 | 请参阅使用 XML 配置聚合器。 |
| 19 | 请参阅使用 XML 配置聚合器。 |
| 20 | 默认情况下,当组因超时(或由 MessageGroupStoreReaper)而完成时,空组的元数据会被保留。迟到的消息会立即被丢弃。将其设置为 true 可完全移除组。然后,迟到的消息将开始一个新的组,并且在组再次超时之前不会被丢弃。新组永远不会正常释放,因为序列范围中的“空洞”导致了超时。空组以后可以使用 MessageGroupStoreReaper 和 empty-group-min-timeout 属性过期(完全移除)。从 5.0 版本开始,空组也会在 empty-group-min-timeout 过去后计划移除。默认值为“false”。 |
另请参阅聚合器过期组以获取更多信息。
| 由于在 Java 类中没有为重序器实现自定义行为,因此不支持对其进行注解。 |