数据库

与大多数企业应用风格一样,数据库是 Batch 的主要存储机制。然而,Batch 与其他应用风格不同,因为它必须处理的数据集规模庞大。如果一个 SQL 语句返回一百万行,结果集可能会将所有返回的结果都保存在内存中,直到所有行都被读取。Spring Batch 为这个问题提供了两种解决方案:

基于游标的 ItemReader 实现

使用数据库游标通常是大多数 Batch 开发人员的默认方法,因为这是数据库解决关系型数据“流式传输”问题的方案。Java 的 ResultSet 类本质上是一种面向对象的机制,用于操作游标。一个 ResultSet 维护一个指向当前数据行的游标。在 ResultSet 上调用 next 会将此游标移动到下一行。Spring Batch 基于游标的 ItemReader 实现会在初始化时打开一个游标,并在每次调用 read 时将游标向前移动一行,返回一个映射的对象,可用于处理。然后调用 close 方法以确保释放所有资源。Spring Core 的 JdbcTemplate 通过使用回调模式来解决这个问题,它在返回控制权给方法调用者之前完全映射 ResultSet 中的所有行并关闭。然而,在 Batch 中,这必须等到 Step 完成。下图显示了基于游标的 ItemReader 如何工作的通用图示。请注意,虽然示例使用了 SQL(因为 SQL 广为人知),但任何技术都可以实现基本方法。

Cursor Example
图 1. 游标示例

此示例说明了基本模式。给定一个具有三列:IDNAMEBAR 的“FOO”表,选择所有 ID 大于 1 但小于 7 的行。这将游标的起始位置(第 1 行)置于 ID 为 2 的行。这一行的结果应该是一个完全映射的 Foo 对象。再次调用 read() 会将游标移到下一行,即 ID 为 3 的 Foo。这些 read 的结果在每次 read 之后被写出,允许对象被垃圾回收(假设没有实例变量维护对其的引用)。

JdbcCursorItemReader

JdbcCursorItemReader 是基于游标技术的 JDBC 实现。它直接与 ResultSet 工作,需要一个 SQL 语句来针对从 DataSource 获取的连接运行。以下数据库 Schema 用作示例:

CREATE TABLE CUSTOMER (
   ID BIGINT IDENTITY PRIMARY KEY,
   NAME VARCHAR(45),
   CREDIT FLOAT
);

许多人倾向于为每一行使用一个领域对象,因此以下示例使用 RowMapper 接口的实现来映射 CustomerCredit 对象:

public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {

    public static final String ID_COLUMN = "id";
    public static final String NAME_COLUMN = "name";
    public static final String CREDIT_COLUMN = "credit";

    public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
        CustomerCredit customerCredit = new CustomerCredit();

        customerCredit.setId(rs.getInt(ID_COLUMN));
        customerCredit.setName(rs.getString(NAME_COLUMN));
        customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));

        return customerCredit;
    }
}

因为 JdbcCursorItemReaderJdbcTemplate 共享关键接口,所以查看如何使用 JdbcTemplate 读取这些数据的示例很有用,以便将其与 ItemReader 进行对比。为了本示例的目的,假设 CUSTOMER 数据库中有 1,000 行。第一个示例使用 JdbcTemplate

//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
                                          new CustomerCreditRowMapper());

运行上述代码片段后,customerCredits 列表包含 1,000 个 CustomerCredit 对象。在 query 方法中,从 DataSource 获取连接,针对该连接运行提供的 SQL,并为 ResultSet 中的每一行调用 mapRow 方法。将其与 JdbcCursorItemReader 的方法进行对比,如下例所示:

JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close();

运行上述代码片段后,计数器等于 1,000。如果上述代码将返回的 customerCredit 放入列表中,结果将与 JdbcTemplate 示例完全相同。然而,ItemReader 的巨大优势在于它允许 Item 被“流式传输”。可以调用一次 read 方法,该 Item 可以由 ItemWriter 写出,然后可以通过 read 获取下一个 Item。这允许以“块”的形式进行 Item 的读取和写入,并定期提交,这是高性能 Batch 处理的精髓。此外,它可以很容易地配置注入到 Spring Batch Step 中。

  • Java

  • XML

以下示例展示了如何在 Java 中将 ItemReader 注入到 Step 中:

Java 配置
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
	return new JdbcCursorItemReaderBuilder<CustomerCredit>()
			.dataSource(this.dataSource)
			.name("creditReader")
			.sql("select ID, NAME, CREDIT from CUSTOMER")
			.rowMapper(new CustomerCreditRowMapper())
			.build();

}

以下示例展示了如何在 XML 中将 ItemReader 注入到 Step 中:

XML 配置
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

附加属性

由于在 Java 中打开游标有许多不同的选项,JdbcCursorItemReader 上有许多可以设置的属性,如下表所述:

表 1. JdbcCursorItemReader 属性

ignoreWarnings

确定 SQLWarning 是否被记录或导致异常。默认为 true(意味着警告被记录)。

fetchSize

ItemReader 使用的 ResultSet 对象需要更多行时,向 JDBC 驱动程序提供一个提示,说明应该从数据库获取的行数。默认情况下,不提供提示。

maxRows

设置底层 ResultSet 在任何时候可以容纳的最大行数的限制。

queryTimeout

设置驱动程序等待 Statement 对象运行的秒数。如果超出限制,会抛出 DataAccessException。(详细信息请查阅驱动程序供应商文档)。

verifyCursorPosition

因为 ItemReader 持有的同一个 ResultSet 会传递给 RowMapper,所以用户自己调用 ResultSet.next() 是可能的,这可能会导致读取器内部计数出现问题。将此值设置为 true 会在调用 RowMapper 后游标位置与之前不同时抛出异常。

saveState

指示读取器的状态是否应在 ItemStream#update(ExecutionContext) 提供的 ExecutionContext 中保存。默认为 true

driverSupportsAbsolute

指示 JDBC 驱动程序是否支持在 ResultSet 上设置绝对行。建议支持 ResultSet.absolute() 的 JDBC 驱动程序将其设置为 true,因为它可能会提高性能,尤其是在处理大型数据集时 Step 失败的情况下。默认为 false

setUseSharedExtendedConnection

指示用于游标的连接是否应由所有其他处理使用,从而共享同一个事务。如果设置为 false,则游标会用自己的连接打开,不参与为 Step 的其余处理启动的任何事务。如果将此标志设置为 true,则必须将 DataSource 包装在 ExtendedConnectionDataSourceProxy 中,以防止连接在每次提交后被关闭和释放。当您将此选项设置为 true 时,用于打开游标的语句会使用 'READ_ONLY' 和 'HOLD_CURSORS_OVER_COMMIT' 两个选项创建。这允许在事务开始和 Step 处理中执行的提交之后保持游标打开。要使用此功能,您需要一个支持此功能的数据库以及支持 JDBC 3.0 或更高版本的 JDBC 驱动程序。默认为 false

StoredProcedureItemReader

有时需要通过存储过程获取游标数据。StoredProcedureItemReader 的工作方式类似于 JdbcCursorItemReader,但不是运行查询来获取游标,而是运行一个返回游标的存储过程。存储过程可以通过三种不同方式返回游标:

  • 作为返回的 ResultSet(由 SQL Server、Sybase、DB2、Derby 和 MySQL 使用)。

  • 作为通过 out 参数返回的 ref-cursor(由 Oracle 和 PostgreSQL 使用)。

  • 作为存储函数调用的返回值。

  • Java

  • XML

以下 Java 示例配置使用与之前示例相同的“客户信用”示例:

Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());

	return reader;
}

以下 XML 示例配置使用与之前示例相同的“客户信用”示例:

XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

前面的示例依赖存储过程提供一个 ResultSet 作为返回结果(前面提到的选项 1)。

如果存储过程返回一个 ref-cursor(选项 2),那么我们需要提供作为返回的 ref-cursor 的 out 参数的位置。

  • Java

  • XML

以下示例展示了如何在 Java 中处理第一个参数为 ref-cursor 的情况:

Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());
	reader.setRefCursorPosition(1);

	return reader;
}

以下示例展示了如何在 XML 中处理第一个参数为 ref-cursor 的情况:

XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

如果游标是从存储函数返回的(选项 3),我们需要将属性“function”设置为 true。默认为 false

  • Java

  • XML

以下示例展示了如何在 Java 中将属性设置为 true

Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());
	reader.setFunction(true);

	return reader;
}

以下示例展示了如何在 XML 中将属性设置为 true

XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="function" value="true"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

