Spring Integration 框架概述

Spring Integration 为 Spring 编程模型提供扩展,以支持众所周知的 企业集成模式。它支持基于 Spring 的应用程序中的轻量级消息传递,并通过声明式适配器支持与外部系统的集成。这些适配器在 Spring 对远程处理、消息传递和调度的支持之上提供了更高层次的抽象。

Spring Integration 的主要目标是在保持构建可维护、可测试代码所需的关注点分离的同时,为构建企业集成解决方案提供一个简单的模型。

Spring Integration 概述

本章对 Spring Integration 的核心概念和组件进行了高级介绍。它还包含一些编程技巧,帮助您充分利用 Spring Integration。

背景

Spring 框架的一个关键主题是控制反转 (IoC)。从广义上讲,这意味着框架代表其上下文中管理的组件处理职责。组件本身被简化,因为它们被免除了这些职责。例如,依赖注入使组件免除了查找或创建其依赖项的责任。同样,面向方面的编程通过将通用横切关注点模块化到可重用的方面,使业务组件免除了通用横切关注点。在每种情况下,最终结果都是一个更容易测试、理解、维护和扩展的系统。

此外,Spring 框架和组合提供了构建企业应用程序的综合编程模型。开发人员受益于此模型的一致性,尤其是它基于成熟的最佳实践,例如面向接口编程和优先使用组合而不是继承。Spring 简化的抽象和强大的支持库提高了开发人员的生产力,同时提高了可测试性和可移植性。

Spring Integration 受这些相同目标和原则的驱动。它将 Spring 编程模型扩展到消息传递领域,并基于 Spring 现有的企业集成支持,提供更高层次的抽象。它支持消息驱动的架构,其中控制反转应用于运行时问题,例如何时运行某些业务逻辑以及应将响应发送到何处。它支持消息的路由和转换,以便可以集成不同的传输和不同的数据格式,而不会影响可测试性。换句话说,消息传递和集成问题由框架处理。业务组件进一步与基础设施隔离,开发人员从复杂的集成责任中解脱出来。

作为 Spring 编程模型的扩展,Spring Integration 提供了多种配置选项,包括注释、带有命名空间支持的 XML、带有通用“bean”元素的 XML 以及对底层 API 的直接使用。该 API 基于定义明确的策略接口和非侵入式、委托适配器。Spring Integration 的设计灵感来自对 Spring 中常见模式与 Gregor Hohpe 和 Bobby Woolf 在 企业集成模式(Addison Wesley,2004 年)中描述的知名模式之间存在强烈亲和力的认识。阅读过该书的开发人员应该立即熟悉 Spring Integration 的概念和术语。

目标和原则

Spring Integration 受以下目标的驱动

  • 提供一个简单的模型来实现复杂的企业集成解决方案。

  • 促进基于 Spring 的应用程序中的异步、消息驱动行为。

  • 促进现有 Spring 用户的直观、增量式采用。

Spring Integration 受以下原则指导

  • 组件应松散耦合,以实现模块化和可测试性。

  • 框架应强制执行业务逻辑和集成逻辑之间的关注点分离。

  • 扩展点应具有抽象性(但在明确定义的边界内),以促进重用和可移植性。

主要组件

从垂直角度来看,分层架构有助于分离关注点,层之间基于接口的契约促进了松耦合。基于 Spring 的应用程序通常以这种方式设计,Spring 框架和产品组合为遵循这种最佳实践提供了坚实的基础,适用于企业应用程序的整个堆栈。消息驱动的架构添加了水平视角,但这些相同目标仍然相关。“分层架构”是一个极其通用和抽象的范式,消息系统通常遵循类似的抽象“管道和过滤器”模型。“过滤器”代表任何能够生成或使用消息的组件,“管道”在过滤器之间传输消息,以便组件本身保持松耦合。重要的是要注意,这两个高级范式并不相互排斥。支持“管道”的底层消息基础设施仍应封装在一个层中,其契约定义为接口。同样,“过滤器”本身应在逻辑上位于应用程序服务层之上的层中进行管理,通过接口与这些服务交互,这与 Web 层的方式非常相似。

