MongoDB 支持

2.1 版本引入了对 MongoDB 的支持:一个“高性能、开源、面向文档的数据库”。

您需要将此依赖项包含到您的项目中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mongodb</artifactId>
    <version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-mongodb:6.3.5"

要下载、安装和运行 MongoDB,请参阅 MongoDB 文档

连接到 MongoDB

阻塞式还是响应式?

从 5.3 版本开始,Spring Integration 提供了对响应式 MongoDB 驱动程序的支持,以便在访问 MongoDB 时启用非阻塞 I/O。要启用响应式支持,请将 MongoDB 响应式流驱动程序添加到您的依赖项中

  • Maven

  • Gradle

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-reactivestreams"

对于常规同步客户端,您需要将其相应的驱动程序添加到依赖项中

  • Maven

  • Gradle

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-sync</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-sync"

两者在框架中都是可选的,以便更好地支持最终用户的选择。

要开始与 MongoDB 交互,您首先需要连接到它。Spring Integration 基于另一个 Spring 项目 Spring Data MongoDB 提供的支持。它提供了名为MongoDatabaseFactoryReactiveMongoDatabaseFactory 的工厂类,简化了与 MongoDB 客户端 API 的集成。

Spring Data 默认提供阻塞式 MongoDB 驱动程序,但您可以通过包含上述依赖项来选择响应式使用。

使用MongoDatabaseFactory

要连接到 MongoDB,您可以使用MongoDatabaseFactory 接口的实现。

以下示例显示了如何使用SimpleMongoClientDatabaseFactory

  • Java

  • XML

MongoDatabaseFactory mongoDbFactory =
        new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

SimpleMongoClientDatabaseFactory 接受两个参数:一个MongoClient 实例和一个指定数据库名称的String。如果您需要配置hostport 等属性,则可以使用底层MongoClients 类提供的构造函数之一来传递这些属性。有关如何配置 MongoDB 的更多信息,请参阅 Spring Data MongoDB 参考文档。

使用ReactiveMongoDatabaseFactory

要使用响应式驱动程序连接到 MongoDB,您可以使用ReactiveMongoDatabaseFactory 接口的实现。

以下示例显示了如何使用SimpleReactiveMongoDatabaseFactory

  • Java

  • XML

ReactiveMongoDatabaseFactory mongoDbFactory =
        new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

MongoDB 消息存储

如《企业集成模式》(EIP) 书中所述,消息存储 允许您持久化消息。在处理具有缓冲消息能力的组件(QueueChannelaggregatorresequencer 等)时,这样做可能很有用,如果可靠性是一个问题。在 Spring Integration 中,MessageStore 策略也为 凭证检查 模式奠定了基础,该模式也在 EIP 中进行了描述。

Spring Integration 的 MongoDB 模块提供了MongoDbMessageStore,它是MessageStore 策略(主要由凭证检查模式使用)和MessageGroupStore 策略(主要由聚合器和重新排序器模式使用)的实现。

以下示例配置了MongoDbMessageStore 以使用QueueChannelaggregator

<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="mongoDbMessageStore"/>
<int:channel>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="mongoDbMessageStore"/>

前面的示例是一个简单的 bean 配置,它期望MongoDbFactory 作为构造函数参数。

MongoDbMessageStore 使用 Spring Data Mongo 映射机制将Message 扩展为具有所有嵌套属性的 Mongo 文档。当您需要访问payloadheaders 以进行审计或分析时,这很有用——例如,针对存储的消息。

MongoDbMessageStore 使用自定义的MappingMongoConverter 实现来将Message 实例存储为 MongoDB 文档,并且对Message 的属性(payloadheader 值)有一些限制。

从 5.1.6 版本开始,MongoDbMessageStore 可以配置自定义转换器,这些转换器将传播到内部的MappingMongoConverter 实现中。有关详细信息,请参阅MongoDbMessageStore.setCustomConverters(Object…​ customConverters) JavaDocs。

