R2DBC 支持

Spring Integration 提供通道适配器,用于通过 R2DBC 驱动程序以响应式方式访问数据库来接收和发送消息。

你需要将此依赖添加到你的项目

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-r2dbc</artifactId>
    <version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-r2dbc:6.4.4"

R2DBC 入站通道适配器

R2dbcMessageSource 是一个基于 R2dbcEntityOperations 的可轮询的 MessageSource 实现,根据 expectSingleResult 选项,生成以 FluxMono 作为 Payload 的消息,用于从数据库获取数据。SELECT 查询可以是静态提供的,也可以基于 SpEL 表达式,该表达式在每次 receive() 调用时都会进行评估。R2dbcMessageSource.SelectCreator 作为评估上下文的根对象存在,允许使用 StatementMapper.SelectSpec 流式 API。默认情况下,此通道适配器将 SELECT 中的记录映射到 LinkedCaseInsensitiveMap 实例。可以通过提供 payloadType 选项进行自定义,底层将基于 this.r2dbcEntityOperations.getConverter() 使用 EntityRowMapperupdateSql 是可选的,用于在数据库中标记已读记录,以便后续轮询时跳过。UPDATE 操作可以提供一个 BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec>,以便根据 SELECT 结果中的记录将值绑定到 UPDATE 中。

此通道适配器的典型配置可能如下所示

@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
    R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
            "SELECT * FROM person WHERE name='Name'");
    r2dbcMessageSource.setPayloadType(Person.class);
    r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
    r2dbcMessageSource.setBindFunction(
				(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
    return r2dbcMessageSource;
}

使用 Java DSL,此通道适配器的配置如下所示

@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
    return IntegrationFlow
        .from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
            (selectCreator) ->
                    selectCreator.createSelect("person")
                        .withProjection("*")
                        .withCriteria(Criteria.where("id").is(1)))
                    .expectSingleResult(true)
                    .payloadType(Person.class)
                    .updateSql("UPDATE Person SET id='2' where id = :id")
                    .bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
                            bindSpec.bind("id", o.getId())),
            e -> e.poller(p -> p.fixedDelay(100)))
        .handle((p, h) -> p)
        .channel(MessageChannels.flux())
        .get();
}

R2DBC 出站通道适配器

R2dbcMessageHandler 是一个 ReactiveMessageHandler 实现,用于使用提供的 R2dbcEntityOperations 在数据库中执行 INSERT(默认)、UPDATEDELETE 查询。R2dbcMessageHandler.Type 可以静态配置,或通过针对请求消息的 SpEL 表达式进行配置。要执行的查询可以基于 tableNamevaluescriteria 表达式选项,或者(如果未提供 tableName)整个消息 Payload 被视为一个 org.springframework.data.relational.core.mapping.Table 实体来执行 SQL。包 org.springframework.data.relational.core.query 在 SpEL 评估上下文中注册为导入,以便直接访问 Criteria 流式 API,该 API 用于 UPDATEDELETE 查询。valuesExpression 用于 INSERTUPDATE,并且必须针对请求消息评估为 Map,其中包含要对目标表进行更改的列-值对。

此通道适配器的典型配置可能如下所示

@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
    R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
    messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
    messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
    messageHandler.setCriteriaExpression(
        EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
    return messageHandler;
}

使用 Java DSL,此通道适配器的配置如下所示

.handleReactive(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
        .queryType(R2dbcMessageHandler.Type.UPDATE)
        .tableNameExpression("payload.class.simpleName")
        .criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
        .values("{age:36}"))