STOMP 支持
Spring Integration 4.2 版本引入了 STOMP (Simple Text Orientated Messaging Protocol) 客户端支持。它基于 Spring Framework 消息模块 stomp 包的架构、基础设施和 API。Spring Integration 使用了许多 Spring STOMP 组件(例如 StompSession
和 StompClientSupport
)。更多信息,请参阅 Spring Framework 参考手册中的Spring Framework STOMP 支持章节。
您需要在项目中包含此依赖项
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stomp</artifactId>
<version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-stomp:6.4.4"
对于服务器端组件,您需要添加 org.springframework:spring-websocket
和/或 io.projectreactor.netty:reactor-netty
依赖项。
概述
要配置 STOMP,您应该从 STOMP 客户端对象开始。Spring Framework 提供了以下实现
-
WebSocketStompClient
:基于 Spring WebSocket API 构建,支持标准的 JSR-356 WebSocket、Jetty 9 以及使用 SockJS Client 进行基于 HTTP 的 WebSocket 模拟的 SockJS。 -
ReactorNettyTcpStompClient
:基于reactor-netty
项目中的ReactorNettyTcpClient
构建。
您可以提供任何其他 StompClientSupport
实现。请参阅这些类的Javadoc。
StompClientSupport
类被设计为生成给定 StompSessionHandler
的 StompSession
的工厂,所有剩余工作都通过对该 StompSessionHandler
和 StompSession
抽象的回调来完成。通过 Spring Integration 的适配器抽象,我们需要提供一些托管共享对象来代表我们的应用程序作为一个拥有唯一会话的 STOMP 客户端。为此,Spring Integration 提供了 StompSessionManager
抽象来管理任意给定 StompSessionHandler
之间的单个 StompSession
。这允许对特定的 STOMP Broker 使用入站或出站通道适配器(或两者)。有关更多信息,请参阅 StompSessionManager
(及其实现)的 JavaDocs。
STOMP 入站通道适配器
StompInboundChannelAdapter
是一个一站式的 MessageProducer
组件,它将您的 Spring Integration 应用程序订阅到提供的 STOMP 目标,并从它们接收消息(通过在连接的 StompSession
上使用提供的 MessageConverter
从 STOMP 帧转换而来)。您可以通过在 StompInboundChannelAdapter
上使用适当的 @ManagedOperation
注解在运行时更改目标(以及 STOMP 订阅)。
有关更多配置选项,请参阅STOMP Namespace 支持和 StompInboundChannelAdapter
的Javadoc。
STOMP 出站通道适配器
StompMessageHandler
是用于 <int-stomp:outbound-channel-adapter>
的 MessageHandler
,它用于通过 StompSession
(由共享的 StompSessionManager
提供)将传出的 Message<?>
实例发送到 STOMP destination
(预配置或在运行时使用 SpEL 表达式确定)。
有关更多配置选项,请参阅STOMP Namespace 支持和 StompMessageHandler
的Javadoc。
STOMP 头映射
STOMP 协议在其帧中提供了头。STOMP 帧的整个结构具有以下格式
....
COMMAND
header1:value1
header2:value2
Body^@
....
Spring Framework 提供了 StompHeaders
来表示这些头。有关更多详细信息,请参阅Javadoc。STOMP 帧被转换为 Message<?>
实例以及从 Message<?>
实例转换,这些头也被映射到 MessageHeaders
实例以及从 MessageHeaders
实例映射。Spring Integration 为 STOMP 适配器提供了默认的 HeaderMapper
实现。该实现是 StompHeaderMapper
。它分别为入站和出站适配器提供了 fromHeaders()
和 toHeaders()
操作。
与许多其他 Spring Integration 模块一样,引入了 IntegrationStompHeaders
类来将标准 STOMP 头映射到 MessageHeaders
,其中以 stomp_
作为头名称前缀。此外,所有带有该前缀的 MessageHeaders
实例在发送到目标时都映射到 StompHeaders
。
有关更多信息,请参阅这些类的Javadoc 以及STOMP Namespace Support 中 mapped-headers
属性的描述。
STOMP 集成事件
许多 STOMP 操作是异步的,包括错误处理。例如,STOMP 有一个 RECEIPT
服务器帧,当客户端帧通过添加 RECEIPT
头请求时返回该帧。为了提供对这些异步事件的访问,Spring Integration 会发出 StompIntegrationEvent
实例,您可以通过实现 ApplicationListener
或使用 <int-event:inbound-channel-adapter>
来获取这些实例(请参阅 接收 Spring Application 事件)。
具体来说,当 stompSessionListenableFuture
由于连接 STOMP broker 失败而接收到 onFailure()
时,AbstractStompSessionManager
会发出一个 StompExceptionEvent
。另一个例子是 StompMessageHandler
。它处理 ERROR
STOMP 帧,这些帧是服务器对该 StompMessageHandler
发送的不当(未被接受的)消息的响应。
StompMessageHandler
在发送到 StompSession
的消息的异步回复中,作为 StompSession.Receiptable
回调的一部分,发出 StompReceiptEvent
。StompReceiptEvent
可以是正面的或负面的,这取决于是否在 receiptTimeLimit
期间从服务器接收到 RECEIPT
帧,您可以在 StompClientSupport
实例上配置该时间限制。它默认为 15 * 1000
(毫秒,即 15 秒)。
StompSession.Receiptable 回调仅在要发送的消息的 RECEIPT STOMP 头不为 null 时才会添加。您可以通过 StompSession 的 autoReceipt 选项以及 StompSessionManager 上分别启用自动 RECEIPT 头生成。 |
有关如何配置 Spring Integration 以接受这些 ApplicationEvent
实例的更多信息,请参阅STOMP Adapters Java Configuration。
STOMP 适配器 Java 配置
以下示例显示了 STOMP 适配器的全面 Java 配置
@Configuration
@EnableIntegration
public class StompConfiguration {
@Bean
public ReactorNettyTcpStompClient stompClient() {
ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
stompClient.setMessageConverter(new PassThruMessageConverter());
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
stompClient.setTaskScheduler(taskScheduler);
stompClient.setReceiptTimeLimit(5000);
return stompClient;
}
@Bean
public StompSessionManager stompSessionManager() {
ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
stompSessionManager.setAutoReceipt(true);
return stompSessionManager;
}
@Bean
public PollableChannel stompInputChannel() {
return new QueueChannel();
}
@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
adapter.setOutputChannel(stompInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public MessageHandler stompMessageHandler() {
StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
handler.setDestination("/topic/myTopic");
return handler;
}
@Bean
public PollableChannel stompEvents() {
return new QueueChannel();
}
@Bean
public ApplicationListener<ApplicationEvent> stompEventListener() {
ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
producer.setEventTypes(StompIntegrationEvent.class);
producer.setOutputChannel(stompEvents());
return producer;
}
}
STOMP Namespace 支持
Spring Integration STOMP namespace 实现了入站和出站通道适配器组件。要将其包含在您的配置中,请在您的应用程序上下文配置文件中提供以下 namespace 声明
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stomp
https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
...
</beans>
理解 <int-stomp:outbound-channel-adapter>
元素
以下列表显示了 STOMP 出站通道适配器的可用属性
<int-stomp:outbound-channel-adapter
id="" (1)
channel="" (2)
stomp-session-manager="" (3)
header-mapper="" (4)
mapped-headers="" (5)
destination="" (6)
destination-expression="" (7)
auto-startup="" (8)
phase=""/> (9)
1 | 组件 bean 名称。MessageHandler 会注册一个 bean 别名,其名称为 id 加上 .handler 。如果您未设置 channel 属性,则会在应用程序上下文中创建一个 DirectChannel 并注册,其 bean 名称为该 id 属性的值。在这种情况下,端点会注册一个 bean 名称,其名称为 id 加上 .adapter 。 |
2 | 如果存在 id ,则标识连接到此适配器的通道。请参阅 id 。可选。 |
3 | 对 StompSessionManager bean 的引用,该 bean 封装了底层连接和 StompSession 处理操作。必需。 |
4 | 对实现 HeaderMapper<StompHeaders> 的 bean 的引用,该 bean 将 Spring Integration MessageHeaders 映射到 STOMP 帧头以及从 STOMP 帧头映射。它与 mapped-headers 互斥。默认值为 StompHeaderMapper 。 |
5 | 逗号分隔的 STOMP 头名称列表,这些头将被映射到 STOMP 帧头。只有在未设置 header-mapper 引用时才能提供此项。此列表中的值也可以是与头名称匹配的简单模式(例如 myheader* 或 *myheader )。一个特殊令牌 (STOMP_OUTBOUND_HEADERS ) 代表所有标准 STOMP 头(内容长度、收据、心跳等)。它们默认包含在内。如果您想添加自己的头,并且也希望标准头被映射,则必须包含此令牌或通过使用 header-mapper 提供您自己的 HeaderMapper 实现。 |
6 | 发送 STOMP 消息的目标名称。它与 destination-expression 互斥。 |
7 | 一个 SpEL 表达式,将在运行时针对每个 Spring Integration Message 作为根对象进行评估。它与 destination 互斥。 |
8 | 布尔值,指示此端点是否应自动启动。默认为 true 。 |
9 | 此端点应在其生命周期阶段内启动和停止。值越低,此端点启动越早,停止越晚。默认值为 Integer.MIN_VALUE 。值可以为负。请参阅 SmartLifeCycle 。 |
理解 <int-stomp:inbound-channel-adapter>
元素
以下列表显示了 STOMP 入站通道适配器的可用属性
<int-stomp:inbound-channel-adapter
id="" (1)
channel="" (2)
error-channel="" (3)
stomp-session-manager="" (4)
header-mapper="" (5)
mapped-headers="" (6)
destinations="" (7)
send-timeout="" (8)
payload-type="" (9)
auto-startup="" (10)
phase=""/> (11)
1 | 组件 bean 名称。如果您未设置 channel 属性,则会在应用程序上下文中创建一个 DirectChannel 并注册,其 bean 名称为该 id 属性的值。在这种情况下,端点会注册一个 bean 名称,其名称为 id 加上 .adapter 。 |
2 | 标识连接到此适配器的通道。 |
3 | MessageChannel bean 引用,ErrorMessage 实例应发送到该引用。 |
4 | 请参阅 <int-stomp:outbound-channel-adapter> 上的相同选项。 |
5 | 逗号分隔的 STOMP 头名称列表,这些头将从 STOMP 帧头映射而来。只有在未设置 header-mapper 引用时才能提供此项。此列表中的值也可以是与头名称匹配的简单模式(例如 myheader* 或 *myheader )。一个特殊令牌 (STOMP_INBOUND_HEADERS ) 代表所有标准 STOMP 头(内容长度、收据、心跳等)。它们默认包含在内。如果您想添加自己的头,并且也希望标准头被映射,则必须也包含此令牌或通过使用 header-mapper 提供您自己的 HeaderMapper 实现。 |
6 | 请参阅 <int-stomp:outbound-channel-adapter> 上的相同选项。 |
7 | 逗号分隔的 STOMP 目标名称列表,用于订阅。目标列表(以及订阅)可以通过 addDestination() 和 removeDestination() @ManagedOperation 注解在运行时进行修改。 |
8 | 当发送消息到可能阻塞的通道时,最大等待时间(毫秒)。例如,如果 QueueChannel 达到最大容量,它可能会阻塞直到有空间可用。 |
9 | 要从传入 STOMP 帧转换的目标 payload 的 Java 类型的完全限定名称。默认为 String.class 。 |
10 | 请参阅 <int-stomp:outbound-channel-adapter> 上的相同选项。 |
11 | 请参阅 <int-stomp:outbound-channel-adapter> 上的相同选项。 |