操作符 gateway()

IntegrationFlow 定义中的 gateway() 操作符是一种特殊的 Service Activator 实现,用于通过其输入通道调用其他端点或集成流并等待回复。从技术上讲,它与 <chain> 定义中嵌套的 <gateway> 组件扮演相同的角色(参见 从链中调用链),并使流更清晰、更直接。从逻辑上和业务角度看,它是一个消息网关,允许在目标集成解决方案的不同部分之间分发和重用功能(参见 消息网关)。此操作符有多种重载以实现不同目标:

  • gateway(String requestChannel):通过其名称向某个端点的输入通道发送消息;

  • gateway(MessageChannel requestChannel):通过直接注入向某个端点的输入通道发送消息;

  • gateway(IntegrationFlow flow):向提供的 IntegrationFlow 的输入通道发送消息。

所有这些都带有一个包含第二个 Consumer<GatewayEndpointSpec> 参数的变体,用于配置目标 GatewayMessageHandler 和相应的 AbstractEndpoint。此外,基于 IntegrationFlow 的方法允许调用现有 IntegrationFlow bean,或者通过就地 lambda 声明流为子流以实现 IntegrationFlow 函数式接口,或者将其提取到 private 方法中以实现更清晰的代码风格。

@Bean
IntegrationFlow someFlow() {
        return IntegrationFlow
                .from(...)
                .gateway(subFlow())
                .handle(...)
                .get();
}

private static IntegrationFlow subFlow() {
        return f -> f
                .scatterGather(s -> s.recipientFlow(...),
                        g -> g.outputProcessor(MessageGroup::getOne))
}
如果下游流并非总返回回复,您应将 requestTimeout 设置为 0,以防止调用线程无限期挂起。在这种情况下,流将在该点结束,并且线程将被释放以进行后续工作。

从 6.5 版本开始,此 gateway() 操作符完全支持 async(true) 行为。在内部,为 GatewayProxyFactoryBean 提供了 AsyncRequestReplyExchanger 服务接口。由于 AsyncRequestReplyExchanger 契约是 CompletableFuture<Message<?>>,因此整个请求-回复以异步方式执行。

© . This site is unofficial and not affiliated with VMware.