STOMP 支持

Spring Integration 4.2 版本引入了 STOMP(简单文本导向消息协议)客户端支持。它基于 Spring 框架的消息模块、stomp 包的架构、基础设施和 API。Spring Integration 使用了许多 Spring STOMP 组件(例如 StompSessionStompClientSupport)。有关更多信息,请参阅 Spring 框架参考手册中的 Spring Framework STOMP 支持 章节。

您需要将此依赖项包含到您的项目中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stomp</artifactId>
    <version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-stomp:6.3.5"

对于服务器端组件,您需要添加 org.springframework:spring-websocket 和/或 io.projectreactor.netty:reactor-netty 依赖项。

概述

要配置 STOMP,您应该从 STOMP 客户端对象开始。Spring 框架提供了以下实现

  • WebSocketStompClient:基于 Spring WebSocket API 构建,支持标准 JSR-356 WebSocket、Jetty 9 和 SockJS 用于基于 HTTP 的 WebSocket 模拟(使用 SockJS 客户端)。

  • ReactorNettyTcpStompClient:基于 reactor-netty 项目中的 ReactorNettyTcpClient 构建。

您可以提供任何其他 StompClientSupport 实现。请参阅这些类的 Javadoc

StompClientSupport 类被设计为一个工厂,用于为提供的 StompSessionHandler 生成 StompSession,其余所有工作都是通过对该 StompSessionHandlerStompSession 抽象的回调完成的。使用 Spring Integration 的适配器抽象,我们需要提供一些托管的共享对象来表示我们的应用程序作为具有其唯一会话的 STOMP 客户端。为此,Spring Integration 提供了 StompSessionManager 抽象来管理任何提供的 StompSessionHandler 之间的单个 StompSession。这允许对特定 STOMP 代理使用入站出站通道适配器(或两者)。有关更多信息,请参阅 StompSessionManager(及其实现)的 JavaDocs。

STOMP 入站通道适配器

StompInboundChannelAdapter 是一个一站式 MessageProducer 组件,它将您的 Spring Integration 应用程序订阅到提供的 STOMP 目标,并从这些目标接收消息(通过在连接的 StompSession 上使用提供的 MessageConverter 从 STOMP 帧转换)。您可以通过在 StompInboundChannelAdapter 上使用适当的 @ManagedOperation 注解在运行时更改目标(以及因此的 STOMP 订阅)。

有关更多配置选项,请参阅 STOMP 命名空间支持StompInboundChannelAdapterJavadoc

STOMP 出站通道适配器

StompMessageHandler<int-stomp:outbound-channel-adapter>MessageHandler,用于通过 StompSession(由共享的 StompSessionManager 提供)将传出的 Message<?> 实例发送到 STOMP destination(预配置或在运行时使用 SpEL 表达式确定)。

有关更多配置选项,请参阅 STOMP 命名空间支持StompMessageHandlerJavadoc

STOMP 标头映射

STOMP 协议在其帧中提供标头。STOMP 帧的整个结构具有以下格式

....
COMMAND
header1:value1
header2:value2

Body^@
....

Spring 框架提供 StompHeaders 来表示这些标头。有关更多详细信息,请参阅 Javadoc。STOMP 帧转换为 Message<?> 实例并从 Message<?> 实例转换,这些标头映射到 MessageHeaders 实例并从 MessageHeaders 实例映射。Spring Integration 为 STOMP 适配器提供了一个默认的 HeaderMapper 实现。该实现是 StompHeaderMapper。它分别为入站和出站适配器提供 fromHeaders()toHeaders() 操作。

与许多其他 Spring Integration 模块一样,已引入 IntegrationStompHeaders 类以将标准 STOMP 标头映射到 MessageHeaders,其中 stomp_ 作为标头名称前缀。此外,所有具有该前缀的 MessageHeaders 实例在发送到目标时都映射到 StompHeaders

有关更多信息,请参阅这些类的 Javadoc 以及 STOMP 命名空间支持 中的 mapped-headers 属性说明。

