RSocket

本节介绍 Spring Framework 对 RSocket 协议的支持。

概览

RSocket 是一种应用程序协议,用于在 TCP、WebSocket 和其他字节流传输上进行多路复用、双向通信,支持以下交互模型之一:

  • Request-Response — 发送一条消息,接收一条响应。

  • Request-Stream — 发送一条消息,接收一个消息流。

  • Channel — 在两个方向上发送消息流。

  • Fire-and-Forget — 发送单向消息。

一旦建立初始连接,“客户端”与“服务器”的区别就消失了,因为双方变得对称,每一方都可以发起上述任一交互。这就是为什么在协议调用中,参与双方被称为“请求者”(requester)和“响应者”(responder),而上述交互被称为“请求流”或简称为“请求”。

以下是 RSocket 协议的主要特性和优势:

  • Reactive Streams 语义跨越网络边界 — 对于诸如 Request-StreamChannel 之类的流式请求,背压(back pressure)信号在请求者和响应者之间传递,允许请求者在源头减慢响应者,从而减少对网络层拥塞控制的依赖,并降低在网络层或任何层进行缓冲的需求。

  • 请求限流 — 此特性因 LEASE 帧而得名“租约”(Leasing),该帧可以由每一端发送,以限制另一端在给定时间内允许的总请求数。租约会定期续期。

  • 会话续订 — 这旨在应对连接丢失,并需要维护一些状态。状态管理对应用程序透明,并且与背压(back pressure)很好地协同工作,背压可以在可能时停止生产者,并减少所需的状态量。

  • 大消息的分段与重组。

  • 心跳保持(keepalive)。

RSocket 有多种语言的实现。其 Java 库构建于 Project Reactor 之上,并使用 Reactor Netty 作为传输层。这意味着应用程序中 Reactive Streams Publisher 发出的信号通过 RSocket 在网络上透明地传播。

协议

RSocket 的优势之一在于它在网络上传输时具有明确定义的行为,并且易于阅读其规范以及一些协议扩展。因此,建议阅读规范,而无需依赖语言实现和更高级别的框架 API。本节提供了一个简洁的概览,以建立一些背景知识。

连接

最初,客户端通过 TCP 或 WebSocket 等一些低级流式传输连接到服务器,并向服务器发送一个 SETUP 帧来设置连接参数。

服务器可能会拒绝 SETUP 帧,但通常在发送(客户端)和接收(服务器)之后,双方就可以开始发起请求,除非 SETUP 指示使用租约(leasing)语义来限制请求数量,在这种情况下,双方必须等待对方发送 LEASE 帧才能允许发起请求。

发起请求

连接建立后,双方都可以通过 REQUEST_RESPONSEREQUEST_STREAMREQUEST_CHANNELREQUEST_FNF 中的任一帧发起请求。这些帧中的每一个都从请求者向响应者携带一条消息。

响应者可以返回带有响应消息的 PAYLOAD 帧,并且在 REQUEST_CHANNEL 的情况下,请求者也可以发送带有更多请求消息的 PAYLOAD 帧。

当请求涉及消息流时(例如 Request-StreamChannel),响应者必须尊重请求者发出的需求信号。需求以消息数量表示。初始需求在 REQUEST_STREAMREQUEST_CHANNEL 帧中指定。后续需求通过 REQUEST_N 帧发出信号。

每一方也可以通过 METADATA_PUSH 帧发送元数据通知,这些通知不属于任何单个请求,而是针对整个连接。

消息格式

RSocket 消息包含数据和元数据。元数据可用于发送路由、安全令牌等。数据和元数据可以采用不同的格式。它们各自的 Mime 类型在 SETUP 帧中声明,并适用于给定连接上的所有请求。

虽然所有消息都可以包含元数据,但通常元数据(如路由)是针对每个请求的,因此仅包含在请求的第一条消息中,即与 REQUEST_RESPONSEREQUEST_STREAMREQUEST_CHANNELREQUEST_FNF 这些帧中的一个一起发送。

协议扩展定义了用于应用程序的常见元数据格式:

Java 实现

RSocket 的Java 实现构建于 Project Reactor 之上。TCP 和 WebSocket 的传输层构建于 Reactor Netty 之上。作为一个 Reactive Streams 库,Reactor 简化了实现协议的工作。对于应用程序而言,使用 FluxMono 并结合声明式操作符和透明的背压支持是自然而然的选择。

