批处理的领域语言

对于任何有经验的批处理架构师来说,Spring Batch 中使用的批处理总体概念应该很熟悉和舒适。有“作业”和“步骤”,以及开发人员提供的称为ItemReaderItemWriter的处理单元。但是,由于 Spring 模式、操作、模板、回调和习惯用法,存在以下机会

  • 显着提高对明确关注点分离的遵守。

  • 明确划分的架构层和作为接口提供的服务。

  • 提供简单且默认的实现,以便于快速采用和开箱即用。

  • 显著增强了可扩展性。

以下图表是经过数十年的使用,对批处理参考架构进行简化的版本。它概述了构成批处理领域语言的组件。此架构框架是一个蓝图,它已通过在过去几代平台(大型机上的 COBOL、Unix 上的 C,以及现在的 Java 任何地方)上的数十年的实现而得到验证。JCL 和 COBOL 开发人员可能与 C、C# 和 Java 开发人员一样熟悉这些概念。Spring Batch 为通常在健壮、可维护的系统中找到的层、组件和技术服务提供了物理实现,这些系统用于解决从简单到复杂的批处理应用程序的创建,并具有解决非常复杂的处理需求的基础设施和扩展。

Figure 2.1: Batch Stereotypes
图 1. 批处理模式

上面的图表突出了构成 Spring Batch 领域语言的关键概念。一个 Job 具有一个到多个步骤,每个步骤都恰好有一个 ItemReader、一个 ItemProcessor 和一个 ItemWriter。一个作业需要启动(使用 JobLauncher),并且有关当前运行进程的元数据需要存储(在 JobRepository 中)。

作业

本节描述与批处理作业概念相关的模式。一个 Job 是一个封装整个批处理过程的实体。与其他 Spring 项目一样,一个 Job 使用 XML 配置文件或基于 Java 的配置进行连接。此配置可能被称为“作业配置”。但是,Job 只是整个层次结构的顶端,如以下图表所示

Job Hierarchy
图 2. 作业层次结构

在 Spring Batch 中,一个 Job 只是一个 Step 实例的容器。它将逻辑上属于一个流程的多个步骤组合在一起,并允许配置对所有步骤全局有效的属性,例如可恢复性。作业配置包含

  • 作业的名称。

  • Step 实例的定义和排序。

  • 作业是否可恢复。

  • Java

  • XML

对于使用 Java 配置的用户,Spring Batch 提供了 Job 接口的默认实现,形式为 SimpleJob 类,它在 Job 之上创建了一些标准功能。在使用基于 Java 的配置时,会提供一系列构建器来实例化 Job,如下例所示

@Bean
public Job footballJob(JobRepository jobRepository) {
    return new JobBuilder("footballJob", jobRepository)
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .build();
}

对于使用 XML 配置的用户,Spring Batch 提供了 Job 接口的默认实现,形式为 SimpleJob 类,它在 Job 之上创建了一些标准功能。但是,批处理命名空间抽象了直接实例化它的需要。相反,您可以使用 <job> 元素,如下例所示

<job id="footballJob">
    <step id="playerload" next="gameLoad"/>
    <step id="gameLoad" next="playerSummarization"/>
    <step id="playerSummarization"/>
</job>

作业实例

JobInstance 代表一个逻辑上的作业运行的概念。考虑一个每天结束时运行一次的批处理作业,例如前面图表中的 EndOfDay Job。只有一个 EndOfDay 作业,但每个 Job 的单独运行必须单独跟踪。在这种情况下,每天有一个逻辑 JobInstance。例如,有一个 1 月 1 日运行、一个 1 月 2 日运行,等等。如果 1 月 1 日的运行第一次失败并在第二天再次运行,它仍然是 1 月 1 日的运行。(通常,这与它正在处理的数据相对应,这意味着 1 月 1 日的运行处理 1 月 1 日的数据)。因此,每个 JobInstance 可以有多次执行(JobExecution 将在本章后面详细讨论),并且在给定时间只能运行一个 JobInstance(它对应于特定的 Job 和标识 JobParameters)。

