批处理的领域语言
对于任何经验丰富的批处理架构师来说,Spring Batch 中使用的批处理总体概念应该会感到熟悉和自在。它有“作业(Jobs)”和“步骤(Steps)”,以及由开发者提供的处理单元,名为 ItemReader 和 ItemWriter。然而,由于 Spring 的模式、操作、模板、回调和惯用法,存在以下机遇:
-
显著改进对明确关注点分离的遵循。
-
清晰划分的架构层和以接口形式提供的服务。
-
简单且默认的实现,使得开箱即用,快速采用且易于使用。
-
显著增强的可扩展性。
以下图表是已使用了数十年的批处理参考架构的简化版本。它概述了构成批处理领域语言的组件。该架构框架是一个蓝图,经过数十年来在几代平台(大型机上的 COBOL、Unix 上的 C 以及现在的 Java anywhere)上的实现验证。JCL 和 COBOL 开发者可能会像 C、C# 和 Java 开发者一样对这些概念感到熟悉。Spring Batch 提供了对健壮、可维护系统中常见的层、组件和技术服务的物理实现,这些系统用于处理从简单到复杂的批处理应用程序的创建,并提供了基础设施和扩展以应对非常复杂的处理需求。
上图突出了构成 Spring Batch 领域语言的关键概念。一个 Job 有一个或多个步骤(Step),每个步骤都只有一个 ItemReader、一个可选的 ItemProcessor 和一个 ItemWriter。一个作业由 JobOperator 操作(启动、停止等),关于当前运行过程的元数据存储在 JobRepository 中并从中恢复。
作业(Job)
本节描述与批处理作业概念相关的原型。Job 是一个封装整个批处理过程的实体。与 Spring 的其他项目一样,Job 通过 XML 配置文件或基于 Java 的配置进行连接。此配置可以称为“作业配置”。然而,Job 仅仅是整个层次结构的顶部,如下图所示:
在 Spring Batch 中,Job 只是 Step 实例的容器。它将多个逻辑上属于同一流程的步骤组合在一起,并允许配置所有步骤的全局属性,例如可重新启动性。作业配置包含:
-
作业名称。
-
Step实例的定义和排序。 -
作业是否可重新启动。
-
Java
-
XML
对于使用 Java 配置的用户,Spring Batch 以 SimpleJob 类的形式提供了 Job 接口的默认实现,它在 Job 的基础上创建了一些标准功能。当使用基于 Java 的配置时,一组构建器可用于实例化 Job,如下例所示:
@Bean
public Job footballJob(JobRepository jobRepository) {
return new JobBuilder("footballJob", jobRepository)
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.build();
}
对于使用 XML 配置的用户,Spring Batch 以 SimpleJob 类的形式提供了 Job 接口的默认实现,它在 Job 的基础上创建了一些标准功能。但是,批处理命名空间抽象掉了直接实例化它的需要。相反,您可以使用 <job> 元素,如下例所示:
<job id="footballJob">
<step id="playerload" next="gameLoad"/>
<step id="gameLoad" next="playerSummarization"/>
<step id="playerSummarization"/>
</job>
JobInstance
JobInstance 指的是逻辑作业运行的概念。考虑一个批处理作业,它应该在一天结束时运行一次,例如上图中 EndOfDay Job。只有一个 EndOfDay 作业,但 Job 的每次单独运行都必须单独跟踪。在这种情况下,每天有一个逻辑 JobInstance。例如,有 1 月 1 日的运行,1 月 2 日的运行,依此类推。如果 1 月 1 日的运行第一次失败,并在第二天再次运行,它仍然是 1 月 1 日的运行。(通常,这也与它正在处理的数据相对应,这意味着 1 月 1 日的运行处理 1 月 1 日的数据)。因此,每个 JobInstance 可以有多次执行(JobExecution 在本章后面将更详细地讨论),并且在给定时间只能运行一个 JobInstance(它对应于特定的 Job 和标识性的 JobParameters)。
JobInstance 的定义与要加载的数据绝对无关。它完全由 ItemReader 的实现来决定如何加载数据。例如,在 EndOfDay 场景中,数据上可能有一个列指示数据所属的 有效日期 或 计划日期。因此,1 月 1 日的运行将只加载 1 日的数据,而 1 月 2 日的运行将只使用 2 日的数据。因为这个决定很可能是一个业务决策,所以它留给 ItemReader 来决定。但是,使用相同的 JobInstance 决定是否使用来自先前执行的“状态”(即 ExecutionContext,这将在本章后面讨论)。使用新的 JobInstance 意味着“从头开始”,而使用现有实例通常意味着“从上次中断的地方开始”。
JobParameters
讨论了 JobInstance 以及它与 Job 的区别之后,自然会问一个问题:“如何区分不同的 JobInstance?”答案是:JobParameters。JobParameters 对象包含一组用于启动批处理作业的参数。它们可以用于标识,甚至在运行期间用作参考数据,如下图所示:
在 Job Instance 部分的示例中,有两个实例,一个用于 1 月 1 日,另一个用于 1 月 2 日,实际上只有一个 Job,但它有两个 JobParameter 对象:一个以 01-01-2017 的作业参数启动,另一个以 01-02-2017 的参数启动。因此,契约可以定义为:JobInstance = Job + 标识性 JobParameters。这允许开发人员有效地控制 JobInstance 的定义方式,因为他们控制传入的参数。
并非所有作业参数都必须有助于 JobInstance 的识别。默认情况下,它们会这样做。但是,该框架也允许提交带有不有助于 JobInstance 身份的参数的 Job。 |
JobExecution
JobExecution 指的是单次尝试运行作业的技术概念。一次执行可能以失败或成功告终,但除非执行成功完成,否则与给定执行对应的 JobInstance 不会被视为已完成。以上面描述的 EndOfDay Job 为例,考虑一个 01-01-2017 的 JobInstance,它在第一次运行时失败了。如果它以与第一次运行相同的标识性作业参数(01-01-2017)再次运行,则会创建一个新的 JobExecution。但是,仍然只有一个 JobInstance。
Job 定义了一个作业是什么以及如何执行它,而 JobInstance 是一个纯粹的组织对象,用于将执行分组在一起,主要用于实现正确的重启语义。然而,JobExecution 是实际运行期间发生情况的主要存储机制,并包含许多必须受控和持久化的属性,如下表所示:
财产 |
定义 |
|
一个 |
|
一个 |
|
一个 |
|
|
|
一个 |
|
一个 |
|
包含在执行之间需要持久化的任何用户数据的“属性包”。 |
|
在 |
这些属性很重要,因为它们是持久化的,并且可以用来完全确定执行的状态。例如,如果 01-01 的 EndOfDay 作业在晚上 9:00 执行并在 9:30 失败,则批处理元数据表中会添加以下条目:
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
IDENTIFYING |
1 |
DATE |
schedule.Date |
2017-01-01 |
TRUE |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
| 为了清晰和排版,列名可能已被缩写或删除。 |
现在作业已经失败,假设确定问题花了整晚时间,所以“批处理窗口”现在已经关闭。进一步假设窗口从晚上 9:00 开始,作业再次为 01-01 启动,从上次中断的地方开始,并在 9:30 成功完成。因为现在是第二天,所以 01-02 作业也必须运行,它紧接着在 9:31 启动,并在其正常的一小时时间 10:30 完成。一个 JobInstance 没有要求必须在另一个之后启动,除非两个作业有可能尝试访问相同的数据,从而导致数据库级别的锁定问题。完全由调度程序决定何时运行 Job。由于它们是独立的 JobInstances,Spring Batch 不会尝试阻止它们并发运行。(尝试在另一个 JobInstance 已经在运行时运行相同的 JobInstance 会导致抛出 JobExecutionAlreadyRunningException)。现在,JobInstance 和 JobParameters 表中应该会有一个额外的条目,并且 JobExecution 表中会有两个额外的条目,如下表所示:
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
2 |
EndOfDayJob |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
IDENTIFYING |
1 |
DATE |
schedule.Date |
2017-01-01 00:00:00 |
TRUE |
2 |
DATE |
schedule.Date |
2017-01-01 00:00:00 |
TRUE |
3 |
DATE |
schedule.Date |
2017-01-02 00:00:00 |
TRUE |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
2 |
1 |
2017-01-02 21:00 |
2017-01-02 21:30 |
COMPLETED |
3 |
2 |
2017-01-02 21:31 |
2017-01-02 22:29 |
COMPLETED |
| 为了清晰和排版,列名可能已被缩写或删除。 |
步骤(Step)
Step 是一个领域对象,它封装了批处理作业的一个独立的、顺序的阶段。因此,每个 Job 完全由一个或多个步骤组成。Step 包含定义和控制实际批处理所需的所有信息。这是一个必然模糊的描述,因为任何给定 Step 的内容都由编写 Job 的开发人员自行决定。一个 Step 可以像开发人员期望的那样简单或复杂。一个简单的 Step 可能会将数据从文件加载到数据库中,几乎不需要或根本不需要代码(取决于所使用的实现)。一个更复杂的 Step 可能具有在处理过程中应用的复杂业务规则。与 Job 一样,Step 具有与唯一的 JobExecution 相关联的单独 StepExecution,如下图所示:
StepExecution
StepExecution 表示执行 Step 的单次尝试。每次运行 Step 时都会创建一个新的 StepExecution,类似于 JobExecution。但是,如果一个步骤因其之前的步骤失败而未能执行,则不会为其持久化执行。只有当 Step 实际启动时,才会创建 StepExecution。
Step 执行由 StepExecution 类的对象表示。每次执行都包含对其相应步骤和 JobExecution 的引用以及与事务相关的数据,例如提交和回滚计数以及开始和结束时间。此外,每个步骤执行都包含一个 ExecutionContext,其中包含开发人员需要跨批处理运行持久化的任何数据,例如统计数据或重新启动所需的状态信息。下表列出了 StepExecution 的属性:
财产 |
定义 |
|
一个 |
|
一个 |
|
一个 |
|
指示执行结果的 |
|
包含在执行之间需要持久化的任何用户数据的“属性包”。 |
|
已成功读取的条目数。 |
|
已成功写入的条目数。 |
|
此执行已提交的事务数。 |
|
由 |
|
|
|
|
|
已由 |
|
|
ExecutionContext
ExecutionContext 表示由框架持久化和控制的键/值对集合,为开发人员提供了一个存储范围限定于 StepExecution 对象或 JobExecution 对象的持久化状态的地方。(对于熟悉 Quartz 的人来说,它与 JobDataMap 非常相似。)最好的使用示例是方便重新启动。以平面文件输入为例,在处理单个行时,框架在提交点定期持久化 ExecutionContext。这样做可以让 ItemReader 存储其状态,以防在运行期间发生致命错误,甚至停电。所有需要做的就是将当前读取的行数放入上下文中,如下例所示,其余的由框架完成:
executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());
以上述作业原型中的 EndOfDay 示例为例,假设有一个步骤 loadData,它将文件加载到数据库中。第一次运行失败后,元数据表将如下图所示:
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
JOB_INST_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
1 |
DATE |
schedule.Date |
2017-01-01 |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
STEP_EXEC_ID |
JOB_EXEC_ID |
STEP_NAME |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
loadData |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
STEP_EXEC_ID |
SHORT_CONTEXT |
1 |
{piece.count=40321} |
在上述情况中,Step 运行了 30 分钟并处理了 40,321 个“片段”,在此场景中代表文件中的行数。该值在每次提交之前由框架更新,并且可以包含与 ExecutionContext 中的条目对应的多行。在提交之前收到通知需要使用各种 StepListener 实现(或 ItemStream),这些将在本指南后面更详细地讨论。与前面的示例一样,假设作业在第二天重新启动。当它重新启动时,上次运行的 ExecutionContext 中的值将从数据库中重构。当 ItemReader 打开时,它可以检查上下文中是否有任何存储状态,并从中初始化自身,如下例所示:
if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
log.debug("Initializing for restart. Restart data is: " + executionContext);
long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));
LineReader reader = getReader();
Object record = "";
while (reader.getPosition() < lineCount && record != null) {
record = readLine();
}
}
在这种情况下,上述代码运行后,当前行数为 40,322,让 Step 可以从上次中断的地方重新开始。您还可以使用 ExecutionContext 存储运行本身的统计数据。例如,如果一个平面文件包含跨多行存在的要处理的订单,则可能需要存储已处理的订单数量(这与读取的行数大不相同),以便在 Step 结束时发送一封包含已处理订单总数的电子邮件。框架会为开发人员处理此存储,以将其正确限定在单个 JobInstance 中。要知道是否应该使用现有的 ExecutionContext 可能非常困难。例如,使用上面 EndOfDay 示例,当 01-01 运行第二次启动时,框架会识别它是同一个 JobInstance,并且在单个 Step 基础上,从数据库中提取 ExecutionContext,并将其(作为 StepExecution 的一部分)传递给 Step 本身。相反,对于 01-02 运行,框架会识别它是一个不同的实例,因此必须将一个空上下文传递给 Step。框架会为开发人员做出许多此类决定,以确保在正确的时间将状态提供给他们。还需要注意的是,在任何给定时间,每个 StepExecution 都恰好存在一个 ExecutionContext。ExecutionContext 的客户端应注意,因为这会创建一个共享键空间。因此,在放入值时应小心,以确保没有数据被覆盖。但是,Step 绝对不会在上下文中存储任何数据,因此不会对框架产生不利影响。
请注意,每个 JobExecution 至少有一个 ExecutionContext,每个 StepExecution 也有一个。例如,考虑以下代码片段:
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob
正如注释中提到的,ecStep 不等于 ecJob。它们是两个不同的 ExecutionContexts。限定在 Step 范围内的 ExecutionContext 在 Step 的每个提交点保存,而限定在 Job 范围内的 ExecutionContext 在每次 Step 执行之间保存。
在 ExecutionContext 中,所有非瞬态条目都必须是 Serializable。执行上下文的正确序列化是步骤和作业重新启动能力的基础。如果您使用并非原生可序列化的键或值,则需要采用定制的序列化方法。未能序列化执行上下文可能会危及状态持久化过程,导致失败的作业无法正确恢复。 |
JobRepository
JobRepository 是所有前面提到的原型的持久化机制。它为 JobLauncher、Job 和 Step 实现提供 CRUD 操作。当 Job 首次启动时,会从仓库中获取 JobExecution。此外,在执行过程中,StepExecution 和 JobExecution 实现通过将其传递给仓库进行持久化。
-
Java
-
XML
使用 Java 配置时,@EnableBatchProcessing 注解将 JobRepository 作为自动配置的组件之一提供。
Spring Batch XML 命名空间支持使用 <job-repository> 标签配置 JobRepository 实例,如下例所示:
<job-repository id="jobRepository"/>
JobOperator
JobOperator 表示用于启动、停止和重新启动作业的简单接口,如下例所示:
public interface JobOperator {
JobExecution start(Job job, JobParameters jobParameters) throws Exception;
JobExecution startNextInstance(Job job) throws Exception;
boolean stop(JobExecution jobExecution) throws Exception;
JobExecution restart(JobExecution jobExecution) throws Exception;
JobExecution abandon(JobExecution jobExecution) throws Exception;
}
Job 以给定的一组 JobParameters 启动。实现应从 JobRepository 获取有效的 JobExecution 并执行 Job。
ItemReader
ItemReader 是一个抽象,表示一次一个地检索 Step 的输入。当 ItemReader 用尽了它可以提供的项目时,它通过返回 null 来指示这一点。您可以在 读取器和写入器 中找到有关 ItemReader 接口及其各种实现的更多详细信息。
ItemWriter
ItemWriter 是一个抽象,表示 Step 的输出,一次一批或一块项目。通常,ItemWriter 对它接下来应该接收的输入一无所知,只知道在其当前调用中传递的项目。您可以在 读取器和写入器 中找到有关 ItemWriter 接口及其各种实现的更多详细信息。
ItemProcessor
ItemProcessor 是一个抽象,表示对项的业务处理。ItemReader 读取一个项,ItemWriter 写入一个项,而 ItemProcessor 提供了一个访问点来转换或应用其他业务处理。如果在处理项时确定该项无效,返回 null 表示该项不应被写入。您可以在 读取器和写入器 中找到有关 ItemProcessor 接口的更多详细信息。
批处理命名空间
上述许多领域概念需要在 Spring ApplicationContext 中进行配置。虽然存在可在标准 bean 定义中使用的上述接口实现,但为了方便配置,提供了命名空间,如下例所示:
<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch
https://www.springframework.org/schema/batch/spring-batch.xsd">
<job id="ioSampleJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</tasklet>
</step>
</job>
</beans:beans>
| 批处理 XML 命名空间自 Spring Batch 6.0 起已弃用,并将在 7.0 版本中移除。 |