Spring Integration Framework 概述

Spring Integration 通过扩展 Spring 编程模型来支持著名的 企业集成模式(Enterprise Integration Patterns)。它在基于 Spring 的应用中实现了轻量级消息传递,并通过声明式适配器支持与外部系统的集成。这些适配器在 Spring 现有对远程调用、消息传递和调度支持的基础上,提供了更高层次的抽象。

Spring Integration 的主要目标是提供一种构建企业集成解决方案的简单模型,同时保持关注点分离,这对于生成可维护、可测试的代码至关重要。

Spring Integration 概述

本章提供了 Spring Integration 核心概念和组件的高层介绍。它包含一些编程技巧,帮助您充分利用 Spring Integration。

背景

Spring Framework 的一个关键主题是控制反转(Inversion of Control,IoC)。从最广泛的意义上讲,这意味着框架代表其上下文内管理的组件处理职责。组件本身得到了简化,因为它们被免除了这些职责。例如,依赖注入减轻了组件定位或创建其依赖项的责任。类似地,面向切面编程通过将通用的横切关注点模块化为可重用的切面,从而减轻了业务组件的负担。在每种情况下,最终结果都是一个更易于测试、理解、维护和扩展的系统。

此外,Spring Framework 和相关项目为构建企业应用提供了全面的编程模型。开发人员受益于该模型的一致性,特别是它基于成熟的最佳实践,例如面向接口编程和优先使用组合而非继承。Spring 简化的抽象和强大的支持库提高了开发人员的生产力,同时增加了可测试性和可移植性。

Spring Integration 正是基于这些相同的目标和原则。它将 Spring 编程模型扩展到消息传递领域,并基于 Spring 现有的企业集成支持,提供了更高层次的抽象。它支持消息驱动架构,其中控制反转应用于运行时关注点,例如何时应运行特定业务逻辑以及应将响应发送到何处。它支持消息的路由和转换,以便集成不同的传输方式和不同的数据格式,而不会影响可测试性。换句话说,消息传递和集成关注点由框架处理。业务组件进一步与基础设施隔离,开发人员也从复杂的集成职责中解脱出来。

作为 Spring 编程模型的扩展,Spring Integration 提供了多种配置选项,包括注解、支持命名空间的 XML、带通用“bean”元素的 XML 以及直接使用底层 API。该 API 基于定义明确的策略接口和非侵入式委托适配器。Spring Integration 的设计灵感来自于对 Spring 内部常见模式与 Gregor Hohpe 和 Bobby Woolf 所著《企业集成模式(Enterprise Integration Patterns)》(Addison Wesley,2004 年)中所述的著名模式之间强烈关联性的认识。读过这本书的开发人员应该会立即熟悉 Spring Integration 的概念和术语。

目标与原则

Spring Integration 的目标如下

  • 提供一种简单的模型来实现复杂的企业集成解决方案。

  • 促进基于 Spring 的应用中的异步、消息驱动行为。

  • 促进现有 Spring 用户直观、渐进地采用。

Spring Integration 遵循以下原则

  • 组件应松耦合以实现模块化和可测试性。

  • 框架应强制分离业务逻辑和集成逻辑的关注点。

  • 扩展点本质上应该是抽象的(但在明确定义的范围内),以促进重用和可移植性。

主要组件

从垂直角度来看,分层架构有利于关注点分离,而层之间的基于接口的契约促进了松耦合。基于 Spring 的应用通常采用这种设计,Spring Framework 和相关项目为企业应用的整个技术栈遵循这种最佳实践提供了坚实的基础。消息驱动架构增加了横向视角,但这些相同的目标仍然相关。正如“分层架构”是一个非常通用和抽象的范式一样,消息系统通常遵循类似的抽象“管道与过滤器(pipes-and-filters)”模型。“过滤器”代表任何能够生成或消费消息的组件,“管道”则在过滤器之间传输消息,从而使组件本身保持松耦合。值得注意的是,这两种高层范式并非互斥。支持“管道”的底层消息基础设施仍应封装在一个层中,该层的契约由接口定义。同样,“过滤器”本身也应在逻辑上位于应用服务层之上的层中进行管理,通过接口与这些服务交互,其方式与 Web 层非常相似。

