批处理的领域语言

对于任何经验丰富的批处理架构师来说,Spring Batch 中使用的批处理总体概念都应该是熟悉和舒适的。存在“作业”和“步骤”,以及开发人员提供的称为 ItemReaderItemWriter 的处理单元。但是,由于 Spring 模式、操作、模板、回调和习惯用法,因此存在以下机会

  • 显著提高对明确关注点分离的遵守。

  • 清晰界定的架构层和作为接口提供的服务。

  • 简单且默认的实现,允许开箱即用地快速采用和易用性。

  • 显著增强的可扩展性。

下图是批处理参考架构的简化版本,该架构已使用了几十年。它概述了构成批处理领域语言的组件。此架构框架是一个蓝图,已通过在过去几代平台(大型机上的 COBOL、Unix 上的 C,以及现在的任何地方的 Java)上的数十次实施得到验证。JCL 和 COBOL 开发人员可能与 C、C# 和 Java 开发人员一样熟悉这些概念。Spring Batch 提供了在健壮、可维护的系统中通常发现的层、组件和技术服务的物理实现,这些系统用于解决从简单到复杂的批处理应用程序的创建,以及解决非常复杂的处理需求的基础设施和扩展。

Figure 2.1: Batch Stereotypes
图 1. 批处理原型

上图突出显示了构成 Spring Batch 领域语言的关键概念。一个 Job 包含一个到多个步骤,每个步骤正好有一个 ItemReader、一个 ItemProcessor 和一个 ItemWriter。作业需要启动(使用 JobLauncher),并且需要存储有关当前正在运行的流程的元数据(在 JobRepository 中)。

作业

本节描述与批处理作业概念相关的原型。一个 Job 是一个封装整个批处理过程的实体。与其他 Spring 项目一样,Job 使用 XML 配置文件或基于 Java 的配置连接在一起。此配置可能被称为“作业配置”。但是,Job 只是整体层次结构的顶层,如下面的图所示

Job Hierarchy
图 2. 作业层次结构

在 Spring Batch 中,Job 只是一个 Step 实例的容器。它组合了在流程中逻辑上属于一起的多个步骤,并允许配置全局适用于所有步骤的属性,例如可重启性。作业配置包含

  • 作业的名称。

  • Step 实例的定义和排序。

  • 作业是否可重启。

  • Java

  • XML

对于使用 Java 配置的用户,Spring Batch 以 SimpleJob 类的方式提供了 Job 接口的默认实现,该类在 Job 之上创建了一些标准功能。当使用基于 Java 的配置时,将提供一系列构建器用于实例化 Job,如下面的示例所示

@Bean
public Job footballJob(JobRepository jobRepository) {
    return new JobBuilder("footballJob", jobRepository)
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .build();
}

对于使用 XML 配置的用户,Spring Batch 以 SimpleJob 类的方式提供了 Job 接口的默认实现,该类在 Job 之上创建了一些标准功能。但是,批处理命名空间隐藏了直接实例化它的需要。相反,您可以使用 <job> 元素,如下面的示例所示

<job id="footballJob">
    <step id="playerload" next="gameLoad"/>
    <step id="gameLoad" next="playerSummarization"/>
    <step id="playerSummarization"/>
</job>

JobInstance

JobInstance 代表一个逻辑作业运行的概念。考虑一个应该在每天结束时运行一次的批处理作业,例如前面图表中的 EndOfDay Job。只有一个 EndOfDay 作业,但 Job 的每次单独运行都必须单独跟踪。在此作业的情况下,每天有一个逻辑 JobInstance。例如,有一个 1 月 1 日的运行,一个 1 月 2 日的运行,依此类推。如果 1 月 1 日的运行第一次失败并在第二天再次运行,它仍然是 1 月 1 日的运行。(通常,这也与它正在处理的数据相对应,这意味着 1 月 1 日的运行处理 1 月 1 日的数据)。因此,每个 JobInstance 可以有多个执行(JobExecution 在本章后面详细讨论),并且在给定时间只能运行一个 JobInstance(对应于特定的 Job 和标识 JobParameters)。

