轮询器

本节介绍 Spring Integration 中轮询的工作原理。

轮询消费者

当消息端点(通道适配器)连接到通道并实例化时,它们会生成以下实例之一

实际的实现取决于这些端点连接到的通道类型。连接到实现org.springframework.messaging.SubscribableChannel接口的通道的通道适配器会生成一个EventDrivenConsumer实例。另一方面,连接到实现org.springframework.messaging.PollableChannel接口的通道(例如QueueChannel)的通道适配器会生成一个PollingConsumer实例。

轮询消费者允许 Spring Integration 组件主动轮询消息,而不是以事件驱动的模式处理消息。

它们代表许多消息传递场景中的一个关键的横切关注点。在 Spring Integration 中,轮询消费者基于同名的模式,该模式在 Gregor Hohpe 和 Bobby Woolf 合著的书籍《企业集成模式》中进行了描述。您可以在书籍的网站上找到该模式的描述。

有关轮询消费者配置的更多信息,请参见消息端点

可轮询消息源

Spring Integration 提供了轮询消费者模式的第二个变体。当使用入站通道适配器时,这些适配器通常会被SourcePollingChannelAdapter包装。例如,当从远程 FTP 服务器位置检索消息时,FTP 入站通道适配器中描述的适配器配置了一个轮询器以定期检索消息。因此,当组件配置了轮询器时,生成的实例将是以下类型之一

这意味着轮询器用于入站和出站消息传递场景。以下是使用轮询器的一些用例

  • 轮询某些外部系统,例如 FTP 服务器、数据库和 Web 服务

  • 轮询内部(可轮询)消息通道

  • 轮询内部服务(例如重复执行 Java 类上的方法)

可以将 AOP 建议类应用于轮询器,在advice-chain中,例如启动事务的事务建议。从 4.1 版开始,提供了PollSkipAdvice。轮询器使用触发器来确定下次轮询的时间。PollSkipAdvice可用于抑制(跳过)轮询,可能是因为存在一些下游条件会阻止消息被处理。要使用此建议,您必须为其提供一个PollSkipStrategy的实现。从 4.2.5 版开始,提供了SimplePollSkipStrategy。要使用它,您可以将一个实例作为 bean 添加到应用程序上下文,将其注入到PollSkipAdvice中,并将其添加到轮询器的建议链中。要跳过轮询,请调用skipPolls()。要恢复轮询,请调用reset()。4.2 版在此领域增加了更多灵活性。请参见条件轮询器

本章旨在仅提供轮询消费者及其如何融入消息通道概念(请参见消息通道)和通道适配器概念(请参见通道适配器)的高级概述。有关一般消息传递端点和轮询消费者的更多信息,请参见消息端点

延迟确认可轮询消息源

从 5.0.1 版开始,某些模块提供了MessageSource实现,这些实现支持将确认延迟到下游流程完成(或将消息传递给另一个线程)为止。这目前仅限于AmqpMessageSourceKafkaMessageSource

使用这些消息源时,IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK标头(请参见MessageHeaderAccessor API)将添加到消息中。当与可轮询消息源一起使用时,标头的值为AcknowledgmentCallback的实例,如下例所示

@FunctionalInterface
public interface AcknowledgmentCallback {

    void acknowledge(Status status);

    boolean isAcknowledged();

    void noAutoAck();

    default boolean isAutoAck();

    enum Status {

        /**
         * Mark the message as accepted.
         */
        ACCEPT,

        /**
         * Mark the message as rejected.
         */
        REJECT,

        /**
         * Reject the message and requeue so that it will be redelivered.
         */
        REQUEUE

    }

}

并非所有消息源(例如,KafkaMessageSource)都支持REJECT状态。它被视为与ACCEPT相同。

应用程序可以随时确认消息,如下例所示

Message<?> received = source.receive();

...

StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
        .acknowledge(Status.ACCEPT);

如果MessageSource连接到SourcePollingChannelAdapter,则当轮询线程在下游流程完成后返回到适配器时,适配器会检查确认是否已确认,如果未确认,则将其状态设置为ACCEPT(或如果流程引发异常则设置为REJECT)。状态值在AcknowledgmentCallback.Status枚举中定义。

Spring Integration 提供MessageSourcePollingTemplate来执行MessageSource的临时轮询。这也会在MessageHandler回调返回(或引发异常)时负责在AcknowledgmentCallback上设置ACCEPTREJECT。以下示例显示了如何使用MessageSourcePollingTemplate进行轮询

MessageSourcePollingTemplate template =
    new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
    ...
});

在这两种情况下(SourcePollingChannelAdapterMessageSourcePollingTemplate),您可以通过在回调上调用noAutoAck()来禁用自动确认/否定确认。如果您将消息传递给另一个线程并希望稍后确认,则可能需要这样做。并非所有实现都支持此功能(例如,Apache Kafka 不支持,因为必须在同一线程上执行偏移量提交)。

