Reactive Streams 支持

Spring Integration 在框架的某些地方以及从不同的方面提供了对 Reactive Streams 交互的支持。我们将在这里讨论其中的大部分内容,并在必要时提供指向目标章节的适当链接以获取详细信息。

前言

回顾一下,Spring Integration 扩展了 Spring 编程模型,以支持众所周知的企业集成模式(Enterprise Integration Patterns)。Spring Integration 使得在基于 Spring 的应用程序中进行轻量级消息传递成为可能,并通过声明式适配器支持与外部系统的集成。Spring Integration 的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持实现可维护、可测试代码所需的关注点分离。这个目标通过在目标应用程序中使用第一类公民(例如 messagechannelendpoint)来实现,这些公民允许我们构建一个集成流(管道),在其中(大多数情况下)一个端点将消息生成到通道中,供另一个端点消费。通过这种方式,我们区分了集成交互模型与目标业务逻辑。这里的关键部分是中间的通道:流的行为取决于其实现,而端点保持不变。

另一方面,Reactive Streams 是一个带有非阻塞背压的异步流处理标准。Reactive Streams 的主要目标是管理流数据跨异步边界的交换——例如将元素传递给另一个线程或线程池——同时确保接收端不会被迫缓存任意数量的数据。换句话说,背压是该模型不可或缺的一部分,以允许在线程之间进行调解的队列是有界的。Reactive Streams 实现(例如 Project Reactor)的目的是在流应用程序的整个处理图中保留这些优点和特性。Reactive Streams 库的最终目标是为目标应用程序提供类型、一组操作符和支持 API,使其尽可能透明和顺畅地利用现有编程语言结构,但最终解决方案并不像普通函数链调用那样是命令式的。它分为两个阶段:定义和执行,执行稍后会在订阅最终的响应式 publisher 时发生,并且数据需求从定义的底部向上推送,根据需要应用背压——我们请求尽可能多当前可以处理的事件。响应式应用程序看起来像一个 "流",或者像我们在 Spring Integration 术语中习惯称呼的 "流"。事实上,Java 9 以来的 Reactive Streams SPI 在 java.util.concurrent.Flow 类中体现。

从这里看来,Spring Integration 流确实非常适合编写 Reactive Streams 应用程序,因为我们在端点上应用了一些响应式框架操作符,但事实上问题要广泛得多,我们需要记住,并非所有端点(例如 JdbcMessageHandler)都可以透明地在响应式流中处理。当然,Spring Integration 对 Reactive Streams 支持的主要目标是允许整个过程完全响应式、按需启动且支持背压。这要等到通道适配器的目标协议和系统提供 Reactive Streams 交互模型才能实现。在下面的章节中,我们将描述 Spring Integration 中为开发响应式应用程序并保留集成流结构提供的组件和方法。

Spring Integration 中所有的 Reactive Streams 交互都使用 Project Reactor 类型实现,例如 MonoFlux

消息网关

与 Reactive Streams 交互的最简单点是 @MessagingGateway,在这里我们只需要将网关方法的返回类型设置为 Mono<?>——整个集成流在网关方法调用之后将在对返回的 Mono 实例进行订阅时执行。有关更多信息,请参阅 Reactor Mono。框架内部对完全基于 Reactive Streams 兼容协议的入站网关使用了类似的 Mono 回复方法(有关更多信息,请参阅下面的响应式通道适配器)。发送-接收操作被包装在 Mono.defer() 中,并在 replyChannel 头部可用时链式地评估回复。通过这种方式,特定响应式协议(例如 Netty)的入站组件将充当订阅者并启动在 Spring Integration 上执行的响应式流。如果请求载荷是响应式类型,最好在响应式流定义中处理它,将流程推迟到发起者订阅时。为此,处理程序方法也必须返回响应式类型。有关更多信息,请参阅下一节。

响应式回复载荷

当生成回复的 MessageHandler 为回复消息返回响应式类型载荷时,它会以异步方式处理,并为 outputChannel 提供常规的 MessageChannel 实现(必须设置 asynctrue),并在输出通道是 ReactiveStreamsSubscribableChannel 实现(例如 FluxMessageChannel)时按需订阅进行展平。在使用标准的命令式 MessageChannel 的情况下,如果回复载荷是多值 publisher(有关更多信息,请参阅 ReactiveAdapter.isMultiValue()),它会被包装到 Mono.just() 中。结果是,该 Mono 必须在下游显式订阅,或者由下游的 FluxMessageChannel 展平。如果 outputChannelReactiveStreamsSubscribableChannel,则无需担心返回类型和订阅;所有内容都由框架内部平滑处理。