JobInstance 的定义与要加载的数据绝对没有关系。完全由 ItemReader 实现来决定如何加载数据。例如,在 EndOfDay 场景中,数据上可能有一列指示数据所属的 有效日期计划日期。因此,1 月 1 日的运行将只加载第 1 天的数据,而 1 月 2 日的运行将只使用第 2 天的数据。由于此确定很可能是一个业务决策,因此由 ItemReader 决定。但是,使用相同的 JobInstance 决定是否使用先前执行的“状态”(即 ExecutionContext,将在本章后面讨论)。使用新的 JobInstance 表示“从头开始”,而使用现有实例通常表示“从上次中断的地方开始”。

JobParameters

在讨论了 JobInstance 及其与 Job 的区别之后,自然会问:“如何区分一个 JobInstance 与另一个?”答案是:JobParametersJobParameters 对象保存一组用于启动批处理作业的参数。它们可以用于识别,甚至可以在运行期间用作参考数据,如下面的图片所示

Job Parameters
图 3. 作业参数

在前面的示例中,有两个实例,一个用于 1 月 1 日,另一个用于 1 月 2 日,实际上只有一个 Job,但它有两个 JobParameter 对象:一个以 01-01-2017 的作业参数启动,另一个以 01-02-2017 的参数启动。因此,可以将约定定义为:JobInstance = Job + 标识 JobParameters。这允许开发人员有效地控制 JobInstance 的定义方式,因为他们控制传入的参数。

并非所有作业参数都需要有助于 JobInstance 的识别。默认情况下,它们会这样做。但是,框架也允许提交不有助于 JobInstance 身份识别的参数的 Job

JobExecution

JobExecution 指的是运行作业的单个尝试的技术概念。执行可能以失败或成功结束,但除非执行成功完成,否则与给定执行相对应的 JobInstance 不被认为已完成。以前面描述的 EndOfDay Job 为例,考虑一个 01-01-2017 的 JobInstance,它在第一次运行时失败了。如果使用与第一次运行相同的标识作业参数(01-01-2017)再次运行它,则会创建一个新的 JobExecution。但是,仍然只有一个 JobInstance

Job 定义作业是什么以及如何执行,而 JobInstance 是一个纯粹的组织对象,用于将执行分组在一起,主要目的是启用正确的重启语义。但是,JobExecution 是实际运行过程中发生情况的主要存储机制,并且包含许多必须控制和持久化的属性,如下表所示

表 1. JobExecution 属性

属性

定义

状态

一个 BatchStatus 对象,指示执行的状态。在运行时,它是 BatchStatus#STARTED。如果失败,则为 BatchStatus#FAILED。如果成功完成,则为 BatchStatus#COMPLETED

startTime

一个 java.time.LocalDateTime,表示执行开始时的当前系统时间。如果作业尚未启动,则此字段为空。

endTime

一个 java.time.LocalDateTime,表示执行完成时的当前系统时间,无论其是否成功。如果作业尚未完成,则此字段为空。

exitStatus

ExitStatus,指示运行的结果。它非常重要,因为它包含返回给调用者的退出代码。有关更多详细信息,请参见第 5 章。如果作业尚未完成,则此字段为空。

createTime

一个 java.time.LocalDateTime,表示 JobExecution 首次持久化时的当前系统时间。作业可能尚未启动(因此没有启动时间),但它始终具有 createTime,框架需要此属性来管理作业级 ExecutionContexts

lastUpdated

一个 java.time.LocalDateTime,表示 JobExecution 最后一次持久化的时间。如果作业尚未启动,则此字段为空。

executionContext

“属性包”,包含在执行之间需要持久化的任何用户数据。

failureExceptions

在执行 Job 期间遇到的异常列表。如果在 Job 失败期间遇到多个异常,这些异常可能很有用。

