响应式流支持
Spring Integration 在框架的某些地方以及从不同方面提供了对 Reactive Streams 交互的支持。我们将在本文档中讨论其中大部分内容,并在必要时提供指向目标章节的相应链接以获取详细信息。
前言
概括地说,Spring Integration 扩展了 Spring 编程模型以支持众所周知的企业集成模式。Spring Integration 能够在基于 Spring 的应用程序中实现轻量级消息传递,并通过声明式适配器支持与外部系统的集成。Spring Integration 的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点分离,这对于生成可维护、可测试的代码至关重要。通过在目标应用程序中使用诸如message
、channel
和endpoint
之类的头等公民,可以在目标应用程序中实现此目标,这使我们能够构建集成流(管道),其中(在大多数情况下)一个端点将消息发送到通道,以便另一个端点使用。通过这种方式,我们将集成交互模型与目标业务逻辑区分开来。这里至关重要的部分是中间的通道:流行为取决于其实现,而端点保持不变。
另一方面,Reactive Streams 是一种用于异步流处理的标准,具有非阻塞的反压机制。Reactive Streams 的主要目标是控制跨异步边界的流数据交换——例如将元素传递到另一个线程或线程池——同时确保接收方不会被迫缓冲任意数量的数据。换句话说,反压是此模型中不可或缺的一部分,以便允许在线程之间进行调解的队列具有边界。Reactive Streams 实现(例如 Project Reactor)的目的是在流应用程序的整个处理图中保留这些好处和特性。Reactive Streams 库的最终目标是以透明且流畅的方式为目标应用程序提供类型、运算符集和支持 API,就像使用可用的编程语言结构一样,但最终的解决方案不如使用普通的函数链调用那样具有命令性。它分为两个阶段:定义和执行,后者在稍后订阅最终的反应式发布者时发生,并且对数据的需求从定义的底部推送到顶部,根据需要应用反压——我们请求我们目前可以处理的尽可能多的事件。反应式应用程序看起来像一个"流"
,或者像我们在 Spring Integration 术语中习惯的那样——"流"
。事实上,自 Java 9 以来,Reactive Streams SPI 在java.util.concurrent.Flow
类中呈现。
从这里可以看出,当我们在端点上应用一些反应式框架运算符时,Spring Integration 流确实非常适合编写 Reactive Streams 应用程序,但实际上问题要广泛得多,我们需要记住并非所有端点(例如JdbcMessageHandler
)都可以透明地在线程中处理。当然,在 Spring Integration 中支持 Reactive Streams 的主要目标是允许整个过程完全反应式,按需启动并准备好反压。在目标通道适配器的协议和系统提供 Reactive Streams 交互模型之前,这将是不可能的。在下面的部分中,我们将描述 Spring Integration 中为开发反应式应用程序而提供的组件和方法,同时保留集成流结构。
Spring Integration 中的所有 Reactive Streams 交互都使用 Project Reactor 类型实现,例如Mono 和Flux 。 |
消息网关
与 Reactive Streams 交互的最简单点是@MessagingGateway
,我们只需将网关方法的返回类型设为Mono<?>
——当对返回的Mono
实例进行订阅时,将执行网关方法调用背后的整个集成流。有关更多信息,请参阅 Reactor Mono
。框架内部对入站网关使用类似的Mono
回复方法,这些网关完全基于与 Reactive Streams 兼容的协议(有关更多信息,请参阅下面的 反应式通道适配器)。发送和接收操作被包装到Mono.deffer()
中,并在replyChannel
标头可用时链接回复评估。通过这种方式,特定反应式协议(例如 Netty)的入站组件将作为 Spring Integration 上执行的反应式流的订阅者和发起者。如果请求有效负载是反应式类型,最好在反应式流定义中处理它,并将处理延迟到发起者订阅。为此,处理程序方法也必须返回反应式类型。有关更多信息,请参阅下一节。
反应式回复有效负载
当生成回复的MessageHandler
为回复消息返回反应式类型有效负载时,它将以异步方式使用为outputChannel
提供的常规MessageChannel
实现进行处理(async
必须设置为true
),并在输出通道为ReactiveStreamsSubscribableChannel
实现(例如FluxMessageChannel
)时使用按需订阅进行展平。对于标准的命令式MessageChannel
用例,如果回复有效负载是**多值**发布者(有关更多信息,请参阅ReactiveAdapter.isMultiValue()
),则将其包装到Mono.just()
中。因此,必须在下游显式订阅Mono
或由下游的FluxMessageChannel
展平。对于outputChannel
的ReactiveStreamsSubscribableChannel
,无需担心返回类型和订阅;框架内部会平滑地处理所有内容。
有关更多信息,请参阅 异步服务激活器。
另请参阅 Kotlin 协程以获取更多信息。
FluxMessageChannel
和ReactiveStreamsConsumer
FluxMessageChannel
是MessageChannel
和Publisher<Message<?>>
的组合实现。内部为接收来自send()
实现的传入消息创建了一个Flux
(作为热源)。Publisher.subscribe()
实现委托给该内部Flux
。此外,对于按需上游消费,FluxMessageChannel
为ReactiveStreamsSubscribableChannel
契约提供了实现。为此通道提供的任何上游Publisher
(例如,请参阅下面的源轮询通道适配器和拆分器)在该通道准备好订阅时会自动订阅。来自这些委托发布者的事件被发送到上面提到的内部Flux
中。
FluxMessageChannel
的使用者必须是org.reactivestreams.Subscriber
实例,以遵守 Reactive Streams 契约。幸运的是,Spring Integration 中的所有MessageHandler
实现也实现了来自 Project Reactor 的CoreSubscriber
。并且由于中间的ReactiveStreamsConsumer
实现,目标开发人员可以保持整个集成流配置的透明性。在这种情况下,流行为从命令式推送模型更改为反应式拉取模型。ReactiveStreamsConsumer
还可以用于使用IntegrationReactiveUtils
将任何MessageChannel
转换为反应式源,从而使集成流部分反应式。
有关更多信息,请参阅 FluxMessageChannel
。
从 5.5 版开始,ConsumerEndpointSpec
引入了reactive()
选项,以使流中的端点作为ReactiveStreamsConsumer
,而与输入通道无关。可以提供可选的Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
,以便通过Flux.transform()
操作(例如,使用publishOn()
、doOnNext()
、retry()
等)自定义来自输入通道的源Flux
。此功能表示为所有消息传递注释(@ServiceActivator
、@Splitter
等)的@Reactive
子注释,通过它们的reactive()
属性。
源轮询通道适配器
通常情况下,SourcePollingChannelAdapter
依赖于由 TaskScheduler
发起的任务。一个轮询触发器根据提供的选项构建,并用于定期调度一个任务来轮询数据或事件的目标源。当 outputChannel
是 ReactiveStreamsSubscribableChannel
时,使用相同的 Trigger
来确定下次执行时间,但不是调度任务,SourcePollingChannelAdapter
会基于 Flux.generate()
生成 Flux<Message<?>>
,用于 nextExecutionTime
值,并使用 Mono.delay()
来延迟上一步中的持续时间。然后使用 Flux.flatMapMany()
轮询 maxMessagesPerPoll
并将其发送到输出 Flux
中。此生成器 Flux
由提供的 ReactiveStreamsSubscribableChannel
订阅,并遵守下游的反压。从 5.5 版本开始,当 maxMessagesPerPoll == 0
时,源代码根本不会被调用,并且 flatMapMany()
会通过 Mono.empty()
结果立即完成,直到 maxMessagesPerPoll
在稍后时间更改为非零值,例如通过控制总线。这样,任何 MessageSource
实现都可以转换为反应式热源。
有关更多信息,请参见轮询消费者。
事件驱动通道适配器
MessageProducerSupport
是事件驱动通道适配器的基类,通常,其 sendMessage(Message<?>)
用作生产驱动程序 API 中的侦听器回调。当消息生产者实现构建消息的 Flux
而不是基于侦听器的方法时,此回调也可以轻松地插入到 doOnNext()
Reactor 运算符中。实际上,当消息生产者的 outputChannel
不是 ReactiveStreamsSubscribableChannel
时,框架中会执行此操作。但是,为了改善最终用户体验并允许更多反压就绪功能,MessageProducerSupport
提供了一个 subscribeToPublisher(Publisher<? extends Message<?>>)
API,在目标实现中使用,当 Publisher<Message<?>>>
是目标系统数据源时。通常,它在 doStart()
实现中使用,当目标驱动程序 API 被调用以获取源数据的 Publisher
时。建议将反应式 MessageProducerSupport
实现与 FluxMessageChannel
结合作为 outputChannel
,以便按需订阅和下游事件消耗。当对 Publisher
的订阅取消时,通道适配器进入停止状态。对这种通道适配器调用 stop()
会完成来自源 Publisher
的生产。通道适配器可以重新启动并自动订阅新创建的源 Publisher
。
消息源到反应式流
从 5.3 版本开始,提供了 ReactiveMessageSourceProducer
。它结合了提供的 MessageSource
和事件驱动的生产到配置的 outputChannel
中。在内部,它将 MessageSource
包装到重复重新订阅的 Mono
中,生成一个 Flux<Message<?>>
,以便在上面提到的 subscribeToPublisher(Publisher<? extends Message<?>>)
中订阅。此 Mono
的订阅使用 Schedulers.boundedElastic()
完成,以避免目标 MessageSource
中可能发生的阻塞。当消息源返回 null
(没有要拉取的数据)时,Mono
会转换为 repeatWhenEmpty()
状态,并根据订阅者上下文中 IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY
Duration
条目延迟后续重新订阅。默认情况下,为 1 秒。如果 MessageSource
使用标头中的 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
信息生成消息,则在原始 Mono
的 doOnSuccess()
中确认(如果需要),并在 doOnError()
中拒绝,如果下游流抛出带有要拒绝的失败消息的 MessagingException
。此 ReactiveMessageSourceProducer
可用于任何用例,在这些用例中,轮询通道适配器的功能应转换为任何现有 MessageSource<?>
实现的反应式按需解决方案。
拆分器和聚合器
当 AbstractMessageSplitter
获取其逻辑的 Publisher
时,该过程会自然地遍历 Publisher
中的项目,将其映射为消息以发送到 outputChannel
。如果此通道是 ReactiveStreamsSubscribableChannel
,则按需从该通道订阅 Publisher
的 Flux
包装器,并且此拆分器的行为更像是 flatMap
Reactor 运算符,当我们将传入事件映射到多值输出 Publisher
时。当整个集成流在拆分器之前和之后使用 FluxMessageChannel
构建时,这最有意义,使 Spring Integration 配置与反应式流要求及其事件处理运算符保持一致。使用常规通道,Publisher
会转换为 Iterable
,用于标准的迭代和生成拆分逻辑。
FluxAggregatorMessageHandler
是特定反应式流逻辑实现的另一个示例,在 Project Reactor 方面可以将其视为 “反应式运算符”
。它基于 Flux.groupBy()
和 Flux.window()
(或 buffer()
)运算符。传入的消息被发送到在创建 FluxAggregatorMessageHandler
时启动的 Flux.create()
中,使其成为热源。此 Flux
按需由 ReactiveStreamsSubscribableChannel
订阅,或者当 outputChannel
不是反应式时,直接在 FluxAggregatorMessageHandler.start()
中订阅。当整个集成流在该组件之前和之后使用 FluxMessageChannel
构建时,此 MessageHandler
具有其强大功能,使整个逻辑能够准备好反压。
有关更多信息,请参见流和 Flux 拆分 和 Flux 聚合器。
Java DSL
Java DSL 中的 IntegrationFlow
可以从任何 Publisher
实例开始(参见 IntegrationFlow.from(Publisher<Message<T>>)
)。此外,使用 IntegrationFlowBuilder.toReactivePublisher()
运算符,IntegrationFlow
可以转换为反应式热源。在这两种情况下,内部都使用 FluxMessageChannel
;它可以根据其 ReactiveStreamsSubscribableChannel
契约订阅传入 Publisher
,并且它本身对于下游订阅者来说是 Publisher<Message<?>>
。通过动态 IntegrationFlow
注册,我们可以实现一个强大的逻辑,将反应式流与此集成流桥接到/从 Publisher
连接。
从 5.5.6 版本开始,存在 toReactivePublisher(boolean autoStartOnSubscribe)
运算符变体来控制返回的 Publisher<Message<?>>
后面的整个 IntegrationFlow
的生命周期。通常,对反应式发布者的订阅和消费发生在稍后的运行时阶段,而不是在反应式流组合期间,甚至不是在 ApplicationContext
启动期间。为了避免在 Publisher<Message<?>>
订阅点进行 IntegrationFlow
生命周期管理的样板代码并改善最终用户体验,引入了带有 autoStartOnSubscribe
标志的这个新运算符。它(如果为 true
)将 IntegrationFlow
及其组件标记为 autoStartup = false
,因此 ApplicationContext
不会自动启动流中的消息生产和消费。相反,IntegrationFlow
的 start()
从内部 Flux.doOnSubscribe()
启动。独立于 autoStartOnSubscribe
值,流从 Flux.doOnCancel()
和 Flux.doOnTerminate()
停止 - 如果没有要消费的消息,则生成消息没有意义。
对于完全相反的用例,当 IntegrationFlow
应该调用反应式流并在完成之后继续时,在 IntegrationFlowDefinition
中提供了 fluxTransform()
运算符。此时流转换为 FluxMessageChannel
,该通道传播到提供的 fluxFunction
中,在 Flux.transform()
运算符中执行。函数的结果被包装到 Mono<Message<?>>
中,以便扁平映射到输出 Flux
中,该输出 Flux
由另一个 FluxMessageChannel
订阅以进行下游流。
有关更多信息,请参见Java DSL 章节。
ReactiveMessageHandler
从 5.3 版本开始,框架本机支持 ReactiveMessageHandler
。此类型的消息处理程序专为反应式客户端设计,这些客户端返回反应式类型以按需订阅低级操作执行,并且不提供任何回复数据来继续反应式流组合。当 ReactiveMessageHandler
用于命令式集成流时,handleMessage()
的结果在返回后立即订阅,仅仅是因为此类流中没有反应式流组合来遵守反压。在这种情况下,框架会将此 ReactiveMessageHandler
包装到 ReactiveMessageHandlerAdapter
中 - MessageHandler
的普通实现。但是,当 ReactiveStreamsConsumer
参与流时(例如,当要消费的通道是 FluxMessageChannel
时),此类 ReactiveMessageHandler
会与整个反应式流组合使用 flatMap()
Reactor 运算符,以便在消费期间遵守反压。
ReactiveMongoDbStoringMessageHandler
是开箱即用的 ReactiveMessageHandler
实现之一,用于出站通道适配器。有关更多信息,请参见MongoDB 反应式通道适配器。
从 6.1 版本开始,IntegrationFlowDefinition
公开了方便的 handleReactive(ReactiveMessageHandler)
终端运算符。任何 ReactiveMessageHandler
实现(即使只是使用 Mono
API 的普通 lambda)都可以用于此运算符。框架会自动订阅返回的 Mono<Void>
。以下是此运算符可能的配置示例
@Bean
public IntegrationFlow wireTapFlow1() {
return IntegrationFlow.from("tappedChannel1")
.wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
.handleReactive((message) -> Mono.just(message).log().then());
}
此运算符的重载版本接受 Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>>
以自定义围绕提供的 ReactiveMessageHandler
的消费者端点。
此外,还提供了基于 ReactiveMessageHandlerSpec
的变体。在大多数情况下,它们用于特定于协议的通道适配器实现。请参阅下一节,其中包含指向具有相应反应式通道适配器的目标技术的链接。
反应式通道适配器
当用于集成的目标协议提供反应式流解决方案时,在 Spring Integration 中实现通道适配器变得非常简单。
入站、事件驱动的通道适配器实现是关于将请求(如果需要)包装到延迟的 Mono
或 Flux
中,并且仅当协议组件启动对侦听器方法返回的 Mono
的订阅时执行发送(以及生成回复,如果有)。这样,我们就有了在此组件中完全封装的反应式流解决方案。当然,输出通道上订阅的下游集成流应该遵守反应式流规范,并以按需、反压就绪的方式执行。
这并非总是由集成流中使用的 MessageHandler
处理器的性质(或当前实现)提供。当没有反应式实现时,可以使用线程池和队列或 FluxMessageChannel
(见上文)处理此限制,在集成端点之前和之后。
反应式**事件驱动**入站通道适配器的示例
public class CustomReactiveMessageProducer extends MessageProducerSupport {
private final CustomReactiveSource customReactiveSource;
public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
this.customReactiveSource = customReactiveSource;
}
@Override
protected void doStart() {
Flux<Message<?>> messageFlux =
this.customReactiveSource
.map(event - >
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
subscribeToPublisher(messageFlux);
}
}
用法如下所示
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.channel(outputChannel)
.get();
}
}
或者以声明方式
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
.handle(outputChannel)
.get();
}
}
或者即使没有通道适配器,我们也可以始终以以下方式使用 Java DSL
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
Flux<Message<?>> myFlux = this.customReactiveSource
.map(event ->
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
return IntegrationFlow.from(myFlux)
.handle(outputChannel)
.get();
}
}
响应式出站通道适配器的实现是关于根据目标协议提供的响应式 API 启动(或继续)与外部系统交互的响应式流。入站有效负载本身可以是响应式类型,也可以是整个集成流的事件,该事件是顶层响应式流的一部分。如果我们处于单向、fire-and-forget 场景中,则可以立即订阅返回的响应式类型,或者将其向下游传播(请求-回复场景)以进行进一步的集成流或目标业务逻辑中的显式订阅,但仍然向下游保留响应式流语义。
响应式出站通道适配器的示例
public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {
private final CustomEntityOperations customEntityOperations;
public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
this.customEntityOperations = customEntityOperations;
}
@Override
protected Mono<Void> handleMessageInternal(Message<?> message) {
return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
.flatMap(mode -> {
switch (mode) {
case INSERT:
return handleInsert(message);
case UPDATE:
return handleUpdate(message);
default:
return Mono.error(new IllegalArgumentException());
}
}).then();
}
private Mono<Void> handleInsert(Message<?> message) {
return this.customEntityOperations.insert(message.getPayload())
.then();
}
private Mono<Void> handleUpdate(Message<?> message) {
return this.r2dbcEntityOperations.update(message.getPayload())
.then();
}
public enum Type {
INSERT,
UPDATE,
}
}
我们将能够使用这两个通道适配器
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Autowired
private CustomReactiveMessageHandler customReactiveMessageHandler;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.transform(someOperation)
.handle(customReactiveMessageHandler)
.get();
}
}
目前,Spring Integration 为WebFlux、RSocket、MongoDb、R2DBC、ZeroMQ、GraphQL、Apache Cassandra提供了通道适配器(或网关)实现。Redis Stream Channel Adapters也是响应式的,并使用来自 Spring Data 的ReactiveStreamOperations
。更多响应式通道适配器即将推出,例如基于Spring for Apache Kafka中的ReactiveKafkaProducerTemplate
和ReactiveKafkaConsumerTemplate
的Apache Kafka Kafka。对于许多其他非响应式通道适配器,建议使用线程池以避免在响应式流处理期间阻塞。
响应式到命令式上下文传播
当Context Propagation库位于类路径上时,Project Reactor 可以获取ThreadLocal
值(例如Micrometer Observation或SecurityContextHolder
)并将它们存储到Subscriber
上下文中。相反的操作也是可能的,当我们需要填充日志记录 MDC 以进行跟踪或让从响应式流调用的服务从作用域恢复观察时。有关其上下文传播特殊运算符的更多信息,请参阅 Project Reactor 文档。如果我们的整个解决方案是单个响应式流组合,则存储和恢复上下文可以顺利工作,因为Subscriber
上下文从下游到组合(Flux
或Mono
)的开头都是可见的。但是,如果应用程序在不同的Flux
实例之间或在命令式处理和返回之间切换,则与Subscriber
绑定的上下文可能不可用。对于这种用例,Spring Integration 提供了一种额外的功能(从版本6.0.5
开始),可以将 Reactor ContextView
存储到从响应式流生成的IntegrationMessageHeaderAccessor.REACTOR_CONTEXT
消息头中,例如,当我们执行直接send()
操作时。然后,此标头用于FluxMessageChannel.subscribeTo()
以恢复此通道将要发出的Message
的 Reactor 上下文。目前,此标头是从WebFluxInboundEndpoint
和RSocketInboundGateway
组件填充的,但可以在执行响应式到命令式集成的任何解决方案中使用。填充此标头的逻辑如下
return requestMono
.flatMap((message) ->
Mono.deferContextual((context) ->
Mono.just(message)
.handle((messageToSend, sink) ->
send(messageWithReactorContextIfAny(messageToSend, context)))));
...
private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
if (!context.isEmpty()) {
return getMessageBuilderFactory()
.fromMessage(message)
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
.build();
}
return message;
}
请注意,我们仍然需要使用handle()
运算符才能使 Reactor 从上下文中恢复ThreadLocal
值。即使它作为标头发送,框架也不能假设它是否将被恢复到下游的ThreadLocal
值。
要从另一个Flux
或Mono
组合上的Message
恢复上下文,可以执行此逻辑
Mono.just(message)
.handle((messageToHandle, sink) -> ...)
.contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));