JobInstance 的定义与要加载的数据完全无关。完全由 ItemReader 实现来决定如何加载数据。例如,在 EndOfDay 场景中,数据可能有一列指示数据所属的 有效日期计划日期。因此,1 月 1 日的运行只会加载来自 1 日的数据,而 1 月 2 日的运行只会使用来自 2 日的数据。由于这种确定很可能是一个业务决策,因此由 ItemReader 来决定。但是,使用相同的 JobInstance 决定是否使用先前执行的“状态”(即 ExecutionContext,将在本章后面讨论)。使用新的 JobInstance 意味着“从头开始”,而使用现有实例通常意味着“从上次中断的地方开始”。

JobParameters

在讨论了 JobInstance 及其与 Job 的区别后,自然会问:“如何区分一个 JobInstance 与另一个?” 答案是:JobParametersJobParameters 对象保存一组用于启动批处理作业的参数。它们可以用于识别,甚至可以在运行期间用作参考数据,如下图所示

Job Parameters
图 3. 作业参数

在前面的示例中,有两个实例,一个用于 1 月 1 日,另一个用于 1 月 2 日,实际上只有一个 Job,但它有两个 JobParameter 对象:一个使用 01-01-2017 的作业参数启动,另一个使用 01-02-2017 的参数启动。因此,可以将合同定义为:JobInstance = Job + 标识 JobParameters。这允许开发人员有效地控制 JobInstance 的定义方式,因为他们控制着传递的参数。

并非所有作业参数都需要为识别 JobInstance 做出贡献。默认情况下,它们会这样做。但是,框架还允许提交具有不影响 JobInstance 身份的参数的 Job

JobExecution

JobExecution 指的是运行作业的单次尝试的技术概念。执行可能会以失败或成功结束,但与给定执行相对应的 JobInstance 只有在执行成功完成时才被认为是完整的。以之前描述的 EndOfDay Job 为例,考虑一个 2017-01-01 的 JobInstance,它在第一次运行时失败了。如果它再次使用与第一次运行相同的标识作业参数(2017-01-01)运行,则会创建一个新的 JobExecution。但是,仍然只有一个 JobInstance

Job 定义了作业是什么以及如何执行,而 JobInstance 是一个纯粹的组织对象,用于将执行分组在一起,主要用于启用正确的重启语义。但是,JobExecution 是实际运行过程中发生的事情的主要存储机制,并且包含许多必须控制和持久化的属性,如下表所示。

表 1. JobExecution 属性

属性

定义

状态

一个 BatchStatus 对象,指示执行的状态。在运行时,它是 BatchStatus#STARTED。如果失败,它是 BatchStatus#FAILED。如果成功完成,它是 BatchStatus#COMPLETED

startTime

一个 java.time.LocalDateTime,表示执行开始时的当前系统时间。如果作业尚未开始,则此字段为空。

endTime

一个 java.time.LocalDateTime,表示执行完成时的当前系统时间,无论它是否成功。如果作业尚未完成,则该字段为空。

exitStatus

ExitStatus,指示运行的结果。它非常重要,因为它包含返回给调用者的退出代码。有关更多详细信息,请参见第 5 章。如果作业尚未完成,则该字段为空。

createTime

一个 java.time.LocalDateTime,表示 JobExecution 首次持久化时的当前系统时间。作业可能尚未开始(因此没有开始时间),但它始终具有 createTime,这是框架管理作业级 ExecutionContexts 所必需的。

lastUpdated

表示上次JobExecution持久化的时间的java.time.LocalDateTime。如果作业尚未开始,则此字段为空。

executionContext

包含任何需要在执行之间持久化的用户数据的“属性包”。

failureExceptions