这些属性很重要,因为它们会持久化,并且可用于完全确定执行的状态。例如,如果 01-01 的 EndOfDay 作业在晚上 9:00 执行并在 9:30 失败,则在批处理元数据表中会创建以下条目

表 2. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

表 3. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID

TYPE_CD

KEY_NAME

DATE_VAL

IDENTIFYING

1

DATE

schedule.Date

2017-01-01

TRUE

表 4. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

为了清晰和格式化,列名可能已被缩写或删除。

现在作业已失败,假设需要整个晚上才能确定问题,因此“批处理窗口”现在已关闭。进一步假设窗口从晚上 9:00 开始,作业再次为 01-01 启动,从上次中断的地方开始,并在 9:30 成功完成。因为现在是第二天,所以必须运行 01-02 作业,它紧随其后在 9:31 启动,并在其正常的一小时时间内在 10:30 完成。除非两个作业可能尝试访问相同的数据,从而导致数据库级别的锁定问题,否则不需要一个 JobInstance 在另一个之后启动。完全由调度程序决定何时运行 Job。由于它们是单独的 JobInstances,因此 Spring Batch 不会尝试阻止它们同时运行。(尝试在另一个 JobInstance 正在运行时运行相同的 JobInstance 会导致抛出 JobExecutionAlreadyRunningException)。现在,JobInstanceJobParameters 表中应该都多了一个条目,JobExecution 表中也应该多两个条目,如下表所示

表 5. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

2

EndOfDayJob

表 6. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID

TYPE_CD

KEY_NAME

DATE_VAL

IDENTIFYING

1

DATE

schedule.Date

2017-01-01 00:00:00

TRUE

2

DATE

schedule.Date

2017-01-01 00:00:00

TRUE

3

DATE

schedule.Date

2017-01-02 00:00:00

TRUE

表 7. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

2

1

2017-01-02 21:00

2017-01-02 21:30

COMPLETED

3

2

2017-01-02 21:31

2017-01-02 22:29

COMPLETED

为了清晰和格式化,列名可能已被缩写或删除。

Step

Step 是一个域对象,封装了批处理作业的独立、顺序阶段。因此,每个 Job 完全由一个或多个步骤组成。Step 包含定义和控制实际批处理所需的所有信息。这是一个必要的模糊描述,因为任何给定 Step 的内容都由编写 Job 的开发人员自行决定。Step 可以像开发人员期望的那样简单或复杂。一个简单的 Step 可能将数据从文件加载到数据库中,需要很少或不需要代码(取决于使用的实现)。更复杂的 Step 可能具有作为处理的一部分应用的复杂业务规则。与 Job 一样,Step 有一个单独的 StepExecution,它与唯一的 JobExecution 相关联,如下面的图片所示

Figure 2.1: Job Hierarchy With Steps
图 4. 带有步骤的作业层次结构

StepExecution

StepExecution 表示执行 Step 的单个尝试。每次运行 Step 时都会创建一个新的 StepExecution,类似于 JobExecution。但是,如果步骤由于前面的步骤失败而无法执行,则不会为其持久化执行。仅当 Step 实际启动时才会创建 StepExecution

Step 执行由 StepExecution 类的对象表示。每个执行都包含对其相应步骤和 JobExecution 以及与事务相关的数据(例如提交和回滚计数以及开始和结束时间)的引用。此外,每个步骤执行都包含一个 ExecutionContext,其中包含开发人员需要在批处理运行之间持久化的任何数据,例如统计信息或重启所需的 state 信息。下表列出了 StepExecution 的属性

表 8. StepExecution 属性

属性

定义

状态

一个 BatchStatus 对象,指示执行的状态。在运行时,状态为 BatchStatus.STARTED。如果失败,状态为 BatchStatus.FAILED。如果成功完成,状态为 BatchStatus.COMPLETED

startTime

一个 java.time.LocalDateTime,表示执行开始时的当前系统时间。如果步骤尚未启动,则此字段为空。

