变更流

从 MongoDB 3.6 开始,变更流 允许应用程序在无需跟踪操作日志的情况下获取有关更改的通知。

变更流支持仅适用于副本集或分片集群。

变更流可以使用命令式和响应式 MongoDB Java 驱动程序进行使用。强烈建议使用响应式变体,因为它资源消耗更少。但是,如果您无法使用响应式 API,仍然可以通过使用 Spring 生态系统中已普遍存在的消息传递概念来获取变更事件。

可以在集合和数据库级别上进行观察,而数据库级别变体会发布数据库中所有集合的更改。在订阅数据库变更流时,请确保使用适合事件类型的类型,因为转换可能无法在不同实体类型之间正确应用。如有疑问,请使用Document

使用MessageListener 的变更流

使用同步驱动程序监听变更流会创建一个长时间运行的阻塞任务,需要委托给单独的组件。在这种情况下,我们需要首先创建一个MessageListenerContainer,它将成为运行特定SubscriptionRequest任务的主要入口点。Spring Data MongoDB 已经自带了一个默认实现,它在MongoTemplate上运行,并且能够为ChangeStreamRequest创建和运行Task实例。

以下示例展示了如何使用MessageListener实例与变更流一起使用。

示例 1. 使用MessageListener实例的变更流
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();                                                                                              (1)

MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println;                           (2)
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("db", "user", ChangeStreamOptions.empty()); (3)

Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class);       (4)

// ...

container.stop();                                                                                               (5)
1 启动容器会初始化资源并为已注册的SubscriptionRequest实例启动Task实例。启动后添加的请求会立即运行。
2 定义接收Message时调用的监听器。Message#getBody()会被转换为请求的域类型。使用Document接收未转换的原始结果。
3 设置要监听的集合,并通过ChangeStreamOptions提供其他选项。
4 注册请求。返回的Subscription可用于检查当前Task状态并取消它以释放资源。
5 不要忘记在确定不再需要容器后停止它。这样做会停止容器中所有正在运行的Task实例。

处理过程中的错误会传递给org.springframework.util.ErrorHandler。如果没有另行说明,默认情况下会应用一个追加日志的ErrorHandler
请使用register(request, body, errorHandler)提供其他功能。

响应式变更流

使用响应式 API 订阅变更流是处理流的更自然的方法。不过,基本构建块(如ChangeStreamOptions)保持不变。以下示例展示了如何使用发出ChangeStreamEvent的变更流。

示例 2. 发出ChangeStreamEvent的变更流
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(User.class) (1)
    .watchCollection("people")
    .filter(where("age").gte(38))                                              (2)
    .listen();                                                                 (3)
1 事件目标类型,底层文档应该转换为该类型。省略此项以接收未转换的原始结果。
2 使用聚合管道或仅使用查询Criteria来过滤事件。
3 获取变更流事件的FluxChangeStreamEvent#getBody()会被转换为从 (2) 中请求的域类型。

恢复变更流

变更流可以恢复,并从您离开的地方继续发出事件。要恢复流,您需要提供恢复令牌或最后一个已知的服务器时间(以 UTC 为单位)。使用ChangeStreamOptions相应地设置值。

以下示例展示了如何使用服务器时间设置恢复偏移量。

示例 3. 恢复变更流
Flux<ChangeStreamEvent<User>> resumed = template.changeStream(User.class)
    .watchCollection("people")
    .resumeAt(Instant.now().minusSeconds(1)) (1)
    .listen();
1 您可以通过 getTimestamp 方法获取 ChangeStreamEvent 的服务器时间,或者使用 getResumeToken 公开的 resumeToken
在某些情况下,恢复变更流时,Instant 可能不够精确。为此,请使用 MongoDB 原生的 BsonTimestamp