数据库

与大多数企业应用程序样式一样,数据库是批处理的中心存储机制。但是,批处理与其他应用程序样式不同,因为它必须处理的数据集非常庞大。如果 SQL 语句返回 100 万行,结果集可能会将所有返回的结果保存在内存中,直到读取所有行。Spring Batch 为此问题提供了两种类型的解决方案

基于游标的 ItemReader 实现

使用数据库游标通常是大多数批处理开发人员的默认方法,因为它就是数据库针对“流式传输”关系数据问题所提供的解决方案。Java ResultSet 类本质上就是用于操作游标的面向对象机制。ResultSet 维护当前数据行的游标。对 ResultSet 调用 next 会将此游标移动到下一行。基于游标的 Spring Batch ItemReader 实现会在初始化时打开一个游标,并在每次调用 read 时将游标向前移动一行,返回一个可用于处理的映射对象。然后调用 close 方法以确保释放所有资源。Spring 核心 JdbcTemplate 通过使用回调模式完全映射 ResultSet 中的所有行并关闭来解决此问题,然后才将控制权返回给方法调用者。但是,在批处理中,必须等到该步骤完成。下图显示了基于游标的 ItemReader 的通用示意图。请注意,虽然示例中使用了 SQL(因为 SQL 非常流行),但任何技术都可以实现这种基本方法。

Cursor Example
图 1. 游标示例

此示例说明了基本模式。给定一个具有三个列的“FOO”表:IDNAMEBAR,选择所有 ID 大于 1 但小于 7 的行。这会将游标的开头(第 1 行)放在 ID 2 上。此行的结果应该是完全映射的 Foo 对象。再次调用 read() 会将游标移动到下一行,即 ID 为 3 的 Foo。每次 read 之后都会写出这些读取结果,从而允许对对象进行垃圾回收(假设没有实例变量维护对它们的引用)。

JdbcCursorItemReader

JdbcCursorItemReader 是基于游标技术的 JDBC 实现。它直接使用 ResultSet,并要求对从 DataSource 获取的连接运行 SQL 语句。以下数据库架构用作示例

CREATE TABLE CUSTOMER (
   ID BIGINT IDENTITY PRIMARY KEY,
   NAME VARCHAR(45),
   CREDIT FLOAT
);

许多人更喜欢对每一行使用一个域对象,因此以下示例使用 RowMapper 接口的一个实现来映射一个 CustomerCredit 对象

public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {

    public static final String ID_COLUMN = "id";
    public static final String NAME_COLUMN = "name";
    public static final String CREDIT_COLUMN = "credit";

    public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
        CustomerCredit customerCredit = new CustomerCredit();

        customerCredit.setId(rs.getInt(ID_COLUMN));
        customerCredit.setName(rs.getString(NAME_COLUMN));
        customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));

        return customerCredit;
    }
}

因为 JdbcCursorItemReaderJdbcTemplate 共享关键接口,所以有必要了解一个使用 JdbcTemplate 读取此数据的示例,以便将其与 ItemReader 进行对比。对于此示例的目的,假设 CUSTOMER 数据库中有 1,000 行。第一个示例使用 JdbcTemplate

//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
                                          new CustomerCreditRowMapper());

在运行前一个代码片段后,customerCredits 列表包含 1,000 个 CustomerCredit 对象。在查询方法中,从 DataSource 获取一个连接,针对该连接运行提供的 SQL,并对 ResultSet 中的每一行调用 mapRow 方法。将其与 JdbcCursorItemReader 的方法进行对比,如下例所示

JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close();

在运行前一个代码片段后,计数器等于 1,000。如果上面的代码将返回的 customerCredit 放入一个列表,则结果将与 JdbcTemplate 示例完全相同。但是,ItemReader 的一大优势在于,它允许“流式传输”项目。可以调用一次 read 方法,项目可以由 ItemWriter 写出,然后可以使用 read 获取下一个项目。这允许分“块”读取和写入项目并定期提交,这是高性能批处理的本质。此外,可以轻松配置它以注入 Spring Batch Step

  • Java

  • XML