消息

在 Spring Integration 中,消息是任何 Java 对象的通用包装器,以及框架在处理该对象时使用的元数据。它由有效负载和标头组成。有效负载可以是任何类型,标头包含常用的信息,例如 ID、时间戳、关联 ID 和返回地址。标头还用于将值传递到连接的传输以及从连接的传输传递值。例如,从接收到的文件创建消息时,文件名可能会存储在标头中,以便后续组件访问。同样,如果消息的内容最终将由出站邮件适配器发送,则各种属性(收件人、发件人、抄送、主题等)可能会由上游组件配置为消息标头值。开发人员还可以在标头中存储任何任意键值对。

Message
图 1. 消息

消息通道

消息通道代表管道和过滤器架构中的“管道”。生产者将消息发送到通道,消费者从通道接收消息。因此,消息通道解耦了消息组件,并为拦截和监控消息提供了便利点。

Message Channel
图 2. 消息通道

消息通道可以遵循点对点或发布订阅语义。对于点对点通道,每个发送到通道的消息最多只能被一个消费者接收。另一方面,发布订阅通道试图将每条消息广播到通道上的所有订阅者。Spring Integration 支持这两种模型。

虽然“点对点”和“发布订阅”定义了最终接收每条消息的消费者数量的两种选择,但还有另一个重要的考虑因素:通道是否应该缓冲消息?在 Spring Integration 中,可轮询通道能够在队列中缓冲消息。缓冲的优点是它允许对入站消息进行节流,从而防止消费者过载。但是,顾名思义,这也增加了一些复杂性,因为消费者只有在配置了轮询器的情况下才能从这样的通道接收消息。另一方面,连接到可订阅通道的消费者只是消息驱动的。 消息通道实现 详细讨论了 Spring Integration 中可用的各种通道实现。

消息端点

Spring Integration 的主要目标之一是通过控制反转简化企业集成解决方案的开发。这意味着您不必直接实现消费者和生产者,甚至不必构建消息并调用消息通道上的发送或接收操作。相反,您应该能够专注于您的特定领域模型,并基于普通对象进行实现。然后,通过提供声明式配置,您可以将您的特定于域的代码“连接”到 Spring Integration 提供的消息传递基础设施。负责这些连接的组件是消息端点。这并不意味着您必须直接连接您现有的应用程序代码。任何现实世界的企业集成解决方案都需要一定量的代码专注于集成问题,例如路由和转换。重要的是在集成逻辑和业务逻辑之间实现关注点分离。换句话说,与 Web 应用程序的模型-视图-控制器 (MVC) 范式一样,目标应该是提供一个薄但专用的层,将入站请求转换为服务层调用,然后将服务层返回值转换为出站回复。下一节概述了处理这些职责的消息端点类型,并且在接下来的章节中,您可以看到 Spring Integration 的声明式配置选项如何提供一种非侵入式的方式来使用这些类型中的每一个。

消息端点

消息端点代表管道和过滤器架构的“过滤器”。如前所述,端点的主要作用是将应用程序代码连接到消息传递框架,并以非侵入式的方式进行连接。换句话说,应用程序代码理想情况下应该不知道消息对象或消息通道。这类似于 MVC 范式中控制器的作用。就像控制器处理 HTTP 请求一样,消息端点处理消息。就像控制器映射到 URL 模式一样,消息端点映射到消息通道。目标在两种情况下都是相同的:将应用程序代码与基础设施隔离。这些概念以及所有后续模式都在 企业集成模式 书籍中进行了详细讨论。在这里,我们只提供对 Spring Integration 支持的主要端点类型及其相关角色的高级描述。接下来的章节将详细说明并提供示例代码以及配置示例。

消息转换器