RSocket Java 中的 API 故意保持最小和基础。它侧重于协议特性,并将应用程序编程模型(例如 RPC 代码生成或其他)作为更高层级的独立关注点。

主要的契约 io.rsocket.RSocket 建模了四种请求交互类型,其中 Mono 代表单个消息的承诺,Flux 代表消息流,而 io.rsocket.Payload 是实际消息,可作为字节缓冲区访问数据和元数据。RSocket 契约是对称使用的。对于请求方,应用程序获得一个 RSocket 来执行请求。对于响应方,应用程序实现 RSocket 来处理请求。

这并非详尽的介绍。大多数情况下,Spring 应用程序无需直接使用其 API。然而,独立于 Spring 查看或实验 RSocket 可能会很有价值。RSocket Java 仓库包含了许多示例应用程序,展示了其 API 和协议特性。

Spring 支持

spring-messaging 模块包含以下内容:

  • RSocketRequester — 用于通过 io.rsocket.RSocket 发起请求的流畅 API,支持数据和元数据编码/解码。

  • 注解响应者 — 使用 @MessageMapping@RSocketExchange 注解的处理器方法,用于响应请求。

  • RSocket 接口 — 将 RSocket 服务声明为带有 @RSocketExchange 方法的 Java 接口,可用作请求者或响应者。

spring-web 模块包含 EncoderDecoder 实现,例如 Jackson CBOR/JSON 和 Protobuf,这些是 RSocket 应用程序可能需要的。它还包含可以用于高效路由匹配的 PathPatternParser

Spring Boot 2.2 支持在 TCP 或 WebSocket 上搭建 RSocket 服务器,包括在 WebFlux 服务器中通过 WebSocket 暴露 RSocket 的选项。此外,还提供了客户端支持和针对 RSocketRequester.BuilderRSocketStrategies 的自动配置。有关更多详细信息,请参阅 Spring Boot 参考文档的RSocket 部分

Spring Security 5.2 提供 RSocket 支持。

Spring Integration 5.2 提供入站和出站网关,用于与 RSocket 客户端和服务器交互。有关更多详细信息,请参阅 Spring Integration 参考手册。

Spring Cloud Gateway 支持 RSocket 连接。

RSocketRequester

RSocketRequester 提供了一个流畅的 API 来执行 RSocket 请求,接受和返回用于数据和元数据的对象,而不是低级的字节缓冲区。它可以对称使用,既可用于从客户端发起请求,也可用于从服务器发起请求。

客户端请求者

在客户端获取 RSocketRequester 意味着连接到服务器,这涉及发送一个包含连接设置的 RSocket SETUP 帧。RSocketRequester 提供了一个构建器,用于帮助准备 io.rsocket.core.RSocketConnector,包括 SETUP 帧的连接设置。

这是使用默认设置进行连接的最基本方式:

  • Java

  • Kotlin

RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);

URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)

URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)

上述代码不会立即连接。当发起请求时,会透明地建立并使用一个共享连接。

连接设置

RSocketRequester.Builder 提供了以下方法来定制初始的 SETUP 帧:

  • dataMimeType(MimeType) — 设置连接上的数据 mime 类型。

  • metadataMimeType(MimeType) — 设置连接上的元数据 mime 类型。

  • setupData(Object) — 包含在 SETUP 中的数据。

  • setupRoute(String, Object…​) — 包含在 SETUP 的元数据中的路由。

  • setupMetadata(Object, MimeType) — 包含在 SETUP 中的其他元数据。

对于数据,默认的 mime 类型派生自第一个配置的 Decoder。对于元数据,默认的 mime 类型是复合元数据(composite metadata),它允许每个请求包含多个元数据值和 mime 类型对。通常,这两者都不需要更改。

SETUP 帧中的数据和元数据是可选的。在服务器端,可以使用@ConnectMapping 方法来处理连接的开始和 SETUP 帧的内容。元数据可用于连接级别的安全性。

策略