消息

在 Spring Integration 中,消息是任何 Java 对象的通用包装器,并结合了框架在处理该对象时使用的元数据。它由一个 Payload 和 Headers 组成。Payload 可以是任何类型,Headers 包含常用的必需信息,例如 ID、时间戳、关联 ID 和回复地址。Headers 也用于向连接的传输发送或从连接的传输接收值。例如,从接收到的文件创建消息时,文件名可以存储在 Header 中,供下游组件访问。同样,如果消息内容最终将由出站邮件适配器发送,各种属性(收件人、发件人、抄送、主题等)可以由上游组件配置为消息头值。开发人员还可以在 Headers 中存储任何任意的键值对。

Message
图 1. 消息

消息通道

消息通道代表了管道与过滤器架构中的“管道”。生产者将消息发送到通道,消费者从通道接收消息。因此,消息通道解耦了消息组件,并提供了方便的消息拦截和监控点。

Message Channel
图 2. 消息通道

消息通道可以遵循点对点或发布-订阅语义。对于点对点通道,发送到通道的每条消息最多只能有一个消费者接收。另一方面,发布-订阅通道尝试将每条消息广播给通道上的所有订阅者。Spring Integration 支持这两种模型。

虽然“点对点”和“发布-订阅”定义了最终有多少消费者接收每条消息的两种选项,但还有另一个重要的考虑因素:通道是否应该缓冲消息?在 Spring Integration 中,可轮询通道能够在队列中缓冲消息。缓冲的优势在于它可以限制入站消息的速度,从而防止消费者过载。然而,顾名思义,这也增加了一些复杂性,因为只有配置了轮询器,消费者才能从这种通道接收消息。另一方面,连接到可订阅通道的消费者只是消息驱动的。消息通道实现 详细讨论了 Spring Integration 中可用的各种通道实现。

消息端点

Spring Integration 的主要目标之一是通过控制反转来简化企业集成解决方案的开发。这意味着您不必直接实现消费者和生产者,甚至不必构建消息并在消息通道上调用发送或接收操作。相反,您应该能够专注于您的特定领域模型,并基于普通对象进行实现。然后,通过提供声明式配置,您可以将特定领域的代码“连接”到 Spring Integration 提供的消息基础设施。负责这些连接的组件就是消息端点。这并不意味着您必须直接连接您现有的应用代码。任何实际的企业集成解决方案都需要一些专注于集成关注点的代码,例如路由和转换。重要的是实现集成逻辑和业务逻辑之间的关注点分离。换句话说,就像 Web 应用的 Model-View-Controller (MVC) 范式一样,目标应该是提供一个轻薄但专用的层,将入站请求转换为服务层调用,然后将服务层返回值转换为出站回复。下一节概述了处理这些职责的消息端点类型,在后续章节中,您将看到 Spring Integration 的声明式配置选项如何提供非侵入式的方式来使用这些组件。

消息端点

消息端点代表了管道与过滤器架构中的“过滤器”。如前所述,端点的主要作用是以非侵入式的方式将应用代码连接到消息框架。换句话说,应用代码理想情况下应该完全感知不到消息对象或消息通道。这类似于 MVC 范式中控制器的作用。就像控制器处理 HTTP 请求一样,消息端点处理消息。就像控制器映射到 URL 模式一样,消息端点映射到消息通道。这两种情况下的目标都是相同的:将应用代码与基础设施隔离。这些概念以及随后的所有模式都在《企业集成模式(Enterprise Integration Patterns)》一书中有详细讨论。在这里,我们仅提供 Spring Integration 支持的主要端点类型以及与这些类型相关的角色的高层描述。接下来的章节将详细阐述并提供示例代码和配置示例。

消息转换器

