RSocket 支持

RSocket Spring Integration 模块 (spring-integration-rsocket) 允许执行 RSocket 应用程序协议.

您需要将此依赖项包含到您的项目中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
    <version>6.3.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:6.3.0"

此模块从版本 5.2 开始可用,基于 Spring Messaging 基础,并包含其 RSocket 组件实现,例如 RSocketRequesterRSocketMessageHandlerRSocketStrategies。有关 RSocket 协议、术语和组件的更多信息,请参见 Spring Framework RSocket 支持

在通过通道适配器启动集成流处理之前,我们需要在服务器和客户端之间建立 RSocket 连接。为此,Spring Integration RSocket 支持提供了 ServerRSocketConnectorClientRSocketConnector,它们是 AbstractRSocketConnector 的实现。

ServerRSocketConnector 根据提供的 io.rsocket.transport.ServerTransport 在主机和端口上公开一个侦听器,以接受来自客户端的连接。可以使用 setServerConfigurer() 自定义内部 RSocketServer 实例,以及可以配置的其他选项,例如 RSocketStrategiesMimeType,用于有效负载数据和标头元数据。当从客户端请求者提供 setupRoute 时(请参见下面的 ClientRSocketConnector),连接的客户端将作为 RSocketRequester 存储在由 clientRSocketKeyStrategy BiFunction<Map<String, Object>, DataBuffer, Object> 确定的键下。默认情况下,连接数据将用作键,作为使用 UTF-8 字符集转换为字符串的值。此类 RSocketRequester 注册表可用于应用程序逻辑中,以确定与之交互的特定客户端连接,或将相同消息发布到所有连接的客户端。当从客户端建立连接时,ServerRSocketConnector 会发出 RSocketConnectedEvent。这类似于 Spring Messaging 模块中 @ConnectMapping 注释提供的功能。映射模式 * 表示接受所有客户端路由。RSocketConnectedEvent 可用于通过 DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER 标头区分不同的路由。

典型的服务器配置可能如下所示

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ServerRSocketConnector serverRSocketConnector() {
    ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
    serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
    serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
    serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
    serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
                                    + headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
    return serverRSocketConnector;
}

@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
	...
}

所有选项(包括 RSocketStrategies bean 和 @EventListener 用于 RSocketConnectedEvent)都是可选的。有关更多信息,请参见 ServerRSocketConnector JavaDocs。

从 5.2.1 版本开始,ServerRSocketMessageHandler 被提取到一个公共的顶级类中,以便与现有的 RSocket 服务器进行可能的连接。当 ServerRSocketConnector 使用 ServerRSocketMessageHandler 的外部实例时,它不会在内部创建 RSocket 服务器,而是将所有处理逻辑委托给提供的实例。此外,ServerRSocketMessageHandler 可以使用 messageMappingCompatible 标志进行配置,以处理 RSocket 控制器中的 @MessageMapping,完全取代标准 RSocketMessageHandler 提供的功能。这在混合配置中很有用,当经典的 @MessageMapping 方法与 RSocket 通道适配器一起出现在同一个应用程序中,并且应用程序中存在外部配置的 RSocket 服务器时。

ClientRSocketConnector 充当 RSocketRequester 的持有者,该 RSocketRequester 基于通过提供的 ClientTransport 连接的 RSocketRSocketConnector 可以使用提供的 RSocketConnectorConfigurer 进行自定义。setupRoute(带有可选的模板变量)和带有元数据的 setupData 也可以在此组件上配置。

典型的客户端配置可能如下所示

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ClientRSocketConnector clientRSocketConnector() {
    ClientRSocketConnector clientRSocketConnector =
            new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
    clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
    clientRSocketConnector.setSetupRoute("clientConnect/{user}");
    clientRSocketConnector.setSetupRouteVariables("myUser");
    return clientRSocketConnector;
}

大多数这些选项(包括 RSocketStrategies bean)都是可选的。请注意我们如何连接到在任意端口上本地启动的 RSocket 服务器。有关 setupData 用例,请参见 ServerRSocketConnector.clientRSocketKeyStrategy。另请参见 ClientRSocketConnector 及其 AbstractRSocketConnector 超类 JavaDocs 以获取更多信息。

ClientRSocketConnectorServerRSocketConnector 都负责将入站通道适配器映射到它们的 path 配置,以路由传入的 RSocket 请求。有关更多信息,请参见下一节。

