消息流
一旦 STOMP 端点暴露,Spring 应用程序就成为连接客户端的 STOMP 代理。本节描述了服务器端的消息流。
spring-messaging 模块包含对消息应用程序的基础支持,这些支持起源于 Spring Integration,后来被提取并整合到 Spring Framework 中,以便在许多 Spring 项目和应用程序场景中广泛使用。以下列表简要描述了一些可用的消息抽象。
-
Message:消息的简单表示,包括头部和有效载荷。
-
MessageHandler:处理消息的契约。
-
MessageChannel:用于发送消息的契约,实现了生产者和消费者之间的松散耦合。
-
SubscribableChannel:带有
MessageHandler订阅者的MessageChannel。 -
ExecutorSubscribableChannel:使用
Executor传递消息的SubscribableChannel。
Java 配置(即 @EnableWebSocketMessageBroker)和 XML 命名空间配置(即 <websocket:message-broker>)都使用上述组件来组装消息工作流。以下图表显示了启用简单内置消息代理时使用的组件。
上图显示了三个消息通道:
-
clientInboundChannel:用于传递从 WebSocket 客户端接收到的消息。 -
clientOutboundChannel:用于向 WebSocket 客户端发送服务器消息。 -
brokerChannel:用于从服务器端应用程序代码中向消息代理发送消息。
下图显示了当配置外部代理(例如 RabbitMQ)来管理订阅和广播消息时使用的组件。
前两张图的主要区别在于,“代理中继”用于通过 TCP 将消息传递给外部 STOMP 代理,并从代理向下传递消息给订阅的客户端。
当从 WebSocket 连接接收到消息时,它们被解码为 STOMP 帧,转换为 Spring Message 表示,并发送到 clientInboundChannel 进行进一步处理。例如,目标头部以 /app 开头的 STOMP 消息可能会路由到注解控制器中的 @MessageMapping 方法,而 /topic 和 /queue 消息可能会直接路由到消息代理。
处理来自客户端的 STOMP 消息的注解 @Controller 可以通过 brokerChannel 向消息代理发送消息,代理会通过 clientOutboundChannel 将消息广播给匹配的订阅者。同样的控制器也可以响应 HTTP 请求执行相同的操作,因此客户端可以执行 HTTP POST,然后 @PostMapping 方法可以向消息代理发送消息以广播给订阅的客户端。
我们可以通过一个简单的示例来追溯消息流。考虑以下示例,它设置了一个服务器:
-
Java
-
Kotlin
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
registry.enableSimpleBroker("/topic");
}
}
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfiguration : WebSocketMessageBrokerConfigurer {
override fun registerStompEndpoints(registry: StompEndpointRegistry) {
registry.addEndpoint("/portfolio")
}
override fun configureMessageBroker(registry: MessageBrokerRegistry) {
registry.setApplicationDestinationPrefixes("/app")
registry.enableSimpleBroker("/topic")
}
}
-
Java
-
Kotlin
@Controller
public class GreetingController {
@MessageMapping("/greeting")
public String handle(String greeting) {
return "[" + getTimestamp() + ": " + greeting;
}
private String getTimestamp() {
return new SimpleDateFormat("MM/dd/yyyy h:mm:ss a").format(new Date());
}
}
@Controller
class GreetingController {
@MessageMapping("/greeting")
fun handle(greeting: String): String {
return "[${getTimestamp()}: $greeting"
}
private fun getTimestamp(): String {
return SimpleDateFormat("MM/dd/yyyy h:mm:ss a").format(Date())
}
}
上述示例支持以下消息流:
-
客户端连接到
localhost:8080/portfolio,一旦 WebSocket 连接建立,STOMP 帧开始在其上传输。 -
客户端发送一个 SUBSCRIBE 帧,其目标头部为
/topic/greeting。收到并解码后,消息被发送到clientInboundChannel,然后路由到消息代理,消息代理存储客户端订阅。 -
客户端发送一个 SEND 帧到
/app/greeting。/app前缀有助于将其路由到注解控制器。剥离/app前缀后,目标的其余部分/greeting会映射到GreetingController中的@MessageMapping方法。 -
从
GreetingController返回的值被转换为一个 SpringMessage,其有效载荷基于返回值,默认目标头部为/topic/greeting(从输入目标派生,其中/app被/topic替换)。生成的消息被发送到brokerChannel并由消息代理处理。 -
消息代理找到所有匹配的订阅者,并通过
clientOutboundChannel向每个订阅者发送一个 MESSAGE 帧,消息从那里被编码为 STOMP 帧并在 WebSocket 连接上传输。
下一节将提供有关注解方法的更多详细信息,包括支持的参数类型和返回值。