消息转换器负责转换消息的内容或结构,并返回修改后的消息。最常见的转换器类型可能是将消息的有效负载从一种格式转换为另一种格式(例如,从 XML 转换为 java.lang.String)。类似地,转换器可以添加、删除或修改消息的标头值。

消息过滤器

消息过滤器决定是否将消息传递到输出通道。这只需要一个布尔测试方法,该方法可以检查特定的有效负载内容类型、属性值、标头的存在或其他条件。如果消息被接受,它将被发送到输出通道。如果没有,它将被丢弃(或者,对于更严格的实现,可以抛出 Exception)。消息过滤器通常与发布-订阅通道一起使用,其中多个消费者可以接收相同的消息,并使用过滤器的条件来缩小要处理的消息集。

注意不要将管道和过滤器架构模式中“过滤器”的通用用法与这种特定于端点的类型混淆,该类型选择性地缩小了在两个通道之间流动的消息。管道和过滤器概念中的“过滤器”更接近于 Spring Integration 的消息端点:任何可以连接到消息通道以发送或接收消息的组件。

消息路由器

消息路由器负责决定哪个通道(如果有)应该接收下一条消息。通常,该决定基于消息的内容或消息标头中可用的元数据。消息路由器通常用作服务激活器或其他能够发送回复消息的端点上的静态配置输出通道的动态替代方案。同样,消息路由器提供了一种主动替代方案,替代了前面描述的多个订阅者使用的反应式消息过滤器。

Router
图 3. 消息路由器

拆分器

拆分器是另一种类型的消息端点,其职责是接受来自其输入通道的消息,将该消息拆分为多个消息,并将每个消息发送到其输出通道。这通常用于将“复合”有效负载对象划分为一组包含细分有效负载的消息。

聚合器

聚合器本质上是分隔器的镜像,它是一种接收多个消息并将它们合并成单个消息的消息端点。实际上,聚合器通常是包含分隔器的管道中的下游消费者。从技术上讲,聚合器比分隔器更复杂,因为它需要维护状态(要聚合的消息),决定何时可获得完整的消息组,以及在必要时超时。此外,在超时的情况下,聚合器需要知道是否发送部分结果、丢弃它们或将它们发送到单独的通道。Spring Integration 提供了 `CorrelationStrategy`、`ReleaseStrategy` 和可配置的设置,用于超时、是否在超时时发送部分结果以及丢弃通道。

服务激活器

服务激活器是将服务实例连接到消息系统的通用端点。必须配置输入消息通道,并且如果要调用的服务方法能够返回值,则还可以提供输出消息通道。

输出通道是可选的,因为每条消息也可以提供自己的“返回地址”头。此规则适用于所有消费者端点。

服务激活器在某个服务对象上调用操作以处理请求消息,提取请求消息的有效负载并转换(如果方法不期望消息类型的参数)。每当服务对象的返回方法返回值时,该返回值也会被转换为回复消息(如果它不是消息类型)。该回复消息被发送到输出通道。如果没有配置输出通道,则回复将被发送到消息中指定的“返回地址”通道(如果可用)。

请求-回复服务激活器端点将目标对象的 method 连接到输入和输出消息通道。

handler endpoint
图 4. 服务激活器
如前所述,在 消息通道 中,通道可以是轮询的或可订阅的。在上图中,这由“时钟”符号和实线箭头(轮询)以及虚线箭头(订阅)表示。

通道适配器

通道适配器是一个端点,它将消息通道连接到其他系统或传输。通道适配器可以是入站的或出站的。通常,通道适配器会在消息与从其他系统接收或发送到其他系统的任何对象或资源之间进行一些映射(文件、HTTP 请求、JMS 消息等)。根据传输方式,通道适配器还可以填充或提取消息头值。Spring Integration 提供了许多通道适配器,将在后面的章节中介绍。

