响应式流支持

Spring Integration 在框架的某些地方和不同方面提供对 Reactive Streams 交互的支持。我们将在必要时在此处讨论它们中的大多数,并提供指向目标章节的适当链接以获取详细信息。

前言

回顾一下,Spring Integration 扩展了 Spring 编程模型以支持众所周知的企业集成模式。Spring Integration 可以在基于 Spring 的应用程序中实现轻量级消息传递,并通过声明式适配器支持与外部系统的集成。Spring Integration 的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持可维护、可测试代码至关重要的关注点分离。在目标应用程序中使用诸如 messagechannelendpoint 等一等公民来实现此目标,这使我们能够构建一个集成流(管道),其中(在大多数情况下)一个端点将消息生成到一个通道中,供另一个端点使用。这样,我们就区分了集成交互模型和目标业务逻辑。这里至关重要的一部分是介于两者之间的通道:流行为取决于其实现,而不会影响端点。

另一方面,Reactive Streams 是一个用于异步流处理的标准,具有非阻塞反压。Reactive Streams 的主要目标是控制跨异步边界的流数据交换——例如将元素传递到另一个线程或线程池——同时确保接收方不必缓冲任意数量的数据。换句话说,反压是此模型中不可或缺的一部分,以便允许在线程之间进行调解的队列受到限制。Reactive Streams 实现(例如 Project Reactor)的目的是在流应用程序的整个处理图中保留这些优点和特性。Reactive Streams 库的最终目标是以透明且平滑的方式为目标应用程序提供类型、一组操作符和支持 API,就像使用可用的编程语言结构一样,但最终解决方案不像使用普通函数链调用那样必要。它分为两个阶段:定义和执行,这发生在稍后的订阅最终反应式发布者期间,并且对数据的需求从定义的底部推送到顶部,根据需要应用反压——我们请求尽可能多的事件我们现在可以处理。反应式应用程序看起来像一个 "stream" 或正如我们在 Spring Integration 术语中习惯的那样——"flow"。事实上,自 Java 9 起,Reactive Streams SPI 在 java.util.concurrent.Flow 类中呈现。

从这里可以看出,Spring Integration 流非常适合编写 Reactive Streams 应用程序,当我们在端点上应用一些反应式框架操作符时,但事实上问题更广泛,我们需要记住并非所有端点(例如 JdbcMessageHandler)都可以透明地处理在反应式流中。当然,Spring Integration 中对 Reactive Streams 支持的主要目标是允许整个过程完全反应式、按需启动并准备好反压。在通道适配器的目标协议和系统提供 Reactive Streams 交互模型之前,这是不可能的。在下面的章节中,我们将描述 Spring Integration 中提供了哪些组件和方法来开发保留集成流结构的反应式应用程序。

Spring Integration 中的所有 Reactive Streams 交互都使用 Project Reactor 类型(如 MonoFlux)实现。

消息网关

与 Reactive Streams 交互最简单的点是 @MessagingGateway,其中我们只需将网关方法的返回类型设为 Mono<?>,当对返回的 Mono 实例进行订阅时,网关方法调用背后的整个集成流将执行。有关更多信息,请参阅 Reactor Mono。框架内部对入站网关使用类似的 Mono 响应方法,该网关完全基于与 Reactive Streams 兼容的协议(有关更多信息,请参阅下面的 Reactive 通道适配器)。发送和接收操作包装在 Mono.deffer() 中,并在有可用时对 replyChannel 标头中的响应求值进行链接。这样,特定反应协议(例如 Netty)的入站组件将成为 Spring Integration 上执行的反应流的订阅者和发起者。如果请求有效负载是反应类型,最好在反应流定义中处理它,将进程推迟到发起者订阅。为此,处理程序方法还必须返回反应类型。有关更多信息,请参阅下一部分。

反应响应有效负载

当生成响应的 MessageHandler 为响应消息返回反应类型有效负载时,它将使用为 outputChannel 提供的常规 MessageChannel 实现以异步方式处理(async 必须设置为 true),并在输出通道为 ReactiveStreamsSubscribableChannel 实现(例如 FluxMessageChannel)时使用按需订阅进行扁平化。使用标准命令式 MessageChannel 用例,如果响应有效负载是多值发布者(有关更多信息,请参阅 ReactiveAdapter.isMultiValue()),它将包装在 Mono.just() 中。因此,Mono 必须在后端显式订阅或由后端的 FluxMessageChannel 扁平化。对于 outputChannelReactiveStreamsSubscribableChannel,无需担心返回类型和订阅;框架内部会平稳处理所有内容。

