STOMP 客户端
Spring 提供了基于 WebSocket 的 STOMP 客户端和基于 TCP 的 STOMP 客户端。
首先,你可以创建并配置 WebSocketStompClient
,如下例所示
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats
在前一个示例中,你可以将 StandardWebSocketClient
替换为 SockJsClient
,因为它也是 WebSocketClient
的实现。SockJsClient
可以使用 WebSocket 或基于 HTTP 的传输作为回退。更多详细信息,请参阅 SockJsClient
。
接下来,你可以建立连接并为 STOMP 会话提供一个处理器,如下例所示
String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);
当会话准备就绪时,处理器会收到通知,如下例所示
public class MyStompSessionHandler extends StompSessionHandlerAdapter {
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
// ...
}
}
一旦会话建立,任何有效载荷都可以发送,并使用配置的 MessageConverter
进行序列化,如下例所示
session.send("/topic/something", "payload");
你还可以订阅目的地。subscribe
方法需要一个用于处理订阅上消息的处理器,并返回一个 Subscription
句柄,你可以使用它来取消订阅。对于接收到的每条消息,处理器可以指定有效载荷应反序列化到的目标 Object
类型,如下例所示
session.subscribe("/topic/something", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
// ...
}
});
要启用 STOMP 心跳,你可以为 WebSocketStompClient
配置一个 TaskScheduler
,并可选择自定义心跳间隔(写入不活动 10 秒触发发送心跳,读取不活动 10 秒关闭连接)。
WebSocketStompClient
仅在不活动时发送心跳,即没有发送其他消息时。当使用外部 broker 时,这可能会带来挑战,因为非 broker 目的地的消息表示活动,但实际上并未转发到 broker。在这种情况下,你可以在初始化 外部 Broker 时配置一个 TaskScheduler
,它能确保即使只发送非 broker 目的地的消息,心跳也会转发到 broker。
当你在性能测试中使用 WebSocketStompClient 从同一台机器模拟数千个客户端时,考虑关闭心跳,因为每个连接都会安排自己的心跳任务,这对于在同一台机器上运行大量客户端来说不是最优的。 |
STOMP 协议还支持回执,客户端必须添加一个 receipt
头,服务器在处理完发送或订阅后会回复一个 RECEIPT 帧。为了支持这一点,StompSession
提供了 setAutoReceipt(boolean)
,它会在后续的每次发送或订阅事件中添加一个 receipt
头。另外,你也可以手动将回执头添加到 StompHeaders
中。send 和 subscribe 方法都返回一个 Receiptable
实例,你可以用它来注册回执成功和失败回调。对于此功能,你必须为客户端配置一个 TaskScheduler
以及回执过期前的时间(默认为 15 秒)。
请注意,StompSessionHandler
本身就是一个 StompFrameHandler
,它除了处理消息处理中的异常(通过 handleException
回调)和传输层错误(包括 ConnectionLostException
,通过 handleTransportError
回调)外,还可以处理 ERROR 帧。
你可以使用 WebSocketStompClient
的 inboundMessageSizeLimit
和 outboundMessageSizeLimit
属性来限制入站和出站 WebSocket 消息的最大大小。当出站 STOMP 消息超出限制时,它会被拆分成部分帧,接收方需要重新组装。默认情况下,出站消息没有大小限制。当入站 STOMP 消息大小超出配置限制时,会抛出 StompConversionException
。入站消息的默认大小限制为 64KB
。
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setInboundMessageSizeLimit(64 * 1024); // 64KB
stompClient.setOutboundMessageSizeLimit(64 * 1024); // 64KB