Spring Integration Interaction
Spring Integration Framework 扩展了 Spring 编程模型以支持众所周知的企业集成模式。它支持基于 Spring 的应用程序中的轻量级消息传递,并通过声明式适配器支持与外部系统的集成。它还提供了一个高级 DSL 来将各种操作(端点)组合成一个逻辑集成流。凭借这种 DSL 配置的 lambda 风格,Spring Integration 已经具有良好的java.util.function
接口采用率。 @MessagingGateway
代理接口也可以作为Function
或Consumer
,根据 Spring Cloud Function 环境,可以将其注册到函数目录中。有关其对函数支持的更多信息,请参见 Spring Integration 参考手册。
另一方面,从版本4.0.3
开始,Spring Cloud Function 引入了一个spring-cloud-function-integration
模块,该模块为从 Spring Integration DSL 角度与FunctionCatalog
的交互提供了更深入、更特定于云且基于自动配置的 API。 FunctionFlowBuilder
通过FunctionCatalog
自动配置和自动装配,并表示目标IntegrationFlow
实例的特定于函数的 DSL 的入口点。除了标准的IntegrationFlow.from()
工厂(为了方便起见),FunctionFlowBuilder
公开了一个fromSupplier(String supplierDefinition)
工厂,用于在提供的FunctionCatalog
中查找目标Supplier
。然后,此FunctionFlowBuilder
会引导到FunctionFlowDefinition
。此FunctionFlowDefinition
是IntegrationFlowExtension
的实现,并公开apply(String functionDefinition)
和accept(String consumerDefinition)
运算符,分别从FunctionCatalog
中查找Function
或Consumer
。有关更多信息,请参见其 Javadoc。
以下示例演示了与IntegrationFlow
API 的其余功能一起使用的FunctionFlowBuilder
@Configuration
public class IntegrationConfiguration {
@Bean
Supplier<byte[]> simpleByteArraySupplier() {
return "simple test data"::getBytes;
}
@Bean
Function<String, String> upperCaseFunction() {
return String::toUpperCase;
}
@Bean
BlockingQueue<String> results() {
return new LinkedBlockingQueue<>();
}
@Bean
Consumer<String> simpleStringConsumer(BlockingQueue<String> results) {
return results::add;
}
@Bean
QueueChannel wireTapChannel() {
return new QueueChannel();
}
@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.fromSupplier("simpleByteArraySupplier")
.wireTap("wireTapChannel")
.apply("upperCaseFunction")
.log(LoggingHandler.Level.WARN)
.accept("simpleStringConsumer");
}
}
由于FunctionCatalog.lookup()
功能不仅限于简单的函数名称,因此函数组合功能也可以在提到的apply()
和accept()
运算符中使用
@Bean
IntegrationFlow functionCompositionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.from("functionCompositionInput")
.accept("upperCaseFunction|simpleStringConsumer");
}
当我们将预定义函数的自动配置依赖项添加到 Spring Cloud 应用程序中时,此 API 变得更加相关。例如Stream Applications项目,除了应用程序映像外,还提供了具有各种集成用例的函数的工件,例如debezium-supplier
、elasticsearch-consumer
、aggregator-function
等。
以下配置分别基于http-supplier
、spel-function
和file-consumer
@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.fromSupplier("httpSupplier", e -> e.poller(Pollers.trigger(new OnlyOnceTrigger())))
.<Flux<?>>handle((fluxPayload, headers) -> fluxPayload, e -> e.async(true))
.channel(c -> c.flux())
.apply("spelFunction")
.<String, String>transform(String::toUpperCase)
.accept("fileConsumer");
}
我们需要做的另一件事是将它们的配置添加到application.properties
中(如果需要)
http.path-pattern=/testPath
spel.function.expression=new String(payload)
file.consumer.name=test-data.txt