轮询器

本节描述了 Spring Integration 中的轮询如何工作。

轮询消费者

当消息端点(通道适配器)连接到通道并实例化时,它们会生成以下实例之一

实际实现取决于这些端点连接的通道类型。连接到实现 org.springframework.messaging.SubscribableChannel 接口的通道的通道适配器会生成 EventDrivenConsumer 实例。另一方面,连接到实现 org.springframework.messaging.PollableChannel 接口(例如 QueueChannel)的通道的通道适配器会生成 PollingConsumer 实例。

轮询消费者让 Spring Integration 组件主动轮询消息,而不是以事件驱动的方式处理消息。

它们在许多消息传递场景中代表了一个关键的横切关注点。在 Spring Integration 中,轮询消费者基于 Gregor Hohpe 和 Bobby Woolf 所著的《企业集成模式》一书中描述的同名模式。您可以在本书网站上找到该模式的描述。

有关轮询消费者配置的更多信息,请参阅消息端点

可轮询消息源

Spring Integration 提供了轮询消费者模式的第二种变体。当使用入站通道适配器时,这些适配器通常被 SourcePollingChannelAdapter 包装。例如,当从远程 FTP 服务器位置检索消息时,FTP 入站通道适配器中描述的适配器配置了一个轮询器来定期检索消息。因此,当组件配置了轮询器时,生成的实例是以下类型之一

这意味着轮询器在入站和出站消息传递场景中都使用。以下是一些使用轮询器的用例

  • 轮询某些外部系统,例如 FTP 服务器、数据库和 Web 服务

  • 轮询内部(可轮询)消息通道

  • 轮询内部服务(例如重复执行 Java 类上的方法)

AOP 通知类可以应用于轮询器,在 advice-chain 中,例如启动事务的事务通知。从版本 4.1 开始,提供了 PollSkipAdvice。轮询器使用触发器来确定下一次轮询的时间。PollSkipAdvice 可用于抑制(跳过)轮询,可能是因为存在一些下游条件会阻止消息被处理。要使用此通知,您必须为其提供 PollSkipStrategy 的实现。从版本 4.2.5 开始,提供了 SimplePollSkipStrategy。要使用它,您可以将实例作为 bean 添加到应用程序上下文中,将其注入 PollSkipAdvice,并将其添加到轮询器的通知链中。要跳过轮询,请调用 skipPolls()。要恢复轮询,请调用 reset()。版本 4.2 在这方面增加了更多灵活性。请参阅条件轮询器

本章旨在仅对轮询消费者以及它们如何适应消息通道(参阅消息通道)和通道适配器(参阅通道适配器)的概念进行高级概述。有关消息端点(尤其是轮询消费者)的更多信息,请参阅消息端点

延迟确认可轮询消息源

从版本 5.0.1 开始,某些模块提供 MessageSource 实现,支持在下游流完成(或将消息交给另一个线程)之前延迟确认。目前这仅限于 AmqpMessageSourceKafkaMessageSource

对于这些消息源,IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 标头(参阅MessageHeaderAccessor API)被添加到消息中。当与可轮询消息源一起使用时,标头的值是 AcknowledgmentCallback 实例,如以下示例所示

@FunctionalInterface
public interface AcknowledgmentCallback extends SimpleAcknowledgment {

    void acknowledge(Status status);

    @Override
    default void acknowledge() {
        acknowledge(Status.ACCEPT);
    }

    default boolean isAcknowledged() {
        return false;
    }


    default void noAutoAck() {
        throw new UnsupportedOperationException("You cannot disable auto acknowledgment with this implementation");
    }

    default boolean isAutoAck() {
        return true;
    }

    enum Status {

        /**
         * Mark the message as accepted.
         */
        ACCEPT,

        /**
         * Mark the message as rejected.
         */
        REJECT,

        /**
         * Reject the message and requeue so that it will be redelivered.
         */
        REQUEUE

    }

}

并非所有消息源(例如 KafkaMessageSource)都支持 REJECT 状态。它被视为与 ACCEPT 相同。

应用程序可以随时确认消息,如以下示例所示

Message<?> received = source.receive();

...

StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
        .acknowledge(Status.ACCEPT);

如果 MessageSource 连接到 SourcePollingChannelAdapter,当轮询器线程在下游流完成后返回到适配器时,适配器会检查确认是否已确认,如果没有,则将其状态设置为 ACCEPT(如果流抛出异常则设置为 REJECT)。状态值在AcknowledgmentCallback.Status 枚举中定义。

Spring Integration 提供 MessageSourcePollingTemplate 来执行 MessageSource 的临时轮询。当 MessageHandler 回调返回(或抛出异常)时,这也负责在 AcknowledgmentCallback 上设置 ACCEPTREJECT。以下示例展示了如何使用 MessageSourcePollingTemplate 进行轮询

MessageSourcePollingTemplate template =
    new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
    ...
});

在两种情况下(SourcePollingChannelAdapterMessageSourcePollingTemplate),您可以通过调用回调上的 noAutoAck() 来禁用自动确认/拒绝。如果您将消息交给另一个线程并希望稍后确认,则可能会这样做。并非所有实现都支持此功能,例如 Apache Kafka 不支持,因为偏移提交必须在同一个线程上执行。