endTime

一个 java.time.LocalDateTime,表示执行完成时的当前系统时间,无论其是否成功。如果步骤尚未退出,则此字段为空。

exitStatus

ExitStatus,指示执行的结果。它非常重要,因为它包含返回给调用者的退出代码。有关更多详细信息,请参见第 5 章。如果作业尚未退出,则此字段为空。

executionContext

“属性包”,包含在执行之间需要持久化的任何用户数据。

readCount

已成功读取的项目数。

writeCount

已成功写入的项目数。

commitCount

为此执行提交的事务数。

rollbackCount

Step 控制的业务事务回滚的次数。

readSkipCount

read 失败的次数,导致跳过项目。

processSkipCount

process 失败的次数,导致跳过项目。

filterCount

ItemProcessor 已“过滤”的项目数。

writeSkipCount

write 失败的次数,导致跳过项目。

ExecutionContext

ExecutionContext 代表一组键值对的集合,这些键值对由框架持久化和控制,为开发者提供了一个存储持久化状态的地方,该状态的作用域限定在 StepExecution 对象或 JobExecution 对象。(对于熟悉 Quartz 的人来说,它非常类似于 JobDataMap。)最佳使用示例是促进重启。以平面文件输入为例,在处理单个行时,框架会在提交点定期持久化 ExecutionContext。这样做可以让 ItemReader 存储其状态,以防在运行期间发生致命错误,甚至停电。只需要将当前读取的行数放入上下文(如下例所示),框架会完成其余工作。

executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());

Job 构造型部分中的 EndOfDay 示例为例,假设有一个步骤 loadData 将文件加载到数据库中。在第一次运行失败后,元数据表将如下所示

表 9. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

表 10. BATCH_JOB_EXECUTION_PARAMS

JOB_INST_ID

TYPE_CD

KEY_NAME

DATE_VAL

1

DATE

schedule.Date

2017-01-01

表 11. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

表 12. BATCH_STEP_EXECUTION

STEP_EXEC_ID

JOB_EXEC_ID

STEP_NAME

START_TIME

END_TIME

STATUS

1

1

loadData

2017-01-01 21:00

2017-01-01 21:30

FAILED

表 13. BATCH_STEP_EXECUTION_CONTEXT

STEP_EXEC_ID

SHORT_CONTEXT

1

{piece.count=40321}

在上述情况下,Step 运行了 30 分钟并处理了 40,321 个“片段”,在本例中,片段代表文件中的行。此值在每次提交前由框架更新,并且可以包含对应于 ExecutionContext 中条目的多行。在提交前收到通知需要使用各种 StepListener 实现(或 ItemStream)之一,这些实现将在本指南的后面部分详细讨论。与前面的示例一样,假设 Job 在第二天重新启动。当它重新启动时,上次运行的 ExecutionContext 中的值将从数据库中重建。当 ItemReader 打开时,它可以检查上下文是否有任何存储的状态,并从此处初始化自身,如下例所示。

if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
    log.debug("Initializing for restart. Restart data is: " + executionContext);

    long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));

    LineReader reader = getReader();

    Object record = "";
    while (reader.getPosition() < lineCount && record != null) {
        record = readLine();
    }
}