消息转换器负责转换消息的内容或结构,并返回修改后的消息。最常见的转换器类型可能是将消息的 Payload 从一种格式转换为另一种格式(例如从 XML 转换为 java.lang.String)。类似地,转换器可以添加、删除或修改消息的 Header 值。

消息过滤器

消息过滤器决定消息是否应该传递到输出通道。这只需要一个布尔测试方法,该方法可以检查特定的 Payload 内容类型、属性值、Header 是否存在或其他条件。如果消息被接受,则发送到输出通道。否则,消息将被丢弃(或者,对于更严格的实现,可能会抛出 Exception)。消息过滤器通常与发布-订阅通道结合使用,在发布-订阅通道中,多个消费者可能会接收到同一条消息,并使用过滤器的条件来缩小要处理的消息集。

请注意不要混淆管道与过滤器架构模式中“过滤器”的泛指用法与此处特指的选择性过滤两个通道之间消息流动的端点类型。管道与过滤器概念中的“过滤器”更接近 Spring Integration 的消息端点:任何可以连接到消息通道以发送或接收消息的组件。

消息路由器

消息路由器负责决定接下来哪个通道或哪些通道(如果有)应该接收消息。通常,该决定基于消息的内容或消息 Header 中可用的元数据。消息路由器通常用作服务激活器或能发送回复消息的其他端点上静态配置输出通道的动态替代方案。同样,消息路由器提供了对先前描述的多个订阅者使用的响应式消息过滤器的一种主动替代方案。

Router
图 3. 消息路由器

分发器

分发器是另一种消息端点类型,其职责是接收来自其输入通道的消息,将该消息分割成多条消息,并将每条消息发送到其输出通道。这通常用于将“复合”Payload 对象分解为包含细分 Payload 的一组消息。

聚合器

聚合器基本上是分发器的镜像,它是一种接收多条消息并将其组合成一条消息的消息端点类型。实际上,聚合器通常是包含分发器的管道中的下游消费者。从技术上讲,聚合器比分发器更复杂,因为它需要维护状态(要聚合的消息),决定何时完整的一组消息可用,并在必要时进行超时处理。此外,在超时情况下,聚合器需要知道是发送部分结果、丢弃它们还是将它们发送到单独的通道。Spring Integration 提供了 CorrelationStrategyReleaseStrategy,以及用于超时、超时时是否发送部分结果以及丢弃通道的可配置设置。

服务激活器

服务激活器是用于将服务实例连接到消息系统的通用端点。必须配置输入消息通道,并且,如果被调用的服务方法能够返回值,也可以提供输出消息通道。

输出通道是可选的,因为每条消息也可能提供自己的“回复地址”Header。此规则适用于所有消费者端点。

服务激活器调用某个服务对象上的操作来处理请求消息,提取请求消息的 Payload 并进行转换(如果方法不期望消息类型的参数)。当服务对象的方法返回一个值时,该返回值同样会在必要时转换为回复消息(如果它本身不是消息类型)。该回复消息被发送到输出通道。如果未配置输出通道,则回复消息将被发送到消息“回复地址”中指定的通道(如果可用)。

请求-回复服务激活器端点将目标对象的方法连接到输入和输出消息通道。

handler endpoint
图 4. 服务激活器
如前所述,在消息通道中,通道可以是可轮询的或可订阅的。在上面的图中,这通过“时钟”符号和实线箭头(轮询)以及虚线箭头(订阅)来表示。

通道适配器

通道适配器是一个端点,它将消息通道连接到其他系统或传输。通道适配器可以是入站的,也可以是出站的。通常,通道适配器会在消息与从其他系统接收或发送到其他系统的任何对象或资源(文件、HTTP请求、JMS消息等)之间进行一些映射。根据传输方式的不同,通道适配器还可能填充或提取消息头的值。Spring Integration 提供了许多通道适配器,这些适配器将在后续章节中介绍。

