延迟器

延迟器是一个简单的端点,它允许消息流被延迟一定的时间间隔。当消息被延迟时,原始发送者不会阻塞。相反,延迟的消息将使用org.springframework.scheduling.TaskScheduler的实例进行调度,并在延迟过去后发送到输出通道。即使对于相当长的延迟,这种方法也是可扩展的,因为它不会导致大量阻塞的发送者线程。相反,在典型情况下,线程池用于实际执行释放消息的操作。本节包含几个配置延迟器的示例。

配置延迟器

<delayer>元素用于延迟两个消息通道之间的消息流。与其他端点一样,您可以提供“input-channel”和“output-channel”属性,但延迟器还具有“default-delay”和“expression”属性(以及“expression”元素),它们确定每条消息应延迟的毫秒数。以下示例将所有消息延迟三秒。

<int:delayer id="delayer" input-channel="input"
             default-delay="3000" output-channel="output"/>

如果您需要确定每条消息的延迟,也可以使用“expression”属性提供SpEL表达式,如下面的表达式所示。

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from("input")
            .delay(d -> d
                    .messageGroupId("delayer.messageGroupId")
                    .defaultDelay(3_000L)
                    .delayExpression("headers['delay']"))
            .channel("output")
            .get();
}
@Bean
fun flow() =
    integrationFlow("input") {
        delay {
            messageGroupId("delayer.messageGroupId")
            defaultDelay(3000L)
            delayExpression("headers['delay']")
        }
        channel("output")
    }
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
    DelayHandler handler = new DelayHandler("delayer.messageGroupId");
    handler.setDefaultDelay(3_000L);
    handler.setDelayExpressionString("headers['delay']");
    handler.setOutputChannelName("output");
    return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
             default-delay="3000" expression="headers['delay']"/>

在前面的示例中,只有当表达式的计算结果对于给定的入站消息为null时,三秒的延迟才适用。如果您只想对表达式计算结果有效的邮件应用延迟,则可以使用0的“default-delay”(默认值)。对于任何延迟为0(或更少)的消息,都会立即在调用线程上发送消息。

XML解析器使用消息组ID <beanName>.messageGroupId
延迟处理程序支持表示毫秒间隔的表达式计算结果(任何其toString()方法产生可以解析为Long的值的Object),以及表示绝对时间的java.util.Date实例。在第一种情况下,毫秒是从当前时间开始计算的(例如,值5000将至少延迟消息五秒钟,从其被延迟器接收的时间算起)。使用Date实例时,只有在该Date对象表示的时间之后才会释放消息。等于非正延迟或过去日期的值不会导致延迟。相反,它将直接发送到原始发送者的线程上的输出通道。如果表达式计算结果不是Date并且不能解析为Long,则应用默认延迟(如果有——默认为0)。
表达式计算可能由于各种原因引发计算异常,包括无效表达式或其他条件。默认情况下,这些异常会被忽略(尽管在DEBUG级别记录),并且延迟器将回退到默认延迟(如果有)。您可以通过设置ignore-expression-failures属性来修改此行为。默认情况下,此属性设置为true,延迟器行为如前所述。但是,如果您希望不忽略表达式计算异常并将它们抛给延迟器的调用者,请将ignore-expression-failures属性设置为false

在前面的示例中,延迟表达式指定为headers['delay']。这是SpEL Indexer语法,用于访问Map元素(MessageHeaders实现Map)。它调用:headers.get("delay")。对于简单的映射元素名称(不包含“.”),您还可以使用SpEL“点访问器”语法,其中前面显示的标头表达式可以指定为headers.delay。但是,如果标头缺失,则会获得不同的结果。在第一种情况下,表达式的计算结果为null。第二种结果类似于以下内容:

 org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 8):
		   Field or property 'delay' cannot be found on object of type 'org.springframework.messaging.MessageHeaders'

因此,如果标头可能被省略并且您想回退到默认延迟,则通常使用索引器语法而不是点属性访问器语法更有效(也推荐),因为检测null比捕获异常更快。

延迟器委托给Spring的TaskScheduler抽象的实例。延迟器使用的默认调度程序是Spring Integration在启动时提供的ThreadPoolTaskScheduler实例。请参阅配置任务调度程序。如果您想委托给其他调度程序,可以通过延迟器元素的“scheduler”属性提供引用,如下例所示。

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    scheduler="exampleTaskScheduler"/>

