批处理的领域语言

对于任何经验丰富的批处理架构师来说,Spring Batch 中使用的批处理总体概念应该是熟悉且舒适的。有 “Jobs” 和 “Steps”,以及开发者提供的处理单元 ItemReaderItemWriter。然而,由于 Spring 的模式、操作、模板、回调和惯用法,为以下方面提供了机会

  • 显著改进了清晰的关注点分离原则的应用。

  • 清晰划分的架构层以及作为接口提供的服务。

  • 提供简单和默认的实现,以便快速采用和开箱即用。

  • 显著增强了可扩展性。

下图是批处理参考架构的简化版本,该架构已使用了数十年。它概述了构成批处理领域语言的组件。这个架构框架是一个蓝图,已在过去几代平台(大型机上的 COBOL、Unix 上的 C 以及现在的任何地方的 Java)上通过数十年的实现得到验证。JCL 和 COBOL 开发者可能与 C、C# 和 Java 开发者一样熟悉这些概念。Spring Batch 提供了健壮、可维护系统中常见的分层、组件和技术服务的物理实现,这些系统用于构建从简单到复杂的批处理应用,并提供基础设施和扩展来满足非常复杂的处理需求。

Figure 2.1: Batch Stereotypes
图 1. 批处理原型

前图强调了构成 Spring Batch 领域语言的关键概念。一个 Job 包含一个或多个 Step,每个 Step 恰好有一个 ItemReader、一个 ItemProcessor 和一个 ItemWriter。Job 需要通过 JobLauncher 启动,并且当前运行过程的元数据需要存储在 JobRepository 中。

Job

本节描述与批处理 Job 概念相关的原型。Job 是一个封装整个批处理过程的实体。与其他 Spring 项目一样,Job 可以通过 XML 配置文件或基于 Java 的配置进行连接。这种配置可能被称为“job 配置”。然而,Job 只是整个层次结构的顶部,如下图所示

Job Hierarchy
图 2. Job 层次结构

在 Spring Batch 中,Job 仅仅是 Step 实例的容器。它将逻辑上属于同一流程的多个 Step 组合在一起,并允许配置所有 Step 的全局属性,例如可重启性。job 配置包含

  • job 的名称。

  • Step 实例的定义和排序。

  • job 是否可重启。

  • Java

  • XML

对于使用 Java 配置的用户,Spring Batch 以 SimpleJob 类形式提供了 Job 接口的默认实现,它在 Job 的基础上创建了一些标准功能。在使用基于 Java 的配置时,提供了一系列构建器用于实例化 Job,如下例所示

@Bean
public Job footballJob(JobRepository jobRepository) {
    return new JobBuilder("footballJob", jobRepository)
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .build();
}

对于使用 XML 配置的用户,Spring Batch 以 SimpleJob 类形式提供了 Job 接口的默认实现,它在 Job 的基础上创建了一些标准功能。然而,批处理命名空间抽象了直接实例化它的需要。取而代之的是,您可以使用 <job> 元素,如下例所示

<job id="footballJob">
    <step id="playerload" next="gameLoad"/>
    <step id="gameLoad" next="playerSummarization"/>
    <step id="playerSummarization"/>
</job>

JobInstance

JobInstance 指的是逻辑 job 运行的概念。考虑一个应在一天结束时运行的批处理 job,例如前图中的 EndOfDay Job。只有一个 EndOfDay job,但 Job 的每一次单独运行都必须单独跟踪。对于此 job,每天有一个逻辑 JobInstance。例如,有 1 月 1 日的运行、1 月 2 日的运行,依此类推。如果 1 月 1 日的运行第一次失败,并在第二天再次运行,它仍然是 1 月 1 日的运行。(通常,这与它处理的数据也相对应,意味着 1 月 1 日的运行处理 1 月 1 日的数据)。因此,每个 JobInstance 可以有多次执行(JobExecution 在本章后面会更详细地讨论),并且在给定时间只能运行一个 JobInstance(它对应于特定的 Job 和标识性 JobParameters)。