Spring Integration 3.0 引入了ConfigurableMongoDbMessageStore。它实现了MessageStoreMessageGroupStore 接口。此类可以接收MongoTemplate 作为构造函数参数,例如,您可以使用它来配置自定义WriteConcern。另一个构造函数需要MappingMongoConverterMongoDbFactory,这允许您为Message 实例及其属性提供一些自定义转换。请注意,默认情况下,ConfigurableMongoDbMessageStore 使用标准 Java 序列化将Message 实例写入和读取到 MongoDB(参见MongoDbMessageBytesConverter),并依赖于MongoTemplate 中其他属性的默认值。它从提供的MongoDbFactoryMappingMongoConverter 构建MongoTemplateConfigurableMongoDbMessageStore 存储的集合的默认名称是configurableStoreMessages。我们建议使用此实现来创建健壮灵活的解决方案,尤其是在消息包含复杂数据类型时。

从 6.0.8 版本开始,AbstractConfigurableMongoDbMessageStore 提供了一个setCreateIndexes(boolean)(默认为true)选项,可用于禁用自动索引创建。以下示例显示了如何声明一个 bean 并禁用自动索引创建

@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory) {
    MongoDbChannelMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndexes(false);
    return mongoDbChannelMessageStore;
}

MongoDB 通道消息存储

4.0 版本引入了新的MongoDbChannelMessageStore。它是一个优化的MessageGroupStore,用于QueueChannel 实例。使用priorityEnabled = true,您可以在<int:priority-queue> 实例中使用它来实现持久化消息的优先级顺序轮询。优先级 MongoDB 文档字段从IntegrationMessageHeaderAccessor.PRIORITYpriority)消息标头填充。

此外,所有 MongoDB MessageStore 实例现在都有一个用于MessageGroup 文档的sequence 字段。sequence 值是来自同一集合的简单sequence 文档的$inc 操作的结果,该文档是按需创建的。sequence 字段用于poll 操作,以便在消息存储在同一毫秒内时提供先进先出 (FIFO) 消息顺序(如果已配置,则在优先级内)。

我们不建议对优先级和非优先级使用相同的MongoDbChannelMessageStore bean,因为priorityEnabled 选项适用于整个存储。但是,相同的collection 可以用于两种MongoDbChannelMessageStore 类型,因为来自存储的消息轮询已排序并使用索引。要配置这种情况,您可以从另一个扩展一个消息存储 bean,如下例所示
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
    <constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="store"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

使用 AbstractConfigurableMongoDbMessageStore 并禁用自动索引创建

从 6.0.8 版本开始,AbstractConfigurableMongoDbMessageStore 实现了一个setCreateIndex(boolean) 方法,可用于禁用或启用(默认)自动索引创建。以下示例显示了如何声明一个 bean 并禁用自动索引创建

@Bean
public AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory)
{
    AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndex(false);

    return mongoDbChannelMessageStore;
}

MongoDB 元数据存储

Spring Integration 4.2 引入了一种新的基于 MongoDB 的MetadataStore(参见 元数据存储)实现。您可以使用MongoDbMetadataStore 来维护跨应用程序重启的元数据状态。您可以将此新的MetadataStore 实现与以下适配器一起使用:

要指示这些适配器使用新的MongoDbMetadataStore,请声明一个 bean 名称为metadataStore 的 Spring bean。feed 入站通道适配器会自动获取并使用已声明的MongoDbMetadataStore。以下示例显示了如何声明一个名为metadataStore 的 bean

@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
    return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}

MongoDbMetadataStore 还实现了ConcurrentMetadataStore,允许它在多个应用程序实例之间可靠地共享,其中只允许一个实例存储或修改键的值。由于 MongoDB 的保证,所有这些操作都是原子的。

MongoDB 入站通道适配器

MongoDB 入站通道适配器是一个轮询使用者,它从 MongoDB 读取数据并将其作为Message 负载发送。以下示例显示了如何配置 MongoDB 入站通道适配器

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
       channel="replyChannel"
       query="{'name' : 'Bob'}"
       entity-class="java.lang.Object"
       auto-startup="false">
		<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>