以下示例展示了如何在 Java 中将 ItemReader 注入到 Step

Java 配置
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
	return new JdbcCursorItemReaderBuilder<CustomerCredit>()
			.dataSource(this.dataSource)
			.name("creditReader")
			.sql("select ID, NAME, CREDIT from CUSTOMER")
			.rowMapper(new CustomerCreditRowMapper())
			.build();

}

以下示例展示了如何在 XML 中将 ItemReader 注入到 Step

XML 配置
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

其他属性

因为在 Java 中打开游标有许多不同的选项,所以 JdbcCursorItemReader 上有许多属性可以设置,如下表所述

表 1. JdbcCursorItemReader 属性

ignoreWarnings

确定是否记录 SQLWarnings 或导致异常。默认值为 true(表示记录警告)。

fetchSize

ItemReader使用的ResultSet对象需要更多行时,给JDBC驱动程序一个提示,提示从数据库中提取的行数。默认情况下,不给出提示。

maxRows

设置底层ResultSet一次可以容纳的最大行数限制。

queryTimeout

设置驱动程序等待Statement对象运行的秒数。如果超过限制,将抛出DataAccessException。(有关详细信息,请查阅驱动程序供应商文档)。

verifyCursorPosition

由于ItemReader持有的相同ResultSet被传递给RowMapper,因此用户可以自己调用ResultSet.next(),这可能会导致读取器的内部计数出现问题。将此值设置为true会导致在RowMapper调用之后光标位置与调用之前不同时抛出异常。

saveState

指示是否应将读取器的状态保存在ItemStream#update(ExecutionContext)提供的ExecutionContext中。默认值为true

driverSupportsAbsolute

指示JDBC驱动程序是否支持在ResultSet上设置绝对行。建议将此设置为true,适用于支持ResultSet.absolute()的JDBC驱动程序,因为它可以提高性能,尤其是在处理大型数据集时某个步骤失败的情况下。默认为false

setUseSharedExtendedConnection

指示游标使用的连接是否应由所有其他处理使用,从而共享同一事务。如果将其设置为false,则游标将使用其自己的连接打开,并且不会参与为步骤处理的其余部分启动的任何事务。如果你将此标志设置为true,则必须将DataSource包装在ExtendedConnectionDataSourceProxy中,以防止在每次提交后关闭和释放连接。当你将此选项设置为true时,用于打开游标的语句将使用“READ_ONLY”和“HOLD_CURSORS_OVER_COMMIT”选项创建。这允许在事务开始和步骤处理中执行的提交上保持游标打开。要使用此功能,你需要一个支持此功能的数据库和一个支持JDBC 3.0或更高版本的JDBC驱动程序。默认为false

HibernateCursorItemReader

就像普通的 Spring 用户对是否使用 ORM 解决方案做出重要决定一样,这会影响他们是否使用 JdbcTemplateHibernateTemplate,Spring Batch 用户也有同样的选择。HibernateCursorItemReader 是游标技术的 Hibernate 实现。Hibernate 在批处理中的使用一直颇具争议。这在很大程度上是因为 Hibernate 最初是为支持在线应用程序样式而开发的。但是,这并不意味着它不能用于批处理。解决此问题的最简单方法是使用 StatelessSession 而不是标准会话。这会移除 Hibernate 使用的所有缓存和脏数据检查,而这可能会在批处理场景中造成问题。有关无状态和普通 Hibernate 会话之间差异的更多信息,请参阅特定 Hibernate 版本的文档。HibernateCursorItemReader 允许您声明一个 HQL 语句并传入一个 SessionFactory,它将以与 JdbcCursorItemReader 相同的基本方式每次调用返回一个项目。以下示例配置使用与 JDBC 读取器相同的“客户信用”示例

HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
itemReader.setQueryString("from CustomerCredit");
//For simplicity sake, assume sessionFactory already obtained.
itemReader.setSessionFactory(sessionFactory);
itemReader.setUseStatelessSession(true);
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close();

此配置的 ItemReader 以与 JdbcCursorItemReader 描述完全相同的方式返回 CustomerCredit 对象,假设已为 Customer 表正确创建了 Hibernate 映射文件。“useStatelessSession”属性默认为 true,但已在此处添加以引起对打开或关闭它的功能的注意。还值得注意的是,底层游标的提取大小可以用 setFetchSize 属性设置。与 JdbcCursorItemReader 一样,配置非常简单。

  • Java

  • XML

以下示例展示如何在 Java 中注入 Hibernate ItemReader

Java 配置
@Bean
public HibernateCursorItemReader itemReader(SessionFactory sessionFactory) {
	return new HibernateCursorItemReaderBuilder<CustomerCredit>()
			.name("creditReader")
			.sessionFactory(sessionFactory)
			.queryString("from CustomerCredit")
			.build();
}

以下示例展示如何在 XML 中注入 Hibernate ItemReader

XML 配置
<bean id="itemReader"
      class="org.springframework.batch.item.database.HibernateCursorItemReader">
    <property name="sessionFactory" ref="sessionFactory" />
    <property name="queryString" value="from CustomerCredit" />
</bean>

StoredProcedureItemReader

有时需要使用存储过程获取游标数据。StoredProcedureItemReader 的工作方式类似于 JdbcCursorItemReader,不同之处在于,它不是运行查询来获取游标,而是运行返回游标的存储过程。存储过程可以通过三种不同的方式返回游标

  • 作为返回的 ResultSet(由 SQL Server、Sybase、DB2、Derby 和 MySQL 使用)。

  • 作为以输出参数返回的 ref-cursor(由 Oracle 和 PostgreSQL 使用)。

  • 作为存储函数调用的返回值。

  • Java

  • XML

以下 Java 示例配置使用与前面示例相同的“客户信用”示例

Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());

	return reader;
}

以下 XML 示例配置使用与前面示例相同的“客户信用”示例

XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

前面的示例依赖于存储过程提供一个 ResultSet 作为返回的结果(前面选项 1)。

如果存储过程返回一个 ref-cursor(选项 2),那么我们需要提供返回的 ref-cursor 的输出参数的位置。

  • Java

  • XML

以下示例演示如何使用 Java 中的第一个参数作为 ref-cursor

Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());
	reader.setRefCursorPosition(1);

	return reader;
}

以下示例演示如何使用 XML 中的第一个参数作为 ref-cursor

XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

如果游标是从存储函数返回的(选项 3),我们需要将属性 "function" 设置为 true。其默认值为 false

  • Java

  • XML

以下示例演示在 Java 中将属性设置为 true

Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());
	reader.setFunction(true);

	return reader;
}

以下示例演示在 XML 中将属性设置为 true

XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="function" value="true"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

在所有这些情况下,我们需要定义一个 RowMapper 以及一个 DataSource 和实际的过程名称。

如果存储过程或函数采用参数,则必须使用 parameters 属性声明并设置这些参数。以下示例针对 Oracle 声明了三个参数。第一个参数是返回 ref-cursor 的 out 参数,第二个和第三个参数是采用类型为 INTEGER 的值的输入参数。

  • Java

  • XML

