延迟器
延迟器是一个简单的端点,它允许消息流按一定间隔进行延迟。当消息被延迟时,原始发送者不会被阻塞。相反,延迟的消息会由 org.springframework.scheduling.TaskScheduler 实例进行调度,在延迟时间过后发送到输出通道。这种方法即使对于相当长的延迟也具有可伸缩性,因为它不会导致大量发送线程被阻塞。相反,在典型情况下,会使用线程池来实际执行消息的释放。本节包含配置延迟器的几个示例。
配置延迟器
<delayer> 元素用于延迟两个消息通道之间的消息流。与其他端点一样,你可以提供 'input-channel' 和 'output-channel' 属性,但延迟器还具有 'default-delay' 和 'expression' 属性(以及 'expression' 元素),它们决定了每条消息应该延迟的毫秒数。以下示例将所有消息延迟三秒。
<int:delayer id="delayer" input-channel="input"
default-delay="3000" output-channel="output"/>
如果你需要为每条消息确定延迟,你还可以使用 'expression' 属性提供 SpEL 表达式,如下所示:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from("input")
.delay(d -> d
.messageGroupId("delayer.messageGroupId")
.defaultDelay(3_000L)
.delayExpression("headers['delay']"))
.channel("output")
.get();
}
@Bean
fun flow() =
integrationFlow("input") {
delay {
messageGroupId("delayer.messageGroupId")
defaultDelay(3000L)
delayExpression("headers['delay']")
}
channel("output")
}
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
DelayHandler handler = new DelayHandler("delayer.messageGroupId");
handler.setDefaultDelay(3_000L);
handler.setDelayExpressionString("headers['delay']");
handler.setOutputChannelName("output");
return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
default-delay="3000" expression="headers['delay']"/>
在前面的示例中,三秒的延迟仅适用于当表达式对给定入站消息计算结果为 null 时。如果你只想将延迟应用于表达式评估结果有效的消息,你可以使用 0(默认值)作为 'default-delay'。对于任何延迟为 0(或更少)的消息,消息会立即在调用线程上发送。
XML 解析器使用 <beanName>.messageGroupId 作为消息组 ID。 |
延迟处理器支持表示毫秒间隔的表达式评估结果(任何 Object,其 toString() 方法产生的值可以解析为 Long)以及表示绝对时间的 java.util.Date 实例。在第一种情况下,毫秒是从当前时间开始计算的,例如,值 5000 会将消息从延迟器接收时延迟至少五秒。对于 Date 实例,消息直到该 Date 对象表示的时间才会被释放。等于非正延迟或过去日期的值不会导致延迟。相反,它会直接在原始发送者的线程上发送到输出通道。如果表达式评估结果不是 Date 且无法解析为 Long,则应用默认延迟(如果有 — 默认值为 0)。 |
表达式评估可能会由于各种原因抛出评估异常,包括无效表达式或其他情况。默认情况下,此类异常会被忽略(尽管在 DEBUG 级别记录),并且延迟器会回退到默认延迟(如果有)。你可以通过设置 ignore-expression-failures 属性来修改此行为。默认情况下,此属性设置为 true,延迟器行为如前所述。但是,如果你不想忽略表达式评估异常并将其抛给延迟器的调用者,请将 ignore-expression-failures 属性设置为 false。 |
|
在前面的示例中,延迟表达式被指定为
因此,如果消息头可能被省略,并且你希望回退到默认延迟,通常更有效(且推荐)使用索引器语法而不是点属性访问器语法,因为检测 null 比捕获异常更快。 |
延迟器委托给 Spring 的 TaskScheduler 抽象实例。延迟器使用的默认调度器是 Spring Integration 在启动时提供的 ThreadPoolTaskScheduler 实例。请参阅 配置任务调度器。如果你想委托给不同的调度器,可以通过延迟器元素的 'scheduler' 属性提供一个引用,如下例所示:
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
scheduler="exampleTaskScheduler"/>
<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
如果配置外部 ThreadPoolTaskScheduler,可以将其属性 waitForTasksToCompleteOnShutdown 设置为 true。这允许在应用程序关闭时,已经处于执行状态(释放消息)的“延迟”任务成功完成。在 Spring Integration 2.2 之前,此属性在 <delayer> 元素上可用,因为 DelayHandler 可以在后台创建自己的调度器。自 2.2 版本起,延迟器需要一个外部调度器实例,并且 waitForTasksToCompleteOnShutdown 已被删除。你应该使用调度器自己的配置。 |
ThreadPoolTaskScheduler 有一个 errorHandler 属性,可以注入 org.springframework.util.ErrorHandler 的某个实现。此处理程序允许处理来自调度任务线程发送延迟消息的 Exception。默认情况下,它使用 org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler,你可以在日志中看到堆栈跟踪。你可能需要考虑使用 org.springframework.integration.channel.MessagePublishingErrorHandler,它将 ErrorMessage 发送到 error-channel,可以是从失败消息的头部获取的通道,也可以是默认的 error-channel。此错误处理在事务回滚(如果存在)之后执行。请参阅 发布失败。 |
延迟器和消息存储
DelayHandler 将延迟消息持久化到所提供的 MessageStore 中的消息组中。('groupId' 基于 <delayer> 元素必需的 'id' 属性。另请参阅 DelayHandler.setMessageGroupId(String)。)延迟消息在调度任务将消息发送到 output-channel 之前立即从 MessageStore 中删除。如果提供的 MessageStore 是持久化的(例如 JdbcMessageStore),它提供了在应用程序关闭时不会丢失消息的能力。应用程序启动后,DelayHandler 从 MessageStore 中的消息组读取消息,并根据消息的原始到达时间(如果延迟是数字)重新调度它们。对于延迟消息头是 Date 的消息,在重新调度时使用该 Date。如果延迟消息在 MessageStore 中停留的时间超过其“延迟”,它将在启动后立即发送。messageGroupId 是必需的,不能依赖可能生成的 DelayHandler bean 名称。这样,在应用程序重新启动后,DelayHandler 可能会获得新的生成的 bean 名称。因此,延迟消息可能会从重新调度中丢失,因为它们的组不再由应用程序管理。
<delayer> 可以用两个互斥的元素中的任意一个进行丰富:<transactional> 和 <advice-chain>。这些 AOP 建议列表应用于代理的内部 DelayHandler.ReleaseMessageHandler,后者负责在延迟后在调度任务的 Thread 上释放消息。例如,当一个下游消息流抛出异常并且 ReleaseMessageHandler 的事务回滚时,可以使用它。在这种情况下,延迟消息保留在持久化的 MessageStore 中。你可以在 <advice-chain> 中使用任何自定义的 org.aopalliance.aop.Advice 实现。<transactional> 元素定义了一个简单的建议链,其中只包含事务建议。以下示例显示了 <delayer> 中的 advice-chain:
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
message-store="jdbcMessageStore">
<int:advice-chain>
<beans:ref bean="customAdviceBean"/>
<tx:advice>
<tx:attributes>
<tx:method name="*" read-only="true"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>
</int:delayer>
DelayHandler 可以作为 JMX MBean 导出,并带有受管操作(getDelayedMessageCount 和 reschedulePersistedMessages),这允许在运行时重新调度延迟的持久化消息 — 例如,如果 TaskScheduler 之前已停止。这些操作可以通过 Control Bus 命令调用,如下例所示:
Message<String> delayerReschedulingMessage =
MessageBuilder.withPayload("'delayer.handler'.reschedulePersistedMessages").build();
controlBusChannel.send(delayerReschedulingMessage);
| 有关消息存储、JMX 和控制总线的更多信息,请参阅 系统管理。 |
从 5.3.7 版本开始,如果在将消息存储到 MessageStore 时存在活动事务,则释放任务会在 TransactionSynchronization.afterCommit() 回调中调度。这是为了防止竞态条件,即调度释放可能在事务提交之前运行,从而导致找不到消息。在这种情况下,消息将在延迟之后或事务提交之后释放,以较晚者为准。
发布失败
从 5.0.8 版本开始,延迟器有两个新属性:
-
maxAttempts(默认 5) -
retryDelay(默认 1 秒)
当消息被释放时,如果下游流失败,释放将在 retryDelay 之后重试。如果达到 maxAttempts,消息将被丢弃(除非释放是事务性的,在这种情况下,消息将保留在存储中,但不再安排释放,直到应用程序重新启动,或者如上所述调用 reschedulePersistedMessages() 方法)。
此外,你可以配置 delayedMessageErrorChannel;当发布失败时,ErrorMessage 会发送到该通道,其中包含异常作为有效负载,并具有 originalMessage 属性。ErrorMessage 包含一个消息头 IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT,其中包含当前尝试次数。
如果错误流消费了错误消息并正常退出,则不采取进一步操作;如果释放是事务性的,事务将提交,并且消息将从存储中删除。如果错误流抛出异常,则将重试发布,最多达到 maxAttempts,如上所述。