MongoDB 支持

2.1 版本引入了对 MongoDB 的支持:一个“高性能、开源、面向文档的数据库”。

您需要在项目中包含此依赖项

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mongodb</artifactId>
    <version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-mongodb:6.4.4"

要下载、安装和运行 MongoDB,请参阅 MongoDB 文档

连接到 MongoDB

阻塞还是响应式?

从 5.3 版本开始,Spring Integration 提供了对响应式 MongoDB 驱动程序的支持,以便在访问 MongoDB 时启用非阻塞 I/O。要启用响应式支持,请将 MongoDB 响应式流驱动程序添加到您的依赖项中

  • Maven

  • Gradle

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-reactivestreams"

对于常规同步客户端,您需要将相应的驱动程序添加到依赖项中

  • Maven

  • Gradle

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-sync</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-sync"

在框架中,两者都是可选的 (optional),以更好地支持最终用户的选择。

要开始与 MongoDB 交互,您首先需要连接到它。Spring Integration 构建在另一个 Spring 项目 Spring Data MongoDB 提供的支持之上。它提供了名为 MongoDatabaseFactoryReactiveMongoDatabaseFactory 的工厂类,它们简化了与 MongoDB 客户端 API 的集成。

Spring Data 默认提供阻塞式 MongoDB 驱动程序,但您可以通过引入上述依赖项来选择使用响应式方式。

使用 MongoDatabaseFactory

要连接到 MongoDB,您可以使用 MongoDatabaseFactory 接口的一个实现。

以下示例展示了如何使用 SimpleMongoClientDatabaseFactory

  • Java

  • XML

MongoDatabaseFactory mongoDbFactory =
        new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

SimpleMongoClientDatabaseFactory 接受两个参数:一个 MongoClient 实例和一个指定数据库名称的 String。如果您需要配置 hostport 等属性,可以使用底层 MongoClients 类提供的构造函数之一进行传递。有关如何配置 MongoDB 的更多信息,请参阅 Spring-Data-MongoDB 参考文档。

使用 ReactiveMongoDatabaseFactory

要使用响应式驱动程序连接到 MongoDB,您可以使用 ReactiveMongoDatabaseFactory 接口的一个实现。

以下示例展示了如何使用 SimpleReactiveMongoDatabaseFactory

  • Java

  • XML

ReactiveMongoDatabaseFactory mongoDbFactory =
        new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

MongoDB 消息存储

正如《企业集成模式》(EIP) 一书中所述,消息存储 允许您持久化消息。如果可靠性是关注点,这在处理具有消息缓冲能力的组件(如 QueueChannelaggregatorresequencer 等)时非常有用。在 Spring Integration 中,MessageStore 策略也为 EIP 中描述的 存根(claim check) 模式提供了基础。

Spring Integration 的 MongoDB 模块提供了 MongoDbMessageStore,它是 MessageStore 策略(主要用于存根模式)和 MessageGroupStore 策略(主要用于聚合器和重排序器模式)的实现。

以下示例配置了一个 MongoDbMessageStore 以使用 QueueChannel 和一个 aggregator

<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="mongoDbMessageStore"/>
<int:channel>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="mongoDbMessageStore"/>

上述示例是一个简单的 Bean 配置,它期望一个 MongoDbFactory 作为构造函数参数。

MongoDbMessageStore 使用 Spring Data Mongo 映射机制将 Message 作为 Mongo 文档展开,包含所有嵌套属性。当您需要访问存储消息的 payloadheaders 进行审计或分析时,这非常有用。

MongoDbMessageStore 使用自定义的 MappingMongoConverter 实现将 Message 实例存储为 MongoDB 文档,并且对 Message 的属性(payloadheader 值)有一些限制。

从 5.1.6 版本开始,MongoDbMessageStore 可以配置自定义转换器,这些转换器会传播到内部的 MappingMongoConverter 实现中。有关更多信息,请参阅 MongoDbMessageStore.setCustomConverters(Object…​ customConverters) 的 JavaDocs。

