STOMP 支持

Spring Integration 4.2 版本引入了 STOMP (Simple Text Orientated Messaging Protocol) 客户端支持。它基于 Spring Framework 消息模块 stomp 包的架构、基础设施和 API。Spring Integration 使用了许多 Spring STOMP 组件(例如 StompSessionStompClientSupport)。更多信息,请参阅 Spring Framework 参考手册中的Spring Framework STOMP 支持章节。

您需要在项目中包含此依赖项

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stomp</artifactId>
    <version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-stomp:6.4.4"

对于服务器端组件,您需要添加 org.springframework:spring-websocket 和/或 io.projectreactor.netty:reactor-netty 依赖项。

概述

要配置 STOMP,您应该从 STOMP 客户端对象开始。Spring Framework 提供了以下实现

  • WebSocketStompClient:基于 Spring WebSocket API 构建,支持标准的 JSR-356 WebSocket、Jetty 9 以及使用 SockJS Client 进行基于 HTTP 的 WebSocket 模拟的 SockJS。

  • ReactorNettyTcpStompClient:基于 reactor-netty 项目中的 ReactorNettyTcpClient 构建。

您可以提供任何其他 StompClientSupport 实现。请参阅这些类的Javadoc

StompClientSupport 类被设计为生成给定 StompSessionHandlerStompSession工厂,所有剩余工作都通过对该 StompSessionHandlerStompSession 抽象的回调来完成。通过 Spring Integration 的适配器抽象,我们需要提供一些托管共享对象来代表我们的应用程序作为一个拥有唯一会话的 STOMP 客户端。为此,Spring Integration 提供了 StompSessionManager 抽象来管理任意给定 StompSessionHandler 之间的单个 StompSession。这允许对特定的 STOMP Broker 使用入站出站通道适配器(或两者)。有关更多信息,请参阅 StompSessionManager(及其实现)的 JavaDocs。

STOMP 入站通道适配器

StompInboundChannelAdapter 是一个一站式的 MessageProducer 组件,它将您的 Spring Integration 应用程序订阅到提供的 STOMP 目标,并从它们接收消息(通过在连接的 StompSession 上使用提供的 MessageConverter 从 STOMP 帧转换而来)。您可以通过在 StompInboundChannelAdapter 上使用适当的 @ManagedOperation 注解在运行时更改目标(以及 STOMP 订阅)。

有关更多配置选项,请参阅STOMP Namespace 支持StompInboundChannelAdapterJavadoc

STOMP 出站通道适配器

StompMessageHandler 是用于 <int-stomp:outbound-channel-adapter>MessageHandler,它用于通过 StompSession(由共享的 StompSessionManager 提供)将传出的 Message<?> 实例发送到 STOMP destination(预配置或在运行时使用 SpEL 表达式确定)。

有关更多配置选项,请参阅STOMP Namespace 支持StompMessageHandlerJavadoc

STOMP 头映射

STOMP 协议在其帧中提供了头。STOMP 帧的整个结构具有以下格式

....
COMMAND
header1:value1
header2:value2

Body^@
....

Spring Framework 提供了 StompHeaders 来表示这些头。有关更多详细信息,请参阅Javadoc。STOMP 帧被转换为 Message<?> 实例以及从 Message<?> 实例转换,这些头也被映射到 MessageHeaders 实例以及从 MessageHeaders 实例映射。Spring Integration 为 STOMP 适配器提供了默认的 HeaderMapper 实现。该实现是 StompHeaderMapper。它分别为入站和出站适配器提供了 fromHeaders()toHeaders() 操作。

与许多其他 Spring Integration 模块一样,引入了 IntegrationStompHeaders 类来将标准 STOMP 头映射到 MessageHeaders,其中以 stomp_ 作为头名称前缀。此外,所有带有该前缀的 MessageHeaders 实例在发送到目标时都映射到 StompHeaders

有关更多信息,请参阅这些类的Javadoc 以及STOMP Namespace Supportmapped-headers 属性的描述。