执行Job期间遇到的异常列表。如果Job失败期间遇到多个异常,这些异常可能会有用。

这些属性很重要,因为它们是持久化的,可以用来完全确定执行的状态。例如,如果01-01的EndOfDay作业在晚上9:00执行,并在9:30失败,则在批处理元数据表中会进行以下条目

表 2. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

表 3. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID

TYPE_CD

KEY_NAME

DATE_VAL

IDENTIFYING

1

DATE

schedule.Date

2017-01-01

TRUE

表 4. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

为了清晰和格式化,列名可能已被缩写或删除。

现在作业已经失败,假设整个晚上都在解决问题,所以“批处理窗口”现在已经关闭。进一步假设窗口从晚上9:00开始,作业再次为01-01启动,从它停止的地方开始,并在9:30成功完成。由于现在是第二天,所以必须运行01-02作业,它在9:31之后立即启动,并在正常的一小时时间内在10:30完成。没有要求一个JobInstance在另一个JobInstance之后启动,除非两个作业有可能尝试访问相同的数据,从而导致数据库级别的锁定问题。完全由调度程序决定何时运行Job。由于它们是独立的JobInstances,Spring Batch不会尝试阻止它们同时运行。(尝试在另一个JobInstance正在运行时运行同一个JobInstance会导致抛出JobExecutionAlreadyRunningException异常)。现在,JobInstanceJobParameters表中应该有一个额外的条目,JobExecution表中应该有两个额外的条目,如下表所示

表 5. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

2

EndOfDayJob

表 6. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID

TYPE_CD

KEY_NAME

DATE_VAL

IDENTIFYING

1

DATE

schedule.Date

2017-01-01 00:00:00

TRUE

2

DATE

schedule.Date

2017-01-01 00:00:00

TRUE

3

DATE

schedule.Date

2017-01-02 00:00:00

TRUE

表 7. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

2

1

2017-01-02 21:00

2017-01-02 21:30

已完成

3

2

2017-01-02 21:31

2017-01-02 22:29

已完成

为了清晰和格式化,列名可能已被缩写或删除。

步骤

一个Step是一个域对象,它封装了批处理作业的独立、顺序阶段。因此,每个Job完全由一个或多个步骤组成。一个Step包含定义和控制实际批处理所需的所有信息。这是一个必要的模糊描述,因为任何给定Step的内容由编写Job的开发人员自行决定。一个Step可以像开发人员想要的那么简单或复杂。一个简单的Step可能将数据从文件加载到数据库中,需要很少或不需要代码(取决于使用的实现)。一个更复杂的Step可能具有作为处理的一部分应用的复杂业务规则。与Job一样,Step也有一个单独的StepExecution,它与唯一的JobExecution相关联,如下图所示

Figure 2.1: Job Hierarchy With Steps
图 4. 带有步骤的作业层次结构

StepExecution

一个StepExecution代表执行Step的单次尝试。每次运行Step时都会创建一个新的StepExecution,类似于JobExecution。但是,如果一个步骤由于前面的步骤失败而无法执行,则不会为它持久化执行。只有当Step实际启动时才会创建StepExecution

Step执行由StepExecution类的对象表示。每次执行都包含对它对应的步骤和JobExecution以及事务相关数据的引用,例如提交和回滚计数以及开始和结束时间。此外,每个步骤执行都包含一个ExecutionContext,其中包含开发人员需要在批处理运行之间持久化的任何数据,例如统计信息或重新启动所需的狀態信息。下表列出了StepExecution的属性

表 8. StepExecution 属性

属性

定义

状态

一个BatchStatus对象,指示执行的状态。在运行时,状态为BatchStatus.STARTED。如果失败,状态为BatchStatus.FAILED。如果成功完成,状态为BatchStatus.COMPLETED

startTime

表示执行开始时的当前系统时间的 java.time.LocalDateTime。如果步骤尚未开始,则此字段为空。

endTime

