Spring Integration Interaction

Spring Integration Framework 扩展了 Spring 编程模型以支持众所周知的企业集成模式。它支持基于 Spring 的应用程序中的轻量级消息传递,并通过声明式适配器支持与外部系统的集成。它还提供了一个高级 DSL 来将各种操作(端点)组合成一个逻辑集成流。凭借这种 DSL 配置的 lambda 风格,Spring Integration 已经具有良好的java.util.function接口采用率。 @MessagingGateway 代理接口也可以作为FunctionConsumer,根据 Spring Cloud Function 环境,可以将其注册到函数目录中。有关其对函数支持的更多信息,请参见 Spring Integration 参考手册

另一方面,从版本4.0.3开始,Spring Cloud Function 引入了一个spring-cloud-function-integration模块,该模块为从 Spring Integration DSL 角度与FunctionCatalog的交互提供了更深入、更特定于云且基于自动配置的 API。 FunctionFlowBuilder通过FunctionCatalog自动配置和自动装配,并表示目标IntegrationFlow实例的特定于函数的 DSL 的入口点。除了标准的IntegrationFlow.from()工厂(为了方便起见),FunctionFlowBuilder公开了一个fromSupplier(String supplierDefinition)工厂,用于在提供的FunctionCatalog中查找目标Supplier。然后,此FunctionFlowBuilder会引导到FunctionFlowDefinition。此FunctionFlowDefinitionIntegrationFlowExtension的实现,并公开apply(String functionDefinition)accept(String consumerDefinition)运算符,分别从FunctionCatalog中查找FunctionConsumer。有关更多信息,请参见其 Javadoc。

以下示例演示了与IntegrationFlow API 的其余功能一起使用的FunctionFlowBuilder

@Configuration
public class IntegrationConfiguration {

    @Bean
    Supplier<byte[]> simpleByteArraySupplier() {
        return "simple test data"::getBytes;
    }

    @Bean
    Function<String, String> upperCaseFunction() {
        return String::toUpperCase;
    }

    @Bean
    BlockingQueue<String> results() {
        return new LinkedBlockingQueue<>();
    }

    @Bean
    Consumer<String> simpleStringConsumer(BlockingQueue<String> results) {
        return results::add;
    }

    @Bean
    QueueChannel wireTapChannel() {
        return new QueueChannel();
    }

    @Bean
    IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
        return functionFlowBuilder
                .fromSupplier("simpleByteArraySupplier")
                .wireTap("wireTapChannel")
                .apply("upperCaseFunction")
                .log(LoggingHandler.Level.WARN)
                .accept("simpleStringConsumer");
    }

}

由于FunctionCatalog.lookup()功能不仅限于简单的函数名称,因此函数组合功能也可以在提到的apply()accept()运算符中使用

@Bean
IntegrationFlow functionCompositionFlow(FunctionFlowBuilder functionFlowBuilder) {
    return functionFlowBuilder
            .from("functionCompositionInput")
            .accept("upperCaseFunction|simpleStringConsumer");
}

当我们将预定义函数的自动配置依赖项添加到 Spring Cloud 应用程序中时,此 API 变得更加相关。例如Stream Applications项目,除了应用程序映像外,还提供了具有各种集成用例的函数的工件,例如debezium-supplierelasticsearch-consumeraggregator-function等。

以下配置分别基于http-supplierspel-functionfile-consumer

@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
    return functionFlowBuilder
            .fromSupplier("httpSupplier", e -> e.poller(Pollers.trigger(new OnlyOnceTrigger())))
            .<Flux<?>>handle((fluxPayload, headers) -> fluxPayload, e -> e.async(true))
            .channel(c -> c.flux())
            .apply("spelFunction")
            .<String, String>transform(String::toUpperCase)
            .accept("fileConsumer");
}

我们需要做的另一件事是将它们的配置添加到application.properties中(如果需要)

http.path-pattern=/testPath
spel.function.expression=new String(payload)
file.consumer.name=test-data.txt