STOMP 集成事件

许多 STOMP 操作是异步的,包括错误处理。例如,STOMP 具有一个 RECEIPT 服务器帧,当客户端帧通过添加 RECEIPT 标头请求一个帧时,它会返回该帧。为了访问这些异步事件,Spring Integration 发出 StompIntegrationEvent 实例,您可以通过实现 ApplicationListener 或使用 <int-event:inbound-channel-adapter> 获取这些实例(请参阅 接收 Spring 应用程序事件)。

具体来说,当 stompSessionListenableFuture 由于无法连接到 STOMP 代理而收到 onFailure() 时,AbstractStompSessionManager 会发出 StompExceptionEvent。另一个示例是 StompMessageHandler。它处理 ERROR STOMP 帧,这些帧是服务器对该 StompMessageHandler 发送的不正确(未接受)消息的响应。

StompMessageHandler 在发送到 StompSession 的消息的异步答案中,作为 StompSession.Receiptable 回调的一部分发出 StompReceiptEventStompReceiptEvent 可以是正数或负数,具体取决于是否在您可以在 StompClientSupport 实例上配置的 receiptTimeLimit 期间从服务器接收 RECEIPT 帧。它默认为 15 * 1000(以毫秒为单位,即 15 秒)。

仅当要发送的消息的 RECEIPT STOMP 标头不为 null 时,才会添加 StompSession.Receiptable 回调。您可以分别通过其 autoReceipt 选项和 StompSessionManagerStompSession 上启用自动 RECEIPT 标头生成。

有关如何配置 Spring Integration 以接受这些 ApplicationEvent 实例的更多信息,请参阅 STOMP 适配器 Java 配置

STOMP 适配器 Java 配置

以下示例显示了 STOMP 适配器的综合 Java 配置

@Configuration
@EnableIntegration
public class StompConfiguration {

    @Bean
    public ReactorNettyTcpStompClient stompClient() {
        ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
        stompClient.setMessageConverter(new PassThruMessageConverter());
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.afterPropertiesSet();
        stompClient.setTaskScheduler(taskScheduler);
        stompClient.setReceiptTimeLimit(5000);
        return stompClient;
    }

    @Bean
    public StompSessionManager stompSessionManager() {
        ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
        stompSessionManager.setAutoReceipt(true);
        return stompSessionManager;
    }

    @Bean
    public PollableChannel stompInputChannel() {
        return new QueueChannel();
    }