有关更多信息,请参阅 异步服务激活器

有关更多信息,还请参阅 Kotlin 协程

FluxMessageChannelReactiveStreamsConsumer

FluxMessageChannelMessageChannelPublisher<Message<?>> 的组合实现。Flux 作为热源,在内部创建用于接收 send() 实现中传入消息。Publisher.subscribe() 实现委托给该内部 Flux。此外,对于按需上游消耗,FluxMessageChannelReactiveStreamsSubscribableChannel 契约提供实现。为此通道提供的任何上游 Publisher(例如,请参见下面的源轮询通道适配器和分流器)在该通道的订阅准备就绪时将自动订阅。来自这些委托发布者的事件将汇入上面提到的内部 Flux

FluxMessageChannel 的使用者必须是 org.reactivestreams.Subscriber 实例,以遵守 Reactive Streams 契约。幸运的是,Spring Integration 中的所有 MessageHandler 实现也实现了 Project Reactor 中的 CoreSubscriber。并且,由于介于两者之间的 ReactiveStreamsConsumer 实现,整个集成流配置对目标开发人员来说是透明的。在这种情况下,流行为从命令式推送模型更改为响应式拉取模型。ReactiveStreamsConsumer 还可以用于使用 IntegrationReactiveUtils 将任何 MessageChannel 转换为响应式源,从而使集成流部分响应式。

有关更多信息,请参见 FluxMessageChannel

从 5.5 版开始,ConsumerEndpointSpec 引入了 reactive() 选项,以将流中的端点作为 ReactiveStreamsConsumer,而与输入通道无关。可以提供可选的 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>,以通过 Flux.transform() 操作自定义来自输入通道的源 Flux,例如使用 publishOn()doOnNext()retry() 等。此功能表示为所有消息传递注释(@ServiceActivator@Splitter 等)的 @Reactive 子注释,通过其 reactive() 属性。

源轮询通道适配器

通常,SourcePollingChannelAdapter 依赖于由 TaskScheduler 发起的任务。轮询触发器根据提供的选项构建,并用于定期调度任务以轮询目标数据或事件源。当 outputChannelReactiveStreamsSubscribableChannel 时,使用相同的 Trigger 确定下一次执行时间,但 SourcePollingChannelAdapter 不会调度任务,而是基于 nextExecutionTime 值创建 Flux<Message<?>>,并基于前一步骤的持续时间创建 Mono.delay()。然后使用 Flux.flatMapMany() 轮询 maxMessagesPerPoll,并将它们汇入输出 Flux。此生成器 Flux 由提供的 ReactiveStreamsSubscribableChannel 订阅,以遵守下游背压。从 5.5 版开始,当 maxMessagesPerPoll == 0 时,根本不会调用源,并且 flatMapMany() 通过 Mono.empty() 结果立即完成,直到稍后将 maxMessagesPerPoll 更改为非零值,例如通过控制总线。通过这种方式,可以将任何 MessageSource 实现转换为响应式热源。

有关更多信息,请参见 轮询使用者

事件驱动的通道适配器

MessageProducerSupport 是事件驱动的通道适配器的基类,通常,其 sendMessage(Message<?>) 用作生产驱动程序 API 中的侦听器回调。当消息生产者实现构建消息的 Flux 而不是基于侦听器的功能时,此回调还可以轻松插入到 doOnNext() Reactor 运算符中。事实上,当消息生产者的 outputChannel 不是 ReactiveStreamsSubscribableChannel 时,框架中会执行此操作。但是,为了改善最终用户体验,并允许更多反压就绪功能,MessageProducerSupport 提供 subscribeToPublisher(Publisher<? extends Message<?>>) API,当 Publisher<Message<?>>> 是目标系统的数据源时,可在目标实现中使用该 API。通常,当针对源数据的 Publisher 调用目标驱动程序 API 时,会从 doStart() 实现中使用它。建议将响应式 MessageProducerSupport 实现与 FluxMessageChannel 结合用作 outputChannel,以实现按需订阅和下游事件使用。当对 Publisher 的订阅被取消时,通道适配器将进入停止状态。对这样的通道适配器调用 stop() 将完成源 Publisher 的生产。通道适配器可以重新启动,并自动订阅新创建的源 Publisher