source endpoint
图 5. 入站通道适配器端点将源系统连接到 MessageChannel
消息源可以是轮询的(例如,POP3)或消息驱动的(例如,IMAP Idle)。在上图中,这由“时钟”符号和实线箭头(轮询)以及虚线箭头(消息驱动)表示。
target endpoint
图 6. 出站通道适配器端点将 MessageChannel 连接到目标系统。
如前所述,在 消息通道 中,通道可以是轮询的或可订阅的。在上图中,这由“时钟”符号和实线箭头(轮询)以及虚线箭头(订阅)表示。

端点 Bean 名称

消费端点(任何具有 inputChannel 的东西)由两个 Bean 组成,即消费者和消息处理程序。消费者引用消息处理程序,并在消息到达时调用它。

考虑以下 XML 示例

<int:service-activator id = "someService" ... />

鉴于上述示例,Bean 名称如下

  • 消费者:someServiceid

  • 处理程序:someService.handler

使用企业集成模式 (EIP) 注释时,名称取决于几个因素。考虑以下注释 POJO 的示例

@Component
public class SomeComponent {

    @ServiceActivator(inputChannel = ...)
    public String someMethod(...) {
        ...
    }

}

鉴于上述示例,Bean 名称如下

  • 消费者:someComponent.someMethod.serviceActivator

  • 处理程序:someComponent.someMethod.serviceActivator.handler

从版本 5.0.4 开始,您可以使用 @EndpointId 注释修改这些名称,如下例所示

@Component
public class SomeComponent {

    @EndpointId("someService")
    @ServiceActivator(inputChannel = ...)
    public String someMethod(...) {
        ...
    }

}

鉴于上述示例,Bean 名称如下

  • 消费者: someService

  • 处理程序:someService.handler

@EndpointId 注解创建的名称与 XML 配置中使用 id 属性创建的名称相同。请考虑以下带注释的 Bean 示例

@Configuration
public class SomeConfiguration {

    @Bean
    @ServiceActivator(inputChannel = ...)
    public MessageHandler someHandler() {
        ...
    }

}

鉴于上述示例,Bean 名称如下

  • 消费者: someConfiguration.someHandler.serviceActivator

  • 处理器: someHandler@Bean 名称)

从版本 5.0.4 开始,您可以使用 @EndpointId 注释修改这些名称,如下例所示

@Configuration
public class SomeConfiguration {

    @Bean("someService.handler")             (1)
    @EndpointId("someService")               (2)
    @ServiceActivator(inputChannel = ...)
    public MessageHandler someHandler() {
        ...
    }

}
1 处理器: someService.handler(Bean 名称)
2 消费者: someService(端点 ID)

@EndpointId 注解创建的名称与 XML 配置中使用 id 属性创建的名称相同,只要您使用将 .handler 附加到 @Bean 名称的约定。

有一种特殊情况会创建第三个 Bean:出于架构原因,如果 MessageHandler @Bean 未定义 AbstractReplyProducingMessageHandler,框架会将提供的 Bean 包装在 ReplyProducingMessageHandlerWrapper 中。此包装器支持请求处理程序建议处理,并发出正常的“未产生回复”调试日志消息。它的 Bean 名称是处理程序 Bean 名称加上 .wrapper(如果有 @EndpointId - 否则,它是正常生成的处理程序名称)。

类似地,可轮询消息源 创建两个 Bean,一个 SourcePollingChannelAdapter(SPCA)和一个 MessageSource

请考虑以下 XML 配置

<int:inbound-channel-adapter id = "someAdapter" ... />

给定前面的 XML 配置,Bean 名称如下

  • SPCA: someAdapterid

  • 处理器: someAdapter.source

请考虑以下 POJO 的 Java 配置,以定义 @EndpointId

@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public String pojoSource() {
    ...
}

给定前面的 Java 配置示例,Bean 名称如下

  • SPCA: someAdapter

  • 处理器: someAdapter.source

请考虑以下 Bean 的 Java 配置,以定义 @EndpointID

@Bean("someAdapter.source")
@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public MessageSource<?> source() {
    return () -> {
        ...
    };
}