如前面的配置所示,您可以使用inbound-channel-adapter 元素配置 MongoDB 入站通道适配器,并为各种属性提供值,例如:

  • query:JSON 查询(参见 MongoDB 查询

  • query-expression:一个 SpEL 表达式,它被计算为 JSON 查询字符串(如上面的query 属性)或o.s.data.mongodb.core.query.Query 的实例。与query 属性互斥。

  • entity-class:负载对象的类型。如果未提供,则返回com.mongodb.DBObject

  • collection-namecollection-name-expression:标识要使用的 MongoDB 集合的名称。

  • mongodb-factory:对o.s.data.mongodb.MongoDbFactory 实例的引用

  • mongo-template:对o.s.data.mongodb.core.MongoTemplate 实例的引用

  • 所有其他入站适配器共有的其他属性(例如“channel”)。

您不能同时设置mongo-templatemongodb-factory

前面的示例相对简单且静态,因为它具有query 的字面值并使用集合的默认名称。有时,您可能需要根据某些条件在运行时更改这些值。为此,请使用它们的-expression 等效项(query-expressioncollection-name-expression),其中提供的表达式可以是任何有效的 SpEL 表达式。

此外,您可能希望对从 MongoDB 读取的已成功处理的数据进行一些后处理。例如;您可能希望在处理文档后移动或删除它。您可以使用 Spring Integration 2.2 添加的事务同步功能来做到这一点,如下例所示

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
    channel="replyChannel"
    query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
    entity-class="java.lang.Object"
    auto-startup="false">
        <int:poller fixed-rate="200" max-messages-per-poll="1">
            <int:transactional synchronization-factory="syncFactory"/>
        </int:poller>
</int-mongodb:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit
        expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
        channel="someChannel"/>
</int:transaction-synchronization-factory>

<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>

以下示例显示了前面示例中引用的DocumentCleaner

public class DocumentCleaner {
    public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
        if (target instanceof List<?> documents){
            for (Object document : documents) {
                mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
            }
        }
    }
}

您可以使用transactional 元素声明您的轮询器是可进行事务处理的。此元素可以引用一个真实的事务管理器(例如,如果您的流程的其他部分调用 JDBC)。如果您没有“真实”事务,您可以使用o.s.i.transaction.PseudoTransactionManager 的实例,它是 Spring 的PlatformTransactionManager 的实现,并在没有实际事务时启用 Mongo 适配器的使用事务同步功能。

这样做不会使 MongoDB 本身成为事务性的。它允许在成功(提交)之前或之后以及失败(回滚)之后采取同步操作。

一旦您的轮询器具有事务性,您可以在o.s.i.transaction.TransactionSynchronizationFactory元素上设置transactional的一个实例。一个TransactionSynchronizationFactory创建一个TransactionSynchronization实例。为了方便起见,我们公开了一个基于SpEL的默认TransactionSynchronizationFactory,它允许您配置SpEL表达式,其执行与事务协调(同步)。支持提交前、提交后和回滚后事件的表达式,以及每个事件的通道,其中评估结果(如果有)将被发送。对于每个子元素,您可以指定expressionchannel属性。如果只有channel属性存在,则接收到的消息将作为特定同步场景的一部分发送到那里。如果只有expression属性存在,并且表达式的结果为非空值,则会生成一条消息,其有效负载为结果,并发送到默认通道(NullChannel),并显示在日志中(DEBUG级别)。如果您希望评估结果发送到特定通道,请添加channel属性。如果表达式的结果为空或无效,则不会生成任何消息。

有关事务同步的更多信息,请参阅事务同步

从5.5版本开始,MongoDbMessageSource可以使用updateExpression配置,该表达式必须计算为包含MongoDBupdate语法的Stringorg.springframework.data.mongodb.core.query.Update实例。它可以用作上述后处理程序的替代方法,它修改从集合中提取的实体,因此在下一个轮询周期中不会再次从集合中提取它们(假设更新更改了查询中使用的某些值)。当在集群中使用同一集合的多个MongoDbMessageSource实例时,仍然建议使用事务来实现执行隔离和数据一致性。

MongoDB变更流入站通道适配器

