WebFlux 支持
WebFlux Spring Integration 模块(spring-integration-webflux
)允许以响应式方式执行 HTTP 请求和处理入站 HTTP 请求。
你需要将此依赖添加到你的项目中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-webflux</artifactId>
<version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-webflux:6.4.4"
在非 Servlet 的服务器配置情况下,必须包含 io.projectreactor.netty:reactor-netty
依赖。
WebFlux 支持包含以下网关实现:WebFluxInboundEndpoint
和 WebFluxRequestExecutingMessageHandler
。该支持完全基于 Spring WebFlux 和 Project Reactor 的基础。有关更多信息,请参阅HTTP 支持,因为响应式 HTTP 组件和常规 HTTP 组件共享许多选项。
WebFlux Namespace 支持
Spring Integration 提供了 webflux
namespace 和相应的 Schema 定义。要将其包含在你的配置中,请在你的应用上下文配置文件中添加以下 namespace 声明
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/webflux
https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
...
</beans>
WebFlux 入站组件
从 5.0 版本开始,提供了 WebFluxInboundEndpoint
实现 WebHandler
。此组件类似于基于 MVC 的 HttpRequestHandlingEndpointSupport
,它们通过新提取的 BaseHttpInboundEndpoint
共享一些通用选项。它用于 Spring WebFlux 响应式环境(而非 MVC)。以下示例展示了 WebFlux 端点的简单实现
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
return IntegrationFlow
.from(WebFlux.inboundChannelAdapter("/reactivePost")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
.statusCodeFunction(m -> HttpStatus.ACCEPTED))
.channel(c -> c.queue("storeChannel"))
.get();
}
@Bean
fun inboundChannelAdapterFlow() =
integrationFlow(
WebFlux.inboundChannelAdapter("/reactivePost")
.apply {
requestMapping { m -> m.methods(HttpMethod.POST) }
requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
statusCodeFunction { m -> HttpStatus.ACCEPTED }
})
{
channel { queue("storeChannel") }
}
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {
@Bean
public WebFluxInboundEndpoint simpleInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/test");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("serviceChannel");
return endpoint;
}
@ServiceActivator(inputChannel = "serviceChannel")
String service() {
return "It works!";
}
}
<int-webflux:inbound-gateway request-channel="requests" path="/sse">
<int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>
配置类似于(示例中提到的)HttpRequestHandlingEndpointSupport
,不同之处在于我们使用 @EnableWebFlux
将 WebFlux 基础设施添加到我们的集成应用中。此外,WebFluxInboundEndpoint
通过响应式 HTTP 服务器实现提供的背压(back-pressure)和按需能力,对下游流执行 sendAndReceive
操作。
回复部分也是非阻塞的,并且基于内部的 FutureReplyChannel ,该通道被平铺映射到回复 Mono ,用于按需解析。 |
你可以使用自定义的 ServerCodecConfigurer
、RequestedContentTypeResolver
甚至 ReactiveAdapterRegistry
配置 WebFluxInboundEndpoint
。后者提供了一种机制,你可以使用它以任何响应式类型返回回复:Reactor Flux
、RxJava Observable
、Flowable
等。通过这种方式,我们可以使用 Spring Integration 组件实现Server Sent Events 场景,示例如下
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
.handle((p, h) -> Flux.just("foo", "bar", "baz"))
.get();
}
@Bean
fun sseFlow() =
integrationFlow(
WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
{
handle { (p, h) -> Flux.just("foo", "bar", "baz") }
}
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/sse");
requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("requests");
return endpoint;
}
<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
path="test1"
auto-startup="false"
phase="101"
request-payload-type="byte[]"
error-channel="errorChannel"
payload-expression="payload"
supported-methods="PUT"
status-code-expression="'202'"
header-mapper="headerMapper"
codec-configurer="codecConfigurer"
reactive-adapter-registry="reactiveAdapterRegistry"
requested-content-type-resolver="requestedContentTypeResolver">
<int-webflux:request-mapping headers="foo"/>
<int-webflux:cross-origin origin="foo" method="PUT"/>
<int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>
有关更多可能的配置选项,请参阅请求映射支持和跨源资源共享 (CORS) 支持。
当请求体为空或 payloadExpression
返回 null
时,请求参数(MultiValueMap<String, String>
)将用作目标消息的 payload
进行处理。
负载验证
从 5.2 版本开始,WebFluxInboundEndpoint
可以配置 Validator
。与HTTP 支持中的 MVC 验证不同,它用于在执行回退和 payloadExpression
函数之前,验证请求通过 HttpMessageReader
转换为的 Publisher
中的元素。框架无法假定构建最终 payload 后 Publisher
对象可能有多复杂。如果需要限制验证可见性仅针对最终 payload(或其 Publisher
元素),则验证应该放在下游而非 WebFlux 端点。更多信息请参阅 Spring WebFlux 文档。无效的 payload 会被 IntegrationWebExchangeBindException
(WebExchangeBindException
的扩展)拒绝,其中包含所有验证 Errors
。更多信息请参阅 Spring Framework 参考手册关于验证的部分。
WebFlux 出站组件
WebFluxRequestExecutingMessageHandler
(从 5.0 版本开始)的实现类似于 HttpRequestExecutingMessageHandler
。它使用 Spring Framework WebFlux 模块中的 WebClient
。要配置它,请定义一个类似于以下的 bean
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow outboundReactive() {
return f -> f
.handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri())
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class));
}
@Bean
fun outboundReactive() =
integrationFlow {
handle(
WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri()
})
.httpMethod(HttpMethod.GET)
.expectedResponseType(String::class.java)
)
}
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
WebFluxRequestExecutingMessageHandler handler =
new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client);
handler.setHttpMethod(HttpMethod.POST);
handler.setExpectedResponseType(String.class);
return handler;
}
<int-webflux:outbound-gateway id="reactiveExample1"
request-channel="requests"
url="http://localhost/test"
http-method-expression="headers.httpMethod"
extract-request-payload="false"
expected-response-type-expression="payload"
charset="UTF-8"
reply-timeout="1234"
reply-channel="replies"/>
<int-webflux:outbound-channel-adapter id="reactiveExample2"
url="http://localhost/example"
http-method="GET"
channel="requests"
charset="UTF-8"
extract-payload="false"
expected-response-type="java.lang.String"
order="3"
auto-startup="false"/>
WebClient
的 exchange()
操作返回一个 Mono<ClientResponse>
,该 Mono 通过几个 Mono.map()
步骤映射到 AbstractIntegrationMessageBuilder
,作为 WebFluxRequestExecutingMessageHandler
的输出。与 ReactiveChannel
作为 outputChannel
一起,Mono<ClientResponse>
的评估会延迟,直到下游订阅为止。否则,它被视为 async
模式,Mono 响应会被适配到 SettableListenableFuture
,用于从 WebFluxRequestExecutingMessageHandler
返回异步回复。输出消息的目标 payload 取决于 WebFluxRequestExecutingMessageHandler
的配置。setExpectedResponseType(Class<?>)
或 setExpectedResponseTypeExpression(Expression)
标识响应体元素转换的目标类型。如果 replyPayloadToFlux
设置为 true
,响应体将转换为一个 Flux
,其中每个元素具有提供的 expectedResponseType
,并且此 Flux
将作为 payload 发送到下游。之后,你可以使用拆分器以响应式方式迭代此 Flux
。
此外,除了 expectedResponseType
和 replyPayloadToFlux
属性之外,还可以将 BodyExtractor<?, ClientHttpResponse>
注入到 WebFluxRequestExecutingMessageHandler
中。它可用于低级别访问 ClientHttpResponse
,以及对主体和 HTTP 消息头转换进行更多控制。Spring Integration 提供 ClientHttpResponseBodyExtractor
作为身份函数,以便(下游)生成整个 ClientHttpResponse
以及任何其他可能的自定义逻辑。
从 5.2 版本开始,WebFluxRequestExecutingMessageHandler
支持响应式 Publisher
、Resource
和 MultiValueMap
类型作为请求消息 payload。内部使用相应的 BodyInserter
填充到 WebClient.RequestBodySpec
中。当 payload 是响应式 Publisher
时,配置的 publisherElementType
或 publisherElementTypeExpression
可用于确定 publisher 元素类型。表达式必须解析为 Class<?>
、解析为目标 Class<?>
的 String
或 ParameterizedTypeReference
。
从 5.5 版本开始,WebFluxRequestExecutingMessageHandler
暴露了一个 extractResponseBody
标志(默认为 true
),用于仅返回响应体,或者返回整个 ResponseEntity
作为回复消息 payload,这独立于提供的 expectedResponseType
或 replyPayloadToFlux
。如果 ResponseEntity
中不存在 body,则此标志被忽略并返回整个 ResponseEntity
。
有关更多可能的配置选项,请参阅HTTP 出站组件。
WebFlux 消息头映射
由于 WebFlux 组件完全基于 HTTP 协议,因此 HTTP 消息头映射没有区别。有关更多可能的选项和用于映射消息头的组件,请参阅HTTP 消息头映射。
WebFlux 请求属性
从 6.0 版本开始,WebFluxRequestExecutingMessageHandler
可以配置为通过 setAttributeVariablesExpression()
评估请求属性。此 SpEL 表达式必须在 Map
中进行评估。然后,此 Map 将传播到 WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer)
HTTP 请求配置回调。如果需要以键值对象的形式将信息从 Message
传递到请求,并且下游过滤器将访问这些属性以进行进一步处理,这将很有帮助。