以下示例演示如何在 Java 中使用参数

Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	List<SqlParameter> parameters = new ArrayList<>();
	parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
	parameters.add(new SqlParameter("amount", Types.INTEGER);
	parameters.add(new SqlParameter("custId", Types.INTEGER);

	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("spring.cursor_func");
	reader.setParameters(parameters);
	reader.setRefCursorPosition(1);
	reader.setRowMapper(rowMapper());
	reader.setPreparedStatementSetter(parameterSetter());

	return reader;
}

以下示例演示如何在 XML 中使用参数

XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="spring.cursor_func"/>
    <property name="parameters">
        <list>
            <bean class="org.springframework.jdbc.core.SqlOutParameter">
                <constructor-arg index="0" value="newid"/>
                <constructor-arg index="1">
                    <util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="amount"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="custid"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
        </list>
    </property>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper" ref="rowMapper"/>
    <property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>

除了参数声明之外,我们还需要指定一个 PreparedStatementSetter 实现,该实现为调用设置参数值。这与上面针对 JdbcCursorItemReader 的工作方式相同。在 其他属性 中列出的所有其他属性也适用于 StoredProcedureItemReader

分页 ItemReader 实现

使用数据库游标的替代方法是运行多个查询,其中每个查询都会获取结果的一部分。我们将此部分称为页面。每个查询都必须指定起始行号和我们希望在页面中返回的行数。

JdbcPagingItemReader

分页 ItemReader 的一种实现是 JdbcPagingItemReaderJdbcPagingItemReader 需要一个 PagingQueryProvider,负责提供用于检索构成页面的行的 SQL 查询。由于每个数据库都有自己的分页支持策略,因此我们需要针对每个支持的数据库类型使用不同的 PagingQueryProvider。还有 SqlPagingQueryProviderFactoryBean,它会自动检测正在使用的数据库并确定适当的 PagingQueryProvider 实现。这简化了配置,并且是推荐的最佳做法。

SqlPagingQueryProviderFactoryBean 要求您指定一个 select 子句和一个 from 子句。您还可以提供一个可选的 where 子句。这些子句和必需的 sortKey 用于构建 SQL 语句。

sortKey 上设置唯一键约束非常重要,以保证在执行之间不会丢失任何数据。

在打开读取器之后,它会以与任何其他 ItemReader 相同的基本方式在每次调用 read 时回传一个项目。当需要其他行时,分页会在后台发生。

  • Java

  • XML

以下 Java 示例配置使用与前面所示基于游标的 ItemReaders 相似的“客户信用”示例

Java 配置
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
	Map<String, Object> parameterValues = new HashMap<>();
	parameterValues.put("status", "NEW");

	return new JdbcPagingItemReaderBuilder<CustomerCredit>()
           				.name("creditReader")
           				.dataSource(dataSource)
           				.queryProvider(queryProvider)
           				.parameterValues(parameterValues)
           				.rowMapper(customerCreditMapper())
           				.pageSize(1000)
           				.build();
}

@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
	SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

	provider.setSelectClause("select id, name, credit");
	provider.setFromClause("from customer");
	provider.setWhereClause("where status=:status");
	provider.setSortKey("id");

	return provider;
}

以下 XML 示例配置使用与前面所示基于游标的 ItemReaders 相似的“客户信用”示例

XML 配置
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="queryProvider">
        <bean class="org.spr...SqlPagingQueryProviderFactoryBean">
            <property name="selectClause" value="select id, name, credit"/>
            <property name="fromClause" value="from customer"/>
            <property name="whereClause" value="where status=:status"/>
            <property name="sortKey" value="id"/>
        </bean>
    </property>
    <property name="parameterValues">
        <map>
            <entry key="status" value="NEW"/>
        </map>
    </property>
    <property name="pageSize" value="1000"/>
    <property name="rowMapper" ref="customerMapper"/>
</bean>

此配置的 ItemReader 使用必须指定的 RowMapper 返回 CustomerCredit 对象。“pageSize”属性确定每次查询运行时从数据库读取的实体数量。

“parameterValues”属性可用于为查询指定参数值的 Map。如果在 where 子句中使用命名参数,则每个条目的键应与命名参数的名称匹配。如果使用传统的“?”占位符,则每个条目的键应为占位符的编号,从 1 开始。

JpaPagingItemReader

