动态和运行时集成流
IntegrationFlow
及其所有依赖组件都可以在运行时注册。在 5.0 版本之前,我们使用 BeanFactory.registerSingleton()
钩子。从 Spring Framework 5.0
开始,我们使用 instanceSupplier
钩子进行编程式 BeanDefinition
注册。下面的示例展示了如何编程式注册一个 Bean
BeanDefinition beanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
.getRawBeanDefinition();
((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);
请注意,在前面的示例中,instanceSupplier
钩子是 genericBeanDefinition
方法的最后一个参数,在本例中由 lambda 提供。
所有必要的 Bean 初始化和生命周期都将自动完成,与标准上下文配置的 Bean 定义一样。
为了简化开发体验,Spring Integration 引入了 IntegrationFlowContext
来在运行时注册和管理 IntegrationFlow
实例,如下面的示例所示
@Autowired
private AbstractServerConnectionFactory server1;
@Autowired
private IntegrationFlowContext flowContext;
...
@Test
public void testTcpGateways() {
TestingUtilities.waitListening(this.server1, null);
IntegrationFlow flow = f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client1"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}
这在有多个配置选项且必须创建多个类似流的实例时非常有用。为此,我们可以迭代我们的选项并在循环中创建和注册 IntegrationFlow
实例。另一种情况是当我们的数据源不是基于 Spring 时,我们必须即时创建它。一个这样的示例是 Reactive Streams 事件源,如下面的示例所示
Flux<Message<?>> messageFlux =
Flux.just("1,2,3,4")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.map(GenericMessage<Integer>::new);
QueueChannel resultChannel = new QueueChannel();
IntegrationFlow integrationFlow =
IntegrationFlow.from(messageFlux)
.<Integer, Integer>transform(p -> p * 2)
.channel(resultChannel)
.get();
this.integrationFlowContext.registration(integrationFlow)
.register();
IntegrationFlowRegistrationBuilder
(作为 IntegrationFlowContext.registration()
的结果)可用于为要注册的 IntegrationFlow
指定 Bean 名称,控制其 autoStartup
,并注册非 Spring Integration Bean。通常,这些额外的 Bean 是连接工厂(AMQP、JMS、(S)FTP、TCP/UDP 等)、序列化器和反序列化器,或任何其他所需的支撑组件。
您可以使用 IntegrationFlowRegistration.destroy()
回调来移除动态注册的 IntegrationFlow
及其所有依赖 Bean,当您不再需要它们时。有关更多信息,请参阅 IntegrationFlowContext
Javadoc。
从 5.0.6 版本开始,IntegrationFlow 定义中所有生成的 Bean 名称都会以前缀形式添加 flow ID。我们建议始终指定显式的 flow ID。否则,会在 IntegrationFlowContext 中启动一个同步屏障,以生成 IntegrationFlow 的 Bean 名称并注册其 Bean。我们对这两个操作进行同步,以避免当相同的生成 Bean 名称可能用于不同的 IntegrationFlow 实例时发生竞态条件。 |
此外,从 5.0.6 版本开始,注册构建器 API 有一个新方法:useFlowIdAsPrefix()
。如果您希望声明同一个流的多个实例,并在流中的组件具有相同 ID 时避免 Bean 名称冲突,此方法非常有用,如下面的示例所示
private void registerFlows() {
IntegrationFlowRegistration flow1 =
this.flowContext.registration(buildFlow(1234))
.id("tcp1")
.useFlowIdAsPrefix()
.register();
IntegrationFlowRegistration flow2 =
this.flowContext.registration(buildFlow(1235))
.id("tcp2")
.useFlowIdAsPrefix()
.register();
}
private IntegrationFlow buildFlow(int port) {
return f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
}
在这种情况下,第一个流的消息处理器可以使用 Bean 名称 tcp1.client.handler
来引用。
当您使用 useFlowIdAsPrefix() 时,需要一个 id 属性。 |