WebSockets
参考文档的这一部分涵盖了对 reactive-stack WebSocket 消息传递的支持。
WebSocket 简介
WebSocket 协议(RFC 6455)提供了一种标准化方式,用于在单个 TCP 连接上建立客户端和服务器之间的全双工双向通信通道。它是不同于 HTTP 的 TCP 协议,但设计用于在 HTTP 之上工作,使用端口 80 和 443 并允许重用现有的防火墙规则。
WebSocket 交互始于一个 HTTP 请求,该请求使用 HTTP Upgrade
头来升级或,在本例中,切换到 WebSocket 协议。以下示例展示了此类交互
GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket (1)
Connection: Upgrade (2)
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
1 | Upgrade 头。 |
2 | 使用 Upgrade 连接。 |
与通常的 200 状态码不同,支持 WebSocket 的服务器返回类似于以下的输出
HTTP/1.1 101 Switching Protocols (1)
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
1 | 协议切换 |
成功握手后,HTTP 升级请求底层的 TCP socket 保持打开状态,以便客户端和服务器继续发送和接收消息。
关于 WebSockets 工作原理的完整介绍超出了本文档的范围。请参阅 RFC 6455、HTML5 的 WebSocket 章或 Web 上的许多介绍和教程。
请注意,如果 WebSocket 服务器运行在 Web 服务器(例如 nginx)后面,你可能需要将其配置为将 WebSocket 升级请求传递给 WebSocket 服务器。同样,如果应用程序运行在云环境中,请查阅云提供商有关 WebSocket 支持的说明。
HTTP 对比 WebSocket
尽管 WebSocket 设计为与 HTTP 兼容并以 HTTP 请求开始,但重要的是要理解这两种协议会导致非常不同的架构和应用程序编程模型。
在 HTTP 和 REST 中,应用程序被建模为许多 URL。要与应用程序交互,客户端以请求-响应方式访问这些 URL。服务器根据 HTTP URL、方法和头将请求路由到适当的处理器。
相比之下,在 WebSockets 中,初始连接通常只有一个 URL。随后,所有应用程序消息都在同一个 TCP 连接上传输。这指向一种完全不同的异步、事件驱动的消息传递架构。
WebSocket 也是一种低级传输协议,与 HTTP 不同,它不规定消息内容的任何语义。这意味着除非客户端和服务器就消息语义达成一致,否则无法路由或处理消息。
WebSocket 客户端和服务器可以通过 HTTP 握手请求上的 Sec-WebSocket-Protocol
头协商使用更高级的消息协议(例如 STOMP)。如果没有此头,他们需要制定自己的约定。
何时使用 WebSockets
WebSockets 可以使网页变得动态和交互。然而,在许多情况下,结合 AJAX 和 HTTP streaming 或 long polling 可以提供一个简单有效的解决方案。
例如,新闻、邮件和社交 feeds 需要动态更新,但每隔几分钟更新一次可能完全可以接受。另一方面,协作、游戏和金融应用程序需要更接近实时。
延迟本身不是决定性因素。如果消息量相对较低(例如,监控网络故障),HTTP streaming 或 polling 可以提供有效的解决方案。低延迟、高频率和高流量的组合是使用 WebSocket 的最佳场景。
还要记住,在 Internet 上,超出你控制范围的限制性代理可能会阻止 WebSocket 交互,原因可能是它们未配置为传递 Upgrade
头,或者它们关闭长时间看起来空闲的连接。这意味着在防火墙内部使用 WebSocket 进行内部应用程序比面向公众的应用程序更为直接。
WebSocket API
Spring Framework 提供了一个 WebSocket API,你可以用它来编写处理 WebSocket 消息的客户端和服务器端应用程序。
服务器端
要创建 WebSocket 服务器,首先可以创建一个 WebSocketHandler
。以下示例展示了如何创建
-
Java
-
Kotlin
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
public class MyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// ...
}
}
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
class MyWebSocketHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
// ...
}
}
然后你可以将其映射到一个 URL
-
Java
-
Kotlin
@Configuration
class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/path", new MyWebSocketHandler());
int order = -1; // before annotated controllers
return new SimpleUrlHandlerMapping(map, order);
}
}
@Configuration
class WebConfig {
@Bean
fun handlerMapping(): HandlerMapping {
val map = mapOf("/path" to MyWebSocketHandler())
val order = -1 // before annotated controllers
return SimpleUrlHandlerMapping(map, order)
}
}
如果使用 WebFlux 配置,则无需进一步操作;如果未使用 WebFlux 配置,则需要声明一个 WebSocketHandlerAdapter
,如下所示
-
Java
-
Kotlin
@Configuration
class WebConfig {
// ...
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
@Configuration
class WebConfig {
// ...
@Bean
fun handlerAdapter() = WebSocketHandlerAdapter()
}
WebSocketHandler
WebSocketHandler
的 handle
方法接受 WebSocketSession
并返回 Mono<Void>
,以指示会话的应用程序处理何时完成。会话通过两个流进行处理,一个用于入站消息,一个用于出站消息。下表描述了处理这些流的两个方法
WebSocketSession 方法 |
描述 |
---|---|
|
提供对入站消息流的访问,并在连接关闭时完成。 |
|
接受出站消息的源,写入消息,并返回一个 |
WebSocketHandler
必须将入站和出站流组合成一个统一的流,并返回一个反映该流完成的 Mono<Void>
。根据应用程序的要求,统一的流在以下情况完成
-
入站或出站消息流中的任一个完成时。
-
入站流完成(即连接关闭),而出站流是无限的。
-
在选定的点,通过
WebSocketSession
的close
方法。
当入站和出站消息流组合在一起时,无需检查连接是否打开,因为 Reactive Streams 会发出结束活动的信号。入站流接收完成或错误信号,出站流接收取消信号。
Handler 最基本的实现是处理入站流的实现。以下示例展示了此类实现
-
Java
-
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive() (1)
.doOnNext(message -> {
// ... (2)
})
.concatMap(message -> {
// ... (3)
})
.then(); (4)
}
}
1 | 访问入站消息流。 |
2 | 对每条消息执行某些操作。 |
3 | 执行使用消息内容的嵌套异步操作。 |
4 | 返回一个在接收完成时完成的 Mono<Void> 。 |
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
return session.receive() (1)
.doOnNext {
// ... (2)
}
.concatMap {
// ... (3)
}
.then() (4)
}
}
1 | 访问入站消息流。 |
2 | 对每条消息执行某些操作。 |
3 | 执行使用消息内容的嵌套异步操作。 |
4 | 返回一个在接收完成时完成的 Mono<Void> 。 |
对于嵌套的异步操作,在使用池化数据缓冲区(例如 Netty)的底层服务器上,你可能需要调用 message.retain() 。否则,在你有机会读取数据之前,数据缓冲区可能已经被释放。更多背景信息,请参阅 数据缓冲区和编解码器。 |
以下实现结合了入站和出站流
-
Java
-
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive() (1)
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.map(value -> session.textMessage("Echo " + value)); (2)
return session.send(output); (3)
}
}
1 | 处理入站消息流。 |
2 | 创建出站消息,生成组合流。 |
3 | 返回一个 Mono<Void> ,在我们继续接收时不会完成。 |
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val output = session.receive() (1)
.doOnNext {
// ...
}
.concatMap {
// ...
}
.map { session.textMessage("Echo $it") } (2)
return session.send(output) (3)
}
}
1 | 处理入站消息流。 |
2 | 创建出站消息,生成组合流。 |
3 | 返回一个 Mono<Void> ,在我们继续接收时不会完成。 |
入站和出站流可以独立,仅在完成时汇合,如下例所示
-
Java
-
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> input = session.receive() (1)
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();
Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage)); (2)
return Mono.zip(input, output).then(); (3)
}
}
1 | 处理入站消息流。 |
2 | 发送出站消息。 |
3 | 汇合流并返回一个 Mono<Void> ,该 Mono 在任一流结束时完成。 |
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive() (1)
.doOnNext {
// ...
}
.concatMap {
// ...
}
.then()
val source: Flux<String> = ...
val output = session.send(source.map(session::textMessage)) (2)
return Mono.zip(input, output).then() (3)
}
}
1 | 处理入站消息流。 |
2 | 发送出站消息。 |
3 | 汇合流并返回一个 Mono<Void> ,该 Mono 在任一流结束时完成。 |
DataBuffer
DataBuffer
是 WebFlux 中字节缓冲区的表示。参考文档的 Spring Core 部分在 数据缓冲区和编解码器 中有更多关于此的内容。要理解的关键点是,在某些服务器(如 Netty)上,字节缓冲区是池化的并带有引用计数,消费后必须释放以避免内存泄漏。
在 Netty 上运行时,如果应用程序希望保留输入数据缓冲区以确保它们不被释放,则必须使用 DataBufferUtils.retain(dataBuffer)
,并在缓冲区被消费后使用 DataBufferUtils.release(dataBuffer)
。
握手
WebSocketHandlerAdapter
委托给 WebSocketService
。默认情况下,它是 HandshakeWebSocketService
的一个实例,它对 WebSocket 请求执行基本检查,然后使用当前服务器的 RequestUpgradeStrategy
。目前,内置支持 Reactor Netty、Tomcat、Jetty 和 Undertow。
HandshakeWebSocketService
公开了一个 sessionAttributePredicate
属性,允许设置一个 Predicate<String>
以从 WebSession
中提取属性并将其插入到 WebSocketSession
的属性中。
服务器配置
每个服务器的 RequestUpgradeStrategy
都公开了特定于底层 WebSocket 服务器引擎的配置。使用 WebFlux Java 配置时,你可以按照 WebFlux 配置 中的相应部分所示定制此类属性,否则如果未使用 WebFlux 配置,请使用以下内容
-
Java
-
Kotlin
@Configuration
class WebConfig {
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}
@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}
@Configuration
class WebConfig {
@Bean
fun handlerAdapter() =
WebSocketHandlerAdapter(webSocketService())
@Bean
fun webSocketService(): WebSocketService {
val strategy = TomcatRequestUpgradeStrategy().apply {
setMaxSessionIdleTimeout(0L)
}
return HandshakeWebSocketService(strategy)
}
}
检查你的服务器的升级策略以了解可用的选项。目前,只有 Tomcat 和 Jetty 公开了此类选项。
CORS
配置 CORS 并限制对 WebSocket 端点访问的最简单方法是让你的 WebSocketHandler
实现 CorsConfigurationSource
并返回一个带有允许的来源、头和其他详细信息的 CorsConfiguration
。如果无法做到这一点,你还可以在 SimpleUrlHandler
上设置 corsConfigurations
属性,按 URL 模式指定 CORS 设置。如果两者都指定,它们将使用 CorsConfiguration
的 combine
方法进行组合。
客户端
Spring WebFlux 提供了一个 WebSocketClient
抽象,其实现包括 Reactor Netty、Tomcat、Jetty、Undertow 和标准 Java(即 JSR-356)。
Tomcat 客户端实际上是标准 Java 客户端的扩展,在 WebSocketSession 处理方面增加了一些额外功能,以利用 Tomcat 特定的 API 暂停接收消息以进行背压(back pressure)。 |
要启动 WebSocket 会话,你可以创建一个客户端实例并使用其 execute
方法
-
Java
-
Kotlin
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
session.receive()
.doOnNext(System.out::println)
.then());
val client = ReactorNettyWebSocketClient()
val url = URI("ws://localhost:8080/path")
client.execute(url) { session ->
session.receive()
.doOnNext(::println)
.then()
}
一些客户端,如 Jetty,实现了 Lifecycle
,在使用它们之前需要先停止和启动。所有客户端都具有与底层 WebSocket 客户端配置相关的构造函数选项。