有关更多信息,请参阅 异步服务激活器

另请参阅 Kotlin Coroutines 以获取更多信息。

FluxMessageChannelReactiveStreamsConsumer

FluxMessageChannelMessageChannelPublisher<Message<?>> 的组合实现。一个 Flux,作为一个热源,在内部被创建,用于接收来自 send() 实现的入站消息。Publisher.subscribe() 的实现被委托给该内部 Flux。此外,为了按需上游消费,FluxMessageChannel 提供了 ReactiveStreamsSubscribableChannel 契约的实现。为该通道提供的任何上游 Publisher(例如,参阅下面的源轮询通道适配器和分割器)在该通道订阅就绪时会自动订阅。来自此委托 publisher 的事件被沉入上述内部 Flux 中。

FluxMessageChannel 的消费者必须是 org.reactivestreams.Subscriber 实例,以遵守 Reactive Streams 契约。幸运的是,Spring Integration 中所有的 MessageHandler 实现也都实现了 Project Reactor 的 CoreSubscriber。并且由于中间的 ReactiveStreamsConsumer 实现,整个集成流配置对目标开发者来说是透明的。在这种情况下,流的行为从命令式推模型变为响应式拉模型。ReactiveStreamsConsumer 还可以用于使用 IntegrationReactiveUtils 将任何 MessageChannel 转换为响应式源,从而使集成流部分响应式化。

有关更多信息,请参阅 FluxMessageChannel

从版本 5.5 开始,ConsumerEndpointSpec 引入了一个 reactive() 选项,使得流中的端点成为 ReactiveStreamsConsumer,而与输入通道无关。可选的 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> 可以提供,以便通过 Flux.transform() 操作(例如使用 publishOn()doOnNext()retry() 等)自定义输入通道的源 Flux。此功能通过所有消息传递注解(@ServiceActivator@Splitter 等)的 reactive() 属性表示为 @Reactive 子注解。

源轮询通道适配器

通常,SourcePollingChannelAdapter 依赖于由 TaskScheduler 启动的任务。根据提供的选项构建轮询触发器,用于定期调度任务以轮询目标数据或事件源。当 outputChannelReactiveStreamsSubscribableChannel 时,使用相同的 Trigger 来确定下一次执行时间,但不是调度任务,而是 SourcePollingChannelAdapter 基于 Flux.generate() 创建一个 Flux<Message<?>>,用于 nextExecutionTime 值,并基于前一步骤的持续时间使用 Mono.delay()。然后使用 Flux.flatMapMany() 轮询 maxMessagesPerPoll 并将它们沉入输出 Flux 中。此生成器 Flux 由提供的 ReactiveStreamsSubscribableChannel 订阅,遵循下游的背压。从版本 5.5 开始,当 maxMessagesPerPoll == 0 时,源根本不会被调用,并且 flatMapMany() 通过 Mono.empty() 结果立即完成,直到 maxMessagesPerPoll 稍后更改为非零值,例如通过控制总线(Control Bus)。通过这种方式,任何 MessageSource 实现都可以转换为响应式热源。

有关更多信息,请参阅 轮询消费者

事件驱动通道适配器

MessageProducerSupport 是事件驱动通道适配器的基类,通常其 sendMessage(Message<?>) 用作生成驱动程序 API 中的监听器回调。当消息生产者实现构建一个消息 Flux 而不是基于监听器的功能时,此回调也可以轻松地插入到 doOnNext() Reactor 操作符中。事实上,当消息生产者的 outputChannel 不是 ReactiveStreamsSubscribableChannel 时,框架就会这样做。然而,为了更好的最终用户体验,并允许更多支持背压的功能,MessageProducerSupport 提供了一个 subscribeToPublisher(Publisher<? extends Message<?>>) API,供目标实现用于当 Publisher<Message<?>>> 是目标系统的数据源时。通常,它用于 doStart() 实现中,当调用目标驱动程序 API 以获取源数据的 Publisher 时。建议将响应式 MessageProducerSupport 实现与 FluxMessageChannel 作为 outputChannel 结合使用,以实现按需订阅和下游事件消费。当对 Publisher 的订阅被取消时,通道适配器进入停止状态。对此通道适配器调用 stop() 会停止源 Publisher 的生产。通道适配器可以通过自动订阅新创建的源 Publisher 来重新启动。