Spring Integration 3.0 引入了 ConfigurableMongoDbMessageStore。它实现了 MessageStoreMessageGroupStore 接口。此类可以接受一个 MongoTemplate 作为构造函数参数,您可以使用它来配置自定义的 WriteConcern 等。另一个构造函数需要一个 MappingMongoConverter 和一个 MongoDbFactory,这允许您为 Message 实例及其属性提供一些自定义转换。请注意,默认情况下,ConfigurableMongoDbMessageStore 使用标准的 Java 序列化来向 MongoDB 写入和读取 Message 实例(参见 MongoDbMessageBytesConverter),并依赖 MongoTemplate 的其他属性的默认值。它从提供的 MongoDbFactoryMappingMongoConverter 构建一个 MongoTemplateConfigurableMongoDbMessageStore 存储的集合的默认名称是 configurableStoreMessages。我们建议在消息包含复杂数据类型时使用此实现来创建健壮且灵活的解决方案。

从 6.0.8 版本开始,AbstractConfigurableMongoDbMessageStore 提供了一个 setCreateIndexes(boolean) 选项(默认为 true),可用于禁用自动索引创建。以下示例展示了如何声明一个 Bean 并禁用自动索引创建

@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory) {
    MongoDbChannelMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndexes(false);
    return mongoDbChannelMessageStore;
}

MongoDB 通道消息存储

4.0 版本引入了新的 MongoDbChannelMessageStore。它是一个优化的 MessageGroupStore,用于 QueueChannel 实例。通过设置 priorityEnabled = true,您可以在 <int:priority-queue> 实例中使用它,以实现持久化消息的优先级顺序轮询。MongoDB 文档中的 priority 字段填充自 IntegrationMessageHeaderAccessor.PRIORITY (priority) 消息头。

此外,所有 MongoDB MessageStore 实例现在都有一个用于 MessageGroup 文档的 sequence 字段。sequence 值是同一集合中一个简单 sequence 文档的 $inc 操作结果,该文档按需创建。sequence 字段用于 poll 操作,以在消息存储在同一毫秒内时提供先进先出(FIFO)的消息顺序(如果在配置中指定了优先级,则在优先级内排序)。

我们不建议对优先级和非优先级使用同一个 MongoDbChannelMessageStore Bean,因为 priorityEnabled 选项适用于整个存储。然而,同一个 collection 可以用于两种 MongoDbChannelMessageStore 类型,因为从存储中轮询消息是排序并使用索引的。要配置这种情况,您可以从另一个消息存储 Bean 扩展一个,如下面的示例所示
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
    <constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="store"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

使用 AbstractConfigurableMongoDbMessageStore 并禁用自动索引创建

从 6.0.8 版本开始,AbstractConfigurableMongoDbMessageStore 实现了一个 setCreateIndex(boolean) 方法,可用于禁用或启用(默认为启用)自动索引创建。以下示例展示了如何声明一个 Bean 并禁用自动索引创建

@Bean
public AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory)
{
    AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndex(false);

    return mongoDbChannelMessageStore;
}

MongoDB 元数据存储

Spring Integration 4.2 引入了一个新的基于 MongoDB 的 MetadataStore(参见 元数据存储)实现。您可以使用 MongoDbMetadataStore 在应用程序重启后维护元数据状态。您可以将这个新的 MetadataStore 实现与以下适配器一起使用,例如

要指示这些适配器使用新的 MongoDbMetadataStore,请声明一个 Bean 名称为 metadataStore 的 Spring Bean。feed 入站通道适配器会自动拾取并使用声明的 MongoDbMetadataStore。以下示例展示了如何声明一个 Bean 名称为 metadataStore 的 Bean

@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
    return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}

MongoDbMetadataStore 还实现了 ConcurrentMetadataStore,使其能够可靠地在多个应用程序实例之间共享,其中只有一个实例被允许存储或修改键的值。由于 MongoDB 的保证,所有这些操作都是原子性的。

MongoDB 入站通道适配器

MongoDB 入站通道适配器是一个轮询消费者,它从 MongoDB 读取数据并将其作为 Message 的 payload 发送。以下示例展示了如何配置 MongoDB 入站通道适配器

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
       channel="replyChannel"
       query="{'name' : 'Bob'}"
       entity-class="java.lang.Object"
       auto-startup="false">
		<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>