JobInstance 的定义与要加载的数据没有任何关系。完全取决于 ItemReader 的实现来决定如何加载数据。例如,在 EndOfDay 场景中,数据中可能有一个列指示数据所属的 effective dateschedule date。因此,1 月 1 日的运行仅加载 1 日的数据,而 1 月 2 日的运行仅使用 2 日的数据。由于这种决定很可能是一个业务决策,因此留给 ItemReader 来决定。然而,使用相同的 JobInstance 决定了是否使用之前执行的“状态”(即本章后面讨论的 ExecutionContext)。使用新的 JobInstance 意味着“从头开始”,而使用现有实例通常意味着“从上次停止的地方继续”。

JobParameters

讨论了 JobInstance 以及它与 Job 的区别后,自然会问的问题是:“如何区分不同的 JobInstance?”答案是:JobParameters。一个 JobParameters 对象包含用于启动批处理 job 的一组参数。这些参数可用于标识,甚至在运行期间用作参考数据,如下图所示

Job Parameters
图 3. Job Parameters

在前例中,有两个实例,一个对应 1 月 1 日,另一个对应 1 月 2 日,实际上只有一个 Job,但它有两个 JobParameter 对象:一个是用 job 参数 01-01-2017 启动的,另一个是用参数 01-02-2017 启动的。因此,契约可以定义为:JobInstance = Job + 标识性 JobParameters。这允许开发者有效控制 JobInstance 的定义方式,因为他们控制传入的参数。

并非所有 job 参数都必须用于标识 JobInstance。默认情况下是这样的。然而,框架也允许提交带有不用于标识 JobInstance 的参数的 Job

JobExecution

JobExecution 指的是运行 Job 的单次尝试的技术概念。一次执行可能以失败或成功告终,但除非执行成功完成,否则对应的 JobInstance 不被认为是完成的。以前面描述的 EndOfDay Job 为例,考虑 01-01-2017 的 JobInstance,它在第一次运行时失败了。如果使用与第一次运行相同的标识性 job 参数 (01-01-2017) 再次运行,则会创建一个新的 JobExecution。然而,仍然只有一个 JobInstance

Job 定义了 job 是什么以及如何执行,而 JobInstance 纯粹是一个组织对象,用于将执行组合在一起,主要是为了实现正确的重启语义。然而,JobExecution 是实际运行期间发生情况的主要存储机制,包含许多必须控制和持久化的属性,如下表所示

表 1. JobExecution 属性

属性

定义

状态

一个 BatchStatus 对象,指示执行的状态。运行时为 BatchStatus#STARTED。如果失败,则为 BatchStatus#FAILED。如果成功完成,则为 BatchStatus#COMPLETED

startTime

一个 java.time.LocalDateTime 对象,表示执行开始时的当前系统时间。如果 job 尚未开始,此字段为空。

endTime

一个 java.time.LocalDateTime 对象,表示执行完成时的当前系统时间,无论是否成功。如果 job 尚未完成,此字段为空。

exitStatus

ExitStatus,表示运行结果。它非常重要,因为它包含返回给调用者的退出代码。更多详情请参阅第 5 章。如果 job 尚未完成,此字段为空。

createTime

一个 java.time.LocalDateTime 对象,表示 JobExecution 首次持久化时的当前系统时间。Job 可能尚未开始(因此没有开始时间),但它始终有一个 createTime,这是框架管理 job 级别 ExecutionContexts 所必需的。

lastUpdated

一个 java.time.LocalDateTime 对象,表示 JobExecution 最后一次持久化时的当前系统时间。如果 job 尚未开始,此字段为空。

executionContext

包含需要在执行之间持久化的任何用户数据的“属性包”。

failureExceptions

执行 Job 期间遇到的异常列表。如果在 Job 失败期间遇到不止一个异常,这些异常会很有用。