消息源到响应式流

从版本 5.3 开始,提供了 ReactiveMessageSourceProducer。它是提供的 MessageSource 和事件驱动的生产到已配置 outputChannel 的组合。在内部,它将 MessageSource 包装到重复重新订阅的 Mono 中,生成 Flux<Message<?>> 以在上面提到的 subscribeToPublisher(Publisher<? extends Message<?>>) 中订阅。此 Mono 的订阅使用 Schedulers.boundedElastic() 完成,以避免在目标 MessageSource 中可能发生的阻塞。当消息源返回 null(没有要提取的数据)时,Mono 会变成 repeatWhenEmpty() 状态,并 delay 后续重新订阅,该重新订阅基于订阅者上下文中的 IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY Duration 条目。默认情况下,它为 1 秒。如果 MessageSource 使用标头中的 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 信息生成消息,则在原始 MonodoOnSuccess() 中确认(如果需要),如果下游流抛出带有要拒绝的失败消息的 MessagingException,则在 doOnError() 中拒绝。当轮询通道适配器的功能应转变为针对任何现有 MessageSource<?> 实现的响应式按需解决方案时,此 ReactiveMessageSourceProducer 可用于任何用例。

拆分器和聚合器

AbstractMessageSplitter 为其逻辑获取 Publisher 时,该过程自然会遍历 Publisher 中的项目,并将它们映射到要发送到 outputChannel 的消息中。如果此通道是 ReactiveStreamsSubscribableChannel,则 PublisherFlux 包装器将根据该通道按需订阅,并且此拆分器行为看起来更像 flatMap Reactor 运算符,当我们将传入事件映射到多值输出 Publisher 时。当整个集成流在拆分器之前和之后都使用 FluxMessageChannel 构建时,它最有意义,将 Spring Integration 配置与 Reactive Streams 要求及其事件处理运算符对齐。使用常规通道,将 Publisher 转换为 Iterable 以用于标准的迭代和生成拆分逻辑。

FluxAggregatorMessageHandler 是特定 Reactive Streams 逻辑实现的另一个示例,它可以被视为 Project Reactor 中的 “reactive 运算符”。它基于 Flux.groupBy()Flux.window()(或 buffer())运算符。传入消息被汇入在创建 FluxAggregatorMessageHandler 时启动的 Flux.create() 中,使其成为热源。此 FluxReactiveStreamsSubscribableChannel 按需订阅,或者当 outputChannel 不具有反应性时,直接在 FluxAggregatorMessageHandler.start() 中订阅。当整个集成流在该组件之前和之后都使用 FluxMessageChannel 构建时,此 MessageHandler 才具有其功能,从而使整个逻辑做好背压准备。

有关更多信息,请参阅 流和 Flux 拆分Flux 聚合器

Java DSL

Java DSL 中的 IntegrationFlow 可以从任何 Publisher 实例开始(请参阅 IntegrationFlow.from(Publisher<Message<T>>))。此外,使用 IntegrationFlowBuilder.toReactivePublisher() 运算符,可以将 IntegrationFlow 变成一个反应性热源。在这两种情况下,内部都使用 FluxMessageChannel;它可以根据其 ReactiveStreamsSubscribableChannel 合同订阅入站 Publisher,并且它本身就是下游订阅者的 Publisher<Message<?>>。通过动态 IntegrationFlow 注册,我们可以实现将 Reactive Streams 与此集成流桥接到 Publisher 的强大逻辑。

从 5.5.6 版本开始,存在一个 toReactivePublisher(boolean autoStartOnSubscribe) 运算符变体,用于控制返回的 Publisher<Message<?>> 背后的整个 IntegrationFlow 的生命周期。通常,对反应式发布者的订阅和消费发生在后期的运行时阶段,而不是在反应式流组合期间,甚至 ApplicationContext 启动期间。为了避免在 Publisher<Message<?>> 订阅点处对 IntegrationFlow 的生命周期管理进行样板代码编写,并为了获得更好的最终用户体验,引入了带有 autoStartOnSubscribe 标志的这个新运算符。如果为 true,它将标记 IntegrationFlow 及其组件为 autoStartup = false,因此 ApplicationContext 不会自动启动流中消息的生产和消费。相反,IntegrationFlowstart() 由内部 Flux.doOnSubscribe() 启动。与 autoStartOnSubscribe 值无关,该流由 Flux.doOnCancel()Flux.doOnTerminate() 停止 - 如果没有内容消费消息,则生成消息没有意义。

