WebFlux 支持

WebFlux Spring Integration 模块(spring-integration-webflux)允许以响应式方式执行 HTTP 请求和处理入站 HTTP 请求。

你需要将此依赖添加到你的项目中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-webflux</artifactId>
    <version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-webflux:6.4.4"

在非 Servlet 的服务器配置情况下,必须包含 io.projectreactor.netty:reactor-netty 依赖。

WebFlux 支持包含以下网关实现:WebFluxInboundEndpointWebFluxRequestExecutingMessageHandler。该支持完全基于 Spring WebFluxProject Reactor 的基础。有关更多信息,请参阅HTTP 支持,因为响应式 HTTP 组件和常规 HTTP 组件共享许多选项。

WebFlux Namespace 支持

Spring Integration 提供了 webflux namespace 和相应的 Schema 定义。要将其包含在你的配置中,请在你的应用上下文配置文件中添加以下 namespace 声明

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/webflux
    https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
    ...
</beans>

WebFlux 入站组件

从 5.0 版本开始,提供了 WebFluxInboundEndpoint 实现 WebHandler。此组件类似于基于 MVC 的 HttpRequestHandlingEndpointSupport,它们通过新提取的 BaseHttpInboundEndpoint 共享一些通用选项。它用于 Spring WebFlux 响应式环境(而非 MVC)。以下示例展示了 WebFlux 端点的简单实现

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
    return IntegrationFlow
        .from(WebFlux.inboundChannelAdapter("/reactivePost")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
            .statusCodeFunction(m -> HttpStatus.ACCEPTED))
        .channel(c -> c.queue("storeChannel"))
        .get();
}
@Bean
fun inboundChannelAdapterFlow() =
    integrationFlow(
        WebFlux.inboundChannelAdapter("/reactivePost")
            .apply {
                requestMapping { m -> m.methods(HttpMethod.POST) }
                requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
                statusCodeFunction { m -> HttpStatus.ACCEPTED }
            })
    {
        channel { queue("storeChannel") }
    }
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {

    @Bean
    public WebFluxInboundEndpoint simpleInboundEndpoint() {
        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setPathPatterns("/test");
        endpoint.setRequestMapping(requestMapping);
        endpoint.setRequestChannelName("serviceChannel");
        return endpoint;
    }

    @ServiceActivator(inputChannel = "serviceChannel")
    String service() {
        return "It works!";
    }

}
<int-webflux:inbound-gateway request-channel="requests" path="/sse">
    <int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>

配置类似于(示例中提到的)HttpRequestHandlingEndpointSupport,不同之处在于我们使用 @EnableWebFlux 将 WebFlux 基础设施添加到我们的集成应用中。此外,WebFluxInboundEndpoint 通过响应式 HTTP 服务器实现提供的背压(back-pressure)和按需能力,对下游流执行 sendAndReceive 操作。

回复部分也是非阻塞的,并且基于内部的 FutureReplyChannel,该通道被平铺映射到回复 Mono,用于按需解析。

你可以使用自定义的 ServerCodecConfigurerRequestedContentTypeResolver 甚至 ReactiveAdapterRegistry 配置 WebFluxInboundEndpoint。后者提供了一种机制,你可以使用它以任何响应式类型返回回复:Reactor Flux、RxJava ObservableFlowable 等。通过这种方式,我们可以使用 Spring Integration 组件实现Server Sent Events 场景,示例如下

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow sseFlow() {
    return IntegrationFlow
            .from(WebFlux.inboundGateway("/sse")
                    .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            .handle((p, h) -> Flux.just("foo", "bar", "baz"))
            .get();
}
@Bean
fun sseFlow() =
     integrationFlow(
            WebFlux.inboundGateway("/sse")
                       .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            {
                 handle { (p, h) -> Flux.just("foo", "bar", "baz") }
            }
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
    WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
    RequestMapping requestMapping = new RequestMapping();
    requestMapping.setPathPatterns("/sse");
    requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
    endpoint.setRequestMapping(requestMapping);
    endpoint.setRequestChannelName("requests");
    return endpoint;
}
<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
                               path="test1"
                               auto-startup="false"
                               phase="101"
                               request-payload-type="byte[]"
                               error-channel="errorChannel"
                               payload-expression="payload"
                               supported-methods="PUT"
                               status-code-expression="'202'"
                               header-mapper="headerMapper"
                               codec-configurer="codecConfigurer"
                               reactive-adapter-registry="reactiveAdapterRegistry"
                               requested-content-type-resolver="requestedContentTypeResolver">
            <int-webflux:request-mapping headers="foo"/>
            <int-webflux:cross-origin origin="foo" method="PUT"/>
            <int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>

有关更多可能的配置选项,请参阅请求映射支持跨源资源共享 (CORS) 支持

当请求体为空或 payloadExpression 返回 null 时,请求参数(MultiValueMap<String, String>)将用作目标消息的 payload 进行处理。

负载验证

从 5.2 版本开始,WebFluxInboundEndpoint 可以配置 Validator。与HTTP 支持中的 MVC 验证不同,它用于在执行回退和 payloadExpression 函数之前,验证请求通过 HttpMessageReader 转换为的 Publisher 中的元素。框架无法假定构建最终 payload 后 Publisher 对象可能有多复杂。如果需要限制验证可见性仅针对最终 payload(或其 Publisher 元素),则验证应该放在下游而非 WebFlux 端点。更多信息请参阅 Spring WebFlux 文档。无效的 payload 会被 IntegrationWebExchangeBindExceptionWebExchangeBindException 的扩展)拒绝,其中包含所有验证 Errors。更多信息请参阅 Spring Framework 参考手册关于验证的部分。

