延迟器

延迟器 (delayer) 是一种简单的端点,允许消息流延迟一定的间隔。当消息被延迟时,原始发送者不会阻塞。相反,延迟的消息会使用 org.springframework.scheduling.TaskScheduler 实例进行调度,以便在延迟时间过后发送到输出通道。这种方法即使对于相当长的延迟也具有可伸缩性,因为它不会导致大量发送者线程被阻塞。相反,在典型情况下,会使用线程池来实际执行消息释放。本节包含配置延迟器的一些示例。

配置延迟器

<delayer> 元素用于延迟两个消息通道之间的消息流。与其他端点一样,你可以提供 'input-channel' 和 'output-channel' 属性,但延迟器还有 'default-delay' 和 'expression' 属性(以及 'expression' 元素),用于确定每条消息应延迟的毫秒数。以下示例将所有消息延迟三秒

<int:delayer id="delayer" input-channel="input"
             default-delay="3000" output-channel="output"/>

如果你需要为每条消息确定延迟时间,还可以使用 'expression' 属性提供 SpEL 表达式,如下面的表达式所示

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from("input")
            .delay(d -> d
                    .messageGroupId("delayer.messageGroupId")
                    .defaultDelay(3_000L)
                    .delayExpression("headers['delay']"))
            .channel("output")
            .get();
}
@Bean
fun flow() =
    integrationFlow("input") {
        delay {
            messageGroupId("delayer.messageGroupId")
            defaultDelay(3000L)
            delayExpression("headers['delay']")
        }
        channel("output")
    }
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
    DelayHandler handler = new DelayHandler("delayer.messageGroupId");
    handler.setDefaultDelay(3_000L);
    handler.setDelayExpressionString("headers['delay']");
    handler.setOutputChannelName("output");
    return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
             default-delay="3000" expression="headers['delay']"/>

在前面的示例中,只有当给定入站消息的表达式评估结果为 null 时,才会应用三秒的延迟。如果你只想对表达式评估结果有效的消息应用延迟,可以使用 0 作为 'default-delay'(这是默认值)。对于任何延迟为 0(或更少)的消息,消息会立即在调用线程上发送。

XML 解析器使用消息组 ID <beanName>.messageGroupId
延迟处理器支持表示毫秒间隔的表达式评估结果(任何其 toString() 方法生成可解析为 Long 值)以及表示绝对时间的 java.util.Date 实例。在第一种情况下,毫秒数从当前时间开始计算(例如,值 5000 会将消息从延迟器接收到时起延迟至少五秒)。使用 Date 实例时,消息直到该 Date 对象表示的时间才会释放。一个非正延迟或过去的 Date 值不会导致延迟。相反,它会直接在原始发送者的线程上发送到输出通道。如果表达式评估结果不是 Date 且无法解析为 Long,则应用默认延迟(如果有,默认值为 0)。
表达式评估可能因各种原因抛出评估异常,包括无效表达式或其他条件。默认情况下,此类异常会被忽略(尽管会在 DEBUG 级别记录日志),并且延迟器会回退到默认延迟(如果有)。你可以通过设置 ignore-expression-failures 属性来修改此行为。默认情况下,此属性设置为 true,并且延迟器行为如前所述。但是,如果你不想忽略表达式评估异常并将它们抛给延迟器的调用者,请将 ignore-expression-failures 属性设置为 false

在前面的示例中,延迟表达式指定为 headers['delay']。这是 SpEL Indexer 语法,用于访问 Map 元素(MessageHeaders 实现了 Map)。它调用:headers.get("delay")。对于简单的 map 元素名称(不包含 '.'),你也可以使用 SpEL 的“点访问器”语法,如前面所示的 header 表达式可以指定为 headers.delay。但是,如果 header 丢失,则会得到不同的结果。

 org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 8):
		   Field or property 'delay' cannot be found on object of type 'org.springframework.messaging.MessageHeaders'

在第一种情况下,表达式评估为 null。第二种情况会导致类似如下的结果

因此,如果 header 有可能被省略并且你想回退到默认延迟,通常使用索引器语法而不是点属性访问器语法更有效率(并且推荐),因为检测 null 比捕获异常更快。

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    scheduler="exampleTaskScheduler"/>

<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
延迟器委托给 Spring 的 TaskScheduler 抽象的实例。延迟器使用的默认调度器是 Spring Integration 在启动时提供的 ThreadPoolTaskScheduler 实例。参见 配置 Task Scheduler。如果你想委托给不同的调度器,可以通过延迟器元素的 'scheduler' 属性提供引用,如下面的示例所示
如果你配置了外部的 ThreadPoolTaskScheduler,可以在此属性上设置 waitForTasksToCompleteOnShutdown = true。这允许在应用程序关闭时,已经处于执行状态(正在释放消息)的“延迟”任务成功完成。在 Spring Integration 2.2 之前,此属性可以在 <delayer> 元素上使用,因为 DelayHandler 可以在后台创建自己的调度器。从 2.2 开始,延迟器需要一个外部调度器实例,并且 waitForTasksToCompleteOnShutdown 被删除。你应该使用调度器本身的配置。