source endpoint
图5. 入站通道适配器端点将源系统连接到MessageChannel
消息源可以是可轮询的(例如 POP3),也可以是消息驱动的(例如 IMAP Idle)。在前面的图中,这由“时钟”符号、实心箭头(轮询)和虚线箭头(消息驱动)表示。
target endpoint
图6. 出站通道适配器端点将MessageChannel连接到目标系统。
消息通道一节中讨论的,通道可以是可轮询的或可订阅的。在前面的图中,这由“时钟”符号、实心箭头(轮询)和虚线箭头(订阅)表示。

端点 Bean 名称

消费端点(任何具有inputChannel的端点)由两个 bean 组成:消费者和消息处理器。消费者引用消息处理器,并在消息到达时调用它。

考虑以下 XML 示例

<int:service-activator id = "someService" ... />

鉴于前面的示例,bean 名称如下

  • 消费者:someServiceid

  • 处理器:someService.handler

使用企业集成模式 (EIP) 注解时,名称取决于多种因素。考虑以下带有注解的 POJO 示例

@Component
public class SomeComponent {

    @ServiceActivator(inputChannel = ...)
    public String someMethod(...) {
        ...
    }

}

鉴于前面的示例,bean 名称如下

  • 消费者:someComponent.someMethod.serviceActivator

  • 处理器:someComponent.someMethod.serviceActivator.handler

从 5.0.4 版本开始,您可以使用@EndpointId注解修改这些名称,如下例所示

@Component
public class SomeComponent {

    @EndpointId("someService")
    @ServiceActivator(inputChannel = ...)
    public String someMethod(...) {
        ...
    }

}

鉴于前面的示例,bean 名称如下

  • 消费者:someService

  • 处理器:someService.handler

@EndpointId创建的名称与 XML 配置中使用id属性创建的名称相同。考虑以下带有注解的 bean 示例

@Configuration
public class SomeConfiguration {

    @Bean
    @ServiceActivator(inputChannel = ...)
    public MessageHandler someHandler() {
        ...
    }

}

鉴于前面的示例,bean 名称如下

  • 消费者:someConfiguration.someHandler.serviceActivator

  • 处理器:someHandler@Bean名称)

从 5.0.4 版本开始,您可以使用@EndpointId注解修改这些名称,如下例所示

@Configuration
public class SomeConfiguration {

    @Bean("someService.handler")             (1)
    @EndpointId("someService")               (2)
    @ServiceActivator(inputChannel = ...)
    public MessageHandler someHandler() {
        ...
    }

}
1 处理器:someService.handler(bean 名称)
2 消费者:someService(端点 ID)

只要您遵循将.handler附加到@Bean名称的约定,@EndpointId注解创建的名称就与 XML 配置中使用id属性创建的名称相同。

有一个特殊情况会创建第三个 bean:出于架构原因,如果一个MessageHandler @Bean没有定义AbstractReplyProducingMessageHandler,框架会将提供的 bean 包装在一个ReplyProducingMessageHandlerWrapper中。这个包装器支持请求处理器建议处理,并发出正常的“produced no reply”调试日志消息。它的 bean 名称是处理器 bean 名称加上.wrapper(当存在@EndpointId — 否则,它是正常的生成处理器名称)。

类似地,可轮询消息源创建两个 bean:一个SourcePollingChannelAdapter (SPCA) 和一个MessageSource

考虑以下 XML 配置

<int:inbound-channel-adapter id = "someAdapter" ... />

鉴于前面的 XML 配置,bean 名称如下

  • SPCA:someAdapterid

  • 处理器:someAdapter.source

考虑以下带有注解@EndpointId的 POJO 的 Java 配置

@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public String pojoSource() {
    ...
}

鉴于前面的 Java 配置示例,bean 名称如下

  • SPCA:someAdapter

  • 处理器:someAdapter.source

考虑以下带有注解@EndpointID的 bean 的 Java 配置

@Bean("someAdapter.source")
@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public MessageSource<?> source() {
    return () -> {
        ...
    };
}

鉴于前面的示例,bean 名称如下

  • SPCA:someAdapter

  • 处理器:someAdapter.source(只要您遵循将.source附加到@Bean名称的约定)

