Spring Integration 交互

Spring Integration 框架扩展了 Spring 编程模型,以支持众所周知的企业集成模式。它支持基于 Spring 的应用程序中的轻量级消息传递,并通过声明式适配器支持与外部系统的集成。它还提供了一个高级 DSL,可以将各种操作(端点)组合成一个逻辑集成流。通过这种 DSL 配置的 lambda 风格,Spring Integration 已经很好地采用了 java.util.function 接口。@MessagingGateway 代理接口也可以作为 FunctionConsumer,根据 Spring Cloud Function 环境,可以将其注册到函数目录中。有关 Spring Integration 对函数的支持的更多信息,请参阅 参考手册

另一方面,从版本 4.0.3 开始,Spring Cloud Function 引入了一个 spring-cloud-function-integration 模块,该模块提供了一个更深入、更特定于云且基于自动配置的 API,用于从 Spring Integration DSL 角度与 FunctionCatalog 交互。FunctionFlowBuilder 通过 FunctionCatalog 自动配置和自动装配,并表示目标 IntegrationFlow 实例的函数特定 DSL 的入口点。除了标准 IntegrationFlow.from() 工厂(为了方便)之外,FunctionFlowBuilder 还公开了一个 fromSupplier(String supplierDefinition) 工厂,用于在提供的 FunctionCatalog 中查找目标 Supplier。然后,此 FunctionFlowBuilder 导向 FunctionFlowDefinition。此 FunctionFlowDefinitionIntegrationFlowExtension 的实现,并公开 apply(String functionDefinition)accept(String consumerDefinition) 运算符,用于分别从 FunctionCatalog 中查找 FunctionConsumer。有关更多信息,请参阅它们的 Javadoc。

以下示例演示了 FunctionFlowBuilder 的实际应用以及 IntegrationFlow API 其余部分的强大功能。

@Configuration
public class IntegrationConfiguration {

    @Bean
    Supplier<byte[]> simpleByteArraySupplier() {
        return "simple test data"::getBytes;
    }

    @Bean
    Function<String, String> upperCaseFunction() {
        return String::toUpperCase;
    }

    @Bean
    BlockingQueue<String> results() {
        return new LinkedBlockingQueue<>();
    }

    @Bean
    Consumer<String> simpleStringConsumer(BlockingQueue<String> results) {
        return results::add;
    }

    @Bean
    QueueChannel wireTapChannel() {
        return new QueueChannel();
    }

    @Bean
    IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
        return functionFlowBuilder
                .fromSupplier("simpleByteArraySupplier")
                .wireTap("wireTapChannel")
                .apply("upperCaseFunction")
                .log(LoggingHandler.Level.WARN)
                .accept("simpleStringConsumer");
    }

}

由于 FunctionCatalog.lookup() 功能不仅限于简单的函数名称,因此函数组合功能也可以用于上述 apply()accept() 运算符。

@Bean
IntegrationFlow functionCompositionFlow(FunctionFlowBuilder functionFlowBuilder) {
    return functionFlowBuilder
            .from("functionCompositionInput")
            .accept("upperCaseFunction|simpleStringConsumer");
}

当我们向 Spring Cloud 应用程序中添加预定义函数的自动配置依赖项时,此 API 变得更加相关。例如,Stream Applications 项目除了应用程序映像之外,还提供了用于各种集成用例的函数工件,例如 debezium-supplierelasticsearch-consumeraggregator-function 等。

以下配置分别基于 http-supplierspel-functionfile-consumer

@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
    return functionFlowBuilder
            .fromSupplier("httpSupplier", e -> e.poller(Pollers.trigger(new OnlyOnceTrigger())))
            .<Flux<?>>handle((fluxPayload, headers) -> fluxPayload, e -> e.async(true))
            .channel(c -> c.flux())
            .apply("spelFunction")
            .<String, String>transform(String::toUpperCase)
            .accept("fileConsumer");
}

我们还需要做的就是将它们的配置添加到 application.properties 中(如果需要)。

http.path-pattern=/testPath
spel.function.expression=new String(payload)
file.consumer.name=test-data.txt
© . This site is unofficial and not affiliated with VMware.