从5.3版本开始,spring-integration-mongodb模块引入了MongoDbChangeStreamMessageProducer——一个针对Spring Data ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class) API的响应式MessageProducerSupport实现。此组件生成包含ChangeStreamEvent作为有效负载的消息的Flux,以及一些与变更流相关的标头(参见MongoHeaders)。建议将此MongoDbChangeStreamMessageProducerFluxMessageChannel组合使用作为outputChannel,用于按需订阅和下游事件消费。

此通道适配器的Java DSL配置可能如下所示

@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
    return IntegrationFlow.from(
            MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
                    .domainType(Person.class)
                    .collection("person")
                    .extractBody(false))
            .channel(MessageChannels.flux())
            .get();
}

MongoDbChangeStreamMessageProducer停止,或下游取消订阅,或MongoDB变更流产生OperationType.INVALIDATE时,Publisher将完成。通道适配器可以再次启动,并创建一个新的源数据Publisher,它会在MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>)中自动订阅。如果需要从其他位置消费变更流事件,则可以在启动之间重新配置此通道适配器以获得新的选项。

有关Spring Data MongoDB中变更流支持的更多信息,请参阅文档

MongoDB出站通道适配器

MongoDB出站通道适配器允许您将消息有效负载写入MongoDB文档存储,如下例所示

<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
	collection-name="myCollection"
	mongo-converter="mongoConverter"
	mongodb-factory="mongoDbFactory" />

如前面的配置所示,您可以使用outbound-channel-adapter元素配置MongoDB出站通道适配器,为各种属性提供值,例如:

  • collection-namecollection-name-expression:标识要使用的MongoDB集合的名称。

  • mongo-converter:指向o.s.data.mongodb.core.convert.MongoConverter实例的引用,该实例有助于将原始Java对象转换为JSON文档表示。

  • mongodb-factory:指向o.s.data.mongodb.MongoDbFactory实例的引用。

  • mongo-template:指向o.s.data.mongodb.core.MongoTemplate实例的引用。注意:不能同时设置mongo-templatemongodb-factory

  • 所有入站适配器共有的其他属性(例如“channel”)。

前面的示例相对简单且静态,因为它对collection-name具有文字值。有时,您可能需要根据某些条件在运行时更改此值。为此,请使用collection-name-expression,其中提供的表达式是任何有效的SpEL表达式。

MongoDB出站网关

5.0版本引入了MongoDB出站网关。它允许您通过向其请求通道发送消息来查询数据库。然后,网关将响应发送到回复通道。您可以使用消息有效负载和标头来指定查询和集合名称,如下例所示

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory;

    @Autowired
    private MongoConverter;


    @Bean
    public IntegrationFlow gatewaySingleQueryFlow() {
        return f -> f
                .handle(queryOutboundGateway())
                .channel(c -> c.queue("retrieveResults"));
    }

    private MongoDbOutboundGatewaySpec queryOutboundGateway() {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
                .query("{name : 'Bob'}")
                .collectionNameFunction(m -> m.getHeaders().get("collection"))
                .expectSingleResult(true)
                .entityClass(Person.class);
    }

}
class MongoDbKotlinApplication {

    fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)

    @Autowired
    lateinit var mongoDbFactory: MongoDatabaseFactory

    @Autowired
    lateinit var mongoConverter: MongoConverter

    @Bean
    fun gatewaySingleQueryFlow() =
    integrationFlow {
        handle(queryOutboundGateway())
        channel { queue("retrieveResults") }
    }

    private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .query("{name : 'Bob'}")
            .collectionNameFunction<Any> { m -> m.headers["collection"] as String }
            .expectSingleResult(true)
            .entityClass(Person::class.java)
    }

}
@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory mongoDbFactory;

    @Bean
    @ServiceActivator(inputChannel = "requestChannel")
    public MessageHandler mongoDbOutboundGateway() {
        MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
        gateway.setCollectionNameExpressionString("'myCollection'");
        gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
        gateway.setExpectSingleResult(true);
        gateway.setEntityClass(Person.class);
        gateway.setOutputChannelName("replyChannel");
        return gateway;
    }

    @Bean
    @ServiceActivator(inputChannel = "replyChannel")
    public MessageHandler handler() {
        return message -> System.out.println(message.getPayload());
    }
}
<int-mongodb:outbound-gateway id="gatewayQuery"
    mongodb-factory="mongoDbFactory"
    mongo-converter="mongoConverter"
    query="{firstName: 'Bob'}"
    collection-name="myCollection"
    request-channel="in"
    reply-channel="out"
    entity-class="org.springframework.integration.mongodb.test.entity$Person"/>