ThreadPoolTaskScheduler 有一个属性 errorHandler,可以注入 org.springframework.util.ErrorHandler 的实现。此处理器允许处理从发送延迟消息的调度任务线程中抛出的 Exception。默认情况下,它使用 org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler,你可以在日志中看到堆栈跟踪。你可能希望考虑使用 org.springframework.integration.channel.MessagePublishingErrorHandler,它会将 ErrorMessage 发送到 error-channel,可以从失败消息的 header 中获取,或者发送到默认的 error-channel。此错误处理在事务回滚后执行(如果存在事务)。参见 释放失败

延迟器与消息存储

DelayHandler 将延迟的消息持久化到提供的 MessageStore 中的消息组中。('groupId' 基于 <delayer> 元素所需的 'id' 属性。另请参阅 DelayHandler.setMessageGroupId(String)。)延迟的消息在 DelayHandler 将消息发送到 output-channel 之前,立即被调度任务从 MessageStore 中移除。如果提供的 MessageStore 是持久的(例如 JdbcMessageStore),它提供了在应用程序关闭时不会丢失消息的能力。应用程序启动后,DelayHandler 从其在 MessageStore 中的消息组读取消息,并根据消息的原始到达时间(如果延迟是数字)重新调度它们。对于延迟 header 是 Date 的消息,在重新调度时会使用该 Date。如果延迟消息在 MessageStore 中停留时间超过其 'delay',它将在启动后立即发送。messageGroupId 是必需的,不能依赖于可能生成的 DelayHandler bean 名称。这样,应用程序重启后,DelayHandler 可能会获得一个新的生成的 bean 名称。因此,延迟的消息可能会因为它们的组不再由应用程序管理而丢失重新调度。

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    message-store="jdbcMessageStore">
    <int:advice-chain>
        <beans:ref bean="customAdviceBean"/>
        <tx:advice>
            <tx:attributes>
                <tx:method name="*" read-only="true"/>
            </tx:attributes>
        </tx:advice>
    </int:advice-chain>
</int:delayer>

<delayer> 可以使用两个互斥的元素之一进行增强:<transactional><advice-chain>。这些 AOP advice 的 List 应用于代理的内部 DelayHandler.ReleaseMessageHandler,它负责在延迟后,在调度任务的 Thread 上释放消息。例如,当后继消息流抛出异常且 ReleaseMessageHandler 的事务回滚时,可以使用它。在这种情况下,延迟的消息会保留在持久化的 MessageStore 中。你可以在 <advice-chain> 中使用任何自定义的 org.aopalliance.aop.Advice 实现。<transactional> 元素定义了一个简单的 advice 链,它只包含事务 advice。以下示例展示了 <delayer> 中的 advice-chain

Message<String> delayerReschedulingMessage =
    MessageBuilder.withPayload("'delayer.handler'.reschedulePersistedMessages").build();
controlBusChannel.send(delayerReschedulingMessage);
DelayHandler 可以作为 JMX MBean 导出,包含管理操作(getDelayedMessageCountreschedulePersistedMessages),这允许在运行时重新调度延迟的持久化消息——例如,如果 TaskScheduler 之前已经停止。这些操作可以通过 Control Bus 命令调用,如下面的示例所示

有关消息存储、JMX 和控制总线的更多信息,请参阅 系统管理

从版本 5.3.7 开始,如果在将消息存储到 MessageStore 中时事务处于活动状态,则释放任务将在 TransactionSynchronization.afterCommit() 回调中调度。这是必要的,以防止竞态条件,即调度释放可能在事务提交之前运行,从而找不到消息。在这种情况下,消息将在延迟后或事务提交后释放,以较晚者为准。

释放失败

  • 从版本 5.0.8 开始,延迟器新增了两个属性

  • maxAttempts(默认值 5)

retryDelay(默认值 1 秒)

释放消息时,如果后继流失败,将在 retryDelay 后重试释放。如果达到 maxAttempts,消息将被丢弃(除非释放是事务性的,在这种情况下消息将保留在存储中,但不再计划释放,直到应用程序重新启动或调用 reschedulePersistedMessages() 方法,如上所述)。

此外,你可以配置一个 delayedMessageErrorChannel;当释放失败时,会将一个 ErrorMessage 发送到该通道,异常作为载荷,并包含 originalMessage 属性。ErrorMessage 包含一个 header IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT,其中包含当前计数。