重排序器
重排序器与消息聚合器有关,但其目的不同。消息聚合器合并消息,而重排序器直接传递消息而不改变它们。
功能
重排序器与消息聚合器的工作方式类似,它们都使用 CORRELATION_ID
将消息存储在组中。不同之处在于,重排序器不会以任何方式处理消息。相反,它会按照消息头中的 SEQUENCE_NUMBER
值顺序释放它们。
关于这一点,你可以选择一次性释放所有消息(根据 SEQUENCE_SIZE
和其他可能性,在整个序列到达后),或者在有效序列可用时立即释放。(本章稍后会解释“有效序列”的含义。)
重排序器旨在对具有小间隙的相对较短的消息序列进行重新排序。如果存在大量具有许多间隙的不连续序列,可能会出现性能问题。 |
配置重排序器
有关在 Java DSL 中配置重排序器,请参见消息聚合器与重排序器。
配置重排序器只需要在 XML 中包含相应的元素。
以下示例显示了重排序器的配置
<int:channel id="inputChannel"/>
<int:channel id="outputChannel"/>
<int:resequencer id="completelyDefinedResequencer" (1)
input-channel="inputChannel" (2)
output-channel="outputChannel" (3)
discard-channel="discardChannel" (4)
release-partial-sequences="true" (5)
message-store="messageStore" (6)
send-partial-result-on-expiry="true" (7)
send-timeout="86420000" (8)
correlation-strategy="correlationStrategyBean" (9)
correlation-strategy-method="correlate" (10)
correlation-strategy-expression="headers['something']" (11)
release-strategy="releaseStrategyBean" (12)
release-strategy-method="release" (13)
release-strategy-expression="size() == 10" (14)
empty-group-min-timeout="60000" (15)
lock-registry="lockRegistry" (16)
group-timeout="60000" (17)
group-timeout-expression="size() ge 2 ? 100 : -1" (18)
scheduler="taskScheduler" /> (19)
expire-group-upon-timeout="false" /> (20)
1 | 重排序器的 id。可选。 |
2 | 重排序器的输入通道。必需。 |
3 | 重排序器将重新排序的消息发送到的通道。可选。 |
4 | 重排序器将超时消息发送到的通道(如果 send-partial-result-on-timeout 设置为 false )。可选。 |
5 | 是否在有序序列可用时立即发送,还是仅在整个消息组到达后发送。可选。(默认为 false 。) |
6 | 对 MessageGroupStore 的引用,可用于在消息组完整之前,按其关联键存储消息组。可选。(默认为易失性内存存储。) |
7 | 当组过期时,是否发送已排序的组(即使缺少某些消息)。可选。(默认为 false。)参见在消息聚合器中管理状态:MessageGroupStore 。 |
8 | 发送回复 Message 到 output-channel 或 discard-channel 时等待的超时间隔。仅当输出通道存在某些“发送”限制时应用,例如具有固定“容量”的 QueueChannel 。在这种情况下,会抛出 MessageDeliveryException 。对于 AbstractSubscribableChannel 实现,send-timeout 被忽略。对于 group-timeout(-expression) ,由调度过期任务产生的 MessageDeliveryException 会导致此任务被重新调度。可选。 |
9 | 对实现消息关联(分组)算法的 bean 的引用。该 bean 可以是 CorrelationStrategy 接口的实现,也可以是一个 POJO。如果是 POJO,则还必须定义 correlation-strategy-method 属性。可选。(默认情况下,消息聚合器使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 头。) |
10 | 在 correlation-strategy 引用的 bean 上定义的方法,该方法实现关联决策算法。可选,但有限制(要求存在 correlation-strategy )。 |
11 | 表示关联策略的 SpEL 表达式。示例:"headers['something']" 。correlation-strategy 或 correlation-strategy-expression 只能二选一。 |
12 | 对实现释放策略的 bean 的引用。该 bean 可以是 ReleaseStrategy 接口的实现,也可以是一个 POJO。如果是 POJO,则还必须定义 release-strategy-method 属性。可选(默认情况下,消息聚合器将使用 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 头属性)。 |
13 | 在 release-strategy 引用的 bean 上定义的方法,该方法实现完成决策算法。可选,但有限制(要求存在 release-strategy )。 |
14 | 表示释放策略的 SpEL 表达式。表达式的根对象是一个 MessageGroup 。示例:"size() == 5" 。release-strategy 或 release-strategy-expression 只能二选一。 |
15 | 仅当为 <resequencer> 的 MessageStore 配置了 MessageGroupStoreReaper 时才适用。默认情况下,当配置 MessageGroupStoreReaper 以使部分组过期时,空组也会被移除。空组在组正常释放后存在。这是为了启用对延迟到达消息的检测和丢弃。如果希望以比部分组过期更长的周期使空组过期,请设置此属性。然后,空组直到至少在此毫秒数内未被修改后才会从 MessageStore 中移除。请注意,空组的实际过期时间也会受到清理器超时属性的影响,最多可能等于此值加上超时时间。 |
16 | 参见使用 XML 配置消息聚合器。 |
17 | 参见使用 XML 配置消息聚合器。 |
18 | 参见使用 XML 配置消息聚合器。 |
19 | 参见使用 XML 配置消息聚合器。 |
20 | 默认情况下,当一个组因超时(或由 MessageGroupStoreReaper )完成时,保留空组的元数据。延迟到达的消息会立即被丢弃。将此设置为 true 可以完全移除该组。然后,延迟到达的消息会开始一个新的组,并且直到该组再次超时才会被丢弃。新组永远不会正常释放,因为导致超时的序列范围中存在“空洞”。空组稍后可以使用 MessageGroupStoreReaper 和 empty-group-min-timeout 属性过期(完全移除)。从 5.0 版本开始,空组在 empty-group-min-timeout 时间过后也会被调度移除。默认值为 'false'。 |
另请参见消息聚合器组过期以获取更多信息。
由于重排序器在 Java 类中没有需要实现的自定义行为,因此它没有注解支持。 |