表示执行完成时的当前系统时间的 java.time.LocalDateTime,无论执行是否成功。如果步骤尚未退出,则此字段为空。

exitStatus

表示执行结果的 ExitStatus。它是最重要的,因为它包含返回给调用者的退出代码。有关更多详细信息,请参见第 5 章。如果作业尚未退出,则此字段为空。

executionContext

包含任何需要在执行之间持久化的用户数据的“属性包”。

readCount

已成功读取的项目数量。

writeCount

已成功写入的项目数量。

commitCount

为本次执行提交的事务数量。

rollbackCount

Step 控制的业务事务回滚的次数。

readSkipCount

read 失败导致跳过项目的次数。

processSkipCount

process 失败导致跳过项目的次数。

filterCount

ItemProcessor “过滤”的项目数量。

writeSkipCount

write 失败导致跳过项目的次数。

ExecutionContext

ExecutionContext 代表一个键值对集合,这些键值对由框架持久化和控制,为开发人员提供了一个存储持久状态的地方,该状态的作用域为 StepExecution 对象或 JobExecution 对象。(对于熟悉 Quartz 的人来说,它与 JobDataMap 非常相似。)最佳使用示例是促进重启。以使用平面文件输入为例,在处理单个行时,框架会在提交点定期持久化 ExecutionContext。这样做可以让 ItemReader 存储其状态,以防运行过程中发生致命错误,甚至在断电的情况下也是如此。所需要做的就是将当前读取的行数放入上下文中,如下例所示,框架会完成剩下的工作

executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());

Job 类别部分中的 EndOfDay 示例为例,假设有一个步骤 loadData,它将文件加载到数据库中。在第一次运行失败后,元数据表将如下例所示

表 9. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

表 10. BATCH_JOB_EXECUTION_PARAMS

JOB_INST_ID

TYPE_CD

KEY_NAME

DATE_VAL

1

DATE

schedule.Date

2017-01-01

表 11. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

表 12. BATCH_STEP_EXECUTION

STEP_EXEC_ID

JOB_EXEC_ID

STEP_NAME

START_TIME

END_TIME

STATUS

1

1

loadData

2017-01-01 21:00

2017-01-01 21:30

FAILED

表 13. BATCH_STEP_EXECUTION_CONTEXT

STEP_EXEC_ID

SHORT_CONTEXT

1

{piece.count=40321}

在上述情况下,Step 运行了 30 分钟,处理了 40,321 个“片段”,在本例中,这些片段代表文件中的行。此值在每次提交之前由框架更新,并且可以包含对应于 ExecutionContext 中条目的多行。在提交之前收到通知需要使用各种 StepListener 实现(或 ItemStream)之一,这些将在本指南的后面部分详细讨论。与前面的示例一样,假设 Job 在第二天重新启动。当它重新启动时,来自上次运行的 ExecutionContext 的值将从数据库中重新构建。当 ItemReader 打开时,它可以检查上下文是否有任何存储状态,并从那里初始化自身,如下例所示

if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
    log.debug("Initializing for restart. Restart data is: " + executionContext);

    long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));

    LineReader reader = getReader();

    Object record = "";
    while (reader.getPosition() < lineCount && record != null) {
        record = readLine();
    }
}