RSocketRequester.Builder 接受 RSocketStrategies 来配置请求者。你需要使用它来提供用于数据和元数据值(反)序列化的编码器和解码器。默认情况下,只注册了 spring-core 中针对 Stringbyte[]ByteBuffer 的基本编解码器。添加 spring-web 将提供更多可以注册的编解码器,如下所示:

  • Java

  • Kotlin

RSocketStrategies strategies = RSocketStrategies.builder()
	.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
	.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
	.build();

RSocketRequester requester = RSocketRequester.builder()
	.rsocketStrategies(strategies)
	.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
		.encoders { it.add(Jackson2CborEncoder()) }
		.decoders { it.add(Jackson2CborDecoder()) }
		.build()

val requester = RSocketRequester.builder()
		.rsocketStrategies(strategies)
		.tcp("localhost", 7000)

RSocketStrategies 被设计为可重用。在某些场景下,例如,客户端和服务器在同一个应用程序中,最好在 Spring 配置中声明它。

客户端响应者

RSocketRequester.Builder 可用于配置响应来自服务器的请求的响应者。

你可以使用注解处理器作为客户端响应者,基于与服务器端相同的底层架构,但通过编程方式注册,如下所示:

  • Java

  • Kotlin

RSocketStrategies strategies = RSocketStrategies.builder()
	.routeMatcher(new PathPatternRouteMatcher())  (1)
	.build();

SocketAcceptor responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(responder)) (3)
	.tcp("localhost", 7000);
1 如果存在 spring-web,使用 PathPatternRouteMatcher 进行高效路由匹配。
2 从包含 @MessageMapping 和/或 @ConnectMapping 方法的类创建响应者。
3 注册响应者。
val strategies = RSocketStrategies.builder()
		.routeMatcher(PathPatternRouteMatcher())  (1)
		.build()

val responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(responder) } (3)
		.tcp("localhost", 7000)
1 如果存在 spring-web,使用 PathPatternRouteMatcher 进行高效路由匹配。
2 从包含 @MessageMapping 和/或 @ConnectMapping 方法的类创建响应者。
3 注册响应者。

注意,上述方法仅是为编程方式注册客户端响应者而设计的快捷方式。对于其他场景,如果客户端响应者在 Spring 配置中,你仍然可以将 RSocketMessageHandler 声明为 Spring Bean,然后按如下方式应用:

  • Java

  • Kotlin

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(handler.responder()))
	.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(handler.responder()) }
		.tcp("localhost", 7000)

对于上述情况,你可能还需要在 RSocketMessageHandler 中使用 setHandlerPredicate 来切换检测客户端响应者的不同策略,例如,基于自定义注解 @RSocketClientResponder 而非默认的 @Controller。在客户端和服务器共存或同一个应用程序中有多个客户端的场景中,这是必需的。

另请参见注解响应者,了解更多关于编程模型的信息。

高级

RSocketRequesterBuilder 提供了一个回调,用于暴露底层的 io.rsocket.core.RSocketConnector,以便进一步配置 keepalive 间隔、会话续订、拦截器等选项。你可以在该级别按如下方式配置选项:

  • Java

  • Kotlin

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> {
		// ...
	})
	.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
		.rsocketConnector {
			//...
		}
		.tcp("localhost", 7000)

服务器请求者

从服务器向连接的客户端发起请求,只需从服务器获取与该连接客户端对应的请求者即可。

注解响应器 中,@ConnectMapping@MessageMapping 方法支持 RSocketRequester 参数。可以使用它来访问连接的请求器。请注意,@ConnectMapping 方法本质上是 SETUP 帧的处理程序,必须在请求开始之前进行处理。因此,最开始的请求必须与处理解耦。例如:

  • Java

  • Kotlin

@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
	requester.route("status").data("5")
		.retrieveFlux(StatusReport.class)
		.subscribe(bar -> { (1)
			// ...
		});
	return ... (2)
}
1 异步启动请求,独立于处理。
2 执行处理并返回完成的 Mono<Void>
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
	GlobalScope.launch {
		requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
			// ...
		}
	}
	/// ... (2)
}
1 异步启动请求,独立于处理。
2 在 suspend 函数中执行处理。

请求

一旦有了 客户端服务器端 请求器,就可以按如下方式发起请求:

  • Java

  • Kotlin

ViewBox viewBox = ... ;

Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlux(AirportLocation.class); (3)
1 指定要包含在请求消息元数据中的路由。
2 为请求消息提供数据。
3 声明预期的响应。
val viewBox: ViewBox = ...

val locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlow<AirportLocation>() (3)
1 指定要包含在请求消息元数据中的路由。
2 为请求消息提供数据。
3 声明预期的响应。

交互类型是根据输入和输出的基数隐式确定的。上面的例子是 Request-Stream,因为它发送一个值并接收一个值的流。在大多数情况下,只要输入和输出的选择与 RSocket 交互类型以及响应器期望的输入和输出类型匹配,您就不需要考虑这个问题。唯一无效的组合示例是多对一。

data(Object) 方法也接受任何 Reactive Streams Publisher,包括 FluxMono,以及任何在 ReactiveAdapterRegistry 中注册的值生产者。对于像 Flux 这样生成相同类型值的多值 Publisher,考虑使用其中一个重载的 data 方法,以避免对每个元素进行类型检查和 Encoder 查找:

data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

data(Object) 步骤是可选的。对于不发送数据的请求,可以跳过它:

  • Java

  • Kotlin

Mono<AirportLocation> location = requester.route("find.radar.EWR"))
	.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait

val location = requester.route("find.radar.EWR")
	.retrieveAndAwait<AirportLocation>()

如果使用 组合元数据(默认)并且这些值受注册的 Encoder 支持,则可以添加额外的元数据值。例如:

  • Java

  • Kotlin

String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");

Flux<AirportLocation> locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow

val requester: RSocketRequester = ...

val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")

val locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlow<AirportLocation>()

对于 Fire-and-Forget,使用返回 Mono<Void>send() 方法。请注意,Mono 仅表示消息已成功发送,而不表示已处理。

对于 Metadata-Push,使用返回 Mono<Void>sendMetadata() 方法。

注解响应器

RSocket 响应器可以作为 @MessageMapping@ConnectMapping 方法来实现。@MessageMapping 方法处理单个请求,而 @ConnectMapping 方法处理连接级别的事件(setup 和 metadata push)。注解响应器是双向支持的,用于服务器端响应和客户端响应。

服务器端响应器

要在服务器端使用注解响应器,将 RSocketMessageHandler 添加到 Spring 配置中,以检测包含 @MessageMapping@ConnectMapping 方法的 @Controller bean:

  • Java

  • Kotlin

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.routeMatcher(new PathPatternRouteMatcher());
		return handler;
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		routeMatcher = PathPatternRouteMatcher()
	}
}

然后通过 Java RSocket API 启动 RSocket 服务器,并将 RSocketMessageHandler 用于响应器,如下所示:

  • Java

  • Kotlin

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

CloseableChannel server =
	RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.block();
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val server = RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.awaitSingle()

RSocketMessageHandler 默认支持 组合路由 元数据。如果需要切换到不同的 mime 类型或注册额外的元数据 mime 类型,可以设置其 MetadataExtractor

您需要设置支持的元数据和数据格式所需的 EncoderDecoder 实例。您可能需要 spring-web 模块来获取编解码器实现。

默认情况下,SimpleRouteMatcher 使用 AntPathMatcher 进行路由匹配。我们建议插入 spring-web 中的 PathPatternRouteMatcher 以实现高效的路由匹配。RSocket 路由可以是分层的,但不是 URL 路径。默认情况下,两个路由匹配器都配置使用 "." 作为分隔符,并且不像 HTTP URL 那样进行 URL 解码。

RSocketMessageHandler 可以通过 RSocketStrategies 进行配置,如果您需要在同一进程中在客户端和服务器之间共享配置,这可能会很有用:

  • Java

  • Kotlin

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.setRSocketStrategies(rsocketStrategies());
		return handler;
	}

	@Bean
	public RSocketStrategies rsocketStrategies() {
		return RSocketStrategies.builder()
			.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
			.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
			.routeMatcher(new PathPatternRouteMatcher())
			.build();
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		rSocketStrategies = rsocketStrategies()
	}

	@Bean
	fun rsocketStrategies() = RSocketStrategies.builder()
			.encoders { it.add(Jackson2CborEncoder()) }
			.decoders { it.add(Jackson2CborDecoder()) }
			.routeMatcher(PathPatternRouteMatcher())
			.build()
}

客户端响应器