<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
如果配置外部ThreadPoolTaskScheduler,则可以在此属性上设置waitForTasksToCompleteOnShutdown = true。当应用程序关闭时,它允许成功完成已经在执行状态(释放消息)的“延迟”任务。在Spring Integration 2.2之前,此属性在<delayer>元素上可用,因为DelayHandler可以在后台创建自己的调度程序。从2.2版本开始,延迟器需要外部调度程序实例,并且删除了waitForTasksToCompleteOnShutdown。您应该使用调度程序自己的配置。
ThreadPoolTaskScheduler具有属性errorHandler,可以使用org.springframework.util.ErrorHandler的一些实现注入它。此处理程序允许处理来自调度任务线程的Exception,该任务发送延迟的消息。默认情况下,它使用org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler,您可以在日志中看到堆栈跟踪。您可能需要考虑使用org.springframework.integration.channel.MessagePublishingErrorHandler,它将ErrorMessage发送到error-channel,无论是来自失败消息的标头还是到默认的error-channel。此错误处理在事务回滚(如果存在)后执行。请参阅释放失败

延迟器和消息存储

DelayHandler将延迟的消息持久保存到提供的MessageStore中的消息组中。('groupId'基于<delayer>元素的必需'id'属性。另请参见DelayHandler.setMessageGroupId(String)。)延迟消息在调度任务立即在DelayHandler将消息发送到output-channel之前从MessageStore中删除。如果提供的MessageStore是持久性的(例如JdbcMessageStore),则它提供了在应用程序关闭时不丢失消息的能力。应用程序启动后,DelayHandlerMessageStore中的消息组读取消息,并根据消息的原始到达时间重新调度它们(如果延迟是数字)。对于延迟标头为Date的消息,在重新调度时使用该Date。如果延迟的消息在MessageStore中保留的时间超过其“延迟”,则在启动后立即发送。messageGroupId是必需的,不能依赖于可以生成的DelayHandler bean名称。这样,在应用程序重新启动后,DelayHandler可能会获得一个新的生成的bean名称。因此,延迟的消息可能会丢失重新调度,因为它们的组不再由应用程序管理。

<delayer>可以添加两个相互排斥的元素之一:<transactional><advice-chain>。这些AOP建议的List应用于代理的内部DelayHandler.ReleaseMessageHandler,它负责在延迟后,在调度任务的Thread上释放消息。例如,当下游消息流抛出异常并且ReleaseMessageHandler的事务回滚时,它可能会被使用。在这种情况下,延迟的消息将保留在持久性MessageStore中。您可以在<advice-chain>中使用任何自定义org.aopalliance.aop.Advice实现。<transactional>元素定义一个简单的advice链,它只有事务性advice。以下示例显示了<delayer>中的advice-chain

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    message-store="jdbcMessageStore">
    <int:advice-chain>
        <beans:ref bean="customAdviceBean"/>
        <tx:advice>
            <tx:attributes>
                <tx:method name="*" read-only="true"/>
            </tx:attributes>
        </tx:advice>
    </int:advice-chain>
</int:delayer>

DelayHandler可以导出为具有托管操作(getDelayedMessageCountreschedulePersistedMessages)的JMX MBean,它允许在运行时重新调度延迟的持久消息——例如,如果TaskScheduler之前已停止。这些操作可以通过Control Bus命令调用,如下例所示。

Message<String> delayerReschedulingMessage =
    MessageBuilder.withPayload("@'delayer.handler'.reschedulePersistedMessages()").build();
controlBusChannel.send(delayerReschedulingMessage);
有关消息存储、JMX和控制总线的更多信息,请参阅系统管理

从5.3.7版本开始,如果在将消息存储到MessageStore时活动事务,则将在TransactionSynchronization.afterCommit()回调中调度释放任务。这是为了防止竞争条件,其中计划的释放可能在事务提交之前运行,并且找不到消息。在这种情况下,消息将在延迟后或事务提交后释放,以较晚者为准。

释放失败

从5.0.8版本开始,延迟器上新增了两个属性。

  • maxAttempts(默认为5)

  • retryDelay(默认为1秒)

释放消息时,如果下游流失败,则将在retryDelay后尝试释放。如果达到maxAttempts,则消息将被丢弃(除非释放是事务性的,在这种情况下,消息将保留在存储中,但将不再计划释放,直到应用程序重新启动或调用reschedulePersistedMessages()方法,如上所述)。

此外,您可以配置delayedMessageErrorChannel;当释放失败时,ErrorMessage将发送到该通道,异常作为有效负载,并具有originalMessage属性。ErrorMessage包含一个标头IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT,其中包含当前计数。

如果错误流消耗错误消息并正常退出,则不会采取任何进一步的操作;如果释放是事务性的,则事务将提交并将消息从存储中删除。如果错误流抛出异常,则将根据上述内容最多重试maxAttempts次释放。