如前述配置所示,您可以通过使用 inbound-channel-adapter 元素并为各种属性提供值来配置 MongoDB 入站通道适配器,例如

  • query: 一个 JSON 查询(参见 MongoDB 查询

  • query-expression: 一个 SpEL 表达式,评估结果为一个 JSON 查询字符串(如上面 query 属性所示)或 o.s.data.mongodb.core.query.Query 的一个实例。与 query 属性互斥。

  • entity-class: Payload 对象的类型。如果未提供,则返回一个 com.mongodb.DBObject

  • collection-namecollection-name-expression: 指定要使用的 MongoDB 集合的名称。

  • mongodb-factory: 对 o.s.data.mongodb.MongoDbFactory 实例的引用

  • mongo-template: 对 o.s.data.mongodb.core.MongoTemplate 实例的引用

  • 其他适用于所有入站适配器的通用属性(如 'channel')。

您不能同时设置 mongo-templatemongodb-factory

上述示例相对简单和静态,因为它对 query 使用字面值,并对 collection 使用默认名称。有时,您可能需要在运行时根据某些条件更改这些值。为此,可以使用它们的 -expression 等效项(query-expressioncollection-name-expression),其中提供的表达式可以是任何有效的 SpEL 表达式。

此外,您可能希望对从 MongoDB 读取并成功处理的数据进行一些后处理。例如,您可能希望在文档处理后移动或删除它。您可以通过使用 Spring Integration 2.2 添加的事务同步功能来实现这一点,如下面的示例所示

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
    channel="replyChannel"
    query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
    entity-class="java.lang.Object"
    auto-startup="false">
        <int:poller fixed-rate="200" max-messages-per-poll="1">
            <int:transactional synchronization-factory="syncFactory"/>
        </int:poller>
</int-mongodb:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit
        expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
        channel="someChannel"/>
</int:transaction-synchronization-factory>

<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>

以下示例展示了前述示例中引用的 DocumentCleaner

public class DocumentCleaner {
    public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
        if (target instanceof List<?> documents){
            for (Object document : documents) {
                mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
            }
        }
    }
}

您可以使用 transactional 元素将轮询器声明为事务性的。此元素可以引用一个真正的事务管理器(例如,如果您的流的其他部分调用 JDBC)。如果您没有“真正的”事务,您可以使用 o.s.i.transaction.PseudoTransactionManager 的实例,它是 Spring PlatformTransactionManager 的一个实现,并在没有实际事务时启用 Mongo 适配器的事务同步功能。

这样做并不会使 MongoDB 本身具有事务性。它允许在成功(提交)之前或之后,或在失败(回滚)之后进行操作同步。

一旦您的轮询器是事务性的,您可以在 transactional 元素上设置一个 o.s.i.transaction.TransactionSynchronizationFactory 的实例。TransactionSynchronizationFactory 创建一个 TransactionSynchronization 的实例。为了方便起见,我们公开了一个默认的基于 SpEL 的 TransactionSynchronizationFactory,它允许您配置 SpEL 表达式,其执行与事务协调(同步)。支持 before-commit、after-commit 和 after-rollback 事件的表达式,并为每个事件提供一个通道,评估结果(如果有)将发送到该通道。对于每个子元素,您可以指定 expressionchannel 属性。如果只存在 channel 属性,则接收到的消息将作为特定同步场景的一部分发送到那里。如果只存在 expression 属性且表达式结果为非空值,则会生成一个以结果为 payload 的消息并发送到默认通道(NullChannel),并出现在日志中(DEBUG 级别)。如果您希望评估结果发送到特定通道,请添加一个 channel 属性。如果表达式结果为 null 或 void,则不会生成消息。

有关事务同步的更多信息,请参阅 事务同步

从 5.5 版本开始,MongoDbMessageSource 可以配置一个 updateExpression,它必须评估为一个具有 MongoDB update 语法的 Stringorg.springframework.data.mongodb.core.query.Update 的一个实例。它可以用作上述后处理过程的替代方案,它修改从集合中取出的实体,以便在下一个轮询周期中不会再次从集合中取出它们(假设更新改变了查询中使用的某个值)。当同一个集合的多个 MongoDbMessageSource 实例在集群中使用时,仍然建议使用事务来实现执行隔离和数据一致性。

MongoDB 变更流入站通道适配器