STOMP 集成事件

许多 STOMP 操作是异步的,包括错误处理。例如,STOMP 有一个 RECEIPT 服务器帧,当客户端帧通过添加 RECEIPT 头请求时返回该帧。为了提供对这些异步事件的访问,Spring Integration 会发出 StompIntegrationEvent 实例,您可以通过实现 ApplicationListener 或使用 <int-event:inbound-channel-adapter> 来获取这些实例(请参阅 接收 Spring Application 事件)。

具体来说,当 stompSessionListenableFuture 由于连接 STOMP broker 失败而接收到 onFailure() 时,AbstractStompSessionManager 会发出一个 StompExceptionEvent。另一个例子是 StompMessageHandler。它处理 ERROR STOMP 帧,这些帧是服务器对该 StompMessageHandler 发送的不当(未被接受的)消息的响应。

StompMessageHandler 在发送到 StompSession 的消息的异步回复中,作为 StompSession.Receiptable 回调的一部分,发出 StompReceiptEventStompReceiptEvent 可以是正面的或负面的,这取决于是否在 receiptTimeLimit 期间从服务器接收到 RECEIPT 帧,您可以在 StompClientSupport 实例上配置该时间限制。它默认为 15 * 1000(毫秒,即 15 秒)。

StompSession.Receiptable 回调仅在要发送的消息的 RECEIPT STOMP 头不为 null 时才会添加。您可以通过 StompSessionautoReceipt 选项以及 StompSessionManager 上分别启用自动 RECEIPT 头生成。

有关如何配置 Spring Integration 以接受这些 ApplicationEvent 实例的更多信息,请参阅STOMP Adapters Java Configuration

STOMP 适配器 Java 配置

以下示例显示了 STOMP 适配器的全面 Java 配置

@Configuration
@EnableIntegration
public class StompConfiguration {

    @Bean
    public ReactorNettyTcpStompClient stompClient() {
        ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
        stompClient.setMessageConverter(new PassThruMessageConverter());
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.afterPropertiesSet();
        stompClient.setTaskScheduler(taskScheduler);
        stompClient.setReceiptTimeLimit(5000);
        return stompClient;
    }

    @Bean
    public StompSessionManager stompSessionManager() {
        ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
        stompSessionManager.setAutoReceipt(true);
        return stompSessionManager;
    }

    @Bean
    public PollableChannel stompInputChannel() {
        return new QueueChannel();
    }

