操作符 gateway()
在 IntegrationFlow
定义中,gateway()
操作符是一种特殊的服务激活器实现,用于通过其输入通道调用其他端点或集成流,并等待回复。从技术上讲,它扮演的角色与 <chain>
定义中嵌套的 <gateway>
组件相同(参见 在 Chain 中调用 Chain),并使得流更加清晰和直接。从逻辑和业务角度来看,它是一个消息网关,允许在目标集成解决方案的不同部分之间分发和重用功能(参见 消息网关)。该操作符有几个重载,用于不同的目的:
-
gateway(String requestChannel)
:通过名称将消息发送到某个端点的输入通道; -
gateway(MessageChannel requestChannel)
:通过直接注入将消息发送到某个端点的输入通道; -
gateway(IntegrationFlow flow)
:将消息发送到提供的IntegrationFlow
的输入通道。
所有这些重载都含有一个带有第二个 Consumer<GatewayEndpointSpec>
参数的变体,用于配置目标 GatewayMessageHandler
和相应的 AbstractEndpoint
。此外,基于 IntegrationFlow
的方法允许调用现有的 IntegrationFlow
Bean,或通过 IntegrationFlow
函数式接口的内联 lambda 将流声明为子流,或将其提取到 private
方法中以获得更整洁的代码风格
@Bean
IntegrationFlow someFlow() {
return IntegrationFlow
.from(...)
.gateway(subFlow())
.handle(...)
.get();
}
private static IntegrationFlow subFlow() {
return f -> f
.scatterGather(s -> s.recipientFlow(...),
g -> g.outputProcessor(MessageGroup::getOne))
}
如果下游流不总是返回回复,应将 requestTimeout 设置为 0,以防止调用线程无限期挂起。在这种情况下,流将在该点结束,并释放线程以进行后续工作。 |