消息流
一旦公开 STOMP 端点,Spring 应用程序就成为已连接客户端的 STOMP 代理。本节介绍服务器端消息流。
spring-messaging
模块包含对源自Spring Integration 的消息应用程序的基础支持,后来被提取并整合到 Spring 框架中,以便在许多Spring 项目 和应用程序场景中更广泛地使用。以下列表简要描述了一些可用的消息传递抽象
-
Message:消息的简单表示,包括报头和有效负载。
-
MessageHandler:处理消息的契约。
-
MessageChannel:用于发送消息的契约,它能够在生产者和消费者之间实现松耦合。
-
SubscribableChannel:具有
MessageHandler
订阅者的MessageChannel
。 -
ExecutorSubscribableChannel:使用
Executor
传递消息的SubscribableChannel
。
Java 配置(即@EnableWebSocketMessageBroker
)和 XML 命名空间配置(即<websocket:message-broker>
)都使用上述组件来组装消息工作流。下图显示了启用简单的内置消息代理时使用的组件
上图显示了三个消息通道
-
clientInboundChannel
:用于传递从 WebSocket 客户端接收到的消息。 -
clientOutboundChannel
:用于将服务器消息发送到 WebSocket 客户端。 -
brokerChannel
:用于从服务器端应用程序代码中将消息发送到消息代理。
下图显示了为管理订阅和广播消息配置外部代理(例如 RabbitMQ)时使用的组件
前两张图的主要区别在于使用“代理中继”通过 TCP 将消息传递到外部 STOMP 代理,以及将消息从代理传递到已订阅的客户端。
当从 WebSocket 连接接收消息时,它们会被解码为 STOMP 帧,转换为 Spring 的Message
表示形式,并发送到clientInboundChannel
以进行进一步处理。例如,目标标头以/app
开头的 STOMP 消息可能会路由到带注解控制器的@MessageMapping
方法,而/topic
和/queue
消息可能会直接路由到消息代理。
处理来自客户端的 STOMP 消息的带注解的@Controller
可以通过brokerChannel
将消息发送到消息代理,代理通过clientOutboundChannel
将消息广播到匹配的订阅者。同一个控制器也可以在响应 HTTP 请求时执行相同的操作,因此客户端可以执行 HTTP POST,然后@PostMapping
方法可以将消息发送到消息代理以广播到已订阅的客户端。
我们可以通过一个简单的示例来跟踪流程。考虑以下示例,它设置了一个服务器
-
Java
-
Kotlin
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
registry.enableSimpleBroker("/topic");
}
}
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfiguration : WebSocketMessageBrokerConfigurer {
override fun registerStompEndpoints(registry: StompEndpointRegistry) {
registry.addEndpoint("/portfolio")
}
override fun configureMessageBroker(registry: MessageBrokerRegistry) {
registry.setApplicationDestinationPrefixes("/app")
registry.enableSimpleBroker("/topic")
}
}
-
Java
-
Kotlin
@Controller
public class GreetingController {
@MessageMapping("/greeting")
public String handle(String greeting) {
return "[" + getTimestamp() + ": " + greeting;
}
private String getTimestamp() {
return new SimpleDateFormat("MM/dd/yyyy h:mm:ss a").format(new Date());
}
}
@Controller
class GreetingController {
@MessageMapping("/greeting")
fun handle(greeting: String): String {
return "[${getTimestamp()}: $greeting"
}
private fun getTimestamp(): String {
return SimpleDateFormat("MM/dd/yyyy h:mm:ss a").format(Date())
}
}
上述示例支持以下流程
-
客户端连接到
localhost:8080/portfolio
,并且一旦建立 WebSocket 连接,STOMP 帧就开始在其上流动。 -
客户端发送一个 SUBSCRIBE 帧,其目标标头为
/topic/greeting
。接收并解码后,消息将发送到clientInboundChannel
,然后路由到消息代理,消息代理存储客户端订阅。 -
客户端向
/app/greeting
发送一个 SEND 帧。/app
前缀有助于将其路由到带注解的控制器。在剥离/app
前缀后,目标的其余部分/greeting
将映射到GreetingController
中的@MessageMapping
方法。 -
从
GreetingController
返回的值将转换为一个 Spring 的Message
,其有效负载基于返回值,默认目标标头为/topic/greeting
(从输入目标派生,将/app
替换为/topic
)。生成的邮件将发送到brokerChannel
并由消息代理处理。 -
消息代理查找所有匹配的订阅者,并通过
clientOutboundChannel
向每个订阅者发送一个 MESSAGE 帧,从那里消息被编码为 STOMP 帧并在 WebSocket 连接上发送。
下一节将提供有关带注解方法的更多详细信息,包括支持的各种参数和返回值。