RSocket 支持
RSocket Spring Integration 模块(spring-integration-rsocket
)支持执行 RSocket 应用协议。
您需要在项目中包含此依赖
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
<version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:6.4.4"
此模块从版本 5.2 开始提供,并基于 Spring Messaging 基础,其 RSocket 组件实现包括 RSocketRequester
、RSocketMessageHandler
和 RSocketStrategies
。有关 RSocket 协议、术语和组件的更多信息,请参阅 Spring Framework RSocket 支持。
在通过通道适配器启动集成流处理之前,我们需要在服务器和客户端之间建立 RSocket 连接。为此,Spring Integration RSocket 支持提供了 ServerRSocketConnector
和 ClientRSocketConnector
作为 AbstractRSocketConnector
的实现。
ServerRSocketConnector
根据提供的 io.rsocket.transport.ServerTransport
在主机和端口上公开一个监听器,用于接受来自客户端的连接。内部 RSocketServer
实例可以通过 setServerConfigurer()
进行自定义,还可以配置其他选项,例如用于有效载荷数据和头部元数据的 RSocketStrategies
和 MimeType
。当客户端请求者提供 setupRoute
(见下面的 ClientRSocketConnector
)时,连接的客户端会以 RSocketRequester
的形式存储,其键由 clientRSocketKeyStrategy
BiFunction<Map<String, Object>, DataBuffer, Object>
确定。默认情况下,连接数据被转换为 UTF-8 字符集的字符串作为键。此 RSocketRequester
注册表可在应用程序逻辑中用于确定特定客户端连接以与之交互,或将相同消息发布到所有连接的客户端。当客户端建立连接时,ServerRSocketConnector
会发出一个 RSocketConnectedEvent
。这类似于 Spring Messaging 模块中 @ConnectMapping
注解提供的功能。映射模式 *
意味着接受所有客户端路由。RSocketConnectedEvent
可用于通过 DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER
头部区分不同的路由。
典型的服务器配置可能如下所示
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
+ headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
return serverRSocketConnector;
}
@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
...
}
所有选项,包括 RSocketStrategies
bean 和用于 RSocketConnectedEvent
的 @EventListener
都是可选的。更多信息请参阅 ServerRSocketConnector
的 JavaDocs。
从版本 5.2.1 开始,ServerRSocketMessageHandler
被提取为公共的顶级类,以便可能连接到现有的 RSocket 服务器。当 ServerRSocketConnector
提供了 ServerRSocketMessageHandler
的外部实例时,它不会在内部创建 RSocket 服务器,而只是将所有处理逻辑委托给提供的实例。此外,ServerRSocketMessageHandler
可以配置一个 messageMappingCompatible
标志,以便处理 RSocket 控制器的 @MessageMapping
,完全替代标准 RSocketMessageHandler
提供的功能。这在混合配置中可能非常有用,即经典 @MessageMapping
方法与 RSocket 通道适配器同时存在于同一个应用程序中,并且应用程序中存在外部配置的 RSocket 服务器。
ClientRSocketConnector
用作基于通过提供的 ClientTransport
连接的 RSocket
的 RSocketRequester
的持有者。RSocketConnector
可以通过提供的 RSocketConnectorConfigurer
进行自定义。在此组件上还可以配置 setupRoute
(带有可选的模板变量)以及带有元数据的 setupData
。
典型的客户端配置可能如下所示
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector =
new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
clientRSocketConnector.setSetupRoute("clientConnect/{user}");
clientRSocketConnector.setSetupRouteVariables("myUser");
return clientRSocketConnector;
}
这些选项(包括 RSocketStrategies
bean)大多数是可选的。注意我们是如何连接到在任意端口上本地启动的 RSocket 服务器的。有关 setupData
用例,请参阅 ServerRSocketConnector.clientRSocketKeyStrategy
。有关更多信息,另请参阅 ClientRSocketConnector
及其超类 AbstractRSocketConnector
的 JavaDocs。
ClientRSocketConnector
和 ServerRSocketConnector
都负责将入站通道适配器映射到其 path
配置,以便路由传入的 RSocket 请求。更多信息请参阅下一节。
RSocket 入站网关
RSocketInboundGateway
负责接收 RSocket 请求并生成响应(如果有)。它需要一个 path
映射数组,这些映射可以是类似于 MVC 请求映射或 @MessageMapping
语义的模式。此外(从版本 5.2.2 开始),可以在 RSocketInboundGateway
上配置一组交互模型(参见 RSocketInteractionModel
),以按特定的帧类型限制 RSocket 请求到达此端点。默认情况下,支持所有交互模型。这样的 bean,根据其 IntegrationRSocketEndpoint
实现(ReactiveMessageHandler
的扩展),会被 ServerRSocketConnector
或 ClientRSocketConnector
自动检测到,用于内部 IntegrationRSocketMessageHandler
中处理传入请求的路由逻辑。可以向 RSocketInboundGateway
提供 AbstractRSocketConnector
以进行显式端点注册。这样,在该 AbstractRSocketConnector
上会禁用自动检测选项。RSocketStrategies
也可以注入到 RSocketInboundGateway
中,或者它们可以从提供的 AbstractRSocketConnector
获取,从而覆盖任何显式注入。解码器从这些 RSocketStrategies
中使用,根据提供的 requestElementType
解码请求负载。如果传入的 Message
中未提供 RSocketPayloadReturnValueHandler.RESPONSE_HEADER
头部,则 RSocketInboundGateway
将请求视为 fireAndForget
RSocket 交互模型。在这种情况下,RSocketInboundGateway
执行简单的 send
操作到 outputChannel
。否则,使用 RSocketPayloadReturnValueHandler.RESPONSE_HEADER
头部中的 MonoProcessor
值向 RSocket 发送回复。为此,RSocketInboundGateway
对 outputChannel
执行 sendAndReceiveMessageReactive
操作。要发送到下游的消息的 payload
始终是 Flux
,根据 MessagingRSocket
逻辑。当处于 fireAndForget
RSocket 交互模型时,消息具有简单的转换后的 payload
。回复的 payload
可以是一个普通对象或一个 Publisher
- RSocketInboundGateway
会根据 RSocketStrategies
中提供的编码器将它们适当地转换为 RSocket 响应。
从版本 5.3 开始,RSocketInboundGateway
添加了一个 decodeFluxAsUnit
选项(默认为 false
)。默认情况下,传入的 Flux
被转换为其每个事件单独解码。这与当前 @MessageMapping
语义下的行为完全一致。要恢复之前的行为或根据应用程序要求将整个 Flux
解码为一个单元,必须将 decodeFluxAsUnit
设置为 true
。然而,目标解码逻辑取决于选择的 Decoder
,例如 StringDecoder
需要流中存在换行符(默认)来指示字节缓冲区结束。
有关如何配置 RSocketInboundGateway
端点以及如何处理下游负载的示例,请参阅使用 Java 配置 RSocket 端点。
RSocket 出站网关
RSocketOutboundGateway
是一个 AbstractReplyProducingMessageHandler
,用于向 RSocket 发送请求并根据 RSocket 响应(如果有)生成回复。低级别 RSocket 协议交互委托给从提供的 ClientRSocketConnector
或从服务器端请求消息中的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
头部解析的 RSocketRequester
。服务器端的目标 RSocketRequester
可以从 RSocketConnectedEvent
或通过 ServerRSocketConnector.setClientRSocketKeyStrategy()
为连接请求映射选择的某些业务键使用 ServerRSocketConnector.getClientRSocketRequester()
API 解析。更多信息请参阅 ServerRSocketConnector
的 JavaDocs。
发送请求的 route
必须显式配置( junto 与路径变量)或通过针对请求消息进行评估的 SpEL 表达式配置。
RSocket 交互模型可以通过 RSocketInteractionModel
选项或相应的表达式设置提供。默认情况下,对于常见网关用例,使用 requestResponse
。
当请求消息负载是 Publisher
时,可以提供一个 publisherElementType
选项,用于根据目标 RSocketRequester
中提供的 RSocketStrategies
编码其元素。此选项的表达式可以评估为 ParameterizedTypeReference
。有关数据及其类型的更多信息,请参阅 RSocketRequester.RequestSpec.data()
的 JavaDocs。
RSocket 请求还可以通过 metadata
进行增强。为此,可以在 RSocketOutboundGateway
上配置一个针对请求消息的 metadataExpression
。这样的表达式必须评估为 Map<Object, MimeType>
。
当 interactionModel
不是 fireAndForget
时,必须提供一个 expectedResponseType
。默认为 String.class
。此选项的表达式可以评估为 ParameterizedTypeReference
。有关回复数据及其类型的更多信息,请参阅 RSocketRequester.RetrieveSpec.retrieveMono()
和 RSocketRequester.RetrieveSpec.retrieveFlux()
的 JavaDocs。
RSocketOutboundGateway
的回复 payload
是一个 Mono
(即使对于 fireAndForget
交互模型也是 Mono<Void>
),这使得此组件始终是 async
的。对于常规通道,此 Mono
在生成到 outputChannel
之前被订阅,或者由 FluxMessageChannel
按需处理。对于 requestStream
或 requestChannel
交互模型的 Flux
响应,也会被包装到回复 Mono
中。它可以通过 FluxMessageChannel
和直通服务激活器在下游进行展平
@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
return payload;
}
或在目标应用逻辑中显式订阅。
预期的响应类型也可以配置为(或通过表达式评估为)void
,将此网关视为出站通道适配器。但是,仍然必须配置 outputChannel
(即使只是 NullChannel
)以启动对返回的 Mono
的订阅。
有关如何配置 RSocketOutboundGateway
端点以及如何处理下游负载的示例,请参阅使用 Java 配置 RSocket 端点。
RSocket 命名空间支持
Spring Integration 提供了 rsocket
命名空间和相应的模式定义。要在配置中包含它,请在应用程序上下文配置文件中添加以下命名空间声明
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/rsocket
https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
...
</beans>
入站
要使用 XML 配置 Spring Integration RSocket 入站通道适配器,您需要使用 int-rsocket
命名空间中的适当 inbound-gateway
组件。以下示例显示了如何配置它
<int-rsocket:inbound-gateway id="inboundGateway"
path="testPath"
interaction-models="requestStream,requestChannel"
rsocket-connector="clientRSocketConnector"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>
ClientRSocketConnector
和 ServerRSocketConnector
应配置为通用的 <bean>
定义。
出站
<int-rsocket:outbound-gateway id="outboundGateway"
client-rsocket-connector="clientRSocketConnector"
auto-startup="false"
interaction-model="fireAndForget"
route-expression="'testRoute'"
request-channel="requestChannel"
publisher-element-type="byte[]"
expected-response-type="java.util.Date"
metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>
有关所有这些 XML 属性的说明,请参阅 spring-integration-rsocket.xsd
。
使用 Java 配置 RSocket 端点
以下示例显示了如何使用 Java 配置 RSocket 入站端点
@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
return rsocketInboundGateway;
}
@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
return payload.next().map(String::toUpperCase);
}
此配置中假定存在 ClientRSocketConnector
或 ServerRSocketConnector
,这意味着对此类端点在“echo”路径上进行自动检测。请注意 @Transformer
的签名,它对 RSocket 请求进行完全响应式处理并生成响应式回复。
以下示例显示了如何使用 Java DSL 配置 RSocket 入站网关
@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlow
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
.get();
}
此配置中假定存在 ClientRSocketConnector
或 ServerRSocketConnector
,这意味着对此类端点在“/uppercase”路径上进行自动检测,并且预期的交互模型为“request channel”。
以下示例显示了如何使用 Java 配置 RSocket 出站网关
@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
RSocketOutboundGateway rsocketOutboundGateway =
new RSocketOutboundGateway(
new FunctionExpression<Message<?>>((m) ->
m.getHeaders().get("route_header")));
rsocketOutboundGateway.setInteractionModelExpression(
new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
return rsocketOutboundGateway;
}
仅客户端需要 setClientRSocketConnector()
。在服务器端,请求消息中必须提供带有 RSocketRequester
值的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
头部。
以下示例显示了如何使用 Java DSL 配置 RSocket 出站网关
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlow
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel(RSocketInteractionModel.requestResponse)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
有关如何在上述流程开始时使用提到的 Function
接口的更多信息,请参阅将 IntegrationFlow
用作网关。