配置与@EnableIntegration

在本文档中,您可以看到许多引用,关于用于在 Spring Integration 流中声明元素的 XML 命名空间支持。这种支持由一系列命名空间解析器提供,它们生成适当的 bean 定义来实现特定的组件。例如,许多端点由一个MessageHandler bean 和一个ConsumerEndpointFactoryBean组成,处理器和输入通道名称被注入其中。

当第一次遇到 Spring Integration 命名空间元素时,框架会自动声明许多 bean(任务调度器、隐式通道创建器等),这些 bean 用于支持运行时环境。

版本 4.0 引入了@EnableIntegration注解,以允许注册 Spring Integration 基础设施 bean(参阅Javadoc)。当仅使用 Java 配置时,此注解是必需的——例如在使用 Spring Boot 或 Spring Integration 消息注解支持,以及没有 XML 集成配置的 Spring Integration Java DSL 时。

@EnableIntegration注解在您有一个没有 Spring Integration 组件的父上下文以及两个或更多使用 Spring Integration 的子上下文时也很有用。它允许这些通用组件只在父上下文中声明一次。

@EnableIntegration注解会向应用上下文注册许多基础设施组件。特别是,它会

  • 注册一些内置 bean,例如errorChannel及其LoggingHandler、用于轮询器的taskSchedulerjsonPath SpEL 函数等等。

  • 添加多个BeanFactoryPostProcessor实例,以增强全局和默认集成环境的BeanFactory

  • 添加多个BeanPostProcessor实例,以增强或转换并包装特定 bean 用于集成目的。

  • 添加注解处理器来解析消息注解,并向应用上下文注册相应的组件。

@IntegrationComponentScan注解也允许类路径扫描。此注解的作用类似于标准的 Spring Framework @ComponentScan注解,但它仅限于 Spring Integration 特有的组件和注解,而标准的 Spring Framework 组件扫描机制无法触及这些组件和注解。例如,请参阅@MessagingGateway注解

@EnablePublisher注解注册一个PublisherAnnotationBeanPostProcessor bean,并为那些没有提供channel属性的@Publisher注解配置default-publisher-channel。如果找到多个@EnablePublisher注解,它们必须都具有相同的默认通道值。有关更多信息,请参阅使用@Publisher注解的注解驱动配置

引入了@GlobalChannelInterceptor注解,用于标记用于全局通道拦截的ChannelInterceptor bean。此注解类似于<int:channel-interceptor> XML 元素(参阅全局通道拦截器配置)。@GlobalChannelInterceptor注解可以放置在类级别(带有@Component刻板印象注解)或@Configuration类中的@Bean方法上。无论哪种情况,bean 都必须实现ChannelInterceptor

从 5.1 版本开始,全局通道拦截器应用于动态注册的通道——例如使用beanFactory.initializeBean()或通过IntegrationFlowContext使用 Java DSL 初始化 bean。在此之前,当 bean 在应用上下文刷新后创建时,拦截器不会被应用。

@IntegrationConverter注解将ConverterGenericConverterConverterFactory bean 标记为integrationConversionService的候选转换器。此注解类似于<int:converter> XML 元素(参阅Payload 类型转换)。您可以将@IntegrationConverter注解放置在类级别(带有@Component刻板印象注解)或@Configuration类中的@Bean方法上。

有关消息注解的更多信息,请参阅注解支持

编程注意事项

