DSL 基础

org.springframework.integration.dsl 包包含前面提到的 IntegrationFlowBuilder API 和许多 IntegrationComponentSpec 实现。这些实现也是构建器,提供了流畅的 API 来配置具体的端点。IntegrationFlowBuilder 基础设施为基于消息的应用程序提供了常见的企业集成模式 (EIP),例如通道、端点、轮询器和通道拦截器。

重要

IntegrationComponentSpec 是一个 FactoryBean 实现,因此其 getObject() 方法不得从 bean 定义中调用。IntegrationComponentSpec 实现必须保持原样用于 bean 定义,框架将管理其生命周期。对于 IntegrationFlow bean 定义,必须使用目标 IntegrationComponentSpec 类型(一个 FactoryBean 值)的 bean 方法参数注入,而不是使用 bean 方法引用。

在 DSL 中,端点被表达为动词以提高可读性。以下列表包含常见的 DSL 方法名称和关联的 EIP 端点

  • transform → Transformer (转换)

  • filter → Filter (过滤)

  • handle → ServiceActivator (处理)

  • split → Splitter (分割)

  • aggregate → Aggregator (聚合)

  • route → Router (路由)

  • bridge → Bridge (桥接)

从概念上讲,集成过程是通过将这些端点组合成一个或多个消息流来构建的。请注意,EIP 并未正式定义“消息流”这个术语,但将其视为使用众所周知的消息传递模式的工作单元很有用。DSL 提供了一个 IntegrationFlow 组件来定义通道及其之间的端点组合,但现在 IntegrationFlow 仅起配置作用,用于在应用程序上下文中填充真正的 bean,并且不在运行时使用。然而,可以将 IntegrationFlow 的 bean 自动装配为 Lifecycle 以控制整个流的 start()stop(),这会委托给与此 IntegrationFlow 相关联的所有 Spring Integration 组件。以下示例使用 IntegrationFlow 流畅 API,通过 IntegrationFlowBuilder 中的 EIP 方法来定义一个 IntegrationFlow bean

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlow.from("input")
            .<String, Integer>transform(Integer::parseInt)
            .get();
}

transform 方法接受一个 lambda 作为端点参数来操作消息载荷。该方法的实际参数是一个 GenericTransformer<S, T> 实例。因此,这里可以使用任何提供的转换器(ObjectToJsonTransformerFileToStringTransformer 等)。

在底层,IntegrationFlowBuilder 会识别 MessageHandler 和相应的端点,分别使用 MessageTransformingHandlerConsumerEndpointFactoryBean。考虑另一个示例

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlow.from("input")
                .filter("World"::equals)
                .transform("Hello "::concat)
                .handle(System.out::println)
                .get();
}

前面的示例组合了一个 Filter → Transformer → Service Activator 的序列。该流是“单向的”。也就是说,它不提供回复消息,只将载荷打印到 STDOUT。端点通过直接通道自动连接。

Lambda 和 Message<?> 参数

在使用 EIP 方法中的 lambda 时,“输入”参数通常是消息载荷。如果你希望访问整个消息,请使用接受 Class<?> 作为第一个参数的重载方法之一。例如,这不起作用

.<Message<?>, Foo>transform(m -> newFooFromMessage(m))

这将在运行时因 ClassCastException 而失败,因为 lambda 不会保留参数类型,并且框架将尝试将载荷转换为 Message<?>

相反,请使用

.(Message.class, m -> newFooFromMessage(m))
Bean 定义覆盖

Java DSL 可以为流定义中内联定义的对象注册 bean,也可以重用现有的、注入的 bean。如果内联对象和现有 bean 定义定义了相同的 bean 名称,则会抛出 BeanDefinitionOverrideException,表明此类配置是错误的。但是,当你处理 prototype bean 时,无法从集成流处理器检测到现有的 bean 定义,因为每次我们从 BeanFactory 调用 prototype bean 时都会获得一个新实例。因此,提供的实例在 IntegrationFlow 中原样使用,不进行任何 bean 注册,也不对其与现有 prototype bean 定义进行任何可能的检查。然而,如果该对象具有显式的 id 且该名称的 bean 定义处于 prototype 范围,则会为该对象调用 BeanFactory.initializeBean()