从 5.3 版本开始,spring-integration-mongodb 模块引入了 MongoDbChangeStreamMessageProducer - 这是 Spring Data ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class) API 的一个响应式 MessageProducerSupport 实现。该组件默认生成一个消息的 Flux 流,其中 payload 为 ChangeStreamEventbody,并包含一些变更流相关的 headers(参见 MongoHeaders)。建议将此 MongoDbChangeStreamMessageProducer 与一个 FluxMessageChannel 结合作为 outputChannel,用于按需订阅和下游事件消费。

此通道适配器的 Java DSL 配置可能如下所示

@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
    return IntegrationFlow.from(
            MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
                    .domainType(Person.class)
                    .collection("person")
                    .extractBody(false))
            .channel(MessageChannels.flux())
            .get();
}

MongoDbChangeStreamMessageProducer 停止,或下游取消订阅,或 MongoDB 变更流生成 OperationType.INVALIDATE 时,Publisher 会完成。通道适配器可以再次启动,并创建一个新的源数据 Publisher,并在 MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>) 中自动订阅。如果在启动之间需要从其他位置消费变更流事件,可以重新配置此通道适配器的新选项。

有关 Spring Data MongoDB 中变更流支持的更多信息,请参阅文档

MongoDB 出站通道适配器

MongoDB 出站通道适配器允许您将消息 payload 写入 MongoDB 文档存储,如下面的示例所示

<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
	collection-name="myCollection"
	mongo-converter="mongoConverter"
	mongodb-factory="mongoDbFactory" />

如前述配置所示,您可以通过使用 outbound-channel-adapter 元素并为各种属性提供值来配置 MongoDB 出站通道适配器,例如

  • collection-namecollection-name-expression: 指定要使用的 MongoDB 集合的名称。

  • mongo-converter: 对 o.s.data.mongodb.core.convert.MongoConverter 实例的引用,用于协助将原始 Java 对象转换为 JSON 文档表示形式。

  • mongodb-factory: 对 o.s.data.mongodb.MongoDbFactory 实例的引用。

  • mongo-template: 对 o.s.data.mongodb.core.MongoTemplate 实例的引用。注意:您不能同时设置 mongo-template 和 mongodb-factory。

  • 其他适用于所有入站适配器的通用属性(如 'channel')。

上述示例相对简单和静态,因为它对 collection-name 使用字面值。有时,您可能需要在运行时根据某些条件更改此值。为此,可以使用 collection-name-expression,其中提供的表达式可以是任何有效的 SpEL 表达式。

MongoDB 出站网关

5.0 版本引入了 MongoDB 出站网关。它允许您通过向其请求通道发送消息来查询数据库。然后网关将响应发送到回复通道。您可以使用消息 payload 和 headers 来指定查询和集合名称,如下面的示例所示

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory;

    @Autowired
    private MongoConverter;


    @Bean
    public IntegrationFlow gatewaySingleQueryFlow() {
        return f -> f
                .handle(queryOutboundGateway())
                .channel(c -> c.queue("retrieveResults"));
    }

    private MongoDbOutboundGatewaySpec queryOutboundGateway() {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
                .query("{name : 'Bob'}")
                .collectionNameFunction(m -> m.getHeaders().get("collection"))
                .expectSingleResult(true)
                .entityClass(Person.class);
    }

}
class MongoDbKotlinApplication {

    fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)

    @Autowired
    lateinit var mongoDbFactory: MongoDatabaseFactory

    @Autowired
    lateinit var mongoConverter: MongoConverter

    @Bean
    fun gatewaySingleQueryFlow() =
    integrationFlow {
        handle(queryOutboundGateway())
        channel { queue("retrieveResults") }
    }

    private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .query("{name : 'Bob'}")
            .collectionNameFunction<Any> { m -> m.headers["collection"] as String }
            .expectSingleResult(true)
            .entityClass(Person::class.java)
    }

}
@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory mongoDbFactory;

    @Bean
    @ServiceActivator(inputChannel = "requestChannel")
    public MessageHandler mongoDbOutboundGateway() {
        MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
        gateway.setCollectionNameExpressionString("'myCollection'");
        gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
        gateway.setExpectSingleResult(true);
        gateway.setEntityClass(Person.class);
        gateway.setOutputChannelName("replyChannel");
        return gateway;
    }

    @Bean
    @ServiceActivator(inputChannel = "replyChannel")
    public MessageHandler handler() {
        return message -> System.out.println(message.getPayload());
    }
}
<int-mongodb:outbound-gateway id="gatewayQuery"
    mongodb-factory="mongoDbFactory"
    mongo-converter="mongoConverter"
    query="{firstName: 'Bob'}"
    collection-name="myCollection"
    request-channel="in"
    reply-channel="out"
    entity-class="org.springframework.integration.mongodb.test.entity$Person"/>