Spring Integration 中的大多数类(除非另有说明)必须在应用上下文中声明为 bean 并作为单例。这意味着这些类的实例是线程安全的,它们的生命周期以及与其他组件的连接由 Spring 依赖注入容器管理。实用工具类和构建器类(JacksonJsonUtils, `MessageBuilder, ExpressionEvalMap, IntegrationReactiveUtils 等)可以直接在 Java 代码中使用。然而,Java DSL 工厂和IntegrationComponentSpec实现结果仍然必须注册为应用上下文中的 bean。许多模块中存在的Session抽象不是线程安全的,通常通过Factory模式实现创建,并在线程安全的Template模式中使用。例如,请参阅SftpRemoteFileTemplate及其与DefaultSftpSessionFactory的关系。

您应尽可能使用普通 Java 对象 (POJO)(用于目标逻辑中的消息处理),并且仅在绝对必要时才在代码中暴露框架。有关更多信息,请参阅POJO 方法调用

如果您确实在类中暴露了框架,则需要考虑一些事项,尤其是在应用启动期间

  • 如果您的组件是ApplicationContextAware,通常不应在setApplicationContext()方法中使用ApplicationContext。相反,应存储一个引用,并将此类使用推迟到上下文生命周期稍后进行。

  • 如果您的组件是InitializingBean或使用@PostConstruct方法,请不要从这些初始化方法中发送任何消息。调用这些方法时,应用上下文尚未初始化,发送此类消息可能会失败。如果您需要在启动期间发送消息,请实现ApplicationListener并等待ContextRefreshedEvent。或者,实现SmartLifecycle,将您的 bean 放在一个较晚的阶段,然后从start()方法发送消息。

使用打包(例如 Shaded)Jar 时的注意事项

Spring Integration 使用 Spring Framework 的SpringFactories机制引导某些功能,以加载多个IntegrationConfigurationInitializer类。这包括-core jar 以及其他一些 jar,包括-http-jmx。此过程的信息存储在每个 jar 的META-INF/spring.factories文件中。

一些开发者倾向于使用知名工具(例如Apache Maven Shade Plugin)将其应用和所有依赖项重新打包到一个单独的 jar 中。

默认情况下,shade 插件在生成 shaded jar 时不会合并spring.factories文件。

除了spring.factories,还有其他META-INF文件(spring.handlersspring.schemas)用于 XML 配置。这些文件也需要合并。

Spring Boot 的可执行 jar 机制采用了不同的方法,它将 jar 文件嵌套,从而保留类路径上的每个spring.factories文件。因此,对于 Spring Boot 应用,如果您使用其默认的可执行 jar 格式,则无需额外操作。

即使您不使用 Spring Boot,您仍然可以使用 Boot 提供的工具,通过为上述文件添加 transformer 来增强 shade 插件。以下示例展示了如何配置插件

示例 1. pom.xml
...
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <configuration>
                <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
                <createDependencyReducedPom>true</createDependencyReducedPom>
            </configuration>
            <dependencies>
                <dependency> (1)
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                </dependency>
            </dependencies>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers> (2)
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>META-INF/spring.handlers</resource>
                            </transformer>
                            <transformer
                                implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                                <resource>META-INF/spring.factories</resource>
                            </transformer>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>META-INF/spring.schemas</resource>
                            </transformer>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
...

具体来说,

1 添加spring-boot-maven-plugin作为依赖项。
2 配置 transformer。

您可以为${spring.boot.version}添加一个属性,或使用明确的版本。

编程技巧与窍门

本节介绍了一些充分利用 Spring Integration 的方法。

XML Schema

使用 XML 配置时,为了避免出现错误的 schema 验证错误,您应该使用“Spring 感知”的 IDE,例如 Spring Tool Suite (STS)、带有 Spring IDE 插件的 Eclipse 或 IntelliJ IDEA。这些 IDE 知道如何从类路径解析正确的 XML schema(通过使用 jar 中的META-INF/spring.schemas文件)。在使用 STS 或带有插件的 Eclipse 时,您必须在项目上启用Spring Project Nature

某些遗留模块(版本 1.0 中存在的模块)在互联网上托管的 schema 是 1.0 版本,这是出于兼容性原因。如果您的 IDE 使用这些 schema,您很可能会看到错误提示。

每个在线 schema 都有一个类似于以下的警告

此 schema 用于 Spring Integration Core 的 1.0 版本。我们无法将其更新到当前 schema,因为这将破坏使用 1.0.3 或更低版本的任何应用。对于后续版本,将从类路径解析“无版本”schema 并从 jar 中获取。请参阅 GitHub

受影响的模块有

  • corespring-integration.xsd

  • file

  • http

  • jms

  • mail

  • security

  • stream

  • ws

  • xml

查找 Java 和 DSL 配置的类名

通过 XML 配置和 Spring Integration 命名空间支持,XML 解析器隐藏了目标 bean 如何声明和连接在一起的细节。对于 Java 配置,了解面向最终用户应用的框架 API 至关重要。

EIP 实现中的一等公民是MessageChannelEndpoint(参阅本章前面的主要组件)。它们的实现(契约)是

  • org.springframework.messaging.Message:参阅Message

  • org.springframework.messaging.MessageChannel:参阅消息通道

  • org.springframework.integration.endpoint.AbstractEndpoint:参阅Poller

前两个很容易理解如何实现、配置和使用。最后一个值得更多关注

AbstractEndpoint在整个 Spring Framework 中广泛用于不同的组件实现。其主要实现是

  • EventDrivenConsumer,当我们订阅SubscribableChannel以监听消息时使用。

  • PollingConsumer,当我们从PollableChannel轮询消息时使用。

使用消息注解或 Java DSL 时,您无需担心这些组件,因为框架会使用适当的注解和BeanPostProcessor实现自动生成它们。手动构建组件时,您应该使用ConsumerEndpointFactoryBean来帮助确定要创建的目标AbstractEndpoint消费者实现,这基于提供的inputChannel属性。

另一方面,ConsumerEndpointFactoryBean将委托给框架中的另一个一等公民 - org.springframework.messaging.MessageHandler。此接口实现的目的是处理由端点从通道消费的消息。Spring Integration 中的所有 EIP 组件都是MessageHandler实现(例如,AggregatingMessageHandlerMessageTransformingHandlerAbstractMessageSplitter等)。目标协议的出站适配器(FileWritingMessageHandlerHttpRequestExecutingMessageHandlerAbstractMqttMessageHandler等)也是MessageHandler实现。使用 Java 配置开发 Spring Integration 应用时,您应该查看 Spring Integration 模块,找到适合用于@ServiceActivator配置的MessageHandler实现。例如,要发送 XMPP 消息(参阅XMPP 支持),您应该配置类似以下内容

@Bean
@ServiceActivator(inputChannel = "input")
public MessageHandler sendChatMessageHandler(XMPPConnection xmppConnection) {
    ChatMessageSendingMessageHandler handler = new ChatMessageSendingMessageHandler(xmppConnection);

    DefaultXmppHeaderMapper xmppHeaderMapper = new DefaultXmppHeaderMapper();
    xmppHeaderMapper.setRequestHeaderNames("*");
    handler.setHeaderMapper(xmppHeaderMapper);

    return handler;
}

MessageHandler实现表示消息流的出站和处理部分。

入站消息流侧有其自身的组件,分为轮询和监听行为。监听(消息驱动)组件很简单,通常只需要一个目标类实现就可以准备好生成消息。监听组件可以是一次性的MessageProducerSupport实现(例如AbstractMqttMessageDrivenChannelAdapterImapIdleChannelAdapter),也可以是请求-回复的MessagingGatewaySupport实现(例如AmqpInboundGatewayAbstractWebServiceInboundGateway)。

轮询入站端点适用于那些不提供监听器 API 或不适合此类行为的协议,包括任何基于文件的协议(例如 FTP)、任何数据库(RDBMS 或 NoSQL)等。

这些入站端点由两个组件组成:轮询器配置,用于定期启动轮询任务;以及消息源类,用于从目标协议读取数据并为下游集成流生成消息。轮询器配置的第一个类是SourcePollingChannelAdapter。它是另一种AbstractEndpoint实现,但特别用于轮询以启动集成流。通常,使用消息注解或 Java DSL 时,您无需担心这个类。框架会根据@InboundChannelAdapter配置或 Java DSL 构建器规范为其生成一个 bean。

消息源组件对于目标应用开发更为重要,它们都实现了MessageSource接口(例如,MongoDbMessageSourceAbstractTwitterMessageSource)。考虑到这一点,我们使用 JDBC 从 RDBMS 表读取数据的配置可能类似于以下内容

@Bean
@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixedDelay="5000"))
public MessageSource<?> storedProc(DataSource dataSource) {
    return new JdbcPollingChannelAdapter(dataSource, "SELECT * FROM foo where status = 0");
}

您可以在特定的 Spring Integration 模块中找到目标协议所需的所有入站和出站类(在大多数情况下,在相应的包中)。例如,spring-integration-websocket适配器包括

  • o.s.i.websocket.inbound.WebSocketInboundChannelAdapter:实现MessageProducerSupport,用于监听 socket 上的帧并将消息发送到通道。

  • o.s.i.websocket.outbound.WebSocketOutboundMessageHandler:一次性的AbstractMessageHandler实现,用于将接收到的消息转换为适当的帧并通过 websocket 发送。

如果您熟悉 Spring Integration XML 配置,从版本 4.3 开始,我们在 XSD 元素定义中提供了关于哪些目标类用于声明适配器或网关 bean 的信息,如下例所示

<xsd:element name="outbound-async-gateway">
    <xsd:annotation>
		<xsd:documentation>
Configures a Consumer Endpoint for the 'o.s.i.amqp.outbound.AsyncAmqpOutboundGateway'
that will publish an AMQP Message to the provided Exchange and expect a reply Message.
The sending thread returns immediately; the reply is sent asynchronously; uses 'AsyncRabbitTemplate.sendAndReceive()'.
       </xsd:documentation>
	</xsd:annotation>

POJO 方法调用

编程注意事项中所述,我们推荐使用 POJO 编程风格,如下例所示

@ServiceActivator
public String myService(String payload) { ... }

在这种情况下,框架会提取String payload,调用您的方法,并将结果包装在一条消息中发送到流程中的下一个组件(原始消息头会复制到新消息中)。事实上,如果您使用 XML 配置,您甚至不需要@ServiceActivator注解,如下面的成对示例所示

<int:service-activator ... ref="myPojo" method="myService" />
public String myService(String payload) { ... }

只要类中的 public 方法没有歧义,您可以省略method属性。

您也可以在 POJO 方法中获取消息头信息,如下例所示

@ServiceActivator
public String myService(@Payload String payload, @Header("foo") String fooHeader) { ... }

您还可以解引用消息上的属性,如下例所示

@ServiceActivator
public String myService(@Payload("payload.foo") String foo, @Header("bar.baz") String barbaz) { ... }

由于有各种 POJO 方法调用方式,5.0 版本之前使用 SpEL(Spring Expression Language)来调用 POJO 方法。与方法中实际执行的工作相比,SpEL(即使是解释执行)对于这些操作通常“足够快”。然而,从 5.0 版本开始,只要可能,默认使用org.springframework.messaging.handler.invocation.InvocableHandlerMethod。这种技术通常比解释执行的 SpEL 执行速度更快,并且与其他 Spring 消息项目一致。InvocableHandlerMethod类似于在 Spring MVC 中用于调用控制器方法的技术。某些方法在使用 SpEL 时仍然总是被调用。例如,前面讨论的带有解引用属性的注解参数。这是因为 SpEL 具有导航属性路径的能力。

可能还有一些我们没有考虑到的边缘情况,这些情况也无法与InvocableHandlerMethod实例一起工作。因此,在这些情况下,我们会自动回退到使用 SpEL。

如果您愿意,您也可以设置您的 POJO 方法,使其始终使用 SpEL,并使用UseSpelInvoker注解,如下例所示

@UseSpelInvoker(compilerMode = "IMMEDIATE")
public void bar(String bar) { ... }

如果省略compilerMode属性,则spring.expression.compiler.mode系统属性决定编译器模式。有关编译 SpEL 的更多信息,请参阅SpEL 编译