鉴于上述示例,Bean 名称如下

  • SPCA: someAdapter

  • 处理器: someAdapter.source(只要您使用将 .source 附加到 @Bean 名称的约定)

配置和 @EnableIntegration

在本文档中,您可以看到对 XML 命名空间支持的引用,用于在 Spring Integration 流中声明元素。此支持由一系列命名空间解析器提供,这些解析器生成适当的 Bean 定义以实现特定组件。例如,许多端点包含一个 MessageHandler Bean 和一个 ConsumerEndpointFactoryBean,其中注入处理程序和输入通道名称。

首次遇到 Spring Integration 命名空间元素时,框架会自动声明一些 Bean(任务调度程序、隐式通道创建器等),这些 Bean 用于支持运行时环境。

版本 4.0 引入了 @EnableIntegration 注解,允许注册 Spring Integration 基础设施 Bean(参见 Javadoc)。当仅使用 Java 配置时,此注解是必需的,例如使用 Spring Boot 或 Spring Integration 消息注解支持以及 Spring Integration Java DSL 且没有 XML 集成配置。

当您有一个没有 Spring Integration 组件的父上下文和两个或多个使用 Spring Integration 的子上下文时,@EnableIntegration 注解也很有用。它允许这些通用组件仅在父上下文中声明一次。

@EnableIntegration 注解将许多基础设施组件注册到应用程序上下文中。特别是,它

  • 注册一些内置 Bean,例如 errorChannel 及其 LoggingHandler、用于轮询器的 taskSchedulerjsonPath SpEL 函数等。

  • 添加多个 BeanFactoryPostProcessor 实例以增强 BeanFactory 以实现全局和默认集成环境。

  • 添加多个 BeanPostProcessor 实例以增强或转换和包装特定 Bean 以用于集成目的。

  • 添加注解处理器以解析消息注解,并将组件为它们注册到应用程序上下文中。

@IntegrationComponentScan 注解也允许类路径扫描。此注解的作用类似于标准 Spring Framework @ComponentScan 注解,但它仅限于特定于 Spring Integration 的组件和注解,标准 Spring Framework 组件扫描机制无法访问这些组件和注解。例如,请参见 @MessagingGateway 注解

@EnablePublisher 注解注册一个 PublisherAnnotationBeanPostProcessor Bean,并为那些没有 channel 属性的 @Publisher 注解配置 default-publisher-channel。如果找到多个 @EnablePublisher 注解,则它们必须对默认通道具有相同的 value。有关更多信息,请参见 使用 @Publisher 注解的注解驱动配置

@GlobalChannelInterceptor 注解已引入,用于标记用于全局通道拦截的 ChannelInterceptor Bean。此注解类似于 <int:channel-interceptor> XML 元素(参见 全局通道拦截器配置)。@GlobalChannelInterceptor 注解可以放在类级别(使用 @Component 构造型注解)或 @Configuration 类中的 @Bean 方法上。在这两种情况下,Bean 都必须实现 ChannelInterceptor

从版本 5.1 开始,全局通道拦截器适用于动态注册的通道,例如使用 beanFactory.initializeBean() 或使用 Java DSL 时通过 IntegrationFlowContext 初始化的 Bean。以前,当在应用程序上下文刷新后创建 Bean 时,拦截器不会应用。

@IntegrationConverter 注解将 ConverterGenericConverterConverterFactory bean 标记为 integrationConversionService 的候选转换器。此注解类似于 <int:converter> XML 元素(参见 有效负载类型转换)。您可以在类级别(使用 @Component 构造型注解)或在 @Configuration 类中的 @Bean 方法上放置 @IntegrationConverter 注解。

有关消息传递注解的更多信息,请参见 注解支持

编程注意事项

您应该尽可能使用普通的 Java 对象 (POJO),并且只有在绝对必要时才在代码中公开框架。有关更多信息,请参见 POJO 方法调用