在这种情况下,在上述代码运行后,当前行数为 40,322,使 Step 可以从中断的地方重新开始。您还可以使用 ExecutionContext 来存储有关运行本身的需要持久化的统计信息。例如,如果一个平面文件包含跨多行的订单以供处理,则可能需要存储已处理的订单数量(这与读取的行数大不相同),以便在 Step 结束时发送一封电子邮件,邮件正文中包含已处理的订单总数。框架为开发者处理存储此信息,以将其正确地限定在单个 JobInstance 范围内。很难知道是否应该使用现有的 ExecutionContext。例如,使用上面的 EndOfDay 示例,当 01-01 运行第二次启动时,框架会识别出它是同一个 JobInstance,并在单个 Step 的基础上,从数据库中提取 ExecutionContext,并将其(作为 StepExecution 的一部分)传递给 Step 本身。相反,对于 01-02 运行,框架会识别出这是一个不同的实例,因此必须将空上下文传递给 Step。框架为开发者做出了许多此类判断,以确保在正确的时间向他们提供状态。还需要注意,在任何给定时间,每个 StepExecution 只有一个 ExecutionContextExecutionContext 的客户端应该小心,因为这会创建一个共享的键空间。因此,在放入值时应注意,以确保不会覆盖任何数据。但是,Step 在上下文中绝对不存储任何数据,因此无法对框架产生不利影响。

请注意,每个 JobExecution 至少有一个 ExecutionContext,并且每个 StepExecution 也都有一个。例如,考虑以下代码片段

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob

如注释中所述,ecStep 不等于 ecJob。它们是两个不同的 ExecutionContext。限定在 Step 范围内的那个会在 Step 的每个提交点保存,而限定在 Job 范围内的那个会在每个 Step 执行之间保存。

ExecutionContext 中,所有非瞬态条目都必须是 Serializable。执行上下文的正确序列化是步骤和作业重启功能的基础。如果您使用非原生可序列化的键或值,则需要采用定制的序列化方法。未能序列化执行上下文可能会危及状态持久化过程,导致失败的作业无法正确恢复。

JobRepository

JobRepository 是前面提到的所有构造型的持久化机制。它为 JobLauncherJobStep 实现提供了 CRUD 操作。当 Job 首次启动时,会从存储库中获取 JobExecution。此外,在执行过程中,StepExecutionJobExecution 实现会通过传递到存储库来持久化。

  • Java

  • XML

在使用 Java 配置时,@EnableBatchProcessing 注解会提供一个 JobRepository 作为自动配置的组件之一。

Spring Batch XML 命名空间支持使用 <job-repository> 标签配置 JobRepository 实例,如下例所示

<job-repository id="jobRepository"/>

JobLauncher

JobLauncher 代表一个简单的接口,用于使用给定的 JobParameters 集启动 Job,如下例所示

public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

预期实现从 JobRepository 获取有效的 JobExecution 并执行 Job

ItemReader

ItemReader 是一个抽象,表示一次读取一个项目的 Step 输入。当 ItemReader 用尽它可以提供的项目时,它会通过返回 null 来指示这一点。您可以在 读者和编写器 中找到有关 ItemReader 接口及其各种实现的更多详细信息。

ItemWriter

ItemWriter 是一个抽象,表示一次写入一批或一块项目的 Step 输出。通常,ItemWriter 不知道它应该接收哪个输入,只知道在其当前调用中传递的项目。您可以在 读者和编写器 中找到有关 ItemWriter 接口及其各种实现的更多详细信息。

ItemProcessor

ItemProcessor 是一个抽象,表示项目的业务处理。虽然 ItemReader 读取一个项目,而 ItemWriter 写入一个项目,但 ItemProcessor 提供了一个访问点来转换或应用其他业务处理。如果在处理项目时确定项目无效,则返回 null 表示不应写入该项目。您可以在 读者和编写器 中找到有关 ItemProcessor 接口的更多详细信息。

批处理命名空间

前面列出的许多领域概念都需要在 Spring ApplicationContext 中进行配置。虽然您可以使用上述接口的实现来在标准 bean 定义中使用,但为了方便配置,提供了一个命名空间,如下例所示

<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   https://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/batch
   https://www.springframework.org/schema/batch/spring-batch.xsd">

<job id="ioSampleJob">
    <step id="step1">
        <tasklet>
            <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
        </tasklet>
    </step>
</job>

</beans:beans>

只要声明了批处理命名空间,就可以使用其任何元素。您可以在 配置和运行作业 中找到有关配置作业的更多信息。您可以在 配置步骤 中找到有关配置 Step 的更多信息。