消息流

一旦暴露 STOMP 端点,Spring 应用程序就成为连接客户端的 STOMP broker。本节描述了服务器端的消息流。

spring-messaging 模块包含消息应用程序的基础支持,它起源于 Spring Integration,后来被提取并纳入 Spring Framework 中,以便在许多 Spring 项目和应用程序场景中更广泛地使用。以下列表简要描述了一些可用的消息抽象

Java 配置(即 @EnableWebSocketMessageBroker)和 XML 命名空间配置(即 <websocket:message-broker>)都使用上述组件来组装消息工作流。下图显示了启用简单内置消息 broker 时使用的组件

message flow simple broker

上图显示了三个消息通道

  • clientInboundChannel:用于传递从 WebSocket 客户端接收的消息。

  • clientOutboundChannel:用于向 WebSocket 客户端发送服务器消息。

  • brokerChannel:用于在服务器端应用程序代码中向消息 broker 发送消息。

下图显示了配置外部 broker(例如 RabbitMQ)用于管理订阅和广播消息时使用的组件

message flow broker relay

前两个图的主要区别在于使用“broker 中继”通过 TCP 将消息传递到外部 STOMP broker,以及将消息从 broker 传递给订阅的客户端。

当从 WebSocket 连接接收到消息时,它们被解码为 STOMP 帧,转换为 Spring Message 表示形式,并发送到 clientInboundChannel 进行进一步处理。例如,目的地头部以 /app 开头的 STOMP 消息可以路由到注解控制器的 @MessageMapping 方法,而 /topic/queue 消息可以直接路由到消息 broker。

处理来自客户端的 STOMP 消息的注解 @Controller 可以通过 brokerChannel 向消息 broker 发送消息,然后 broker 通过 clientOutboundChannel 将消息广播给匹配的订阅者。同一个控制器也可以响应 HTTP 请求执行相同的操作,因此客户端可以执行 HTTP POST,然后 @PostMapping 方法可以将消息发送到消息 broker,以广播给订阅的客户端。

我们可以通过一个简单的示例来追踪流程。考虑以下示例,它设置了一个服务器

  • Java

  • Kotlin

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {

	@Override
	public void registerStompEndpoints(StompEndpointRegistry registry) {
		registry.addEndpoint("/portfolio");
	}

	@Override
	public void configureMessageBroker(MessageBrokerRegistry registry) {
		registry.setApplicationDestinationPrefixes("/app");
		registry.enableSimpleBroker("/topic");
	}
}
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfiguration : WebSocketMessageBrokerConfigurer {
	override fun registerStompEndpoints(registry: StompEndpointRegistry) {
		registry.addEndpoint("/portfolio")
	}

	override fun configureMessageBroker(registry: MessageBrokerRegistry) {
		registry.setApplicationDestinationPrefixes("/app")
		registry.enableSimpleBroker("/topic")
	}
}
  • Java

  • Kotlin

@Controller
public class GreetingController {

	@MessageMapping("/greeting")
	public String handle(String greeting) {
		return "[" + getTimestamp() + ": " + greeting;
	}

	private String getTimestamp() {
		return new SimpleDateFormat("MM/dd/yyyy h:mm:ss a").format(new Date());
	}

}
@Controller
class GreetingController {
	
	@MessageMapping("/greeting")
	fun handle(greeting: String): String {
		return "[${getTimestamp()}: $greeting"
	}

	private fun getTimestamp(): String {
		return SimpleDateFormat("MM/dd/yyyy h:mm:ss a").format(Date())
	}
}

上例支持以下流程

  1. 客户端连接到 localhost:8080/portfolio,一旦建立 WebSocket 连接,STOMP 帧就开始在其上传输。

  2. 客户端发送一个 SUBSCRIBE 帧,其目的地头部为 /topic/greeting。收到并解码后,消息发送到 clientInboundChannel,然后路由到消息 broker,由其存储客户端订阅信息。

  3. 客户端向 /app/greeting 发送一个 SEND 帧。/app 前缀有助于将其路由到注解控制器。移除 /app 前缀后,目的地剩余的 /greeting 部分映射到 GreetingController 中的 @MessageMapping 方法。

  4. GreetingController 返回的值被转换为一个 Spring Message,其载荷基于返回值,默认目的地头部为 /topic/greeting(从输入目的地中将 /app 替换为 /topic 派生而来)。生成的消息被发送到 brokerChannel 并由消息 broker 处理。

  5. 消息 broker 查找所有匹配的订阅者,并通过 clientOutboundChannel 向每个订阅者发送一个 MESSAGE 帧,消息从 clientOutboundChannel 被编码为 STOMP 帧并在 WebSocket 连接上传输。

下一节将提供更多关于注解方法的详细信息,包括支持的参数类型和返回值。