这些属性很重要,因为它们会被持久化,并可用于完全确定执行的状态。例如,如果 01-01 的 EndOfDay job 在晚上 9:00 执行,并在 9:30 失败,则会在批处理元数据表中记录以下条目

表 2. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

表 3. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID

TYPE_CD

KEY_NAME

DATE_VAL

IDENTIFYING

1

DATE

schedule.Date

2017-01-01

TRUE

表 4. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

为了清晰和格式化,列名可能已被缩写或删除。

现在 job 已失败,假设确定问题花费了整个晚上,因此“批处理窗口”现已关闭。进一步假设窗口在晚上 9:00 开始,01-01 的 job 再次启动,从上次停止的地方继续,并在 9:30 成功完成。由于现在是第二天,01-02 的 job 也必须运行,它紧随其后在 9:31 启动,并在正常的一小时内于 10:30 完成。没有要求一个 JobInstance 必须在另一个之后启动,除非这两个 job 有可能尝试访问相同的数据,从而导致数据库级别的锁定问题。何时运行 Job 完全取决于调度器决定。由于它们是独立的 JobInstance,Spring Batch 不会尝试阻止它们并发运行。(尝试在另一个 JobInstance 正在运行时运行同一个 JobInstance 会抛出 JobExecutionAlreadyRunningException 异常)。现在,在 JobInstanceJobParameters 表中都应该有一个额外的条目,在 JobExecution 表中应该有两个额外的条目,如下表所示

表 5. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

2

EndOfDayJob

表 6. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID

TYPE_CD

KEY_NAME

DATE_VAL

IDENTIFYING

1

DATE

schedule.Date

2017-01-01 00:00:00

TRUE

2

DATE

schedule.Date

2017-01-01 00:00:00

TRUE

3

DATE

schedule.Date

2017-01-02 00:00:00

TRUE

表 7. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

2

1

2017-01-02 21:00

2017-01-02 21:30

COMPLETED

3

2

2017-01-02 21:31

2017-01-02 22:29

COMPLETED

为了清晰和格式化,列名可能已被缩写或删除。

Step

Step 是一个领域对象,封装了批处理 job 的一个独立的、顺序的阶段。因此,每个 Job 完全由一个或多个 Step 组成。Step 包含了定义和控制实际批处理所需的所有信息。这是一个必然含糊的描述,因为任何给定 Step 的内容取决于编写 Job 的开发人员的决定。Step 可以像开发人员希望的那样简单或复杂。一个简单的 Step 可能将数据从文件加载到数据库,几乎不需要编写代码(取决于使用的实现)。更复杂的 Step 可能包含作为处理一部分应用的复杂业务规则。与 Job 一样,Step 有一个单独的 StepExecution,它与一个唯一的 JobExecution 相关联,如下图所示

Figure 2.1: Job Hierarchy With Steps
图 4. 包含 Step 的 Job 层次结构

StepExecution

StepExecution 表示执行 Step 的单次尝试。每次运行 Step 时都会创建一个新的 StepExecution,这与 JobExecution 类似。但是,如果一个 step 由于其之前的 step 失败而未能执行,则不会为其持久化执行记录。StepExecution 仅在其 Step 实际启动时创建。

Step 执行由 StepExecution 类的对象表示。每次执行都包含对其对应 step 和 JobExecution 的引用以及事务相关数据,例如提交和回滚计数以及开始和结束时间。此外,每次 step 执行都包含一个 ExecutionContext,其中包含开发人员需要在批处理运行之间持久化的任何数据,例如统计信息或重启所需的状态信息。下表列出了 StepExecution 的属性

表 8. StepExecution 属性

属性

定义

状态

一个 BatchStatus 对象,指示执行的状态。运行时状态为 BatchStatus.STARTED。如果失败,状态为 BatchStatus.FAILED。如果成功完成,状态为 BatchStatus.COMPLETED

