STOMP 客户端

Spring 提供了基于 WebSocket 的 STOMP 客户端和基于 TCP 的 STOMP 客户端。

首先,你可以创建并配置 WebSocketStompClient,如下例所示

WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats

在上述示例中,你可以用 SockJsClient 替换 StandardWebSocketClient,因为 SockJsClient 也是 WebSocketClient 的实现。SockJsClient 可以使用 WebSocket 或基于 HTTP 的传输作为回退方案。有关更多详细信息,请参阅 SockJsClient

接下来,你可以建立连接并为 STOMP 会话提供一个处理器,如下例所示

String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);

当会话准备就绪时,处理器将收到通知,如下例所示

public class MyStompSessionHandler extends StompSessionHandlerAdapter {

	@Override
	public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
		// ...
	}
}

一旦会话建立,任何有效载荷都可以发送,并使用配置的 MessageConverter 进行序列化,如下例所示

session.send("/topic/something", "payload");

你还可以订阅目标。subscribe 方法需要一个用于处理订阅消息的处理器,并返回一个 Subscription 句柄,你可以用它来取消订阅。对于每条收到的消息,处理器可以指定应将有效载荷反序列化为的目标 Object 类型,如下例所示

session.subscribe("/topic/something", new StompFrameHandler() {

	@Override
	public Type getPayloadType(StompHeaders headers) {
		return String.class;
	}

	@Override
	public void handleFrame(StompHeaders headers, Object payload) {
		// ...
	}

});

要启用 STOMP 心跳,你可以为 WebSocketStompClient 配置一个 TaskScheduler,并可选择自定义心跳间隔(写入不活动导致发送心跳的间隔为 10 秒,读取不活动导致关闭连接的间隔为 10 秒)。

WebSocketStompClient 仅在不活动的情况下发送心跳,即没有发送其他消息时。当使用外部代理时,这可能会带来挑战,因为非代理目标的消息表示活动但实际上并未转发到代理。在这种情况下,你可以在初始化 外部代理 时配置一个 TaskScheduler,这样可以确保即使仅发送非代理目标的消息时,心跳也会转发到代理。

当你使用 WebSocketStompClient 进行性能测试,模拟同一台机器上的数千个客户端时,请考虑关闭心跳,因为每个连接都调度自己的心跳任务,这对于在同一台机器上运行的大量客户端来说并未优化。

STOMP 协议还支持回执,客户端必须添加一个 receipt 头,服务器在处理发送或订阅后会回复一个 RECEIPT 帧。为了支持这一点,StompSession 提供了 setAutoReceipt(boolean),它会导致在每次后续发送或订阅事件中添加 receipt 头。或者,你也可以手动向 StompHeaders 添加回执头。发送和订阅都返回一个 Receiptable 实例,你可以用它来注册回执成功和失败回调。对于此功能,你必须为客户端配置一个 TaskScheduler 以及回执过期的时间量(默认为 15 秒)。

请注意,StompSessionHandler 本身是一个 StompFrameHandler,这使得除了处理消息异常的 handleException 回调和处理传输级别错误(包括 ConnectionLostException)的 handleTransportError 之外,它还可以处理 ERROR 帧。

你可以使用 WebSocketStompClientinboundMessageSizeLimitoutboundMessageSizeLimit 属性来限制入站和出站 WebSocket 消息的最大大小。当出站 STOMP 消息超过限制时,它会被分割成部分帧,接收方需要重新组装。默认情况下,出站消息没有大小限制。当入站 STOMP 消息大小超过配置的限制时,会抛出 StompConversionException。入站消息的默认大小限制为 64KB

WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setInboundMessageSizeLimit(64 * 1024); // 64KB
stompClient.setOutboundMessageSizeLimit(64 * 1024); // 64KB
© . This site is unofficial and not affiliated with VMware.