动态和运行时集成流程

IntegrationFlow及其所有依赖组件都可以在运行时注册。5.0版本之前,我们使用的是BeanFactory.registerSingleton()钩子。从Spring Framework 5.0开始,我们使用instanceSupplier钩子进行程序化的BeanDefinition注册。以下示例演示如何以编程方式注册bean

BeanDefinition beanDefinition =
         BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
               .getRawBeanDefinition();

((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);

请注意,在上例中,instanceSupplier钩子是genericBeanDefinition方法的最后一个参数,在本例中由lambda提供。

所有必要的bean初始化和生命周期都会自动完成,就像标准上下文配置bean定义一样。

为了简化开发体验,Spring Integration引入了IntegrationFlowContext来在运行时注册和管理IntegrationFlow实例,如下例所示

@Autowired
private AbstractServerConnectionFactory server1;

@Autowired
private IntegrationFlowContext flowContext;

...

@Test
public void testTcpGateways() {
    TestingUtilities.waitListening(this.server1, null);

    IntegrationFlow flow = f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client1"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());

    IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
    assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}

当我们有多个配置选项并且必须创建多个类似流程的实例时,这很有用。为此,我们可以迭代我们的选项,并在循环中创建和注册IntegrationFlow实例。另一种情况是我们的数据源不是基于Spring的,所以我们必须动态创建它。这样的示例是Reactive Streams事件源,如下例所示

Flux<Message<?>> messageFlux =
    Flux.just("1,2,3,4")
        .map(v -> v.split(","))
        .flatMapIterable(Arrays::asList)
        .map(Integer::parseInt)
        .map(GenericMessage<Integer>::new);

QueueChannel resultChannel = new QueueChannel();

IntegrationFlow integrationFlow =
    IntegrationFlow.from(messageFlux)
        .<Integer, Integer>transform(p -> p * 2)
        .channel(resultChannel)
        .get();

this.integrationFlowContext.registration(integrationFlow)
            .register();

IntegrationFlowRegistrationBuilder(作为IntegrationFlowContext.registration()的结果)可用于为要注册的IntegrationFlow指定bean名称,控制其autoStartup,以及注册非Spring Integration bean。通常,这些附加的bean是连接工厂(AMQP、JMS、(S)FTP、TCP/UDP等)、序列化器和反序列化器,或任何其他所需的支撑组件。

您可以使用IntegrationFlowRegistration.destroy()回调来移除不再需要的动态注册的IntegrationFlow及其所有依赖bean。有关更多信息,请参阅IntegrationFlowContext Javadoc

从5.0.6版本开始,IntegrationFlow定义中生成的所有bean名称都以流程ID作为前缀。我们建议始终指定显式的流程ID。否则,将在IntegrationFlowContext中启动同步屏障,以生成IntegrationFlow的bean名称并注册其bean。我们同步这两个操作是为了避免在同一个生成的bean名称可能用于不同的IntegrationFlow实例时出现竞争条件。

此外,从5.0.6版本开始,注册构建器API有了一个新方法:useFlowIdAsPrefix()。如果您希望声明多个相同流程的实例,并在流程中的组件具有相同的ID时避免bean名称冲突,这将非常有用,如下例所示

private void registerFlows() {
    IntegrationFlowRegistration flow1 =
              this.flowContext.registration(buildFlow(1234))
                    .id("tcp1")
                    .useFlowIdAsPrefix()
                    .register();

    IntegrationFlowRegistration flow2 =
              this.flowContext.registration(buildFlow(1235))
                    .id("tcp2")
                    .useFlowIdAsPrefix()
                    .register();
}

private IntegrationFlow buildFlow(int port) {
    return f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());
}

在这种情况下,第一个流程的消息处理程序可以用bean名称tcp1.client.handler引用。

当您使用useFlowIdAsPrefix()时,需要id属性。