DSL 基础

org.springframework.integration.dsl 包包含前面提到的 IntegrationFlowBuilder API 和许多 IntegrationComponentSpec 实现,它们也是构建器,并提供流畅的 API 来配置具体的端点。IntegrationFlowBuilder 基础设施为基于消息的应用程序提供通用的企业集成模式 (EIP),例如通道、端点、轮询器和通道拦截器。

重要

IntegrationComponentSpec 是一个 FactoryBean 实现,因此其 getObject() 方法不能从 bean 定义中调用。对于 bean 定义,必须保留 IntegrationComponentSpec 实现原样,框架将管理其生命周期。对于 IntegrationFlow bean 定义,必须使用目标 IntegrationComponentSpec 类型(一个 FactoryBean 值)的 bean 方法参数注入,而不是 bean 方法引用。

在 DSL 中,端点以动词的形式表达,以提高可读性。以下列表包含常见的 DSL 方法名称和关联的 EIP 端点

  • transform → Transformer

  • filter → Filter

  • handle → ServiceActivator

  • split → Splitter

  • aggregate → Aggregator

  • route → Router

  • bridge → Bridge

从概念上讲,集成过程是通过将这些端点组合成一个或多个消息流来构建的。请注意,EIP 没有正式定义“消息流”一词,但将其视为使用众所周知的信使模式的工作单元是有用的。DSL 提供了一个 IntegrationFlow 组件来定义通道和它们之间端点的组合,但是现在 IntegrationFlow 只扮演配置角色,用于在应用程序上下文中填充实际的 bean,在运行时不使用。但是,可以将 IntegrationFlow 的 bean 作为 Lifecycle 自动装配,以控制整个流的 start()stop(),这将委托给与此 IntegrationFlow 关联的所有 Spring Integration 组件。以下示例使用 IntegrationFlow 流畅的 API 通过使用来自 IntegrationFlowBuilder 的 EIP 方法来定义 IntegrationFlow bean。

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlow.from("input")
            .<String, Integer>transform(Integer::parseInt)
            .get();
}

transform 方法接受 lambda 作为端点参数来操作消息有效负载。此方法的实际参数是 GenericTransformer<S, T> 实例。因此,此处可以使用任何提供的转换器 (ObjectToJsonTransformerFileToStringTransformer 等)。

在幕后,IntegrationFlowBuilder 分别识别 MessageHandler 及其端点,分别使用 MessageTransformingHandlerConsumerEndpointFactoryBean。考虑另一个示例

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlow.from("input")
                .filter("World"::equals)
                .transform("Hello "::concat)
                .handle(System.out::println)
                .get();
}

前面的示例组合了 Filter → Transformer → Service Activator 的序列。该流是“单向的”。也就是说,它不提供回复消息,而只是将有效负载打印到 STDOUT。端点通过使用直接通道自动连接在一起。

Lambda 和 Message<?> 参数

在 EIP 方法中使用 lambda 时,“输入”参数通常是消息有效负载。如果要访问整个消息,请使用采用 Class<?> 作为第一个参数的重载方法之一。例如,这不起作用

.<Message<?>, Foo>transform(m -> newFooFromMessage(m))

这将在运行时因 ClassCastException 而失败,因为 lambda 没有保留参数类型,并且框架将尝试将有效负载转换为 Message<?>

相反,使用

.(Message.class, m -> newFooFromMessage(m))
Bean 定义覆盖

Java DSL 可以注册为流定义中内联定义的对象注册 bean,也可以重用现有的注入 bean。如果为内联对象和现有 bean 定义定义了相同的 bean 名称,则会抛出 BeanDefinitionOverrideException,指示此类配置错误。但是,当处理 prototype bean 时,无法从集成流处理器检测到现有的 bean 定义,因为每次从 BeanFactory 调用 prototype bean 时,我们都会获得一个新实例。这样,提供的实例按原样在 IntegrationFlow 中使用,无需任何 bean 注册和对现有 prototype bean 定义进行任何可能的检查。但是,如果此对象具有显式 id 并且此名称的 bean 定义处于 prototype 范围,则会调用 BeanFactory.initializeBean()