RSocket
本节介绍 Spring 框架对 RSocket 协议的支持。
概述
RSocket 是一种应用程序协议,用于通过 TCP、WebSocket 和其他字节流传输进行多路复用双工通信,使用以下交互模型之一
-
请求-响应
— 发送一条消息并接收一条回复。 -
请求-流
— 发送一条消息并接收消息流作为回复。 -
通道
— 双向发送消息流。 -
即发即忘
— 发送单向消息。
一旦建立初始连接,"客户端"与"服务器"的区分就消失了,因为双方变得对称,并且双方都可以发起上述交互之一。这就是为什么在协议调用中,参与方被称为"请求者"和"响应者",而上述交互被称为"请求流"或简称为"请求"。
这些是 RSocket 协议的关键特性和优势
-
响应式流语义跨越网络边界 — 对于诸如
请求流
和通道
之类的流式请求,背压信号在请求者和响应者之间传递,允许请求者在源头减缓响应者的速度,从而减少对网络层拥塞控制的依赖,以及对网络层或任何级别的缓冲的需求。 -
请求节流 — 此功能在
LEASE
帧之后被称为"租赁",该帧可以从每端发送以限制另一端在给定时间内允许的请求总数。租赁会定期续期。 -
会话恢复 — 这专为连接丢失而设计,需要维护一些状态。状态管理对应用程序来说是透明的,并且与背压配合良好,背压可以在可能的情况下停止生产者,并减少所需的状态量。
-
大型消息的碎片化和重新组装。
-
保持活动(心跳)。
RSocket 在多种语言中都有实现。该Java 库建立在Project Reactor和Reactor Netty之上,用于传输。这意味着来自应用程序中响应式流发布者的信号会透明地通过 RSocket 跨网络传播。
协议
RSocket 的优势之一是它在网络上有明确定义的行为,并且有一个易于阅读的规范以及一些协议扩展。因此,建议阅读规范,独立于语言实现和更高级别的框架 API。本节提供一个简洁的概述,以建立一些上下文。
连接
最初,客户端通过某些低级流传输(如 TCP 或 WebSocket)连接到服务器,并向服务器发送SETUP
帧以设置连接参数。
服务器可能会拒绝SETUP
帧,但通常在发送(对于客户端)和接收(对于服务器)之后,双方都可以开始发出请求,除非SETUP
指示使用租赁语义来限制请求数量,在这种情况下,双方必须等待来自另一端的LEASE
帧才能允许发出请求。
发出请求
一旦建立连接,双方都可以通过以下帧之一发起请求:REQUEST_RESPONSE
、REQUEST_STREAM
、REQUEST_CHANNEL
或REQUEST_FNF
。每个帧都包含从请求者到响应者的一个消息。
响应方可以返回包含响应消息的PAYLOAD
帧,在REQUEST_CHANNEL
的情况下,请求方也可以发送包含更多请求消息的PAYLOAD
帧。
当请求涉及消息流,例如Request-Stream
和Channel
时,响应方必须尊重请求方的需求信号。需求以消息数量表示。初始需求在REQUEST_STREAM
和REQUEST_CHANNEL
帧中指定。后续需求通过REQUEST_N
帧发出信号。
双方也可以通过METADATA_PUSH
帧发送元数据通知,这些通知不属于任何单个请求,而是属于整个连接。
消息格式
RSocket 消息包含数据和元数据。元数据可用于发送路由、安全令牌等。数据和元数据可以采用不同的格式。每种格式的 MIME 类型在SETUP
帧中声明,并适用于给定连接上的所有请求。
虽然所有消息都可以包含元数据,但通常元数据(例如路由)是按请求的,因此只包含在请求的第一个消息中,即包含以下帧之一:REQUEST_RESPONSE
、REQUEST_STREAM
、REQUEST_CHANNEL
或REQUEST_FNF
。
协议扩展定义了应用程序中使用的常见元数据格式
Java 实现
RSocket 的Java 实现基于Project Reactor。TCP 和 WebSocket 的传输基于Reactor Netty。作为 Reactive Streams 库,Reactor 简化了协议实现的工作。对于应用程序来说,使用Flux
和Mono
以及声明式操作符和透明反压支持是自然的选择。
RSocket Java 中的 API 故意保持最小化和基础。它专注于协议功能,并将应用程序编程模型(例如 RPC 代码生成与其他模型)留作更高层次、独立的关注点。
主要契约 io.rsocket.RSocket 使用 Mono
表示单个消息的承诺,Flux
表示消息流,io.rsocket.Payload
表示带有数据和元数据访问权限的实际消息,来建模四种请求交互类型。RSocket
契约对称使用。对于请求,应用程序将获得一个 RSocket
来执行请求。对于响应,应用程序实现 RSocket
来处理请求。
这不是一个完整的介绍。在大多数情况下,Spring 应用程序不需要直接使用它的 API。但是,独立于 Spring 查看或试验 RSocket 可能很重要。RSocket Java 存储库包含许多 示例应用程序,演示了它的 API 和协议功能。
Spring 支持
spring-messaging
模块包含以下内容
-
RSocketRequester — 通过
io.rsocket.RSocket
进行请求的流畅 API,带有数据和元数据编码/解码。 -
带注释的响应器 — 用于响应的
@MessageMapping
和@RSocketExchange
注释的处理程序方法。 -
RSocket 接口 — RSocket 服务声明为带有
@RSocketExchange
方法的 Java 接口,用于作为请求者或响应者。
spring-web
模块包含 Encoder
和 Decoder
实现,例如 Jackson CBOR/JSON 和 Protobuf,RSocket 应用程序可能需要这些实现。它还包含 PathPatternParser
,可以将其插入以进行高效的路由匹配。
Spring Boot 2.2 支持在 TCP 或 WebSocket 上建立 RSocket 服务器,包括在 WebFlux 服务器中通过 WebSocket 公开 RSocket 的选项。还提供客户端支持和自动配置 RSocketRequester.Builder
和 RSocketStrategies
。有关更多详细信息,请参阅 Spring Boot 参考中的 RSocket 部分。
Spring Security 5.2 提供 RSocket 支持。
Spring Integration 5.2 提供入站和出站网关,用于与 RSocket 客户端和服务器交互。有关更多详细信息,请参阅 Spring Integration 参考手册。
Spring Cloud Gateway 支持 RSocket 连接。
RSocketRequester
RSocketRequester
提供一个流畅的 API 来执行 RSocket 请求,接受和返回数据和元数据的对象,而不是低级数据缓冲区。它可以对称使用,用于从客户端发出请求,以及从服务器发出请求。
客户端请求者
在客户端获取 RSocketRequester
需要连接到服务器,这涉及发送包含连接设置的 RSocket SETUP
帧。RSocketRequester
提供了一个构建器,可以帮助准备 io.rsocket.core.RSocketConnector
,包括 SETUP
帧的连接设置。
这是使用默认设置连接的最基本方式
-
Java
-
Kotlin
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)
以上代码不会立即连接。当发出请求时,会透明地建立共享连接并使用它。
连接设置
RSocketRequester.Builder
提供以下选项来定制初始 SETUP
帧
-
dataMimeType(MimeType)
— 设置连接上数据的 MIME 类型。 -
metadataMimeType(MimeType)
— 设置连接上元数据的 MIME 类型。 -
setupData(Object)
— 要包含在SETUP
中的数据。 -
setupRoute(String, Object…)
— 要包含在SETUP
中的元数据中的路由。 -
setupMetadata(Object, MimeType)
— 要包含在SETUP
中的其他元数据。
对于数据,默认 MIME 类型是从第一个配置的 Decoder
中推断出来的。对于元数据,默认 MIME 类型是 复合元数据,它允许每个请求包含多个元数据值和 MIME 类型对。通常情况下,这两个都不需要更改。
SETUP
帧中的数据和元数据是可选的。在服务器端,可以使用 @ConnectMapping 方法来处理连接的开始和 SETUP
帧的内容。元数据可用于连接级别的安全性。
策略
RSocketRequester.Builder
接受 RSocketStrategies
来配置请求者。您需要使用它来提供数据和元数据值的编码器和解码器。默认情况下,只注册了来自 spring-core
的 String
、byte[]
和 ByteBuffer
的基本编解码器。添加 spring-web
可以访问更多可以注册的编解码器,如下所示
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000)
RSocketStrategies
旨在重复使用。在某些情况下,例如同一个应用程序中的客户端和服务器,最好在 Spring 配置中声明它。
客户端响应者
RSocketRequester.Builder
可用于配置对来自服务器的请求的响应者。
您可以使用带注释的处理程序来处理基于服务器上相同基础设施的客户端响应,但以编程方式注册,如下所示
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher()) (1)
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder)) (3)
.tcp("localhost", 7000);
1 | 如果存在 spring-web ,请使用 PathPatternRouteMatcher 进行高效的路由匹配。 |
2 | 从带有 @MessageMapping 和/或 @ConnectMapping 方法的类创建响应程序。 |
3 | 注册响应程序。 |
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher()) (1)
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) } (3)
.tcp("localhost", 7000)
1 | 如果存在 spring-web ,请使用 PathPatternRouteMatcher 进行高效的路由匹配。 |
2 | 从带有 @MessageMapping 和/或 @ConnectMapping 方法的类创建响应程序。 |
3 | 注册响应程序。 |
请注意,以上仅是为编程方式注册客户端响应程序而设计的快捷方式。对于客户端响应程序位于 Spring 配置中的其他场景,您仍然可以将 RSocketMessageHandler
声明为 Spring bean,然后按如下方式应用
-
Java
-
Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)
对于上述情况,您可能还需要在 RSocketMessageHandler
中使用 setHandlerPredicate
切换到不同的策略来检测客户端响应程序,例如基于自定义注释(如 @RSocketClientResponder
)而不是默认的 @Controller
。这在客户端和服务器或同一应用程序中的多个客户端的场景中是必要的。
另请参见 带注释的响应程序,以详细了解编程模型。
高级
RSocketRequesterBuilder
提供了一个回调,用于公开底层的 io.rsocket.core.RSocketConnector
,以便进一步配置保持活动间隔、会话恢复、拦截器等选项。您可以按如下方式在该级别配置选项
-
Java
-
Kotlin
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
.rsocketConnector {
//...
}
.tcp("localhost", 7000)
服务器请求程序
要从服务器向已连接的客户端发出请求,只需从服务器获取已连接客户端的请求程序即可。
在 带注释的响应程序 中,@ConnectMapping
和 @MessageMapping
方法支持 RSocketRequester
参数。使用它来访问连接的请求程序。请记住,@ConnectMapping
方法本质上是 SETUP
帧的处理程序,必须在请求开始之前处理。因此,最开始的请求必须与处理分离。例如
-
Java
-
Kotlin
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> { (1)
// ...
});
return ... (2)
}
1 | 异步启动请求,独立于处理。 |
2 | 执行处理并返回完成 Mono<Void> 。 |
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
// ...
}
}
/// ... (2)
}
1 | 异步启动请求,独立于处理。 |
2 | 在挂起函数中执行处理。 |
请求
-
Java
-
Kotlin
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlux(AirportLocation.class); (3)
1 | 指定要包含在请求消息元数据中的路由。 |
2 | 为请求消息提供数据。 |
3 | 声明预期的响应。 |
val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlow<AirportLocation>() (3)
1 | 指定要包含在请求消息元数据中的路由。 |
2 | 为请求消息提供数据。 |
3 | 声明预期的响应。 |
交互类型根据输入和输出的基数隐式确定。上面的示例是 Request-Stream
,因为发送一个值并接收一个值流。在大多数情况下,您不需要考虑这一点,只要输入和输出的选择与 RSocket 交互类型以及响应程序期望的输入和输出类型匹配即可。唯一一个无效组合的示例是多对一。
data(Object)
方法还接受任何 Reactive Streams Publisher
,包括 Flux
和 Mono
,以及在 ReactiveAdapterRegistry
中注册的任何其他值生产者。对于产生相同类型值的 Flux
等多值 Publisher
,请考虑使用重载的 data
方法之一,以避免对每个元素进行类型检查和 Encoder
查找
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
data(Object)
步骤是可选的。对于不发送数据的请求,请跳过此步骤。
-
Java
-
Kotlin
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()
如果使用 复合元数据(默认值)并且值受注册的 Encoder
支持,则可以添加额外的元数据值。例如
-
Java
-
Kotlin
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()
对于 Fire-and-Forget
,请使用返回 Mono<Void>
的 send()
方法。请注意,Mono
仅表示消息已成功发送,而不表示它已处理。
对于 Metadata-Push
,请使用具有 Mono<Void>
返回值的 sendMetadata()
方法。
带注释的响应器
RSocket 响应器可以实现为 @MessageMapping
和 @ConnectMapping
方法。@MessageMapping
方法处理单个请求,而 @ConnectMapping
方法处理连接级事件(设置和元数据推送)。带注释的响应器在对称方式下受支持,用于从服务器端响应和从客户端响应。
服务器响应器
要在服务器端使用带注释的响应器,请将 RSocketMessageHandler
添加到您的 Spring 配置中,以检测具有 @MessageMapping
和 @ConnectMapping
方法的 @Controller
bean。
-
Java
-
Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.routeMatcher(new PathPatternRouteMatcher());
return handler;
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
routeMatcher = PathPatternRouteMatcher()
}
}
然后通过 Java RSocket API 启动 RSocket 服务器,并将 RSocketMessageHandler
插入响应器,如下所示
-
Java
-
Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
CloseableChannel server =
RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.block();
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val server = RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.awaitSingle()
RSocketMessageHandler
默认支持 复合 和 路由 元数据。如果您需要切换到不同的 mime 类型或注册其他元数据 mime 类型,可以设置其 MetadataExtractor。
您需要设置支持元数据和数据格式所需的 Encoder
和 Decoder
实例。您可能需要 spring-web
模块来实现编解码器。
默认情况下,SimpleRouteMatcher
用于通过 AntPathMatcher
匹配路由。我们建议插入来自 spring-web
的 PathPatternRouteMatcher
,以实现高效的路由匹配。RSocket 路由可以是分层的,但不是 URL 路径。默认情况下,这两个路由匹配器都配置为使用 "." 作为分隔符,并且没有像 HTTP URL 那样进行 URL 解码。
RSocketMessageHandler
可以通过 RSocketStrategies
进行配置,如果您需要在同一进程中的客户端和服务器之间共享配置,这可能很有用。
-
Java
-
Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(rsocketStrategies());
return handler;
}
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.routeMatcher(new PathPatternRouteMatcher())
.build();
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
rSocketStrategies = rsocketStrategies()
}
@Bean
fun rsocketStrategies() = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.routeMatcher(PathPatternRouteMatcher())
.build()
}
客户端响应器
客户端上的带注释的响应器需要在 RSocketRequester.Builder
中配置。有关详细信息,请参阅 客户端响应器。
@MessageMapping
-
Java
-
Kotlin
@Controller
public class RadarsController {
@MessageMapping("locate.radars.within")
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
@Controller
class RadarsController {
@MessageMapping("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
上面的@MessageMapping
方法响应具有路由“locate.radars.within”的请求流交互。它支持灵活的方法签名,可以选择使用以下方法参数
方法参数 | 描述 |
---|---|
|
请求的有效负载。这可以是异步类型的具体值,例如 注意:使用该注释是可选的。如果方法参数不是简单类型,也不是任何其他支持的参数,则假定它是预期的有效负载。 |
|
用于向远程端发出请求的请求器。 |
|
从路由中提取的值,基于映射模式中的变量,例如 |
|
注册用于提取的元数据值,如MetadataExtractor中所述。 |
|
所有注册用于提取的元数据值,如MetadataExtractor中所述。 |
返回值应为一个或多个对象,这些对象将被序列化为响应有效负载。这可以是异步类型,例如Mono
或Flux
,具体值,或者void
或无值异步类型,例如Mono<Void>
。
@MessageMapping
方法支持的RSocket交互类型由输入(即@Payload
参数)和输出的基数决定,其中基数是指以下含义
基数 | 描述 |
---|---|
1 |
显式值,或单值异步类型,例如 |
多个 |
多值异步类型,例如 |
0 |
对于输入,这意味着方法没有 对于输出,这是 |
下表显示了所有输入和输出基数组合以及相应的交互类型
输入基数 | 输出基数 | 交互类型 |
---|---|---|
0, 1 |
0 |
Fire-and-Forget, Request-Response |
0, 1 |
1 |
Request-Response |
0, 1 |
多个 |
Request-Stream |
多个 |
0, 1, 多个 |
Request-Channel |
@RSocketExchange
作为@MessageMapping
的替代方案,您也可以使用@RSocketExchange
方法处理请求。此类方法声明在RSocket接口上,可以通过RSocketServiceProxyFactory
用作请求器,或由响应器实现。
例如,要作为响应器处理请求
-
Java
-
Kotlin
public interface RadarsService {
@RSocketExchange("locate.radars.within")
Flux<AirportLocation> radars(MapRequest request);
}
@Controller
public class RadarsController implements RadarsService {
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
interface RadarsService {
@RSocketExchange("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation>
}
@Controller
class RadarsController : RadarsService {
override fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
@RSocketExhange
和 @MessageMapping
之间存在一些差异,因为前者需要适合请求者和响应者使用。例如,虽然 @MessageMapping
可以声明为处理任意数量的路由,并且每个路由可以是一个模式,但 @RSocketExchange
必须声明为具有单个具体路由。在支持的方法参数方面也存在细微差异,有关支持参数的列表,请参见 @MessageMapping 和 RSocket 接口。
@RSocketExchange
可以在类型级别使用,以指定给定 RSocket 服务接口的所有路由的通用前缀。
@ConnectMapping
@ConnectMapping
处理 RSocket 连接开始时的 SETUP
帧,以及通过 METADATA_PUSH
帧进行的任何后续元数据推送通知,即 io.rsocket.RSocket
中的 metadataPush(Payload)
。
@ConnectMapping
方法支持与 @MessageMapping 相同的参数,但基于 SETUP
和 METADATA_PUSH
帧的元数据和数据。@ConnectMapping
可以具有一个模式来缩小处理范围,以针对元数据中具有路由的特定连接,或者如果未声明任何模式,则所有连接都匹配。
@ConnectMapping
方法不能返回数据,并且必须声明为 void
或 Mono<Void>
作为返回值。如果处理对新连接返回错误,则拒绝连接。处理不能被延迟以对连接的 RSocketRequester
发出请求。有关详细信息,请参见 服务器请求者。
MetadataExtractor
响应者必须解释元数据。 复合元数据 允许独立格式化的元数据值(例如,用于路由、安全、跟踪),每个值都有自己的 MIME 类型。应用程序需要一种方法来配置要支持的元数据 MIME 类型,以及一种方法来访问提取的值。
MetadataExtractor
是一种契约,用于获取序列化元数据并返回解码的名称-值对,然后可以通过名称(例如,通过带注释的处理程序方法中的 @Header
)访问这些对。
DefaultMetadataExtractor
可以被赋予 Decoder
实例来解码元数据。开箱即用,它内置支持 "message/x.rsocket.routing.v0",它将其解码为 String
并保存在 "route" 键下。对于任何其他 MIME 类型,您都需要提供一个 Decoder
并注册 MIME 类型,如下所示
-
Java
-
Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")
复合元数据非常适合组合独立的元数据值。但是,请求者可能不支持复合元数据,或者可能选择不使用它。为此,DefaultMetadataExtractor
可能需要自定义逻辑来将解码后的值映射到输出映射。以下是一个使用 JSON 作为元数据的示例
-
Java
-
Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
MimeType.valueOf("application/vnd.myapp.metadata+json"),
new ParameterizedTypeReference<Map<String,String>>() {},
(jsonMap, outputMap) -> {
outputMap.putAll(jsonMap);
});
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
outputMap.putAll(jsonMap)
}
当通过RSocketStrategies
配置MetadataExtractor
时,您可以让RSocketStrategies.Builder
使用配置的解码器创建提取器,并简单地使用回调来自定义注册,如下所示
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.metadataExtractorRegistry(registry -> {
registry.metadataToExtract(fooMimeType, Foo.class, "foo");
// ...
})
.build();
import org.springframework.messaging.rsocket.metadataToExtract
val strategies = RSocketStrategies.builder()
.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
registry.metadataToExtract<Foo>(fooMimeType, "foo")
// ...
}
.build()
RSocket 接口
Spring 框架允许您使用@RSocketExchange
方法将 RSocket 服务定义为 Java 接口。您可以将此类接口传递给RSocketServiceProxyFactory
以创建一个代理,该代理通过RSocketRequester执行请求。您也可以实现该接口作为响应者来处理请求。
首先使用@RSocketExchange
方法创建接口
interface RadarService {
@RSocketExchange("radars")
Flux<AirportLocation> getRadars(@Payload MapRequest request);
// more RSocket exchange methods...
}
现在您可以创建一个代理,当调用方法时执行请求
RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();
RadarService service = factory.createClient(RadarService.class);
您还可以实现该接口来处理请求作为响应者。请参见带注释的响应者。
方法参数
带注释的 RSocket 交换方法支持具有以下方法参数的灵活方法签名
方法参数 | 描述 |
---|---|
|
添加一个路由变量以传递给 |
|
设置请求的输入有效负载。这可以是一个具体的值,或任何可以通过 |
|
输入有效负载中元数据条目的值。这可以是任何 |
|
元数据条目的 |
返回值
带注释的 RSocket 交换方法支持返回值,这些返回值可以是具体的值,或任何可以通过ReactiveAdapterRegistry
适应 Reactive Streams Publisher
的值生产者。
默认情况下,具有同步(阻塞)方法签名的 RSocket 服务方法的行为取决于底层 RSocket ClientTransport
的响应超时设置以及 RSocket 保持活动设置。RSocketServiceProxyFactory.Builder
确实公开了 blockTimeout
选项,该选项还允许您配置阻塞响应的最大时间,但我们建议在 RSocket 级别配置超时值以获得更多控制。