startTime

一个 java.time.LocalDateTime 对象,表示执行开始时的当前系统时间。如果 step 尚未开始,此字段为空。

endTime

一个 java.time.LocalDateTime 对象,表示执行完成时的当前系统时间,无论是否成功。如果 step 尚未退出,此字段为空。

exitStatus

ExitStatus,指示执行结果。它非常重要,因为它包含返回给调用者的退出代码。更多详情请参阅第 5 章。如果 job 尚未退出,此字段为空。

executionContext

包含需要在执行之间持久化的任何用户数据的“属性包”。

readCount

成功读取的项目数。

writeCount

成功写入的项目数。

commitCount

此执行已提交的事务数。

rollbackCount

Step 控制的业务事务回滚的次数。

readSkipCount

read 失败导致跳过项目的次数。

processSkipCount

process 失败导致跳过项目的次数。

filterCount

ItemProcessor“过滤”掉的项目数。

writeSkipCount

write 失败导致跳过项目的次数。

ExecutionContext

ExecutionContext 表示键/值对的集合,由框架持久化和控制,为开发人员提供一个存储与 StepExecution 对象或 JobExecution 对象作用域关联的持久状态的地方。(对于熟悉 Quartz 的人来说,它与 JobDataMap 非常相似。)最佳用法示例是辅助重启。以平面文件输入为例,在处理单行数据时,框架会在提交点定期持久化 ExecutionContext。这样做可以让 ItemReader 在运行期间发生致命错误或甚至断电时存储其状态。所需做的就是将当前读取的行数放入上下文中,如下例所示,其余工作由框架完成

executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());

使用 Job 原型部分中的 EndOfDay 示例,假设有一个 step,名为 loadData,它将文件加载到数据库中。第一次失败运行后,元数据表将如下例所示

表 9. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

表 10. BATCH_JOB_EXECUTION_PARAMS

JOB_INST_ID

TYPE_CD

KEY_NAME

DATE_VAL

1

DATE

schedule.Date

2017-01-01

表 11. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

表 12. BATCH_STEP_EXECUTION

STEP_EXEC_ID

JOB_EXEC_ID

STEP_NAME

START_TIME

END_TIME

STATUS

1

1

loadData

2017-01-01 21:00

2017-01-01 21:30

FAILED

表 13. BATCH_STEP_EXECUTION_CONTEXT

STEP_EXEC_ID

SHORT_CONTEXT

1

{piece.count=40321}

在前述情况下,Step 运行了 30 分钟,处理了 40,321 个“片段”,在此场景中表示文件中的行。该值由框架在每次提交之前更新,并可包含与 ExecutionContext 中的条目对应的多行。要在提交前收到通知,需要实现各种 StepListener 之一(或实现 ItemStream),这将在本指南后面更详细地讨论。与前面的示例一样,假设 Job 在第二天重启。重启时,上次运行的 ExecutionContext 中的值会从数据库中重建。当 ItemReader 打开时,它可以检查上下文中是否有存储的状态,并从中初始化自身,如下例所示

if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
    log.debug("Initializing for restart. Restart data is: " + executionContext);

    long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));

    LineReader reader = getReader();

    Object record = "";
    while (reader.getPosition() < lineCount && record != null) {
        record = readLine();
    }
}

