子流程支持
if…else 和 publish-subscribe 组件中的一些提供了使用子流程指定其逻辑或映射的能力。最简单的示例是 .publishSubscribeChannel(),如下例所示
@Bean
public IntegrationFlow subscribersFlow() {
return flow -> flow
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.<Integer>handle((p, h) -> p / 2)
.channel(c -> c.queue("subscriber1Results")))
.subscribe(f -> f
.<Integer>handle((p, h) -> p * 2)
.channel(c -> c.queue("subscriber2Results"))))
.<Integer>handle((p, h) -> p * 3)
.channel(c -> c.queue("subscriber3Results"));
}
您可以使用单独的 IntegrationFlow @Bean 定义达到相同的效果,但我们希望您觉得子流程样式的逻辑组合很有用。我们发现它能使代码更短(因此更易读)。
从 5.3 版本开始,提供了基于 BroadcastCapableChannel 的 publishSubscribeChannel() 实现,用于在由 Broker 支持的消息通道上配置子流程订阅者。例如,我们现在可以在 Jms.publishSubscribeChannel() 上将几个订阅者配置为子流程
@Bean
public JmsPublishSubscribeMessageChannelSpec jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(jmsConnectionFactory())
.destination("pubsub");
}
@Bean
public IntegrationFlow pubSubFlow(BroadcastCapableChannel jmsPublishSubscribeChannel) {
return f -> f
.publishSubscribeChannel(jmsPublishSubscribeChannel,
pubsub -> pubsub
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel1")))
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}
类似的 publish-subscribe 子流程组合提供了 .routeToRecipients() 方法。
另一个例子是在 .filter() 方法上使用 .discardFlow() 而不是 .discardChannel()。
.route() 值得特别关注。考虑以下示例
@Bean
public IntegrationFlow routeFlow() {
return f -> f
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.channelMapping("true", "evenChannel")
.subFlowMapping("false", sf ->
sf.<Integer>handle((p, h) -> p * 3)))
.transform(Object::toString)
.channel(c -> c.queue("oddChannel"));
}
.channelMapping() 继续像在常规 Router 映射中那样工作,但是 .subFlowMapping() 将该子流程绑定到主流程。换句话说,任何路由器的子流程在 .route() 之后都会返回到主流程。
有时,您需要从 .subFlowMapping() 中引用一个现有的 IntegrationFlow @Bean。以下示例展示了如何做到这一点
Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow. 当您将子流程配置为 lambda 表达式时,框架会处理与子流程的请求-回复交互,并且不需要网关。 |
子流程可以嵌套到任何深度,但我们不建议这样做。实际上,即使在路由器的情况下,在一个流程中添加复杂的子流程很快就会看起来像一盘意大利面条,并且人类难以解析。
在 DSL 支持子流程配置的情况下,当配置的组件通常需要一个通道,并且该子流程以 channel() 元素开头时,框架会在组件输出通道和流程的输入通道之间隐式地放置一个 bridge()。例如,在此 filter 定义中
框架会在内部创建一个 DirectChannel bean,用于注入到 MessageFilter.discardChannel 中。然后将子流程包装到一个以该隐式通道作为订阅起点的 IntegrationFlow 中,并在流程中指定的 channel() 之前放置一个 bridge。当使用现有的 IntegrationFlow bean 作为子流程引用时(而不是内联子流程,例如 lambda),则不需要这样的 bridge,因为框架可以从 flow bean 中解析出第一个通道。对于内联子流程,输入通道尚不可用。 |