    @Bean
    public StompInboundChannelAdapter stompInboundChannelAdapter() {
        StompInboundChannelAdapter adapter =
        		new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
        adapter.setOutputChannel(stompInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "stompOutputChannel")
    public MessageHandler stompMessageHandler() {
        StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
        handler.setDestination("/topic/myTopic");
        return handler;
    }

    @Bean
    public PollableChannel stompEvents() {
        return new QueueChannel();
    }

    @Bean
    public ApplicationListener<ApplicationEvent> stompEventListener() {
        ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
        producer.setEventTypes(StompIntegrationEvent.class);
        producer.setOutputChannel(stompEvents());
        return producer;
    }

}

STOMP 命名空间支持

Spring Integration STOMP 命名空间实现了入站和出站通道适配器组件。要将其包含在您的配置中,请在您的应用程序上下文配置文件中提供以下命名空间声明

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/stomp
    https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
    ...
</beans>

了解 <int-stomp:outbound-channel-adapter> 元素

以下列表显示了 STOMP 出站通道适配器可用的属性

<int-stomp:outbound-channel-adapter
                           id=""                      (1)
                           channel=""                 (2)
                           stomp-session-manager=""   (3)
                           header-mapper=""           (4)
                           mapped-headers=""          (5)
                           destination=""             (6)
                           destination-expression=""  (7)
                           auto-startup=""            (8)
                           phase=""/>                 (9)
1 组件 bean 名称。MessageHandler 使用 id 加上 .handler 作为 bean 别名注册。如果您未设置 channel 属性,则会创建一个 DirectChannel 并将其注册到应用程序上下文中,并将此 id 属性的值作为 bean 名称。在这种情况下,端点使用 id 加上 .adapter 作为 bean 名称注册。
2 如果存在 id,则标识附加到此适配器的通道。请参阅 id。可选。
3 StompSessionManager bean 的引用,该 bean 封装了底层连接和 StompSession 处理操作。必需。
4 对实现 HeaderMapper<StompHeaders> 的 bean 的引用,该 bean 将 Spring Integration MessageHeaders 映射到 STOMP 帧标头并从 STOMP 帧标头映射。它与 mapped-headers 互斥。它默认为 StompHeaderMapper
5 要映射到 STOMP 帧标头的 STOMP 标头名称的逗号分隔列表。仅当未设置 header-mapper 引用时,才能提供它。此列表中的值也可以是与标头名称匹配的简单模式(例如 myheader**myheader)。特殊标记(STOMP_OUTBOUND_HEADERS)表示所有标准 STOMP 标头(内容长度、回执、心跳等)。默认情况下包含它们。如果您想添加自己的标头并希望标准标头也映射,则必须包含此标记或使用 header-mapper 提供自己的 HeaderMapper 实现。
6 发送 STOMP 消息的目标的名称。它与 destination-expression 互斥。
7 在运行时针对每个 Spring Integration Message 作为根对象计算的 SpEL 表达式。它与 destination 互斥。
8 指示此端点是否应自动启动的布尔值。默认为 true
9 此端点应启动和停止的生存周期阶段。值越低,此端点启动越早,停止越晚。默认值为 Integer.MIN_VALUE。值可以为负数。请参阅 SmartLifeCycle

了解 <int-stomp:inbound-channel-adapter> 元素

以下列表显示了 STOMP 入站通道适配器可用的属性

<int-stomp:inbound-channel-adapter
                           id=""                     (1)
                           channel=""                (2)
                           error-channel=""          (3)
                           stomp-session-manager=""  (4)
                           header-mapper=""          (5)
                           mapped-headers=""         (6)
                           destinations=""           (7)
                           send-timeout=""           (8)
                           payload-type=""           (9)
                           auto-startup=""           (10)
                           phase=""/>                (11)
1 组件 bean 名称。如果您未设置 channel 属性,则会创建一个 DirectChannel 并将其注册到应用程序上下文中,并将此 id 属性的值作为 bean 名称。在这种情况下,端点使用 id 加上 .adapter 作为 bean 名称注册。
2 标识附加到此适配器的通道。
3 应将 ErrorMessage 实例发送到的 MessageChannel bean 引用。
4 请参阅 <int-stomp:outbound-channel-adapter> 上的相同选项。

5 需要从 STOMP 帧头映射的 STOMP 头名称的逗号分隔列表。只有在未设置header-mapper引用时才能提供此选项。此列表中的值也可以是与头名称匹配的简单模式(例如,myheader**myheader)。特殊标记 (STOMP_INBOUND_HEADERS) 表示所有标准 STOMP 头(内容长度、回执、心跳等)。它们默认包含在内。如果要添加自己的头并且希望也映射标准头,则必须包含此标记或使用header-mapper提供自己的HeaderMapper实现。
6 请参阅 <int-stomp:outbound-channel-adapter> 上的相同选项。

7 要订阅的 STOMP 目标名称的逗号分隔列表。可以通过addDestination()removeDestination() @ManagedOperation注释在运行时修改目标(以及订阅)列表。
8 如果通道可能阻塞,则在将消息发送到通道时等待的最大时间量(以毫秒为单位)。例如,如果QueueChannel的最大容量已满,则它可能会阻塞,直到有可用空间。
9 要从传入 STOMP 帧转换的目标payload的 Java 类型全限定名称。默认为String.class
10 请参阅 <int-stomp:outbound-channel-adapter> 上的相同选项。

11 请参阅 <int-stomp:outbound-channel-adapter> 上的相同选项。