对于完全相反的使用案例,当 IntegrationFlow 应该调用反应式流并在完成后继续时,IntegrationFlowDefinition 中提供了 fluxTransform() 运算符。此时流将变成一个 FluxMessageChannel,该通道传播到提供的 fluxFunction 中,在 Flux.transform() 运算符中执行。函数的结果被包装到 Mono<Message<?>> 中,以平面映射到输出 Flux,该 Flux 由另一个 FluxMessageChannel 订阅以用于下游流。

有关更多信息,请参阅 Java DSL 章节

ReactiveMessageHandler

从 5.3 版本开始,ReactiveMessageHandler 在框架中得到本机支持。此类消息处理程序专为反应式客户端设计,这些客户端返回反应式类型以进行低级操作执行的按需订阅,并且不提供任何回复数据来继续反应式流组合。当在命令式集成流中使用 ReactiveMessageHandler 时,handleMessage() 结果在返回后立即订阅,仅仅是因为此类流中没有反应式流组合来尊重反压。在这种情况下,框架将此 ReactiveMessageHandler 包装到 ReactiveMessageHandlerAdapter 中 - MessageHandler 的普通实现。但是,当 ReactiveStreamsConsumer 参与流时(例如,当要消费的通道是 FluxMessageChannel 时),此类 ReactiveMessageHandler 会与整个反应式流组合到 flatMap() Reactor 运算符中,以在消费期间尊重反压。

开箱即用的 ReactiveMessageHandler 实现之一是用于出站通道适配器的 ReactiveMongoDbStoringMessageHandler。有关更多信息,请参阅 MongoDB 反应式通道适配器

从 6.1 版本开始,IntegrationFlowDefinition 公开了方便的 handleReactive(ReactiveMessageHandler) 终端操作符。任何 ReactiveMessageHandler 实现(甚至只是使用 Mono API 的普通 lambda)都可以用于此操作符。框架自动订阅返回的 Mono<Void>。以下是此操作符可能配置的简单示例

@Bean
public IntegrationFlow wireTapFlow1() {
    return IntegrationFlow.from("tappedChannel1")
            .wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
            .handleReactive((message) -> Mono.just(message).log().then());
}

此操作符的重载版本接受 Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>> 以自定义围绕提供的 ReactiveMessageHandler 的使用者端点。

此外,还提供了基于 ReactiveMessageHandlerSpec 的变体。在大多数情况下,它们用于特定于协议的通道适配器实现。请参阅下一节,其中包含指向目标技术的链接以及相应的反应式通道适配器。

反应式通道适配器

当集成目标协议提供反应式流解决方案时,在 Spring Integration 中实现通道适配器变得很简单。

入站、事件驱动的通道适配器实现涉及将请求(如果需要)包装到延迟的 MonoFlux 中,并且仅当协议组件启动对从侦听器方法返回的 Mono 的订阅时才执行发送(并生成答复,如果有的话)。这样,我们就有了一个反应式流解决方案,该解决方案完全封装在此组件中。当然,订阅输出通道的下游集成流应遵守反应式流规范,并以按需、支持反压的方式执行。

这并不总是通过集成流中使用的 MessageHandler 处理器的性质(或当前实现)提供的。当没有反应式实现时,可以使用线程池和队列或 FluxMessageChannel(见上文)在集成端点之前和之后来处理此限制。

反应式事件驱动入站通道适配器的示例

public class CustomReactiveMessageProducer extends MessageProducerSupport {

    private final CustomReactiveSource customReactiveSource;

    public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
        Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
        this.customReactiveSource = customReactiveSource;
    }

    @Override
    protected void doStart() {
        Flux<Message<?>> messageFlux =
            this.customReactiveSource
                .map(event - >
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());

        subscribeToPublisher(messageFlux);
    }
}

用法如下所示

public class MainFlow {
  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .channel(outputChannel)
        .get();
  }
}

或以声明方式

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
        .handle(outputChannel)
        .get();
  }
}

或者,即使没有通道适配器,我们也可以始终按照以下方式使用 Java DSL

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
    Flux<Message<?>> myFlux = this.customReactiveSource
                .map(event ->
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());
     return IntegrationFlow.from(myFlux)
        .handle(outputChannel)
        .get();
  }
}