从 Message Source 到 Reactive Streams

从版本 5.3 开始,提供了 ReactiveMessageSourceProducer。它是提供的 MessageSource 和到配置的 outputChannel 的事件驱动生产的组合。在内部,它将 MessageSource 包装到反复重新订阅的 Mono 中,生成一个 Flux<Message<?>>,用于在上述 subscribeToPublisher(Publisher<? extends Message<?>>) 中订阅。对此 Mono 的订阅是使用 Schedulers.boundedElastic() 完成的,以避免目标 MessageSource 中可能的阻塞。当消息源返回 null(没有数据可拉取)时,Mono 会进入 repeatWhenEmpty() 状态,并根据订阅者上下文中的 IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY Duration 条目延迟进行后续的重新订阅。默认情况下,延迟为 1 秒。如果 MessageSource 在头部生成包含 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 信息的 message,则在原始 MonodoOnSuccess() 中(如果需要)进行确认,并且如果下游流抛出带有失败 message 的 MessagingException 需要拒绝,则在 doOnError() 中进行拒绝。此 ReactiveMessageSourceProducer 可用于任何需要将轮询通道适配器的功能转换为任何现有 MessageSource<?> 实现的响应式、按需解决方案的用例。

分割器与聚合器

AbstractMessageSplitter 收到一个 Publisher 作为其逻辑时,流程会自然地遍历 Publisher 中的项,将它们映射到消息并发送到 outputChannel。如果此通道是 ReactiveStreamsSubscribableChannel,则该通道会按需订阅 Publisher 的 Flux 包装器,并且此分割器行为更像是一个 flatMap Reactor 操作符,当我们把入站事件映射到多值输出 Publisher 时。这在整个集成流由分割器前后的 FluxMessageChannel 构建时最有意义,这样可以将 Spring Integration 配置与 Reactive Streams 的要求及其事件处理操作符对齐。对于常规通道,Publisher 会转换为 Iterable,用于标准的迭代和生产分割逻辑。

FluxAggregatorMessageHandler 是特定 Reactive Streams 逻辑实现的另一个示例,可以被视为 Project Reactor 中的 "响应式操作符"。它基于 Flux.groupBy()Flux.window()(或 buffer())操作符。当 FluxAggregatorMessageHandler 创建时,入站消息会沉入一个启动的 Flux.create() 中,使其成为一个热源。该 Flux 由 ReactiveStreamsSubscribableChannel 按需订阅,或者当 outputChannel 不是响应式时直接在 FluxAggregatorMessageHandler.start() 中订阅。当整个集成流由该组件前后的 FluxMessageChannel 构建时,此 MessageHandler 具有其强大之处,使整个逻辑支持背压。

有关更多信息,请参阅 Stream 和 Flux 分割Flux 聚合器

Java DSL

Java DSL 中的 IntegrationFlow 可以从任何 Publisher 实例开始(请参阅 IntegrationFlow.from(Publisher<Message<T>>))。此外,使用 IntegrationFlowBuilder.toReactivePublisher() 操作符,IntegrationFlow 可以转换为响应式热源。在这两种情况下,内部都使用 FluxMessageChannel;它可以根据其 ReactiveStreamsSubscribableChannel 契约订阅入站 Publisher,并且它本身就是一个 Publisher<Message<?>>,供下游订阅者使用。通过动态 IntegrationFlow 注册,我们可以实现将 Reactive Streams 与此集成流连接到/来自 Publisher 的强大逻辑。

从版本 5.5.6 开始,引入了一个 toReactivePublisher(boolean autoStartOnSubscribe) 操作符变体,用于控制返回的 Publisher<Message<?>> 背后的整个 IntegrationFlow 的生命周期。通常,对响应式 publisher 的订阅和消费发生在后期的运行时阶段,而不是在响应式流组合期间,甚至不是在 ApplicationContext 启动期间。为了避免在 Publisher<Message<?>> 订阅点对 IntegrationFlow 进行生命周期管理的样板代码,并为了更好的最终用户体验,引入了这个带有 autoStartOnSubscribe 标志的新操作符。如果设置为 true,它会将 IntegrationFlow 及其组件标记为 autoStartup = false,这样 ApplicationContext 就不会自动启动流中的消息生产和消费。相反,IntegrationFlowstart() 是从内部的 Flux.doOnSubscribe() 启动的。无论 autoStartOnSubscribe 的值如何,流都会从 Flux.doOnCancel()Flux.doOnTerminate() 停止——如果没有任何东西可以消费消息,生产消息就没有意义。