您可以将以下属性与MongoDB出站网关一起使用:

  • collection-namecollection-name-expression:标识要使用的 MongoDB 集合的名称。

  • mongo-converter:指向o.s.data.mongodb.core.convert.MongoConverter实例的引用,该实例有助于将原始Java对象转换为JSON文档表示。

  • mongodb-factory:指向o.s.data.mongodb.MongoDbFactory实例的引用。

  • mongo-template:指向o.s.data.mongodb.core.MongoTemplate实例的引用。注意:不能同时设置mongo-templatemongodb-factory

  • entity-class:要传递给MongoTemplate中的find(..)findOne(..)方法的实体类的完全限定名称。如果没有提供此属性,则默认值为org.bson.Document

  • queryquery-expression:指定MongoDB查询。有关更多查询示例,请参阅MongoDB文档

  • collection-callback:指向org.springframework.data.mongodb.core.CollectionCallback实例的引用。从5.0.11版本开始,更推荐使用具有请求消息上下文的o.s.i.mongodb.outbound.MessageCollectionCallback实例。有关更多信息,请参阅其Javadoc。

作为queryquery-expression属性的替代方法,您可以使用collectionCallback属性作为对MessageCollectionCallback函数接口实现的引用来指定其他数据库操作。以下示例指定计数操作

private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
    return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .collectionCallback((collection, requestMessage) -> collection.count())
            .collectionName("myCollection");
}

MongoDB响应式通道适配器

从5.3版本开始,提供了ReactiveMongoDbStoringMessageHandlerReactiveMongoDbMessageSource实现。它们基于Spring Data中的ReactiveMongoOperations,需要org.mongodb:mongodb-driver-reactivestreams依赖项。

ReactiveMongoDbStoringMessageHandlerReactiveMessageHandler的实现,当在集成流定义中涉及响应式流组合时,框架对其提供原生支持。有关更多信息,请参阅ReactiveMessageHandler

从配置角度来看,与许多其他标准通道适配器没有区别。例如,使用Java DSL,此类通道适配器可以使用如下方式:

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return f -> f
            .channel(MessageChannels.flux())
            .handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}

在此示例中,我们将通过提供的ReactiveMongoDatabaseFactory连接到MongoDB,并将请求消息中的数据存储到名为data的默认集合中。实际操作将在内部创建的ReactiveStreamsConsumer中的响应式流组合中按需执行。

ReactiveMongoDbMessageSource是基于提供的ReactiveMongoDatabaseFactoryReactiveMongoOperations和MongoDB查询(或表达式)的AbstractMessageSource实现,根据expectSingleResult选项和预期的entityClass类型调用find()findOne()操作来转换查询结果。当生成的邮件有效负载中的Publisher(根据expectSingleResult选项为FluxMono)被订阅时,会按需执行查询和结果评估。当使用下游的拆分器和FluxMessageChannel时,框架可以自动订阅此有效负载(基本上是flatMap)。否则,目标应用程序有责任订阅下游端点中的轮询发布者。

使用Java DSL,可以像这样配置此通道适配器:

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return IntegrationFlow
            .from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
                            .entityClass(Person.class),
                    c -> c.poller(Pollers.fixedDelay(1000)))
            .split()
            .channel(c -> c.flux("output"))
            .get();
}

从5.5版本开始,ReactiveMongoDbMessageSource可以使用updateExpression配置。它具有与阻塞式MongoDbMessageSource相同的功能。有关更多信息,请参阅MongoDB入站通道适配器AbstractMongoDbMessageSourceSpec JavaDocs。