如果您确实将框架公开给您的类,则需要考虑一些因素,尤其是在应用程序启动期间。

  • 如果您的组件是 ApplicationContextAware,则通常不应在 setApplicationContext() 方法中使用 ApplicationContext。相反,请存储一个引用,并将此类使用推迟到上下文生命周期的后期。

  • 如果您的组件是 InitializingBean 或使用 @PostConstruct 方法,请不要从这些初始化方法中发送任何消息。当这些方法被调用时,应用程序上下文尚未初始化,发送此类消息可能会失败。如果您需要在启动期间发送消息,请实现 ApplicationListener 并等待 ContextRefreshedEvent。或者,实现 SmartLifecycle,将您的 bean 放置在后期阶段,并从 start() 方法发送消息。

使用打包(例如,阴影)jar 时注意事项

Spring Integration 通过使用 Spring Framework 的 SpringFactories 机制加载多个 IntegrationConfigurationInitializer 类来引导某些功能。这包括 -core jar 以及某些其他 jar,包括 -http-jmx。此过程的信息存储在每个 jar 中的 META-INF/spring.factories 文件中。

一些开发人员更喜欢使用众所周知的工具(例如 Apache Maven Shade 插件)将他们的应用程序和所有依赖项重新打包到一个 jar 中。

默认情况下,shade 插件在生成阴影 jar 时不会合并 spring.factories 文件。

除了 spring.factories 之外,其他 META-INF 文件(spring.handlersspring.schemas)用于 XML 配置。这些文件也需要合并。

Spring Boot 的可执行 jar 机制 采用了一种不同的方法,它嵌套了 jar,从而保留了类路径上的每个 spring.factories 文件。因此,对于 Spring Boot 应用程序,如果您使用其默认的可执行 jar 格式,则无需执行其他操作。

即使您不使用 Spring Boot,您仍然可以使用 Boot 提供的工具来增强 shade 插件,方法是为上述文件添加转换器。以下示例显示了如何配置插件

示例 1. pom.xml
...
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <configuration>
                <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
                <createDependencyReducedPom>true</createDependencyReducedPom>
            </configuration>
            <dependencies>
                <dependency> (1)
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                </dependency>
            </dependencies>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers> (2)
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>META-INF/spring.handlers</resource>
                            </transformer>
                            <transformer
                                implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                                <resource>META-INF/spring.factories</resource>
                            </transformer>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>META-INF/spring.schemas</resource>
                            </transformer>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
...

具体来说,

1 spring-boot-maven-plugin 添加为依赖项。
2 配置转换器。

您可以为 ${spring.boot.version} 添加一个属性,或者使用显式版本。

编程技巧

本节介绍了一些充分利用 Spring Integration 的方法。

XML 架构

在使用 XML 配置时,为了避免出现错误的架构验证错误,您应该使用“Spring 感知”IDE,例如 Spring Tool Suite (STS)、带有 Spring IDE 插件的 Eclipse 或 IntelliJ IDEA。这些 IDE 知道如何从类路径中解析正确的 XML 架构(通过使用 jar 中的 META-INF/spring.schemas 文件)。在使用 STS 或带有插件的 Eclipse 时,您必须在项目上启用 Spring Project Nature

为某些遗留模块(存在于 1.0 版本中的模块)在互联网上托管的架构是 1.0 版本,出于兼容性原因。如果您的 IDE 使用这些架构,您可能会看到错误的错误。

每个在线架构都有类似于以下内容的警告

此架构适用于 Spring Integration Core 的 1.0 版本。我们无法将其更新到当前架构,因为这将破坏使用 1.0.3 或更低版本的任何应用程序。对于后续版本,将从类路径解析“非版本化”架构,并从 jar 中获取。请参考 GitHub

受影响的模块是

  • core (spring-integration.xsd)

  • file

  • http

  • jms

  • mail

  • security

  • stream

  • ws

  • xml