在所有这些情况下,我们需要定义一个 RowMapper,以及一个 DataSource 和实际的存储过程名称。

如果存储过程或函数接受参数,则必须使用 parameters 属性声明和设置它们。以下示例,适用于 Oracle,声明了三个参数。第一个是返回 ref-cursor 的 out 参数,第二和第三个是 in 参数,接受类型为 INTEGER 的值。

  • Java

  • XML

以下示例展示了如何在 Java 中处理参数:

Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	List<SqlParameter> parameters = new ArrayList<>();
	parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
	parameters.add(new SqlParameter("amount", Types.INTEGER);
	parameters.add(new SqlParameter("custId", Types.INTEGER);

	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("spring.cursor_func");
	reader.setParameters(parameters);
	reader.setRefCursorPosition(1);
	reader.setRowMapper(rowMapper());
	reader.setPreparedStatementSetter(parameterSetter());

	return reader;
}

以下示例展示了如何在 XML 中处理参数:

XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="spring.cursor_func"/>
    <property name="parameters">
        <list>
            <bean class="org.springframework.jdbc.core.SqlOutParameter">
                <constructor-arg index="0" value="newid"/>
                <constructor-arg index="1">
                    <util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="amount"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="custid"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
        </list>
    </property>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper" ref="rowMapper"/>
    <property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>

除了参数声明之外,我们需要指定一个 PreparedStatementSetter 实现,用于设置调用的参数值。这与上面 JdbcCursorItemReader 的工作方式相同。在附加属性中列出的所有附加属性也适用于 StoredProcedureItemReader

基于分页的 ItemReader 实现

使用数据库游标的替代方案是运行多个查询,每个查询获取结果的一部分。我们将这部分称为一页。每个查询必须指定起始行号和我们希望在该页中返回的行数。

JdbcPagingItemReader

JdbcPagingItemReader 是基于分页的 ItemReader 的一个实现。JdbcPagingItemReader 需要一个 PagingQueryProvider,负责提供用于检索构成页的行的 SQL 查询。由于每个数据库都有自己的提供分页支持的策略,我们需要为每种支持的数据库类型使用不同的 PagingQueryProvider。还有一个 SqlPagingQueryProviderFactoryBean,它可以自动检测正在使用的数据库并确定合适的 PagingQueryProvider 实现。这简化了配置,是推荐的最佳实践。

SqlPagingQueryProviderFactoryBean 要求您指定一个 select 子句和一个 from 子句。您还可以提供一个可选的 where 子句。这些子句和必需的 sortKey 用于构建一个 SQL 语句。

sortKey 上具有唯一键约束很重要,以保证在执行之间不会丢失数据。

读取器打开后,它会像任何其他 ItemReader 一样,每次调用 read 都返回一个 Item。当需要更多行时,分页在后台发生。

  • Java

  • XML

以下 Java 示例配置使用与之前展示的基于游标的 ItemReaders 类似的“客户信用”示例:

Java 配置
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
	Map<String, Object> parameterValues = new HashMap<>();
	parameterValues.put("status", "NEW");

	return new JdbcPagingItemReaderBuilder<CustomerCredit>()
           				.name("creditReader")
           				.dataSource(dataSource)
           				.queryProvider(queryProvider)
           				.parameterValues(parameterValues)
           				.rowMapper(customerCreditMapper())
           				.pageSize(1000)
           				.build();
}

@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
	SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

	provider.setSelectClause("select id, name, credit");
	provider.setFromClause("from customer");
	provider.setWhereClause("where status=:status");
	provider.setSortKey("id");

	return provider;
}

以下 XML 示例配置使用与之前展示的基于游标的 ItemReaders 类似的“客户信用”示例:

XML 配置
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="queryProvider">
        <bean class="org.spr...SqlPagingQueryProviderFactoryBean">
            <property name="selectClause" value="select id, name, credit"/>
            <property name="fromClause" value="from customer"/>
            <property name="whereClause" value="where status=:status"/>
            <property name="sortKey" value="id"/>
        </bean>
    </property>
    <property name="parameterValues">
        <map>
            <entry key="status" value="NEW"/>
        </map>
    </property>
    <property name="pageSize" value="1000"/>
    <property name="rowMapper" ref="customerMapper"/>
</bean>

此配置的 ItemReader 使用 RowMapper 返回 CustomerCredit 对象,RowMapper 必须指定。pageSize 属性确定在每次查询运行时从数据库读取的实体数量。