在这种情况下,在前面的代码运行之后,当前行是 40,322,让 Step 从它停止的地方重新开始。您还可以使用 ExecutionContext 来存储有关运行本身需要持久化的统计信息。例如,如果一个平面文件包含跨多行处理的订单,则可能需要存储已处理的订单数量(这与读取的行数大不相同),以便在 Step 结束时发送一封电子邮件,其中包含已处理的订单总数。框架负责为开发人员存储此信息,以便将其正确地与单个 JobInstance 关联。很难知道是否应该使用现有的 ExecutionContext。例如,使用上面的 EndOfDay 示例,当 01-01 运行第二次启动时,框架会识别出它是同一个 JobInstance,并且在单个 Step 基础上,从数据库中提取 ExecutionContext,并将其(作为 StepExecution 的一部分)传递给 Step 本身。相反,对于 01-02 运行,框架会识别出它是一个不同的实例,因此必须将一个空上下文传递给 Step。框架为开发人员做出了许多此类确定,以确保在正确的时间向他们提供状态。还需要注意的是,在任何给定时间,每个 StepExecution 只有一个 ExecutionContextExecutionContext 的客户端应该小心,因为这会创建一个共享的键空间。因此,在放入值时要小心,以确保不会覆盖任何数据。但是,Step 在上下文中绝对不存储任何数据,因此无法对框架产生负面影响。

请注意,每个 JobExecution 至少有一个 ExecutionContext,每个 StepExecution 也都有一个。例如,请考虑以下代码片段

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob

如注释中所述,ecStep 不等于 ecJob。它们是两个不同的 ExecutionContext。作用域为 Step 的一个在 Step 的每个提交点保存,而作用域为 Job 的一个在每个 Step 执行之间保存。

ExecutionContext 中,所有非瞬态条目必须是 Serializable。执行上下文的正确序列化是步骤和作业重启功能的基础。如果您使用非原生可序列化的键或值,则需要采用定制的序列化方法。未能序列化执行上下文可能会危及状态持久化过程,导致失败的作业无法正确恢复。

JobRepository

JobRepository 是前面提到的所有构造型的持久化机制。它为 JobLauncherJobStep 实现提供 CRUD 操作。当一个 Job 首次启动时,会从存储库中获取一个 JobExecution。此外,在执行过程中,StepExecutionJobExecution 实现通过传递给存储库来持久化。

  • Java

  • XML

使用 Java 配置时,@EnableBatchProcessing 注解提供 JobRepository 作为自动配置的组件之一。

Spring Batch XML 命名空间支持使用 <job-repository> 标签配置 JobRepository 实例,如下例所示

<job-repository id="jobRepository"/>

JobLauncher

JobLauncher 代表一个简单的接口,用于使用给定的 JobParameters 集启动 Job,如下例所示

public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

预计实现将从 JobRepository 获取有效的 JobExecution 并执行 Job

ItemReader

ItemReader 是一个抽象,代表一次检索一个 Step 的输入。当 ItemReader 用尽它可以提供的项目时,它通过返回 null 来指示这一点。您可以在 读取器和写入器 中找到有关 ItemReader 接口及其各种实现的更多详细信息。

ItemWriter

ItemWriter 是一个抽象,代表一次一个批次或块地输出 Step。通常,ItemWriter 不了解它应该接收的下一个输入,只知道在其当前调用中传递的项目。您可以在 读取器和写入器 中找到有关 ItemWriter 接口及其各种实现的更多详细信息。

ItemProcessor

ItemProcessor 是一个抽象,它代表了对一项的业务处理。当 ItemReader 读取一项时,ItemWriter 写入一项,ItemProcessor 提供了一个访问点来转换或应用其他业务处理。如果在处理该项时,确定该项无效,则返回 null 表示该项不应被写入。您可以在 读者和作者 中找到有关 ItemProcessor 接口的更多详细信息。

批处理命名空间

前面列出的许多域概念需要在 Spring ApplicationContext 中配置。虽然您可以使用标准 bean 定义中提供的接口实现,但为了方便配置,提供了一个命名空间,如下例所示

<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   https://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/batch
   https://www.springframework.org/schema/batch/spring-batch.xsd">

<job id="ioSampleJob">
    <step id="step1">
        <tasklet>
            <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
        </tasklet>
    </step>
</job>

</beans:beans>

只要声明了批处理命名空间,就可以使用它的任何元素。您可以在 配置和运行作业 中找到有关配置作业的更多信息。您可以在 配置步骤 中找到有关配置 Step 的更多信息。