消息源的条件轮询器

本节介绍如何使用条件轮询器。

背景

轮询器上的advice-chain中的Advice对象会建议整个轮询任务(包括消息检索和处理)。这些“around advice”方法无法访问轮询的任何上下文,只能访问轮询本身。这对于诸如使任务具有事务性或由于某些外部条件而跳过轮询之类的需求来说是可以的,如前所述。如果我们希望根据轮询的receive部分的结果采取某些操作,或者如果我们希望根据条件调整轮询器,该怎么办?对于这些情况,Spring Integration 提供了“智能”轮询。

“智能”轮询

5.3 版引入了ReceiveMessageAdvice接口。实现此接口的advice-chain中的任何Advice对象仅应用于receive()操作 - MessageSource.receive()PollableChannel.receive(timeout)。因此,它们只能应用于SourcePollingChannelAdapterPollingConsumer。此类类实现了以下方法

  • beforeReceive(Object source) 此方法在Object.receive()方法之前调用。它允许您检查和重新配置源。返回false会取消此轮询(类似于前面提到的PollSkipAdvice)。

  • Message<?> afterReceive(Message<?> result, Object source) 此方法在receive()方法之后调用。同样,您可以重新配置源或采取任何操作(可能取决于结果,如果源未创建任何消息,则结果可以为null)。您甚至可以返回不同的消息

线程安全

如果Advice修改了源,则不应使用TaskExecutor配置轮询器。如果Advice修改了源,则此类修改不是线程安全的,并且可能导致意外结果,尤其是在高频轮询器的情况下。如果您需要并发处理轮询结果,请考虑使用下游ExecutorChannel,而不是向轮询器添加执行器。

建议链排序

您应该了解初始化期间建议链是如何处理的。不实现ReceiveMessageAdviceAdvice对象应用于整个轮询过程,并且都在任何ReceiveMessageAdvice之前按顺序调用。然后,ReceiveMessageAdvice对象按顺序围绕源receive()方法调用。例如,如果您有Advice对象a、b、c、d,其中bdReceiveMessageAdvice,则这些对象将按以下顺序应用:a、c、b、d。此外,如果源已经是Proxy,则ReceiveMessageAdvice将在任何现有的Advice对象之后调用。如果您希望更改顺序,则必须自行连接代理。

SimpleActiveIdleReceiveMessageAdvice

此建议是ReceiveMessageAdvice的一个简单实现。当与DynamicPeriodicTrigger结合使用时,它会根据之前的轮询是否产生消息来调整轮询频率。轮询器还必须引用同一个DynamicPeriodicTrigger

重要:异步传递
SimpleActiveIdleReceiveMessageAdvice根据receive()结果修改触发器。这仅在轮询线程上调用建议时才有效。如果轮询器具有task-executor,则无效。要在希望在轮询结果之后使用异步操作的情况下使用此建议,请稍后执行异步传递,可能通过使用ExecutorChannel

CompoundTriggerAdvice

此建议允许根据轮询是否返回消息来选择两个触发器之一。考虑一个使用CronTrigger的轮询器。CronTrigger实例是不可变的,因此一旦构造就不能更改。考虑一个用例,我们希望使用 cron 表达式每小时触发一次轮询,但如果未收到消息,则每分钟轮询一次,并且当检索到消息时,恢复使用 cron 表达式。

建议(和轮询器)为此目的使用CompoundTrigger。触发器的primary触发器可以是CronTrigger。当建议检测到未收到消息时,它会将辅助触发器添加到CompoundTrigger中。当CompoundTrigger实例的nextExecutionTime方法被调用时,如果存在,它会委托给辅助触发器。否则,它会委托给主触发器。

轮询器还必须引用同一个CompoundTrigger

以下示例显示了每小时 cron 表达式与回退到每分钟的配置

<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
    <bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
    <int:poller trigger="compoundTrigger">
        <int:advice-chain>
            <bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
                <constructor-arg ref="compoundTrigger"/>
                <constructor-arg ref="secondary"/>
            </bean>
        </int:advice-chain>
    </int:poller>
</int:inbound-channel-adapter>

<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
    <constructor-arg ref="primary" />
</bean>

<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
    <constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>

<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="60000" />
</bean>
重要:异步传递
CompoundTriggerAdvice 根据 receive() 的结果修改触发器。这仅在 advice 在轮询线程上调用时有效。如果轮询器具有 task-executor,则此方法无效。要在希望在轮询结果后使用异步操作的情况下使用此 advice,请稍后进行异步传递,例如使用 ExecutorChannel

仅限 MessageSource 的 Advice

某些 advice 仅适用于 MessageSource.receive(),并且对于 PollableChannel 没有意义。为此,仍然存在 MessageSourceMutator 接口(ReceiveMessageAdvice 的扩展)。有关更多信息,请参阅 入站通道适配器:轮询多个服务器和目录