Poller
本节介绍 Spring Integration 中的轮询工作方式。
轮询消费者
当消息端点(通道适配器)连接到通道并实例化时,它们会生成以下实例之一
实际实现取决于这些端点连接到的通道类型。连接到实现了 org.springframework.messaging.SubscribableChannel
接口的通道的通道适配器会生成 EventDrivenConsumer
的实例。另一方面,连接到实现了 org.springframework.messaging.PollableChannel
接口(例如 QueueChannel
)的通道的通道适配器会生成 PollingConsumer
的实例。
轮询消费者允许 Spring Integration 组件主动轮询消息,而不是以事件驱动的方式处理消息。
它们在许多消息传递场景中是一个关键的横切关注点。在 Spring Integration 中,轮询消费者基于同名的模式,该模式在 Gregor Hohpe 和 Bobby Woolf 所著的《企业集成模式》一书中有所描述。您可以在 该书的网站上找到该模式的描述。
有关轮询消费者配置的更多信息,请参阅 消息端点。
可轮询消息源
Spring Integration 提供了轮询消费者模式的第二种变体。当使用入站通道适配器时,这些适配器通常被 SourcePollingChannelAdapter
包装。例如,当从远程 FTP 服务器位置检索消息时,FTP 入站通道适配器 中描述的适配器会配置一个 poller 来定期检索消息。因此,当组件配置了 pollers 时,生成的实例属于以下类型之一
这意味着 pollers 在入站和出站消息传递场景中都会使用。以下是使用 pollers 的一些用例
-
轮询某些外部系统,例如 FTP 服务器、数据库和 Web Services
-
轮询内部(可轮询)消息通道
-
轮询内部服务(例如重复执行 Java 类中的方法)
AOP advice 类可以应用于 pollers 的 advice-chain 中,例如事务 advice 用于启动事务。从版本 4.1 开始,提供了 PollSkipAdvice 。Pollers 使用 triggers 来确定下一次轮询的时间。可以使用 PollSkipAdvice 来抑制(跳过)一次轮询,也许是因为存在某些下游条件会阻止消息被处理。要使用此 advice,您必须提供一个 PollSkipStrategy 的实现。从版本 4.2.5 开始,提供了 SimplePollSkipStrategy 。要使用它,您可以将其实例作为 bean 添加到应用上下文,注入到 PollSkipAdvice 中,并将其添加到 poller 的 advice 链中。要跳过轮询,请调用 skipPolls() 。要恢复轮询,请调用 reset() 。版本 4.2 在这方面增加了更多灵活性。请参阅 条件轮询器。 |
延迟确认可轮询消息源
从版本 5.0.1 开始,某些模块提供了支持延迟确认的 MessageSource
实现,直到下游流完成(或将消息交给另一个线程)。目前这仅限于 AmqpMessageSource
和 KafkaMessageSource
。
对于这些消息源,会将 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
头部(参见 MessageHeaderAccessor
API)添加到消息中。当与可轮询消息源一起使用时,此头部的值是 AcknowledgmentCallback
的实例,如下例所示
@FunctionalInterface
public interface AcknowledgmentCallback {
void acknowledge(Status status);
boolean isAcknowledged();
void noAutoAck();
default boolean isAutoAck();
enum Status {
/**
* Mark the message as accepted.
*/
ACCEPT,
/**
* Mark the message as rejected.
*/
REJECT,
/**
* Reject the message and requeue so that it will be redelivered.
*/
REQUEUE
}
}
并非所有消息源(例如 KafkaMessageSource
)都支持 REJECT
状态。它被视为与 ACCEPT
相同。
应用可以在任何时候确认消息,如下例所示
Message<?> received = source.receive();
...
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
.acknowledge(Status.ACCEPT);
如果 MessageSource
连接到 SourcePollingChannelAdapter
,当 poller 线程在下游流完成后返回到适配器时,适配器会检查确认是否已完成,如果未完成,则将其状态设置为 ACCEPT
(如果流抛出异常,则设置为 REJECT
)。状态值在 AcknowledgmentCallback.Status
枚举中定义。
Spring Integration 提供了 MessageSourcePollingTemplate
来执行对 MessageSource
的即时轮询。当 MessageHandler
回调返回(或抛出异常)时,这也负责在 AcknowledgmentCallback
上设置 ACCEPT
或 REJECT
。以下示例展示了如何使用 MessageSourcePollingTemplate
进行轮询
MessageSourcePollingTemplate template =
new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
...
});
在这两种情况(SourcePollingChannelAdapter
和 MessageSourcePollingTemplate
)下,您都可以通过调用回调上的 noAutoAck()
方法来禁用自动 ack/nack。如果您将消息交给另一个线程并希望稍后进行确认,则可能会这样做。并非所有实现都支持此功能(例如,Apache Kafka 不支持,因为偏移量提交必须在同一线程上执行)。
消息源的条件轮询器
本节介绍如何使用条件轮询器。
背景
Poller 的 advice-chain
中的 Advice
对象会建议整个轮询任务(包括消息检索和处理)。这些“around advice”方法无法访问轮询的任何上下文——只能访问轮询本身。这对于诸如使任务具有事务性或由于某些外部条件跳过轮询(如前所述)等要求而言是可以接受的。如果我们希望根据轮询的 receive
部分的结果采取某些操作,或者如果我们要根据条件调整 poller,该怎么办?对于这些情况,Spring Integration 提供了“智能”轮询。
“智能”轮询
版本 5.3 引入了 ReceiveMessageAdvice
接口。advice-chain
中任何实现了此接口的 Advice
对象仅应用于 receive()
操作 - MessageSource.receive()
和 PollableChannel.receive(timeout)
。因此,它们只能应用于 SourcePollingChannelAdapter
或 PollingConsumer
。此类实现了以下方法
-
beforeReceive(Object source)
此方法在Object.receive()
方法之前调用。它允许您检查和重新配置源。返回false
将取消此轮询(类似于前面提到的PollSkipAdvice
)。 -
Message<?> afterReceive(Message<?> result, Object source)
此方法在receive()
方法之后调用。同样,您可以重新配置源或采取任何操作(可能取决于结果,如果源未创建消息,结果可能为null
)。您甚至可以返回不同的消息
线程安全
如果 |
Advice 链排序
您应该了解 advice 链在初始化期间的处理方式。未实现 |
SimpleActiveIdleReceiveMessageAdvice
此 advice 是 ReceiveMessageAdvice
的简单实现。当与 DynamicPeriodicTrigger
结合使用时,它会根据前一次轮询是否收到消息来调整轮询频率。poller 还必须引用同一个 DynamicPeriodicTrigger
。
重要:异步移交
SimpleActiveIdleReceiveMessageAdvice 会根据 receive() 结果修改 trigger。这仅在 advice 在 poller 线程上调用时有效。如果 poller 有 task-executor 则无效。要在希望在轮询结果后使用异步操作时使用此 advice,请稍后进行异步移交,例如通过使用 ExecutorChannel 。 |
CompoundTriggerAdvice
此 advice 允许根据轮询是否返回消息来选择两个 triggers 中的一个。考虑一个使用 CronTrigger
的 poller。CronTrigger
实例是不可变的,因此一旦构造就无法更改。考虑一个用例:我们想使用 cron 表达式每小时触发一次轮询,但如果未收到消息,则每分钟轮询一次,并在检索到消息时恢复使用 cron 表达式。
Advice(和 poller)为此目的使用 CompoundTrigger
。触发器的 primary
trigger 可以是 CronTrigger
。当 advice 检测到未收到消息时,它会将 secondary trigger 添加到 CompoundTrigger
。当调用 CompoundTrigger
实例的 nextExecutionTime
方法时,如果存在 secondary trigger,则委托给它。否则,委托给 primary trigger。
Poller 还必须引用同一个 CompoundTrigger
。
以下示例展示了每小时 cron 表达式配置,并带有回退到每分钟轮询的设置
<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
<bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
<int:poller trigger="compoundTrigger">
<int:advice-chain>
<bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
<constructor-arg ref="compoundTrigger"/>
<constructor-arg ref="secondary"/>
</bean>
</int:advice-chain>
</int:poller>
</int:inbound-channel-adapter>
<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
<constructor-arg ref="primary" />
</bean>
<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
<constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>
<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
<constructor-arg value="60000" />
</bean>
重要:异步移交
CompoundTriggerAdvice 会根据 receive() 结果修改 trigger。这仅在 advice 在 poller 线程上调用时有效。如果 poller 有 task-executor 则无效。要在希望在轮询结果后使用异步操作时使用此 advice,请稍后进行异步移交,例如通过使用 ExecutorChannel 。 |
仅限 MessageSource 的 Advice
有些 advices 可能仅适用于 MessageSource.receive()
,而对于 PollableChannel
则没有意义。为此,仍然存在 MessageSourceMutator
接口(ReceiveMessageAdvice
的扩展)。更多信息请参阅 入站通道适配器:轮询多个服务器和目录。