重新排序器
重排序器与聚合器相关,但服务于不同的目的。聚合器组合消息,而重排序器在不更改消息的情况下传递消息。
功能
重排序器的工作方式与聚合器类似,它使用CORRELATION_ID
将消息存储在组中。不同之处在于重排序器不会以任何方式处理消息。相反,它会按照消息的SEQUENCE_NUMBER
头值顺序释放它们。
关于这一点,您可以选择一次性释放所有消息(在整个序列之后,根据SEQUENCE_SIZE
和其他可能性),或者在有效序列可用时立即释放。(我们将在本章后面介绍“有效序列”的含义。)
重排序器旨在对具有较小间隙的相对较短的消息序列进行重新排序。如果您有大量具有许多间隙的不相交序列,您可能会遇到性能问题。 |
配置重排序器
有关在 Java DSL 中配置重排序器的信息,请参见聚合器和重排序器。
配置重排序器只需要在 XML 中包含相应的元素。
以下示例显示了重排序器配置
<int:channel id="inputChannel"/>
<int:channel id="outputChannel"/>
<int:resequencer id="completelyDefinedResequencer" (1)
input-channel="inputChannel" (2)
output-channel="outputChannel" (3)
discard-channel="discardChannel" (4)
release-partial-sequences="true" (5)
message-store="messageStore" (6)
send-partial-result-on-expiry="true" (7)
send-timeout="86420000" (8)
correlation-strategy="correlationStrategyBean" (9)
correlation-strategy-method="correlate" (10)
correlation-strategy-expression="headers['something']" (11)
release-strategy="releaseStrategyBean" (12)
release-strategy-method="release" (13)
release-strategy-expression="size() == 10" (14)
empty-group-min-timeout="60000" (15)
lock-registry="lockRegistry" (16)
group-timeout="60000" (17)
group-timeout-expression="size() ge 2 ? 100 : -1" (18)
scheduler="taskScheduler" /> (19)
expire-group-upon-timeout="false" /> (20)
1 | 重排序器的 ID 是可选的。 |
2 | 重排序器的输入通道。必需。 |
3 | 重排序器将重新排序的消息发送到的通道。可选。 |
4 | 重排序器将超时消息发送到的通道(如果send-partial-result-on-timeout 设置为false )。可选。 |
5 | 是否在可用时立即发送排序后的序列,还是只在整个消息组到达后发送。可选。(默认值为false 。) |
6 | 对MessageGroupStore 的引用,该存储可以用于在消息组完成之前将其存储在它们的关联键下。可选。(默认情况下是易失性内存存储。) |
7 | 在组过期时,是否应该发送排序后的组(即使缺少一些消息)。可选。(默认值为 false。)请参见在聚合器中管理状态:MessageGroupStore 。 |
8 | 发送回复Message 到output-channel 或discard-channel 时等待的超时时间间隔。仅当输出通道存在一些“发送”限制时才应用,例如具有固定“容量”的QueueChannel 。在这种情况下,将抛出MessageDeliveryException 。对于AbstractSubscribableChannel 实现,send-timeout 将被忽略。对于group-timeout(-expression) ,来自计划的过期任务的MessageDeliveryException 会导致此任务重新计划。可选。 |
9 | 对实现消息关联(分组)算法的 Bean 的引用。该 Bean 可以是CorrelationStrategy 接口的实现或 POJO。在后一种情况下,还必须定义correlation-strategy-method 属性。可选。(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID 标头。) |
10 | 在correlation-strategy 引用的 Bean 上定义的方法,该方法实现关联决策算法。可选,有限制(需要correlation-strategy 存在)。 |
11 | 表示关联策略的 SpEL 表达式。示例:"headers['something']" 。仅允许correlation-strategy 或correlation-strategy-expression 中的一个。 |
12 | 对实现释放策略的 Bean 的引用。该 Bean 可以是ReleaseStrategy 接口的实现或 POJO。在后一种情况下,还必须定义release-strategy-method 属性。可选(默认情况下,聚合器将使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 标头属性)。 |
13 | 在release-strategy 引用的 Bean 上定义的方法,该方法实现完成决策算法。可选,有限制(需要release-strategy 存在)。 |
14 | 表示释放策略的 SpEL 表达式。表达式的根对象是MessageGroup 。示例:"size() == 5" 。仅允许release-strategy 或release-strategy-expression 中的一个。 |
15 | 仅当为<resequencer> MessageStore 配置了MessageGroupStoreReaper 时才适用。默认情况下,当配置MessageGroupStoreReaper 以使部分组过期时,空组也会被删除。在组正常释放后,空组存在。这是为了能够检测和丢弃迟到的消息。如果您希望以比过期部分组更长的计划来过期空组,请设置此属性。然后,空组不会从MessageStore 中删除,直到它们至少在该毫秒数内未被修改。请注意,过期空组的实际时间也会受到收割机超时属性的影响,并且可能与该值加上超时一样多。 |
16 | 参见使用 XML 配置聚合器。 |
17 | 参见使用 XML 配置聚合器。 |
18 | 参见使用 XML 配置聚合器。 |
19 | 参见使用 XML 配置聚合器。 |
20 | 默认情况下,当组因超时(或由MessageGroupStoreReaper )完成时,空组的元数据将被保留。 延迟到达的消息将立即被丢弃。 将此设置为true 以完全删除组。 然后,延迟到达的消息将启动一个新组,并且在组再次超时之前不会被丢弃。 新组永远不会因导致超时的序列范围中的“漏洞”而正常释放。 空组可以使用MessageGroupStoreReaper 以及empty-group-min-timeout 属性稍后过期(完全删除)。 从 5.0 版开始,空组也会在empty-group-min-timeout 过期后被安排删除。 默认值为“false”。 |
另请参阅聚合器过期组以获取更多信息。
由于在 Java 类中没有要为重排序器实现的自定义行为,因此没有对其进行注释支持。 |