WebFlux 出站组件

WebFluxRequestExecutingMessageHandler(从 5.0 版本开始)的实现类似于 HttpRequestExecutingMessageHandler。它使用 Spring Framework WebFlux 模块中的 WebClient。要配置它,请定义一个类似于以下的 bean

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow outboundReactive() {
    return f -> f
        .handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
                UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
                        .queryParams(m.getPayload())
                        .build()
                        .toUri())
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(String.class));
}
@Bean
fun outboundReactive() =
    integrationFlow {
        handle(
            WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
                UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
                    .queryParams(m.getPayload())
                    .build()
                    .toUri()
            })
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(String::class.java)
        )
    }
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
    WebFluxRequestExecutingMessageHandler handler =
        new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client);
    handler.setHttpMethod(HttpMethod.POST);
    handler.setExpectedResponseType(String.class);
    return handler;
}
<int-webflux:outbound-gateway id="reactiveExample1"
    request-channel="requests"
    url="http://localhost/test"
    http-method-expression="headers.httpMethod"
    extract-request-payload="false"
    expected-response-type-expression="payload"
    charset="UTF-8"
    reply-timeout="1234"
    reply-channel="replies"/>

<int-webflux:outbound-channel-adapter id="reactiveExample2"
    url="http://localhost/example"
    http-method="GET"
    channel="requests"
    charset="UTF-8"
    extract-payload="false"
    expected-response-type="java.lang.String"
    order="3"
    auto-startup="false"/>

WebClientexchange() 操作返回一个 Mono<ClientResponse>,该 Mono 通过几个 Mono.map() 步骤映射到 AbstractIntegrationMessageBuilder,作为 WebFluxRequestExecutingMessageHandler 的输出。与 ReactiveChannel 作为 outputChannel 一起,Mono<ClientResponse> 的评估会延迟,直到下游订阅为止。否则,它被视为 async 模式,Mono 响应会被适配到 SettableListenableFuture,用于从 WebFluxRequestExecutingMessageHandler 返回异步回复。输出消息的目标 payload 取决于 WebFluxRequestExecutingMessageHandler 的配置。setExpectedResponseType(Class<?>)setExpectedResponseTypeExpression(Expression) 标识响应体元素转换的目标类型。如果 replyPayloadToFlux 设置为 true,响应体将转换为一个 Flux,其中每个元素具有提供的 expectedResponseType,并且此 Flux 将作为 payload 发送到下游。之后,你可以使用拆分器以响应式方式迭代此 Flux

此外,除了 expectedResponseTypereplyPayloadToFlux 属性之外,还可以将 BodyExtractor<?, ClientHttpResponse> 注入到 WebFluxRequestExecutingMessageHandler 中。它可用于低级别访问 ClientHttpResponse,以及对主体和 HTTP 消息头转换进行更多控制。Spring Integration 提供 ClientHttpResponseBodyExtractor 作为身份函数,以便(下游)生成整个 ClientHttpResponse 以及任何其他可能的自定义逻辑。

从 5.2 版本开始,WebFluxRequestExecutingMessageHandler 支持响应式 PublisherResourceMultiValueMap 类型作为请求消息 payload。内部使用相应的 BodyInserter 填充到 WebClient.RequestBodySpec 中。当 payload 是响应式 Publisher 时,配置的 publisherElementTypepublisherElementTypeExpression 可用于确定 publisher 元素类型。表达式必须解析为 Class<?>、解析为目标 Class<?>StringParameterizedTypeReference

从 5.5 版本开始,WebFluxRequestExecutingMessageHandler 暴露了一个 extractResponseBody 标志(默认为 true),用于仅返回响应体,或者返回整个 ResponseEntity 作为回复消息 payload,这独立于提供的 expectedResponseTypereplyPayloadToFlux。如果 ResponseEntity 中不存在 body,则此标志被忽略并返回整个 ResponseEntity

有关更多可能的配置选项,请参阅HTTP 出站组件

WebFlux 消息头映射

由于 WebFlux 组件完全基于 HTTP 协议,因此 HTTP 消息头映射没有区别。有关更多可能的选项和用于映射消息头的组件,请参阅HTTP 消息头映射

WebFlux 请求属性

从 6.0 版本开始,WebFluxRequestExecutingMessageHandler 可以配置为通过 setAttributeVariablesExpression() 评估请求属性。此 SpEL 表达式必须在 Map 中进行评估。然后,此 Map 将传播到 WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer) HTTP 请求配置回调。如果需要以键值对象的形式将信息从 Message 传递到请求,并且下游过滤器将访问这些属性以进行进一步处理,这将很有帮助。