查找 Java 和 DSL 配置的类名

使用 XML 配置和 Spring Integration 命名空间支持,XML 解析器隐藏了目标 bean 的声明和连接方式。对于 Java 配置,了解目标最终用户应用程序的框架 API 很重要。

EIP 实现的第一公民是 MessageChannelEndpoint(请参阅本章前面的 主要组件)。它们的实现(契约)是

  • org.springframework.messaging.Message:请参阅 Message

  • org.springframework.messaging.MessageChannel:请参阅 消息通道

  • org.springframework.integration.endpoint.AbstractEndpoint: 请参阅 轮询消费者

前两个很容易理解如何实现、配置和使用。最后一个值得更多关注。

AbstractEndpoint 在整个 Spring 框架中被广泛用于不同的组件实现。其主要实现包括:

  • EventDrivenConsumer,用于订阅 SubscribableChannel 以监听消息。

  • PollingConsumer,用于从 PollableChannel 轮询消息。

当您使用消息传递注解或 Java DSL 时,您无需担心这些组件,因为框架会自动使用适当的注解和 BeanPostProcessor 实现生成它们。当手动构建组件时,您应该使用 ConsumerEndpointFactoryBean 来帮助确定要创建的目标 AbstractEndpoint 消费者实现,这取决于提供的 inputChannel 属性。

另一方面,ConsumerEndpointFactoryBean 会委托给框架中的另一个一等公民 - org.springframework.messaging.MessageHandler。实现此接口的目标是处理从通道中由端点消费的消息。Spring Integration 中的所有 EIP 组件都是 MessageHandler 实现(例如,AggregatingMessageHandlerMessageTransformingHandlerAbstractMessageSplitter 等)。目标协议出站适配器(FileWritingMessageHandlerHttpRequestExecutingMessageHandlerAbstractMqttMessageHandler 等)也是 MessageHandler 实现。当您使用 Java 配置开发 Spring Integration 应用程序时,您应该查看 Spring Integration 模块以找到适合用于 @ServiceActivator 配置的 MessageHandler 实现。例如,要发送 XMPP 消息(请参阅 XMPP 支持),您应该配置如下内容:

@Bean
@ServiceActivator(inputChannel = "input")
public MessageHandler sendChatMessageHandler(XMPPConnection xmppConnection) {
    ChatMessageSendingMessageHandler handler = new ChatMessageSendingMessageHandler(xmppConnection);

    DefaultXmppHeaderMapper xmppHeaderMapper = new DefaultXmppHeaderMapper();
    xmppHeaderMapper.setRequestHeaderNames("*");
    handler.setHeaderMapper(xmppHeaderMapper);

    return handler;
}

MessageHandler 实现代表消息流的出站和处理部分。

入站消息流侧有其自身的组件,这些组件分为轮询和监听行为。监听(消息驱动)组件很简单,通常只需要一个目标类实现即可准备好生成消息。监听组件可以是单向 MessageProducerSupport 实现(例如,AbstractMqttMessageDrivenChannelAdapterImapIdleChannelAdapter),也可以是请求-回复 MessagingGatewaySupport 实现(例如,AmqpInboundGatewayAbstractWebServiceInboundGateway)。

轮询入站端点适用于那些不提供监听器 API 或不打算使用此类行为的协议,包括任何基于文件的协议(例如 FTP)、任何数据库(RDBMS 或 NoSQL)等。

这些入站端点由两个组件组成:轮询配置,用于定期启动轮询任务,以及消息源类,用于从目标协议读取数据并为下游集成流生成消息。轮询配置的第一类是 SourcePollingChannelAdapter。它是另一个 AbstractEndpoint 实现,但专门用于轮询以启动集成流。通常,使用消息传递注解或 Java DSL 时,您无需担心此类。框架会根据 @InboundChannelAdapter 配置或 Java DSL 构建器规范生成一个 bean。