对于完全相反的用例,当 IntegrationFlow 需要调用响应式流并在完成后继续时,IntegrationFlowDefinition 中提供了 fluxTransform() 操作符。此时的流被转换为 FluxMessageChannel,该通道被传播到提供的 fluxFunction 中,并在 Flux.transform() 操作符中执行。函数的返回值被包装在 Mono<Message<?>> 中,用于平铺映射到输出 Flux,该 Flux 由另一个 FluxMessageChannel 订阅,以进行下游流。

有关更多信息,请参阅 Java DSL 章节

ReactiveMessageHandler

从版本 5.3 开始,框架原生支持 ReactiveMessageHandler。这种类型的消息处理器专为响应式客户端设计,它返回一个响应式类型,用于按需订阅以执行低级别操作,并且不提供任何回复数据来继续响应式流组合。当在命令式集成流中使用 ReactiveMessageHandler 时,handleMessage() 的结果会在返回后立即订阅,仅仅因为在此类流中没有响应式流组合来遵守背压。在这种情况下,框架将此 ReactiveMessageHandler 包装到 ReactiveMessageHandlerAdapter 中——一个简单的 MessageHandler 实现。然而,当流中包含 ReactiveStreamsConsumer 时(例如,当消费的通道是 FluxMessageChannel 时),此类 ReactiveMessageHandler 会与整个响应式流一起组合,使用 flatMap() Reactor 操作符,以在消费期间遵守背压。

开箱即用的 ReactiveMessageHandler 实现之一是用于出站通道适配器的 ReactiveMongoDbStoringMessageHandler。有关更多信息,请参阅 MongoDB 响应式通道适配器

从版本 6.1 开始,IntegrationFlowDefinition 暴露了一个方便的 handleReactive(ReactiveMessageHandler) 终端操作符。任何 ReactiveMessageHandler 实现(甚至仅仅是一个使用 Mono API 的简单 lambda 表达式)都可以用于此操作符。框架会自动订阅返回的 Mono<Void>。以下是此操作符可能配置的一个简单示例:

@Bean
public IntegrationFlow wireTapFlow1() {
    return IntegrationFlow.from("tappedChannel1")
            .wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
            .handleReactive((message) -> Mono.just(message).log().then());
}

此操作符的重载版本接受 Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>>,以自定义围绕提供的 ReactiveMessageHandler 的消费者端点。

此外,还提供了基于 ReactiveMessageHandlerSpec 的变体。在大多数情况下,它们用于特定协议的通道适配器实现。请参阅下一节,其中包含指向目标技术的链接及其相应的响应式通道适配器。

响应式通道适配器

当集成目标协议提供 Reactive Streams 解决方案时,在 Spring Integration 中实现通道适配器就变得简单直接。

入站、事件驱动的通道适配器实现是关于(如有必要)将请求包装成一个延迟的 MonoFlux,并且仅当协议组件发起对监听器方法返回的 Mono 的订阅时,才执行发送(并生成回复,如果需要)。通过这种方式,我们将响应式流解决方案精确地封装在这个组件中。当然,下游集成流在输出通道上订阅时应遵循 Reactive Streams 规范,并以按需、支持背压的方式执行。

这并不总是通过集成流中使用的 MessageHandler 处理器本身的性质(或当前实现)可用。当没有响应式实现时,可以使用线程池和队列或 FluxMessageChannel(见上文),在集成端点之前和之后处理此限制。

一个响应式事件驱动入站通道适配器的示例

public class CustomReactiveMessageProducer extends MessageProducerSupport {

    private final CustomReactiveSource customReactiveSource;

    public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
        Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
        this.customReactiveSource = customReactiveSource;
    }

    @Override
    protected void doStart() {
        Flux<Message<?>> messageFlux =
            this.customReactiveSource
                .map(event - >
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());

        subscribeToPublisher(messageFlux);
    }
}

用法如下所示

public class MainFlow {
  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .channel(outputChannel)
        .get();
  }
}

或者以声明方式

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
        .handle(outputChannel)
        .get();
  }
}

或者即使没有通道适配器,我们也可以始终使用 Java DSL,如下所示

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
    Flux<Message<?>> myFlux = this.customReactiveSource
                .map(event ->
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());
     return IntegrationFlow.from(myFlux)
        .handle(outputChannel)
        .get();
  }
}