RSocket 入站网关

The RSocketInboundGateway is responsible for receiving RSocket requests and producing responses (if any). It requires an array of path mapping which could be as patterns similar to MVC request mapping or @MessageMapping semantics. In addition, (since version 5.2.2), a set of interaction models (see RSocketInteractionModel) can be configured on the RSocketInboundGateway to restrict RSocket requests to this endpoint by the particular frame type. By default, all the interaction models are supported. Such a bean, according its IntegrationRSocketEndpoint implementation (extension of a ReactiveMessageHandler), is auto detected either by the ServerRSocketConnector or ClientRSocketConnector for a routing logic in the internal IntegrationRSocketMessageHandler for incoming requests. An AbstractRSocketConnector can be provided to the RSocketInboundGateway for explicit endpoint registration. This way, the auto-detection option is disabled on that AbstractRSocketConnector. The RSocketStrategies can also be injected into the RSocketInboundGateway or they are obtained from the provided AbstractRSocketConnector overriding any explicit injection. Decoders are used from those RSocketStrategies to decode a request payload according to the provided requestElementType. If an RSocketPayloadReturnValueHandler.RESPONSE_HEADER header is not provided in incoming the Message, the RSocketInboundGateway treats a request as a fireAndForget RSocket interaction model. In this case, an RSocketInboundGateway performs a plain send operation into the outputChannel. Otherwise, a MonoProcessor value from the RSocketPayloadReturnValueHandler.RESPONSE_HEADER header is used for sending a reply to the RSocket. For this purpose, an RSocketInboundGateway performs a sendAndReceiveMessageReactive operation on the outputChannel. The payload of the message to send downstream is always a Flux according to MessagingRSocket logic. When in a fireAndForget RSocket interaction model, the message has a plain converted payload. The reply payload could be a plain object or a Publisher - the RSocketInboundGateway converts both of them properly into an RSocket response according to the encoders provided in the RSocketStrategies.

从 5.3 版本开始,RSocketInboundGateway 中添加了一个 decodeFluxAsUnit 选项(默认值为 false)。默认情况下,传入的 Flux 会以每个事件单独解码的方式进行转换。这是目前 @MessageMapping 语义中存在的精确行为。为了恢复以前的行为或根据应用程序要求将整个 Flux 解码为单个单元,必须将 decodeFluxAsUnit 设置为 true。但是,目标解码逻辑取决于所选的 Decoder,例如,StringDecoder 需要在流中存在一个新行分隔符(默认情况下)以指示字节缓冲区结束。

有关如何配置 RSocketInboundGateway 端点以及如何处理下游有效负载的示例,请参见 使用 Java 配置 RSocket 端点

RSocket 出站网关

RSocketOutboundGateway 是一个 AbstractReplyProducingMessageHandler,用于向 RSocket 发送请求并根据 RSocket 回复(如果有)生成回复。低级 RSocket 协议交互委托给 RSocketRequester,该 RSocketRequester 从提供的 ClientRSocketConnector 或从服务器端请求消息中的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER 标头解析。服务器端的目标 RSocketRequester 可以从 RSocketConnectedEvent 解析,或者使用 ServerRSocketConnector.getClientRSocketRequester() API 解析,根据通过 ServerRSocketConnector.setClientRSocketKeyStrategy() 选择的某些业务键进行连接请求映射。有关更多信息,请参阅 ServerRSocketConnector JavaDocs。

发送请求的 route 必须显式配置(以及路径变量),或者通过针对请求消息计算的 SpEL 表达式配置。

RSocket 交互模型可以通过 RSocketInteractionModel 选项或相应的表达式设置提供。默认情况下,requestResponse 用于常见的网关用例。

当请求消息有效负载为 Publisher 时,可以提供 publisherElementType 选项来根据目标 RSocketRequester 中提供的 RSocketStrategies 编码其元素。此选项的表达式可以计算为 ParameterizedTypeReference。有关数据及其类型的更多信息,请参阅 RSocketRequester.RequestSpec.data() JavaDocs。

RSocket 请求还可以使用 metadata 进行增强。为此,可以在 RSocketOutboundGateway 上配置针对请求消息的 metadataExpression。此类表达式必须计算为 Map<Object, MimeType>

interactionModel 不是 fireAndForget 时,必须提供 expectedResponseType。默认情况下,它是 String.class。此选项的表达式可以计算为 ParameterizedTypeReference。有关回复数据及其类型的更多信息,请参阅 RSocketRequester.RetrieveSpec.retrieveMono()RSocketRequester.RetrieveSpec.retrieveFlux() JavaDocs。