在这种情况下,在前面的代码运行后,当前行是 40,322,允许 Step 从上次停止的地方再次开始。您还可以使用 ExecutionContext 来持久化关于运行本身的统计信息。例如,如果一个平面文件包含跨多行存在的待处理订单,则可能需要存储已处理的订单数(这与读取的行数大不相同),以便在 Step 结束时发送一封包含已处理订单总数的电子邮件。框架会为开发人员处理此信息的存储,以便将其正确地与单个 JobInstance 关联起来。很难知道是否应该使用现有的 ExecutionContext。例如,使用上面的 EndOfDay 示例,当 01-01 的运行第二次再次开始时,框架会识别出它是同一个 JobInstance,并在单个 Step 的基础上,从数据库中取出 ExecutionContext,并将其(作为 StepExecution 的一部分)传递给 Step 本身。相反,对于 01-02 的运行,框架会识别出它是一个不同的实例,因此必须将一个空的 context 传递给 Step。框架为开发人员做了许多此类判断,以确保在正确的时间将状态提供给他们。同样重要的是要注意,在任何给定时间,每个 StepExecution 恰好存在一个 ExecutionContextExecutionContext 的客户端应谨慎,因为这会创建一个共享的键空间。因此,在存入值时应小心,确保数据不会被覆盖。然而,Step 在 context 中完全不存储数据,因此无法对框架产生不利影响。

注意,每个 JobExecution 至少有一个 ExecutionContext,每个 StepExecution 也有一个。例如,考虑以下代码片段

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob

如注释中所述,ecStep 不等于 ecJob。它们是两个不同的 ExecutionContexts。作用域为 Step 的 Context 在 Step 的每个提交点保存,而作用域为 Job 的 Context 在每次 Step 执行之间保存。

ExecutionContext 中,所有非 transient 的条目必须是 Serializable。执行上下文的正确序列化是 steps 和 jobs 重启能力的基础。如果您使用的键或值不是 natively serializable,则需要采用定制的序列化方法。未能序列化执行上下文可能会危及状态持久化过程,导致失败的 jobs 无法正确恢复。

JobRepository

JobRepository 是前面提到的所有原型的持久化机制。它为 JobLauncherJobStep 实现提供了 CRUD 操作。首次启动 Job 时,会从 repository 获取一个 JobExecution。此外,在执行过程中,通过将 StepExecutionJobExecution 实现传递给 repository 来进行持久化。

  • Java

  • XML

使用 Java 配置时,@EnableBatchProcessing 注解提供了一个 JobRepository 作为自动配置的组件之一。

Spring Batch XML 命名空间支持使用 <job-repository> 标签配置 JobRepository 实例,如下例所示

<job-repository id="jobRepository"/>

JobLauncher

JobLauncher 表示一个简单的接口,用于使用给定的一组 JobParameters 启动 Job,如下例所示

public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

预期实现会从 JobRepository 获取有效的 JobExecution 并执行 Job

ItemReader

ItemReader 是一个抽象,表示为 Step 逐项检索输入。当 ItemReader 提供完所有项目时,它通过返回 null 来指示。关于 ItemReader 接口及其各种实现的更多详细信息,请参见Reader 和 Writer

ItemWriter

ItemWriter 是一个抽象,表示 Step 的输出,每次一个批次或一个块的项目。通常,ItemWriter 不知道接下来应该接收什么输入,只知道当前调用中传递的项目。关于 ItemWriter 接口及其各种实现的更多详细信息,请参见Reader 和 Writer

ItemProcessor

ItemProcessor 是一个抽象,表示项目的业务处理。ItemReader 读入一个项目,ItemWriter 写出一个项目,而 ItemProcessor 提供了一个访问点来转换或应用其他业务处理。如果在处理项目时确定该项目无效,返回 null 表示该项目不应被写出。关于 ItemProcessor 接口的更多详细信息,请参见Reader 和 Writer

批处理命名空间

前面列出的许多领域概念需要在 Spring ApplicationContext 中进行配置。虽然可以使用上述接口的实现进行标准 bean 定义,但为了便于配置,提供了一个命名空间,如下例所示

<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   https://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/batch
   https://www.springframework.org/schema/batch/spring-batch.xsd">

<job id="ioSampleJob">
    <step id="step1">
        <tasklet>
            <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
        </tasklet>
    </step>
</job>

</beans:beans>

只要声明了批处理命名空间,就可以使用其任何元素。关于配置 Job 的更多信息,请参见配置和运行 Job。关于配置 Step 的更多信息,请参见配置 Step