parameterValues 属性可用于指定参数值的 Map。如果在 where 子句中使用命名参数,则每个条目的键应该与命名参数的名称匹配。如果使用传统的“?”占位符,那么每个条目的键应该是指占位符的序号,从 1 开始。

JpaPagingItemReader

JpaPagingItemReader 是基于分页的 ItemReader 的另一个实现。JPA 没有类似于 Hibernate StatelessSession 的概念,所以我们必须使用 JPA 规范提供的其他特性。由于 JPA 支持分页,因此在使用 JPA 进行 Batch 处理时,这是一个自然的选择。读取每一页后,实体变为游离态,持久化上下文被清除,允许实体在页面处理完毕后被垃圾回收。

JpaPagingItemReader 允许您声明一个 JPQL 语句,并传入一个 EntityManagerFactory。然后它会像任何其他 ItemReader 一样,每次调用 read 都返回一个 Item。当需要更多实体时,分页在后台发生。

  • Java

  • XML

以下 Java 示例配置使用与之前展示的 JDBC 读取器相同的“客户信用”示例:

Java 配置
@Bean
public JpaPagingItemReader itemReader() {
	return new JpaPagingItemReaderBuilder<CustomerCredit>()
           				.name("creditReader")
           				.entityManagerFactory(entityManagerFactory())
           				.queryString("select c from CustomerCredit c")
           				.pageSize(1000)
           				.build();
}

以下 XML 示例配置使用与之前展示的 JDBC 读取器相同的“客户信用”示例:

XML 配置
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
    <property name="entityManagerFactory" ref="entityManagerFactory"/>
    <property name="queryString" value="select c from CustomerCredit c"/>
    <property name="pageSize" value="1000"/>
</bean>

此配置的 ItemReader 以与上面 JdbcPagingItemReader 描述的完全相同的方式返回 CustomerCredit 对象,假设 CustomerCredit 对象具有正确的 JPA 注解或 ORM 映射文件。pageSize 属性确定在每次查询执行时从数据库读取的实体数量。

数据库 ItemWriter

虽然平面文件和 XML 文件都有特定的 ItemWriter 实例,但在数据库领域没有完全等价的。这是因为事务提供了所有需要的功能。文件需要 ItemWriter 实现,因为它们必须表现得像事务一样,跟踪已写入的 Item 并在适当的时间执行 flush 或 clear。数据库不需要此功能,因为写入操作已经包含在事务中。用户可以创建自己的实现 ItemWriter 接口的 DAO,或者使用来自自定义 ItemWriter 的一个,该自定义 ItemWriter 是为通用处理关注点编写的。无论哪种方式,它们都应该正常工作。需要注意的一点是批量写入输出提供的性能和错误处理能力。这在使用 hibernate 作为 ItemWriter 时最常见,但在使用 JDBC batch 模式时也可能出现相同问题。批量写入数据库输出本身没有缺陷,前提是我们小心地执行 flush 并且数据中没有错误。然而,写入时出现的任何错误都可能导致困惑,因为无法知道是哪个单独的 Item 导致了异常,甚至无法知道是否有任何单独的 Item 负责,如下图所示:

Error On Flush
图 2. Flush 错误

如果在写入前缓冲 Item,直到缓冲区在提交之前被 flush 之前不会抛出任何错误。例如,假设每个 chunk 写入 20 个 Item,第 15 个 Item 抛出了 DataIntegrityViolationException。就 Step 而言,所有 20 个 Item 都成功写入,因为直到它们实际写入时才知道发生了错误。一旦调用 Session#flush(),缓冲区被清空,异常被触发。此时,Step 无能为力。事务必须回滚。通常,此异常可能会导致 Item 被跳过(取决于跳过/重试策略),然后它不会再次写入。然而,在批量处理场景中,无法知道是哪个 Item 导致了问题。发生故障时,整个缓冲区正在写入。解决此问题的唯一方法是在每个 Item 之后进行 flush,如下图所示:

Error On Write
图 3. 写入错误

这是一个常见的用例,尤其是在使用 Hibernate 时,ItemWriter 实现的简单指导原则是在每次调用 write() 时执行 flush。这样做可以可靠地跳过 Item,由 Spring Batch 在内部处理发生错误后对 ItemWriter 的调用的粒度。