来自 RSocketOutboundGateway 的回复 payload 是一个 Mono(即使对于 fireAndForget 交互模型,它也是 Mono<Void>),始终使该组件成为 async。这种 Mono 在产生到 outputChannel(对于常规通道)或按需由 FluxMessageChannel 处理之前被订阅。对于 requestStreamrequestChannel 交互模型的 Flux 响应也被包装到一个回复 Mono 中。它可以通过 FluxMessageChannel 使用直通服务激活器在向下游扁平化。

@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
    return payload;
}

或者在目标应用程序逻辑中显式订阅。

预期响应类型也可以配置(或通过表达式评估)为 void,将此网关视为出站通道适配器。但是,outputChannel 仍然必须配置(即使它只是一个 NullChannel)以启动对返回的 Mono 的订阅。

有关如何配置 RSocketOutboundGateway 端点以及如何处理下游有效负载的示例,请参见 使用 Java 配置 RSocket 端点

RSocket 命名空间支持

Spring Integration 提供了一个 rsocket 命名空间和相应的模式定义。要在您的配置中包含它,请在您的应用程序上下文配置文件中添加以下命名空间声明

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/rsocket
    https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
    ...
</beans>

入站

要使用 XML 配置 Spring Integration RSocket 入站通道适配器,您需要使用 int-rsocket 命名空间中的适当 inbound-gateway 组件。以下示例展示了如何配置它

<int-rsocket:inbound-gateway id="inboundGateway"
                             path="testPath"
                             interaction-models="requestStream,requestChannel"
                             rsocket-connector="clientRSocketConnector"
                             request-channel="requestChannel"
                             rsocket-strategies="rsocketStrategies"
                             request-element-type="byte[]"/>

ClientRSocketConnectorServerRSocketConnector 应该配置为通用的 <bean> 定义。

出站

<int-rsocket:outbound-gateway id="outboundGateway"
                              client-rsocket-connector="clientRSocketConnector"
                              auto-startup="false"
                              interaction-model="fireAndForget"
                              route-expression="'testRoute'"
                              request-channel="requestChannel"
                              publisher-element-type="byte[]"
                              expected-response-type="java.util.Date"
                              metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>

有关所有这些 XML 属性的描述,请参见 spring-integration-rsocket.xsd

使用 Java 配置 RSocket 端点

以下示例展示了如何使用 Java 配置 RSocket 入站端点

@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
    RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
    rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
    return rsocketInboundGateway;
}

@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
    return payload.next().map(String::toUpperCase);
}

此配置假设使用 ClientRSocketConnectorServerRSocketConnector,用于在“echo”路径上自动检测此类端点。请注意 @Transformer 签名及其对 RSocket 请求的完全响应式处理以及生成响应式回复。

以下示例展示了如何使用 Java DSL 配置 RSocket 入站网关

@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
    return IntegrationFlow
        .from(RSockets.inboundGateway("/uppercase")
                   .interactionModels(RSocketInteractionModel.requestChannel))
        .<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
        .get();
}

此配置假设使用 ClientRSocketConnectorServerRSocketConnector,用于在“/uppercase”路径上自动检测此类端点,并期望交互模型为“请求通道”。

以下示例展示了如何使用 Java 配置 RSocket 出站网关

@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
    RSocketOutboundGateway rsocketOutboundGateway =
            new RSocketOutboundGateway(
                    new FunctionExpression<Message<?>>((m) ->
                        m.getHeaders().get("route_header")));
    rsocketOutboundGateway.setInteractionModelExpression(
            new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
    rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
    return rsocketOutboundGateway;
}

setClientRSocketConnector() 仅在客户端侧需要。在服务器端,必须在请求消息中提供带有 RSocketRequester 值的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER 标头。

以下示例展示了如何使用 Java DSL 配置 RSocket 出站网关

@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
    return IntegrationFlow
        .from(Function.class)
        .handle(RSockets.outboundGateway("/uppercase")
            .interactionModel(RSocketInteractionModel.requestResponse)
            .expectedResponseType(String.class)
            .clientRSocketConnector(clientRSocketConnector))
        .get();
}

有关如何在上述流程开头使用提到的 Function 接口的更多信息,请参见 IntegrationFlow 作为网关