反应式出站通道适配器实现涉及启动(或继续)反应式流,以根据目标协议提供的反应式 API 与外部系统进行交互。入站有效负载可以本身是反应式类型,也可以作为整个集成流的事件,该事件是顶部的反应式流的一部分。如果我们处于单向、即发即弃场景中,则可以立即订阅返回的反应式类型,或者将其向下游传播(请求-答复场景)以进行进一步的集成流或在目标业务逻辑中进行显式订阅,但仍然向下游保留反应式流语义。

反应式出站通道适配器的示例

public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {

    private final CustomEntityOperations customEntityOperations;

    public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
        Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
        this.customEntityOperations = customEntityOperations;
    }

    @Override
    protected Mono<Void> handleMessageInternal(Message<?> message) {
        return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
                .flatMap(mode -> {
                    switch (mode) {
                        case INSERT:
                            return handleInsert(message);
                        case UPDATE:
                            return handleUpdate(message);
                        default:
                            return Mono.error(new IllegalArgumentException());
                    }
                }).then();
    }

    private Mono<Void> handleInsert(Message<?> message) {
        return this.customEntityOperations.insert(message.getPayload())
                .then();
    }

    private Mono<Void> handleUpdate(Message<?> message) {
        return this.r2dbcEntityOperations.update(message.getPayload())
                .then();
    }

    public enum Type {
        INSERT,
        UPDATE,
    }
}

我们将能够使用这两个通道适配器

public class MainFlow {

  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Autowired
  private CustomReactiveMessageHandler customReactiveMessageHandler;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .transform(someOperation)
        .handle(customReactiveMessageHandler)
        .get();
  }
}

目前,Spring Integration 为 WebFluxRSocketMongoDbR2DBCZeroMQGraphQLApache Cassandra 提供通道适配器(或网关)实现。 Redis 流通道适配器 也是响应式的,并使用 Spring Data 中的 ReactiveStreamOperations。更多响应式通道适配器即将推出,例如基于 Spring for Apache Kafka 中的 ReactiveKafkaProducerTemplateReactiveKafkaConsumerTemplateKafka 中的 Apache Kafka。对于许多其他非响应式通道适配器,建议使用线程池以避免在响应式流处理期间发生阻塞。

响应式到命令式上下文传播

上下文传播 库位于类路径中时,Project Reactor 可以获取 ThreadLocal 值(例如 Micrometer 观察SecurityContextHolder),并将它们存储到 Subscriber 上下文中。当我们需要为跟踪填充日志记录 MDC 或让从响应式流调用的服务从作用域中恢复观察时,也可以执行相反的操作。有关其用于上下文传播的特殊运算符,请参阅 Project Reactor 文档 中的更多信息。如果我们的整个解决方案是单个响应式流组合,则存储和恢复上下文会顺利进行,因为 Subscriber 上下文从下游到组合的开始(FluxMono)都是可见的。但是,如果应用程序在不同的 Flux 实例或命令式处理与返回之间切换,则与 Subscriber 绑定的上下文可能不可用。对于此类用例,Spring Integration 提供了一个附加功能(从版本 6.0.5 开始),用于将 Reactor ContextView 存储到从响应式流生成的 IntegrationMessageHeaderAccessor.REACTOR_CONTEXT 消息头中,例如当我们执行直接 send() 操作时。然后在 FluxMessageChannel.subscribeTo() 中使用此标头为该通道将要发出的 Message 恢复 Reactor 上下文。目前,此标头由 WebFluxInboundEndpointRSocketInboundGateway 组件填充,但可以在执行响应式到命令式集成的任何解决方案中使用。填充此标头的逻辑如下

return requestMono
        .flatMap((message) ->
                Mono.deferContextual((context) ->
                        Mono.just(message)
                                .handle((messageToSend, sink) ->
                                        send(messageWithReactorContextIfAny(messageToSend, context)))));
...

private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
    if (!context.isEmpty()) {
        return getMessageBuilderFactory()
                .fromMessage(message)
                .setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
                .build();
    }
    return message;
}

请注意,我们仍然需要使用 handle() 运算符让 Reactor 从上下文中恢复 ThreadLocal 值。即使它作为标头发送,框架也不能假设它是否将要恢复到下游的 ThreadLocal 值。

要从另一个 FluxMono 组合中的 Message 还原上下文,可以执行此逻辑

Mono.just(message)
        .handle((messageToHandle, sink) -> ...)
        .contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));