消息源的条件轮询器

本节介绍如何使用条件轮询器。

背景

轮询器上的 advice-chain 中的 Advice 对象会通知整个轮询任务(消息检索和处理)。这些“环绕通知”方法无法访问轮询的任何上下文——只有轮询本身。这对于诸如使任务具有事务性或由于某些外部条件跳过轮询等要求来说是没问题的,如前所述。如果我们希望根据轮询的 receive 部分的结果采取一些行动,或者我们希望根据条件调整轮询器,该怎么办?对于这些情况,Spring Integration 提供了“智能”轮询。

“智能”轮询

版本 5.3 引入了 ReceiveMessageAdvice 接口。advice-chain 中实现此接口的任何 Advice 对象仅应用于 receive() 操作 - MessageSource.receive()PollableChannel.receive(timeout)。因此,它们只能应用于 SourcePollingChannelAdapterPollingConsumer。此类实现以下方法

  • beforeReceive(Object source) 此方法在 Object.receive() 方法之前调用。它允许您检查和重新配置源。返回 false 会取消此轮询(类似于前面提到的 PollSkipAdvice)。

  • Message<?> afterReceive(Message<?> result, Object source) 此方法在 receive() 方法之后调用。同样,您可以重新配置源或采取任何行动(可能取决于结果,如果源没有创建消息,结果可能为 null)。您甚至可以返回不同的消息

线程安全

如果 Advice 修改源,则不应使用 TaskExecutor 配置轮询器。如果 Advice 修改源,则此类修改不是线程安全的,可能会导致意外结果,尤其是在高频轮询器的情况下。如果您需要并发处理轮询结果,请考虑使用下游 ExecutorChannel 而不是向轮询器添加执行器。

通知链顺序

您应该了解在初始化期间如何处理通知链。不实现 ReceiveMessageAdviceAdvice 对象应用于整个轮询过程,并且在任何 ReceiveMessageAdvice 之前按顺序首先调用。然后 ReceiveMessageAdvice 对象在源 receive() 方法周围按顺序调用。例如,如果您有 Advice 对象 a, b, c, d,其中 bdReceiveMessageAdvice,则对象按以下顺序应用:a, c, b, d。此外,如果源已经是 Proxy,则在任何现有 Advice 对象之后调用 ReceiveMessageAdvice。如果您希望更改顺序,则必须自己连接代理。

SimpleActiveIdleReceiveMessageAdvice

此通知是 ReceiveMessageAdvice 的简单实现。当与 DynamicPeriodicTrigger 结合使用时,它会根据上一次轮询是否产生消息来调整轮询频率。轮询器还必须引用相同的 DynamicPeriodicTrigger

重要提示:异步移交
SimpleActiveIdleReceiveMessageAdvice 根据 receive() 结果修改触发器。这仅当在轮询器线程上调用通知时才有效。如果轮询器有 task-executor,则不起作用。要在您希望在轮询结果之后使用异步操作的情况下使用此通知,请稍后执行异步移交,例如通过使用 ExecutorChannel

CompoundTriggerAdvice

此通知允许根据轮询是否返回消息来选择两个触发器中的一个。考虑一个使用 CronTrigger 的轮询器。CronTrigger 实例是不可变的,因此一旦构造就无法更改。考虑一个用例,我们希望使用 cron 表达式每小时触发一次轮询,但如果没有收到消息,则每分钟轮询一次,当检索到消息时,恢复使用 cron 表达式。

为此,通知(和轮询器)使用 CompoundTrigger。触发器的 primary 触发器可以是 CronTrigger。当通知检测到未收到消息时,它会将辅助触发器添加到 CompoundTrigger。当调用 CompoundTrigger 实例的 nextExecutionTime 方法时,如果存在,它会委托给辅助触发器。否则,它会委托给主要触发器。

轮询器还必须引用相同的 CompoundTrigger

以下示例显示了每小时 cron 表达式的配置,并带有回退到每分钟

<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
    <bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
    <int:poller trigger="compoundTrigger">
        <int:advice-chain>
            <bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
                <constructor-arg ref="compoundTrigger"/>
                <constructor-arg ref="secondary"/>
            </bean>
        </int:advice-chain>
    </int:poller>
</int:inbound-channel-adapter>

<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
    <constructor-arg ref="primary" />
</bean>

<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
    <constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>

<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="60000" />
</bean>
重要提示:异步移交
CompoundTriggerAdvice 根据 receive() 结果修改触发器。这仅当在轮询器线程上调用通知时才有效。如果轮询器有 task-executor,则不起作用。要在您希望在轮询结果之后使用异步操作的情况下使用此通知,请稍后执行异步移交,例如通过使用 ExecutorChannel

仅限 MessageSource 的通知

某些通知可能仅适用于 MessageSource.receive(),它们对 PollableChannel 没有意义。为此,仍然存在 MessageSourceMutator 接口(ReceiveMessageAdvice 的扩展)。有关更多信息,请参阅入站通道适配器:轮询多个服务器和目录

© . This site is unofficial and not affiliated with VMware.