您可以与 MongoDB 出站网关一起使用以下属性

  • collection-namecollection-name-expression: 指定要使用的 MongoDB 集合的名称。

  • mongo-converter: 对 o.s.data.mongodb.core.convert.MongoConverter 实例的引用,用于协助将原始 Java 对象转换为 JSON 文档表示形式。

  • mongodb-factory: 对 o.s.data.mongodb.MongoDbFactory 实例的引用。

  • mongo-template: 对 o.s.data.mongodb.core.MongoTemplate 实例的引用。注意:您不能同时设置 mongo-templatemongodb-factory

  • entity-class: 要传递给 MongoTemplate 中 find(..)findOne(..) 方法的实体类的完全限定名。如果未提供此属性,默认值为 org.bson.Document

  • queryquery-expression: 指定 MongoDB 查询。有关更多查询示例,请参阅 MongoDB 文档

  • collection-callback: 对 org.springframework.data.mongodb.core.CollectionCallback 实例的引用。自 5.0.11 版本起,推荐使用带有请求消息上下文的 o.s.i.mongodb.outbound.MessageCollectionCallback 实例。有关更多信息,请参阅其 Javadocs。注意:您不能同时拥有 collection-callback 和任何查询属性。

作为 queryquery-expression 属性的替代方案,您可以使用 collectionCallback 属性作为对 MessageCollectionCallback 函数式接口实现的引用来指定其他数据库操作。以下示例指定了一个计数操作

private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
    return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .collectionCallback((collection, requestMessage) -> collection.count())
            .collectionName("myCollection");
}

MongoDB 响应式通道适配器

从 5.3 版本开始,提供了 ReactiveMongoDbStoringMessageHandlerReactiveMongoDbMessageSource 实现。它们基于 Spring Data 的 ReactiveMongoOperations,并需要 org.mongodb:mongodb-driver-reactivestreams 依赖项。

ReactiveMongoDbStoringMessageHandlerReactiveMessageHandler 的实现,当集成流定义涉及响应式流组合时,框架原生支持它。有关更多信息,请参阅 ReactiveMessageHandler

从配置角度来看,它与许多其他标准通道适配器没有区别。例如,使用 Java DSL,此类通道适配器可以这样使用

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return f -> f
            .channel(MessageChannels.flux())
            .handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}

在此示例中,我们将通过提供的 ReactiveMongoDatabaseFactory 连接到 MongoDB,并将请求消息中的数据存储到默认名称为 data 的集合中。实际操作将根据内部创建的 ReactiveStreamsConsumer 中响应式流组合的需要执行。

ReactiveMongoDbMessageSource 是一个 AbstractMessageSource 实现,它基于所提供的 ReactiveMongoDatabaseFactoryReactiveMongoOperations 和 MongoDB 查询(或表达式),根据 expectSingleResult 选项调用 find()findOne() 操作,并使用预期的 entityClass 类型来转换查询结果。查询执行和结果评估是按需执行的,即当生成的消息的有效载荷中的 PublisherFluxMono,根据 expectSingleResult 选项)被订阅时。当在下游使用 splitter 和 FluxMessageChannel 时,框架可以自动订阅此类有效载荷(本质上是 flatMap)。否则,目标应用程序有责任在下游端点中订阅轮询的 Publisher

使用 Java DSL,这样的通道适配器可以配置为

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return IntegrationFlow
            .from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
                            .entityClass(Person.class),
                    c -> c.poller(Pollers.fixedDelay(1000)))
            .split()
            .channel(c -> c.flux("output"))
            .get();
}

从版本 5.5 开始,ReactiveMongoDbMessageSource 可以配置 updateExpression。它具有与阻塞式 MongoDbMessageSource 相同的功能。参见 MongoDB 入站通道适配器AbstractMongoDbMessageSourceSpec JavaDocs 了解更多信息。