分页 ItemReader 的另一个实现是 JpaPagingItemReader。JPA 没有类似于 Hibernate StatelessSession 的概念,因此我们必须使用 JPA 规范提供的其他功能。由于 JPA 支持分页,因此在将 JPA 用于批处理时,这是一个自然选择。在读取每一页后,实体将分离,并清除持久性上下文,以便在处理该页后对实体进行垃圾回收。

JpaPagingItemReader 允许您声明一个 JPQL 语句,并传入一个 EntityManagerFactory。然后,它会以与任何其他 ItemReader 相同的基本方式在每次调用读取时回传一个项目。当需要其他实体时,分页会在后台发生。

  • Java

  • XML

以下 Java 示例配置使用与前面所示 JDBC 读取器相同的“客户信用”示例

Java 配置
@Bean
public JpaPagingItemReader itemReader() {
	return new JpaPagingItemReaderBuilder<CustomerCredit>()
           				.name("creditReader")
           				.entityManagerFactory(entityManagerFactory())
           				.queryString("select c from CustomerCredit c")
           				.pageSize(1000)
           				.build();
}

以下 XML 示例配置使用与前面所示 JDBC 读取器相同的“客户信用”示例

XML 配置
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
    <property name="entityManagerFactory" ref="entityManagerFactory"/>
    <property name="queryString" value="select c from CustomerCredit c"/>
    <property name="pageSize" value="1000"/>
</bean>

此配置的 ItemReader 返回 CustomerCredit 对象的方式与上面为 JdbcPagingItemReader 描述的方式完全相同,假设 CustomerCredit 对象具有正确的 JPA 注释或 ORM 映射文件。“pageSize”属性确定每次查询执行时从数据库读取的实体数量。

数据库 ItemWriters

虽然平面文件和 XML 文件都有一个特定的 ItemWriter 实例,但在数据库世界中没有完全等效的实例。这是因为事务提供了所有所需的功能。ItemWriter 实现对于文件是必需的,因为它们必须表现得好像它们是事务性的,跟踪已写入的项目并在适当的时候刷新或清除。数据库不需要此功能,因为写入已包含在事务中。用户可以创建自己的实现 ItemWriter 接口的 DAO,或使用为通用处理问题编写的自定义 ItemWriter 中的一个。无论哪种方式,它们都应该毫无问题地工作。需要注意的一件事是通过批处理输出提供的性能和错误处理功能。在将 Hibernate 用作 ItemWriter 时最常见,但在使用 JDBC 批处理模式时也可能遇到相同的问题。批处理数据库输出没有任何固有缺陷,假设我们小心刷新并且数据中没有错误。但是,写入时发生的任何错误都可能造成混淆,因为无法知道哪个单独的项目导致了异常,甚至无法知道是否有任何单独的项目负责,如下图所示

Error On Flush
图 2. 刷新时出错

如果在写入之前对项目进行缓冲,则在提交之前刷新缓冲区之前不会引发任何错误。例如,假设每个块写入 20 个项目,并且第 15 个项目抛出 DataIntegrityViolationException。就 Step 而言,所有 20 个项目都已成功写入,因为在实际写入之前无法知道发生了错误。一旦调用 Session#flush(),缓冲区将被清空,并且会命中异常。此时,Step 无能为力。必须回滚事务。通常,此异常可能会导致跳过该项目(取决于跳过/重试策略),然后不会再次写入该项目。但是,在批处理场景中,无法知道哪个项目导致了问题。当故障发生时,整个缓冲区正在被写入。解决此问题的唯一方法是在每个项目后刷新,如下图所示

Error On Write
图 3. 写入时出错

这是一个常见的用例,尤其是在使用 Hibernate 时,并且 ItemWriter 实现的简单准则是对 write() 的每次调用进行刷新。这样做允许可靠地跳过项目,Spring Batch 在错误后在内部处理对 ItemWriter 的调用的粒度。