    @Bean
    public StompInboundChannelAdapter stompInboundChannelAdapter() {
        StompInboundChannelAdapter adapter =
        		new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
        adapter.setOutputChannel(stompInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "stompOutputChannel")
    public MessageHandler stompMessageHandler() {
        StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
        handler.setDestination("/topic/myTopic");
        return handler;
    }

    @Bean
    public PollableChannel stompEvents() {
        return new QueueChannel();
    }

    @Bean
    public ApplicationListener<ApplicationEvent> stompEventListener() {
        ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
        producer.setEventTypes(StompIntegrationEvent.class);
        producer.setOutputChannel(stompEvents());
        return producer;
    }

}

STOMP Namespace 支持

Spring Integration STOMP namespace 实现了入站和出站通道适配器组件。要将其包含在您的配置中,请在您的应用程序上下文配置文件中提供以下 namespace 声明

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/stomp
    https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
    ...
</beans>

理解 <int-stomp:outbound-channel-adapter> 元素

以下列表显示了 STOMP 出站通道适配器的可用属性

<int-stomp:outbound-channel-adapter
                           id=""                      (1)
                           channel=""                 (2)
                           stomp-session-manager=""   (3)
                           header-mapper=""           (4)
                           mapped-headers=""          (5)
                           destination=""             (6)
                           destination-expression=""  (7)
                           auto-startup=""            (8)
                           phase=""/>                 (9)
1 组件 bean 名称。MessageHandler 会注册一个 bean 别名,其名称为 id 加上 .handler。如果您未设置 channel 属性,则会在应用程序上下文中创建一个 DirectChannel 并注册,其 bean 名称为该 id 属性的值。在这种情况下,端点会注册一个 bean 名称,其名称为 id 加上 .adapter
2 如果存在 id,则标识连接到此适配器的通道。请参阅 id。可选。
3 StompSessionManager bean 的引用,该 bean 封装了底层连接和 StompSession 处理操作。必需。
4 对实现 HeaderMapper<StompHeaders> 的 bean 的引用,该 bean 将 Spring Integration MessageHeaders 映射到 STOMP 帧头以及从 STOMP 帧头映射。它与 mapped-headers 互斥。默认值为 StompHeaderMapper
5 逗号分隔的 STOMP 头名称列表,这些头将被映射到 STOMP 帧头。只有在未设置 header-mapper 引用时才能提供此项。此列表中的值也可以是与头名称匹配的简单模式(例如 myheader**myheader)。一个特殊令牌 (STOMP_OUTBOUND_HEADERS) 代表所有标准 STOMP 头(内容长度、收据、心跳等)。它们默认包含在内。如果您想添加自己的头,并且也希望标准头被映射,则必须包含此令牌或通过使用 header-mapper 提供您自己的 HeaderMapper 实现。
6 发送 STOMP 消息的目标名称。它与 destination-expression 互斥。
7 一个 SpEL 表达式,将在运行时针对每个 Spring Integration Message 作为根对象进行评估。它与 destination 互斥。
8 布尔值,指示此端点是否应自动启动。默认为 true
9 此端点应在其生命周期阶段内启动和停止。值越低,此端点启动越早,停止越晚。默认值为 Integer.MIN_VALUE。值可以为负。请参阅 SmartLifeCycle

理解 <int-stomp:inbound-channel-adapter> 元素

以下列表显示了 STOMP 入站通道适配器的可用属性

<int-stomp:inbound-channel-adapter
                           id=""                     (1)
                           channel=""                (2)
                           error-channel=""          (3)
                           stomp-session-manager=""  (4)
                           header-mapper=""          (5)
                           mapped-headers=""         (6)
                           destinations=""           (7)
                           send-timeout=""           (8)
                           payload-type=""           (9)
                           auto-startup=""           (10)
                           phase=""/>                (11)
1 组件 bean 名称。如果您未设置 channel 属性,则会在应用程序上下文中创建一个 DirectChannel 并注册,其 bean 名称为该 id 属性的值。在这种情况下,端点会注册一个 bean 名称,其名称为 id 加上 .adapter
2 标识连接到此适配器的通道。
3 MessageChannel bean 引用,ErrorMessage 实例应发送到该引用。
4 请参阅 <int-stomp:outbound-channel-adapter> 上的相同选项。
5 逗号分隔的 STOMP 头名称列表,这些头将从 STOMP 帧头映射而来。只有在未设置 header-mapper 引用时才能提供此项。此列表中的值也可以是与头名称匹配的简单模式(例如 myheader**myheader)。一个特殊令牌 (STOMP_INBOUND_HEADERS) 代表所有标准 STOMP 头(内容长度、收据、心跳等)。它们默认包含在内。如果您想添加自己的头,并且也希望标准头被映射,则必须也包含此令牌或通过使用 header-mapper 提供您自己的 HeaderMapper 实现。
6 请参阅 <int-stomp:outbound-channel-adapter> 上的相同选项。
7 逗号分隔的 STOMP 目标名称列表,用于订阅。目标列表(以及订阅)可以通过 addDestination()removeDestination() @ManagedOperation 注解在运行时进行修改。
8 当发送消息到可能阻塞的通道时,最大等待时间(毫秒)。例如,如果 QueueChannel 达到最大容量,它可能会阻塞直到有空间可用。
9 要从传入 STOMP 帧转换的目标 payload 的 Java 类型的完全限定名称。默认为 String.class
10 请参阅 <int-stomp:outbound-channel-adapter> 上的相同选项。
11 请参阅 <int-stomp:outbound-channel-adapter> 上的相同选项。