响应式出站通道适配器实现是关于根据目标协议提供的响应式 API,启动(或继续)与外部系统交互的响应式流。入站载荷本身可以是响应式类型,或者作为整个集成流的一个事件,该集成流是上层响应式流的一部分。返回的响应式类型如果是单向的、即发即弃的场景,可以立即订阅;或者,它会向下游传播(请求-回复场景),以进行进一步的集成流处理或目标业务逻辑中的显式订阅,但仍然保持响应式流的语义向下游传播。

一个响应式出站通道适配器的示例

public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {

    private final CustomEntityOperations customEntityOperations;

    public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
        Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
        this.customEntityOperations = customEntityOperations;
    }

    @Override
    protected Mono<Void> handleMessageInternal(Message<?> message) {
        return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
                .flatMap(mode -> {
                    switch (mode) {
                        case INSERT:
                            return handleInsert(message);
                        case UPDATE:
                            return handleUpdate(message);
                        default:
                            return Mono.error(new IllegalArgumentException());
                    }
                }).then();
    }

    private Mono<Void> handleInsert(Message<?> message) {
        return this.customEntityOperations.insert(message.getPayload())
                .then();
    }

    private Mono<Void> handleUpdate(Message<?> message) {
        return this.r2dbcEntityOperations.update(message.getPayload())
                .then();
    }

    public enum Type {
        INSERT,
        UPDATE,
    }
}

我们将能够同时使用这两种通道适配器

public class MainFlow {

  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Autowired
  private CustomReactiveMessageHandler customReactiveMessageHandler;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .transform(someOperation)
        .handle(customReactiveMessageHandler)
        .get();
  }
}

目前,Spring Integration 为 WebFlux, RSocket, MongoDb, R2DBC, ZeroMQ, GraphQL, Apache Cassandra 提供了通道适配器(或网关)实现。Redis Stream 通道适配器 也是响应式的,并使用 Spring Data 的 ReactiveStreamOperations。更多响应式通道适配器正在开发中,例如基于 Spring for Apache KafkaReactiveKafkaProducerTemplateReactiveKafkaConsumerTemplateApache Kafka 通道适配器等。对于许多其他非响应式通道适配器,建议使用线程池以避免在响应式流处理过程中阻塞。

响应式到命令式上下文传播

Context Propagation 库位于类路径上时,Project Reactor 可以获取 ThreadLocal 值(例如 Micrometer ObservationSecurityContextHolder),并将它们存储到 Subscriber 上下文。反向操作也是可能的,当我们需要填充日志 MDC 以进行跟踪,或者让我们在响应式流中调用的服务从作用域中恢复观察时。有关上下文传播的特殊操作符的更多信息,请参阅 Project Reactor 文档。如果我们的整个解决方案是一个单一的响应式流组合,存储和恢复上下文可以顺利进行,因为 Subscriber 上下文从下游向上游直到组合的开始(FluxMono)都是可见的。但是,如果应用程序在不同的 Flux 实例之间切换,或者切换到命令式处理再切换回来,那么与 Subscriber 绑定的上下文可能不可用。对于这种情况,Spring Integration 提供了一项额外功能(从版本 6.0.5 开始),可以将 Reactor ContextView 存储到响应式流生成的 IntegrationMessageHeaderAccessor.REACTOR_CONTEXT 消息头中,例如当我们执行直接的 send() 操作时。然后,此消息头会在 FluxMessageChannel.subscribeTo() 中使用,以恢复此通道将要发出的 Message 的 Reactor 上下文。目前,此消息头由 WebFluxInboundEndpointRSocketInboundGateway 组件填充,但可以在执行响应式到命令式集成的任何解决方案中使用。填充此消息头的逻辑如下:

return requestMono
        .flatMap((message) ->
                Mono.deferContextual((context) ->
                        Mono.just(message)
                                .handle((messageToSend, sink) ->
                                        send(messageWithReactorContextIfAny(messageToSend, context)))));
...

private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
    if (!context.isEmpty()) {
        return getMessageBuilderFactory()
                .fromMessage(message)
                .setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
                .build();
    }
    return message;
}

请注意,我们仍然需要使用 handle() 操作符来使 Reactor 从上下文中恢复 ThreadLocal 值。即使它作为消息头发送,框架也无法假设它是否会在下游恢复到 ThreadLocal 值。

要在另一个 FluxMono 组合中从 Message 恢复上下文,可以执行以下逻辑:

Mono.just(message)
        .handle((messageToHandle, sink) -> ...)
        .contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));