消息源组件对于目标应用程序开发更为重要,它们都实现了MessageSource接口(例如,MongoDbMessageSourceAbstractTwitterMessageSource)。考虑到这一点,我们用于从带有JDBC的RDBMS表中读取数据的配置可能类似于以下内容

@Bean
@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixedDelay="5000"))
public MessageSource<?> storedProc(DataSource dataSource) {
    return new JdbcPollingChannelAdapter(dataSource, "SELECT * FROM foo where status = 0");
}

您可以在特定的Spring Integration模块中找到目标协议所需的所有入站和出站类(在大多数情况下,在相应的包中)。例如,spring-integration-websocket适配器是

  • o.s.i.websocket.inbound.WebSocketInboundChannelAdapter:实现MessageProducerSupport以监听套接字上的帧并将消息发送到通道。

  • o.s.i.websocket.outbound.WebSocketOutboundMessageHandler:单向AbstractMessageHandler实现,用于将传入的消息转换为适当的帧并通过websocket发送。

如果您熟悉Spring Integration XML配置,从4.3版本开始,我们在XSD元素定义中提供有关用于声明适配器或网关bean的目标类的信息,如下例所示

<xsd:element name="outbound-async-gateway">
    <xsd:annotation>
		<xsd:documentation>
Configures a Consumer Endpoint for the 'o.s.i.amqp.outbound.AsyncAmqpOutboundGateway'
that will publish an AMQP Message to the provided Exchange and expect a reply Message.
The sending thread returns immediately; the reply is sent asynchronously; uses 'AsyncRabbitTemplate.sendAndReceive()'.
       </xsd:documentation>
	</xsd:annotation>

POJO方法调用

编程注意事项中所述,我们建议使用POJO编程风格,如下例所示

@ServiceActivator
public String myService(String payload) { ... }

在这种情况下,框架会提取一个String有效负载,调用您的方法,并将结果包装在一个消息中,发送到流中的下一个组件(原始标头将复制到新消息)。实际上,如果您使用XML配置,您甚至不需要@ServiceActivator注解,如下面的配对示例所示

<int:service-activator ... ref="myPojo" method="myService" />
public String myService(String payload) { ... }

只要类上的公共方法没有歧义,您就可以省略method属性。

您也可以在POJO方法中获取标头信息,如下例所示

@ServiceActivator
public String myService(@Payload String payload, @Header("foo") String fooHeader) { ... }

您也可以取消引用消息上的属性,如下例所示

@ServiceActivator
public String myService(@Payload("payload.foo") String foo, @Header("bar.baz") String barbaz) { ... }

由于各种 POJO 方法调用可用,5.0 之前的版本使用 SpEL(Spring 表达式语言)来调用 POJO 方法。与方法中通常执行的实际工作相比,SpEL(即使是解释执行的)通常也“足够快”。但是,从 5.0 版本开始,默认情况下,只要有可能,就会使用 `org.springframework.messaging.handler.invocation.InvocableHandlerMethod`。这种技术通常比解释执行的 SpEL 执行速度更快,并且与其他 Spring 消息传递项目一致。`InvocableHandlerMethod` 类似于在 Spring MVC 中用于调用控制器方法的技术。使用 SpEL 时,仍然有一些方法始终被调用。例如,前面讨论过的带有反引用属性的带注解的参数。这是因为 SpEL 能够遍历属性路径。

可能还有一些我们没有考虑到的其他特殊情况,这些情况也不适用于 `InvocableHandlerMethod` 实例。出于这个原因,在这些情况下,我们会自动回退到使用 SpEL。

如果您愿意,也可以设置您的 POJO 方法,使其始终使用 SpEL,使用 `UseSpelInvoker` 注解,如下例所示

@UseSpelInvoker(compilerMode = "IMMEDIATE")
public void bar(String bar) { ... }

如果省略了 `compilerMode` 属性,则 `spring.expression.compiler.mode` 系统属性将决定编译模式。有关编译 SpEL 的更多信息,请参阅 SpEL 编译