使用 R2DBC 进行数据访问
R2DBC(“Reactive Relational Database Connectivity”,响应式关系型数据库连接)是一项由社区驱动的规范工作,旨在利用响应式模式标准化对 SQL 数据库的访问。
包层级结构
Spring Framework 的 R2DBC 抽象框架包含两个不同的包
-
core
:org.springframework.r2dbc.core
包包含DatabaseClient
类以及各种相关类。参见使用 R2DBC 核心类控制基本的 R2DBC 处理和错误处理。 -
connection
:org.springframework.r2dbc.connection
包包含一个用于便捷访问ConnectionFactory
的工具类,以及可用于测试和运行未经修改的 R2DBC 的各种简单ConnectionFactory
实现。参见控制数据库连接。
使用 R2DBC 核心类控制基本的 R2DBC 处理和错误处理
本节介绍如何使用 R2DBC 核心类控制基本的 R2DBC 处理,包括错误处理。它包含以下主题
使用 DatabaseClient
DatabaseClient
是 R2DBC 核心包中的中心类。它负责资源的创建和释放,有助于避免常见的错误,例如忘记关闭连接。它执行核心 R2DBC 工作流的基本任务(例如语句创建和执行),而应用程序代码只需提供 SQL 并提取结果。DatabaseClient
类
-
运行 SQL 查询
-
更新语句和存储过程调用
-
对
Result
实例执行迭代 -
捕获 R2DBC 异常并将其转换为在
org.springframework.dao
包中定义的通用、信息量更大的异常层级结构。(参见一致的异常层级结构。)
该客户端具有函数式、流式的 API,使用响应式类型进行声明式组合。
当你在代码中使用 DatabaseClient
时,只需实现 java.util.function
接口,为它们提供一个清晰定义的契约。给定由 DatabaseClient
类提供的 Connection
,一个 Function
回调会创建一个 Publisher
。提取 Row
结果的映射函数也是如此。
你可以在 DAO 实现中通过直接实例化并引用 ConnectionFactory
来使用 DatabaseClient
,也可以在 Spring IoC 容器中配置它,并将其作为 Bean 引用注入到 DAO 中。
创建 DatabaseClient
对象最简单的方法是通过一个静态工厂方法,如下所示
-
Java
-
Kotlin
DatabaseClient client = DatabaseClient.create(connectionFactory);
val client = DatabaseClient.create(connectionFactory)
ConnectionFactory 应始终在 Spring IoC 容器中配置为一个 Bean。 |
上述方法创建一个具有默认设置的 DatabaseClient
。
你还可以从 DatabaseClient.builder()
获取一个 Builder
实例。可以通过调用以下方法来自定义客户端
-
….bindMarkers(…)
:提供一个特定的BindMarkersFactory
来配置命名参数到数据库绑定标记的转换。 -
….executeFunction(…)
:设置ExecuteFunction
来控制Statement
对象的运行方式。 -
….namedParameters(false)
:禁用命名参数展开。默认启用。
方言通过 BindMarkersFactoryResolver 从 ConnectionFactory 解析,通常是通过检查 ConnectionFactoryMetadata 。通过在 META-INF/spring.factories 中注册实现 org.springframework.r2dbc.core.binding.BindMarkersFactoryResolver$BindMarkerFactoryProvider 接口的类,可以让 Spring 自动发现你的 BindMarkersFactory 。BindMarkersFactoryResolver 使用 Spring 的 SpringFactoriesLoader 从类路径中发现绑定标记提供者实现。 |
目前支持的数据库有
-
H2
-
MariaDB
-
Microsoft SQL Server
-
MySQL
-
Postgres
该类执行的所有 SQL 都会在 DEBUG
级别下,以对应客户端实例的完全限定类名(通常是 DefaultDatabaseClient
)作为类别进行日志记录。此外,每次执行都会在响应式序列中注册一个检查点,以帮助调试。
以下部分提供了一些 DatabaseClient
的使用示例。这些示例并非 DatabaseClient
暴露的所有功能的详尽列表。更多信息请参见相应的 javadoc。
执行语句
DatabaseClient
提供了运行语句的基本功能。以下示例展示了创建新表所需的最少但功能完整的代码
-
Java
-
Kotlin
Mono<Void> completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
.then();
client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
.await()
DatabaseClient
旨在提供方便、流式的使用体验。它在执行规范的每个阶段都暴露了中间、继续和终端方法。上面的示例使用了 then()
方法来返回一个完成 Publisher
,该 Publisher 在查询(如果 SQL 查询包含多个语句,则为多个查询)完成后立即完成。
execute(…) 接受 SQL 查询字符串或查询 Supplier<String> ,以将实际查询创建推迟到执行时。 |
查询(SELECT
)
SQL 查询可以通过 Row
对象或受影响的行数返回值。DatabaseClient
可以根据执行的查询返回更新的行数或行本身。
以下查询从表中获取 id
和 name
列
-
Java
-
Kotlin
Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person")
.fetch().first();
val first = client.sql("SELECT id, name FROM person")
.fetch().awaitSingle()
以下查询使用绑定变量
-
Java
-
Kotlin
Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
.bind("fn", "Joe")
.fetch().first();
val first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
.bind("fn", "Joe")
.fetch().awaitSingle()
你可能已经注意到上面示例中使用了 fetch()
。fetch()
是一个 continuation 操作符,允许你指定要消费多少数据。
调用 first()
返回结果中的第一行并丢弃剩余的行。可以使用以下操作符来消费数据
-
first()
返回整个结果的第一行。其 Kotlin Coroutine 变体对于非空返回值命名为awaitSingle()
,对于可选值命名为awaitSingleOrNull()
。 -
one()
精确返回一个结果,如果结果包含更多行则失败。使用 Kotlin Coroutines 时,对于恰好一个值使用awaitOne()
,如果值可能为null
则使用awaitOneOrNull()
。 -
all()
返回结果的所有行。使用 Kotlin Coroutines 时,请使用flow()
。 -
rowsUpdated()
返回受影响的行数(INSERT
/UPDATE
/DELETE
计数)。其 Kotlin Coroutine 变体命名为awaitRowsUpdated()
。
如果不指定进一步的映射详情,查询将返回表格结果,形式为 Map
,其键是不区分大小写的列名,映射到对应的列值。
你可以通过提供一个 Function<Row, T>
函数来控制结果映射,该函数会为每一行 Row
调用,以便它可以返回任意值(单个值、集合、map 和对象)。
以下示例提取 name
列并发出其值
-
Java
-
Kotlin
Flux<String> names = client.sql("SELECT name FROM person")
.map(row -> row.get("name", String.class))
.all();
val names = client.sql("SELECT name FROM person")
.map{ row: Row -> row.get("name", String.class) }
.flow()
或者,有一个映射到单个值的快捷方式
Flux<String> names = client.sql("SELECT name FROM person")
.mapValue(String.class)
.all();
或者你可以映射到一个具有 Bean 属性或 record 组件的结果对象
// assuming a name property on Person
Flux<Person> persons = client.sql("SELECT name FROM person")
.mapProperties(Person.class)
.all();
使用 DatabaseClient
进行更新(INSERT
、UPDATE
和 DELETE
)
修改语句的唯一区别是它们通常不返回表格数据,因此使用 rowsUpdated()
来消费结果。
以下示例展示了一个返回更新行数的 UPDATE
语句
-
Java
-
Kotlin
Mono<Integer> affectedRows = client.sql("UPDATE person SET first_name = :fn")
.bind("fn", "Joe")
.fetch().rowsUpdated();
val affectedRows = client.sql("UPDATE person SET first_name = :fn")
.bind("fn", "Joe")
.fetch().awaitRowsUpdated()
将值绑定到查询
典型的应用程序需要参数化的 SQL 语句来根据某些输入选择或更新行。这些通常是由 WHERE
子句约束的 SELECT
语句,或接受输入参数的 INSERT
和 UPDATE
语句。如果参数未正确转义,参数化语句存在 SQL 注入的风险。DatabaseClient
利用 R2DBC 的 bind
API 来消除查询参数的 SQL 注入风险。你可以使用 execute(…)。
操作符提供参数化的 SQL 语句,并将参数绑定到实际的 Statement
。然后,你的 R2DBC 驱动程序会使用预处理语句和参数替换来运行该语句。
参数绑定支持两种绑定策略
-
按索引,使用从零开始的参数索引。
-
按名称,使用占位符名称。
以下示例展示了查询的参数绑定
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind("id", "joe")
.bind("name", "Joe")
.bind("age", 34);
或者,你可以传入一个名称和值的 Map
Map<String, Object> params = new LinkedHashMap<>();
params.put("id", "joe");
params.put("name", "Joe");
params.put("age", 34);
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bindValues(params);
或者你可以传入一个带有 Bean 属性或 record 组件的参数对象
// assuming id, name, age properties on Person
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bindProperties(new Person("joe", "Joe", 34);
或者,你可以使用位置参数将值绑定到语句。索引从零开始。
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bind(0, "joe")
.bind(1, "Joe")
.bind(2, 34);
如果你的应用程序绑定到许多参数,可以通过一次调用实现相同的目的
List<?> values = List.of("joe", "Joe", 34);
db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
.bindValues(values);
查询预处理器将命名的 Collection
参数展开成一系列绑定标记,以消除根据参数数量动态创建查询的需要。嵌套对象数组也会展开,以允许使用(例如)选择列表。
考虑以下查询
SELECT id, name, state FROM table WHERE (name, age) IN (('John', 35), ('Ann', 50))
上述查询可以参数化并按如下方式运行
-
Java
-
Kotlin
List<Object[]> tuples = new ArrayList<>();
tuples.add(new Object[] {"John", 35});
tuples.add(new Object[] {"Ann", 50});
client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
.bind("tuples", tuples);
val tuples: MutableList<Array<Any>> = ArrayList()
tuples.add(arrayOf("John", 35))
tuples.add(arrayOf("Ann", 50))
client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
.bind("tuples", tuples)
选择列表的使用取决于供应商。 |
以下示例展示了一个使用 IN
谓词的更简单变体
-
Java
-
Kotlin
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
.bind("ages", Arrays.asList(35, 50));
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
.bind("ages", arrayOf(35, 50))
R2DBC 本身不支持 Collection 类似的值。然而,在上面示例中展开给定的 List 对于 Spring 的 R2DBC 支持中的命名参数是可行的,例如用于 IN 子句中。但是,插入或更新数组类型列(例如在 Postgres 中)需要底层 R2DBC 驱动程序支持的数组类型:通常是 Java 数组,例如 String[] 来更新 text[] 列。不要将 Collection<String> 或类似类型作为数组参数传递。 |
语句过滤器
有时,你需要在实际运行 Statement
之前对其选项进行微调。为此,可以通过在 DatabaseClient
中注册一个 Statement
过滤器(StatementFilterFunction
),以便在语句执行过程中拦截和修改它们,示例如下
-
Java
-
Kotlin
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter((s, next) -> next.execute(s.returnGeneratedValues("id")))
.bind("name", …)
.bind("state", …);
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter { s: Statement, next: ExecuteFunction -> next.execute(s.returnGeneratedValues("id")) }
.bind("name", …)
.bind("state", …)
DatabaseClient
还暴露了一个简化的 filter(…)
重载方法,它接受一个 Function<Statement, Statement>
。
-
Java
-
Kotlin
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter(statement -> s.returnGeneratedValues("id"));
client.sql("SELECT id, name, state FROM table")
.filter(statement -> s.fetchSize(25));
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter { statement -> s.returnGeneratedValues("id") }
client.sql("SELECT id, name, state FROM table")
.filter { statement -> s.fetchSize(25) }
StatementFilterFunction
的实现允许对 Statement
进行过滤以及对 Result
对象进行过滤。
DatabaseClient
最佳实践
一旦配置完成,DatabaseClient
类的实例是线程安全的。这一点很重要,因为这意味着你可以配置一个 DatabaseClient
的单例实例,然后安全地将这个共享引用注入到多个 DAO(或 Repository)中。DatabaseClient
是有状态的,因为它维护着对 ConnectionFactory
的引用,但这种状态不是会话状态。
使用 DatabaseClient
类时的常见做法是在 Spring 配置文件中配置一个 ConnectionFactory
,然后将该共享的 ConnectionFactory
Bean 依赖注入到你的 DAO 类中。DatabaseClient
在 ConnectionFactory
的 setter 方法中创建。这使得 DAO 类似于以下示例
-
Java
-
Kotlin
public class R2dbcCorporateEventDao implements CorporateEventDao {
private DatabaseClient databaseClient;
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.databaseClient = DatabaseClient.create(connectionFactory);
}
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao {
private val databaseClient = DatabaseClient.create(connectionFactory)
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
显式配置的另一种选择是使用组件扫描和注解支持进行依赖注入。在这种情况下,你可以使用 @Component
注解类(使其成为组件扫描的候选),并使用 @Autowired
注解 ConnectionFactory
的 setter 方法。以下示例展示了如何实现
-
Java
-
Kotlin
@Component (1)
public class R2dbcCorporateEventDao implements CorporateEventDao {
private DatabaseClient databaseClient;
@Autowired (2)
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.databaseClient = DatabaseClient.create(connectionFactory); (3)
}
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
1 | 使用 @Component 注解类。 |
2 | 使用 @Autowired 注解 ConnectionFactory 的 setter 方法。 |
3 | 使用 ConnectionFactory 创建一个新的 DatabaseClient 。 |
@Component (1)
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao { (2)
private val databaseClient = DatabaseClient(connectionFactory) (3)
// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
1 | 使用 @Component 注解类。 |
2 | ConnectionFactory 的构造函数注入。 |
3 | 使用 ConnectionFactory 创建一个新的 DatabaseClient 。 |
无论你选择使用(或不使用)上述哪种模板初始化风格,每次要运行 SQL 时创建 DatabaseClient
类的新实例通常是不必要的。一旦配置完成,DatabaseClient
实例是线程安全的。如果你的应用程序访问多个数据库,你可能需要多个 DatabaseClient
实例,这需要多个 ConnectionFactory
,从而需要多个配置不同的 DatabaseClient
实例。
检索自动生成的主键
INSERT
语句在向定义了自增或 identity 列的表中插入行时,可能会生成主键。要完全控制生成的列名,只需注册一个 StatementFilterFunction
来请求所需列的生成主键。
-
Java
-
Kotlin
Mono<Integer> generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter(statement -> s.returnGeneratedValues("id"))
.map(row -> row.get("id", Integer.class))
.first();
// generatedId emits the generated key once the INSERT statement has finished
val generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
.filter { statement -> s.returnGeneratedValues("id") }
.map { row -> row.get("id", Integer.class) }
.awaitOne()
// generatedId emits the generated key once the INSERT statement has finished
控制数据库连接
本节包含
使用 ConnectionFactory
Spring 通过 ConnectionFactory
获取数据库的 R2DBC 连接。ConnectionFactory
是 R2DBC 规范的一部分,是驱动程序的通用入口点。它允许容器或框架向应用程序代码隐藏连接池和事务管理问题。作为开发人员,您无需了解如何连接到数据库的详细信息。这是设置 ConnectionFactory
的管理员的职责。您在开发和测试代码时很可能同时扮演这两个角色,但您不必一定了解生产数据源是如何配置的。
当您使用 Spring 的 R2DBC 层时,您可以使用第三方提供的连接池实现来配置您自己的连接池。一个流行的实现是 R2DBC Pool (r2dbc-pool
)。Spring 发行版中的实现仅用于测试目的,不提供连接池。
配置 ConnectionFactory
-
使用
ConnectionFactory
获取连接,就像您通常获取 R2DBCConnectionFactory
一样。 -
提供一个 R2DBC URL(请参阅您的驱动程序文档以获取正确的值)。
以下示例展示了如何配置 ConnectionFactory
-
Java
-
Kotlin
ConnectionFactory factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
val factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
使用 ConnectionFactoryUtils
ConnectionFactoryUtils
类是一个方便强大的辅助类,它提供了 static
方法用于从 ConnectionFactory
获取连接并在必要时关闭连接。
它支持订阅者 Context
绑定的连接,例如与 R2dbcTransactionManager
一起使用。
使用 SingleConnectionFactory
SingleConnectionFactory
类是 DelegatingConnectionFactory
接口的一个实现,它封装了一个不会在每次使用后关闭的单个 Connection
。
如果任何客户端代码假定连接是池化的(例如在使用持久化工具时),并调用 close
方法,则应将 suppressClose
属性设置为 true
。此设置会返回一个抑制关闭的代理,该代理封装了物理连接。请注意,您无法再将此代理转换为原生的 Connection
或类似对象。
SingleConnectionFactory
主要是一个测试类,如果您的 R2DBC 驱动程序允许,它也可以用于特定需求,例如流水线处理。与池化的 ConnectionFactory
不同,它总是重用同一个连接,避免了过度创建物理连接。
使用 TransactionAwareConnectionFactoryProxy
TransactionAwareConnectionFactoryProxy
是一个目标 ConnectionFactory
的代理。该代理封装了目标 ConnectionFactory
,以增加对 Spring 管理的事务的感知。
如果您使用的 R2DBC 客户端没有与 Spring 的 R2DBC 支持集成,则需要使用此类。在这种情况下,您仍然可以使用此客户端,同时让该客户端参与 Spring 管理的事务。通常更倾向于将 R2DBC 客户端与对 ConnectionFactoryUtils 的正确访问集成,以便进行资源管理。 |
有关更多详细信息,请参阅 TransactionAwareConnectionFactoryProxy
的 Javadoc。
使用 R2dbcTransactionManager
R2dbcTransactionManager
类是针对单个 R2DBC ConnectionFactory
的 ReactiveTransactionManager
实现。它将指定 ConnectionFactory
的 R2DBC Connection
绑定到订阅者 Context
,可能允许每个 ConnectionFactory
对应一个订阅者 Connection
。
应用程序代码需要通过 ConnectionFactoryUtils.getConnection(ConnectionFactory)
获取 R2DBC Connection
,而不是使用 R2DBC 标准的 ConnectionFactory.create()
。所有框架类(例如 DatabaseClient
)都隐式使用此策略。如果不与事务管理器一起使用,查找策略的行为与 ConnectionFactory.create()
完全相同,因此在任何情况下都可以使用。