客户端的注解响应器需要在 RSocketRequester.Builder 中进行配置。详情请参阅 客户端响应器

@MessageMapping

一旦 服务器端客户端 响应器配置到位,@MessageMapping 方法就可以按如下方式使用:

  • Java

  • Kotlin

@Controller
public class RadarsController {

	@MessageMapping("locate.radars.within")
	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
@Controller
class RadarsController {

	@MessageMapping("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

上面的 @MessageMapping 方法响应路由为 "locate.radars.within" 的 Request-Stream 交互。它支持灵活的方法签名,可以选择使用以下方法参数:

方法参数 描述

@Payload

请求的 payload。这可以是具体的值,也可以是异步类型(如 MonoFlux)。

注意: 是否使用此注解是可选的。一个不是简单类型且不是其他受支持参数的方法参数,被假定为预期的 payload。

RSocketRequester

用于向远程端发起请求的请求器。

@DestinationVariable

根据映射模式中的变量从路由中提取的值,例如 @MessageMapping("find.radar.{id}")

@Header

按照 MetadataExtractor 中所述注册用于提取的元数据值。

@Headers Map<String, Object>

按照 MetadataExtractor 中所述注册用于提取的所有元数据值。

返回值预计为一个或多个对象,这些对象将被序列化为响应 payload。这可以是异步类型(如 MonoFlux),具体值,或者 void 或无值的异步类型(如 Mono<Void>)。

@MessageMapping 方法支持的 RSocket 交互类型取决于输入(即 @Payload 参数)和输出的基数,其中基数表示以下含义:

基数 描述

1

具体值,或单值异步类型,例如 Mono<T>

多值异步类型,例如 Flux<T>

0

对于输入,这意味着方法没有 @Payload 参数。

对于输出,这是 void 或无值的异步类型,例如 Mono<Void>

下表显示了所有输入和输出基数组合以及相应的交互类型:

输入基数 输出基数 交互类型

0, 1

0

Fire-and-Forget, Request-Response

0, 1

1

Request-Response

0, 1

Request-Stream

0, 1, 多

Request-Channel

@RSocketExchange

作为 @MessageMapping 的替代方案,您也可以使用 @RSocketExchange 方法处理请求。这些方法在 RSocket 接口 上声明,可以通过 RSocketServiceProxyFactory 用作请求器,或由响应器实现。

例如,作为响应器处理请求:

  • Java

  • Kotlin

public interface RadarsService {

	@RSocketExchange("locate.radars.within")
	Flux<AirportLocation> radars(MapRequest request);
}

@Controller
public class RadarsController implements RadarsService {

	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
interface RadarsService {

	@RSocketExchange("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation>
}

@Controller
class RadarsController : RadarsService {

	override fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

@RSocketExchange@MessageMapping 之间存在一些差异,因为前者需要保持适用于请求器和响应器。例如,虽然 @MessageMapping 可以声明用于处理任意数量的路由且每个路由可以是模式,但 @RSocketExchange 必须声明一个具体的单一路由。在支持的与元数据相关的方法参数方面也存在细微差异,请参阅 @MessageMappingRSocket 接口 查看支持参数的列表。

@RSocketExchange 可以用在类型级别,为给定 RSocket 服务接口的所有路由指定一个公共前缀。

@ConnectMapping

@ConnectMapping 在 RSocket 连接开始时处理 SETUP 帧,以及通过 METADATA_PUSH 帧进行的后续元数据推送通知,即 io.rsocket.RSocket 中的 metadataPush(Payload)

@ConnectMapping 方法支持与 @MessageMapping 相同的参数,但基于 SETUPMETADATA_PUSH 帧中的元数据和数据。@ConnectMapping 可以包含模式以将处理范围缩小到元数据中包含路由的特定连接,或者如果未声明模式,则匹配所有连接。

@ConnectMapping 方法不能返回数据,必须声明返回值为 voidMono<Void>。如果处理新连接时返回错误,则连接将被拒绝。处理不应被阻塞以等待向连接的 RSocketRequester 发起请求。详情请参阅 服务器端请求器

MetadataExtractor

响应器必须解释元数据。组合元数据 允许独立格式化的元数据值(例如,用于路由、安全、跟踪),每个值都有自己的 mime 类型。应用程序需要一种方法来配置支持的元数据 mime 类型,以及一种访问提取值的方法。

MetadataExtractor 是一个契约,用于接收序列化的元数据并返回解码后的名称-值对,这些名称-值对可以通过名称像头部一样访问,例如通过注解处理方法中的 @Header

DefaultMetadataExtractor 可以提供 Decoder 实例来解码元数据。它开箱即用地内置支持 "message/x.rsocket.routing.v0",它将其解码为 String 并保存在 "route" 键下。对于任何其他 mime 类型,您需要提供一个 Decoder 并按如下方式注册 mime 类型:

  • Java

  • Kotlin

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")

组合元数据很适合组合独立的元数据值。但是,请求器可能不支持组合元数据,或者可能选择不使用它。为此,DefaultMetadataExtractor 可能需要自定义逻辑将解码后的值映射到输出 Map。以下是使用 JSON 作为元数据的示例:

  • Java

  • Kotlin

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
	MimeType.valueOf("application/vnd.myapp.metadata+json"),
	new ParameterizedTypeReference<Map<String,String>>() {},
	(jsonMap, outputMap) -> {
		outputMap.putAll(jsonMap);
	});
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
	outputMap.putAll(jsonMap)
}

通过 RSocketStrategies 配置 MetadataExtractor 时,可以让 RSocketStrategies.Builder 使用配置好的解码器创建 extractor,然后简单地使用回调来定制注册,如下所示:

  • Java

  • Kotlin

RSocketStrategies strategies = RSocketStrategies.builder()
	.metadataExtractorRegistry(registry -> {
		registry.metadataToExtract(fooMimeType, Foo.class, "foo");
		// ...
	})
	.build();
import org.springframework.messaging.rsocket.metadataToExtract

val strategies = RSocketStrategies.builder()
		.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
			registry.metadataToExtract<Foo>(fooMimeType, "foo")
			// ...
		}
		.build()

RSocket 接口

Spring Framework 允许您将 RSocket 服务定义为包含 @RSocketExchange 方法的 Java 接口。您可以将这样的接口传递给 RSocketServiceProxyFactory 以创建一个代理,该代理通过 RSocketRequester 执行请求。您也可以将该接口实现为处理请求的响应器。

首先创建包含 @RSocketExchange 方法的接口:

interface RadarService {

	@RSocketExchange("radars")
	Flux<AirportLocation> getRadars(@Payload MapRequest request);

	// more RSocket exchange methods...

}

现在您可以创建一个在调用方法时执行请求的代理:

RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();

RadarService service = factory.createClient(RadarService.class);

您还可以实现该接口作为响应器来处理请求。请参阅 注解响应器

方法参数

注解的 RSocket 交换方法支持灵活的方法签名,带有以下方法参数:

方法参数 描述

@DestinationVariable

添加路由变量以与 @RSocketExchange 注解中的路由一起传递给 RSocketRequester,以便展开路由中的模板占位符。该变量可以是 String 或任何 Object,然后通过 toString() 进行格式化。

@Payload

设置请求的输入 payload。这可以是具体值,或可以通过 ReactiveAdapterRegistry 转换为 Reactive Streams Publisher 的任何值生产者。除非 required 属性设置为 false,或参数标记为可选(由 MethodParameter#isOptional 确定),否则必须提供 payload。

Object,如果后跟 MimeType

输入 payload 中元数据条目的值。这可以是任何 Object,只要下一个参数是元数据条目的 MimeType。该值可以是具体值,或可以通过 ReactiveAdapterRegistry 转换为 Reactive Streams Publisher 的任何单值生产者。

MimeType

元数据条目的 MimeType。预期前面的方法参数是元数据值。

返回值

注解的 RSocket 交换方法支持具体值或可以通过 ReactiveAdapterRegistry 转换为 Reactive Streams Publisher 的任何值生产者作为返回值。

默认情况下,具有同步(阻塞)方法签名的 RSocket 服务方法的行为取决于底层 RSocket ClientTransport 的响应超时设置以及 RSocket keep-alive 设置。RSocketServiceProxyFactory.Builder 确实提供了一个 blockTimeout 选项,也允许您配置阻塞等待响应的最长时间,但我们建议在 RSocket 级别配置超时值以获得更多控制。