延迟器
延迟器是一个简单的端点,它允许消息流延迟一定的时间间隔。当消息被延迟时,原始发送者不会阻塞。相反,延迟的消息会使用 org.springframework.scheduling.TaskScheduler
的实例进行调度,以便在延迟时间过后发送到输出通道。这种方法即使对于相当长的延迟也是可扩展的,因为它不会导致大量发送者线程被阻塞。相反,在典型情况下,线程池用于实际执行释放消息的操作。本节包含几个配置延迟器的示例。
配置延迟器
<delayer>
元素用于延迟两个消息通道之间消息流。与其他端点一样,您可以提供“input-channel”和“output-channel”属性,但延迟器还具有“default-delay”和“expression”属性(以及“expression”元素),它们决定每个消息应该延迟的毫秒数。以下示例将所有消息延迟三秒钟
<int:delayer id="delayer" input-channel="input"
default-delay="3000" output-channel="output"/>
如果您需要确定每个消息的延迟,也可以使用“expression”属性提供 SpEL 表达式,如下面的表达式所示
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from("input")
.delay(d -> d
.messageGroupId("delayer.messageGroupId")
.defaultDelay(3_000L)
.delayExpression("headers['delay']"))
.channel("output")
.get();
}
@Bean
fun flow() =
integrationFlow("input") {
delay {
messageGroupId("delayer.messageGroupId")
defaultDelay(3000L)
delayExpression("headers['delay']")
}
channel("output")
}
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
DelayHandler handler = new DelayHandler("delayer.messageGroupId");
handler.setDefaultDelay(3_000L);
handler.setDelayExpressionString("headers['delay']");
handler.setOutputChannelName("output");
return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
default-delay="3000" expression="headers['delay']"/>
在前面的示例中,三秒钟的延迟仅在表达式对给定传入消息求值为 null 时才适用。如果您只想对表达式求值结果有效的消息应用延迟,可以使用 0
的“default-delay”(默认值)。对于任何延迟为 0
(或更小)的消息,该消息会立即在调用线程上发送。
XML 解析器使用消息组 ID <beanName>.messageGroupId 。
|
延迟处理程序支持表示以毫秒为单位的间隔的表达式求值结果(任何其 toString() 方法产生可解析为 Long 的值的 Object ),以及表示绝对时间的 java.util.Date 实例。在第一种情况下,毫秒是从当前时间开始计算的(例如,值 5000 将使消息延迟至少 5 秒,从它被延迟器接收的时间开始)。对于 Date 实例,消息不会在该 Date 对象所表示的时间之前释放。等于非正延迟或过去日期的值不会导致延迟。相反,它会直接发送到原始发送方线程上的输出通道。如果表达式求值结果不是 Date 且不能解析为 Long ,则应用默认延迟(如果有 - 默认值为 0 )。
|
表达式求值可能会由于各种原因抛出求值异常,包括无效表达式或其他条件。默认情况下,此类异常会被忽略(尽管在 DEBUG 级别记录),延迟器会回退到默认延迟(如果有)。您可以通过设置 ignore-expression-failures 属性来修改此行为。默认情况下,此属性设置为 true ,延迟器行为如前所述。但是,如果您希望不忽略表达式求值异常并将其抛出到延迟器的调用者,请将 ignore-expression-failures 属性设置为 false 。
|
在前面的示例中,延迟表达式指定为
因此,如果存在省略标头的可能性,并且您希望回退到默认延迟,那么使用索引器语法而不是点属性访问器语法通常更有效(也更推荐),因为检测空值比捕获异常更快。 |
延迟器委托给 Spring 的 TaskScheduler
抽象类的实例。延迟器使用的默认调度器是 Spring Integration 在启动时提供的 ThreadPoolTaskScheduler
实例。请参阅 配置任务调度器。如果您希望委托给其他调度器,可以通过延迟器元素的 'scheduler' 属性提供引用,如下例所示
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
scheduler="exampleTaskScheduler"/>
<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
如果您配置了外部 ThreadPoolTaskScheduler ,您可以在此属性上设置 waitForTasksToCompleteOnShutdown = true 。它允许在应用程序关闭时成功完成已处于执行状态的 'delay' 任务(释放消息)。在 Spring Integration 2.2 之前,此属性在 <delayer> 元素上可用,因为 DelayHandler 可以在后台创建自己的调度器。从 2.2 开始,延迟器需要外部调度器实例,并且 waitForTasksToCompleteOnShutdown 已被删除。您应该使用调度器自己的配置。
|
ThreadPoolTaskScheduler 具有一个属性 errorHandler ,它可以注入 org.springframework.util.ErrorHandler 的某些实现。此处理程序允许从计划任务的线程处理发送延迟消息的 Exception 。默认情况下,它使用 org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler ,您可以在日志中看到堆栈跟踪。您可能希望考虑使用 org.springframework.integration.channel.MessagePublishingErrorHandler ,它将 ErrorMessage 发送到 error-channel ,无论是从失败消息的标头还是到默认的 error-channel 。此错误处理在事务回滚(如果存在)后执行。请参阅 释放失败。
|
延迟器和消息存储
DelayHandler
将延迟消息持久化到提供的 MessageStore
中的消息组中。('groupId' 基于 <delayer>
元素所需的 'id' 属性。另请参阅 DelayHandler.setMessageGroupId(String)
。)延迟消息在计划任务将消息发送到 output-channel
之前立即从 MessageStore
中删除。如果提供的 MessageStore
是持久性的(例如 JdbcMessageStore
),它提供了在应用程序关闭时不丢失消息的能力。应用程序启动后,DelayHandler
从 MessageStore
中的消息组中读取消息,并根据消息的原始到达时间重新安排它们(如果延迟是数字的)。对于延迟标头为 Date
的消息,在重新安排时使用该 Date
。如果延迟消息在 MessageStore
中保留的时间超过其 'delay',则它将在启动后立即发送。messageGroupId
是必需的,不能依赖于可以生成的 DelayHandler
bean 名称。这样,在应用程序重启后,DelayHandler
可能会获得一个新的生成的 bean 名称。因此,延迟消息可能会从重新安排中丢失,因为它们组不再由应用程序管理。
<delayer>
可以通过两种互斥的元素进行扩展:<transactional>
和 <advice-chain>
。这些 AOP 建议的 List
应用于代理的内部 DelayHandler.ReleaseMessageHandler
,该处理器负责在延迟后,在计划任务的 Thread
上释放消息。例如,当下游消息流抛出异常并且 ReleaseMessageHandler
的事务回滚时,可以使用它。在这种情况下,延迟的消息将保留在持久化的 MessageStore
中。您可以在 <advice-chain>
中使用任何自定义的 org.aopalliance.aop.Advice
实现。<transactional>
元素定义了一个简单的建议链,它只包含事务建议。以下示例展示了 <delayer>
中的 advice-chain
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
message-store="jdbcMessageStore">
<int:advice-chain>
<beans:ref bean="customAdviceBean"/>
<tx:advice>
<tx:attributes>
<tx:method name="*" read-only="true"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>
</int:delayer>
DelayHandler
可以作为 JMX MBean
导出,并带有可管理的操作(getDelayedMessageCount
和 reschedulePersistedMessages
),这允许在运行时重新调度延迟的持久化消息——例如,如果 TaskScheduler
之前已停止。这些操作可以通过 Control Bus
命令调用,如下面的示例所示
Message<String> delayerReschedulingMessage =
MessageBuilder.withPayload("@'delayer.handler'.reschedulePersistedMessages()").build();
controlBusChannel.send(delayerReschedulingMessage);
有关消息存储、JMX 和控制总线的更多信息,请参见 系统管理。 |
从 5.3.7 版本开始,如果在将消息存储到 MessageStore
时事务处于活动状态,则会在 TransactionSynchronization.afterCommit()
回调中调度释放任务。这是为了防止竞争条件,即计划的释放可能在事务提交之前运行,而消息未找到。在这种情况下,消息将在延迟后或事务提交后释放,以较晚者为准。
释放失败
从 5.0.8 版本开始,delayer 上有两个新的属性
-
maxAttempts
(默认值为 5) -
retryDelay
(默认值为 1 秒)
当释放消息时,如果下游流失败,则将在 retryDelay
后尝试释放。如果达到 maxAttempts
,则会丢弃消息(除非释放是事务性的,在这种情况下,消息将保留在存储中,但不会再调度释放,直到应用程序重新启动或调用 reschedulePersistedMessages()
方法,如上所述)。
此外,您可以配置一个 delayedMessageErrorChannel
;当释放失败时,一个 ErrorMessage
会被发送到该通道,异常作为有效负载,并且具有 originalMessage
属性。ErrorMessage
包含一个标题 IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT
,其中包含当前计数。
如果错误流消耗了错误消息并正常退出,则不会采取任何进一步的操作;如果发布是事务性的,则事务将提交,并且消息将从存储中删除。如果错误流抛出异常,则发布将根据上面讨论的maxAttempts
进行重试。