批处理的领域语言
对于任何经验丰富的批处理架构师来说,Spring Batch 中使用的批处理总体概念应该是熟悉且舒适的。有 “Jobs” 和 “Steps”,以及开发者提供的处理单元 ItemReader
和 ItemWriter
。然而,由于 Spring 的模式、操作、模板、回调和惯用法,为以下方面提供了机会
-
显著改进了清晰的关注点分离原则的应用。
-
清晰划分的架构层以及作为接口提供的服务。
-
提供简单和默认的实现,以便快速采用和开箱即用。
-
显著增强了可扩展性。
下图是批处理参考架构的简化版本,该架构已使用了数十年。它概述了构成批处理领域语言的组件。这个架构框架是一个蓝图,已在过去几代平台(大型机上的 COBOL、Unix 上的 C 以及现在的任何地方的 Java)上通过数十年的实现得到验证。JCL 和 COBOL 开发者可能与 C、C# 和 Java 开发者一样熟悉这些概念。Spring Batch 提供了健壮、可维护系统中常见的分层、组件和技术服务的物理实现,这些系统用于构建从简单到复杂的批处理应用,并提供基础设施和扩展来满足非常复杂的处理需求。

前图强调了构成 Spring Batch 领域语言的关键概念。一个 Job
包含一个或多个 Step,每个 Step 恰好有一个 ItemReader
、一个 ItemProcessor
和一个 ItemWriter
。Job 需要通过 JobLauncher
启动,并且当前运行过程的元数据需要存储在 JobRepository
中。
Job
本节描述与批处理 Job 概念相关的原型。Job
是一个封装整个批处理过程的实体。与其他 Spring 项目一样,Job
可以通过 XML 配置文件或基于 Java 的配置进行连接。这种配置可能被称为“job 配置”。然而,Job
只是整个层次结构的顶部,如下图所示

在 Spring Batch 中,Job
仅仅是 Step
实例的容器。它将逻辑上属于同一流程的多个 Step 组合在一起,并允许配置所有 Step 的全局属性,例如可重启性。job 配置包含
-
job 的名称。
-
Step
实例的定义和排序。 -
job 是否可重启。
-
Java
-
XML
对于使用 Java 配置的用户,Spring Batch 以 SimpleJob
类形式提供了 Job
接口的默认实现,它在 Job
的基础上创建了一些标准功能。在使用基于 Java 的配置时,提供了一系列构建器用于实例化 Job
,如下例所示
@Bean
public Job footballJob(JobRepository jobRepository) {
return new JobBuilder("footballJob", jobRepository)
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.build();
}
对于使用 XML 配置的用户,Spring Batch 以 SimpleJob
类形式提供了 Job
接口的默认实现,它在 Job
的基础上创建了一些标准功能。然而,批处理命名空间抽象了直接实例化它的需要。取而代之的是,您可以使用 <job>
元素,如下例所示
<job id="footballJob">
<step id="playerload" next="gameLoad"/>
<step id="gameLoad" next="playerSummarization"/>
<step id="playerSummarization"/>
</job>
JobInstance
JobInstance
指的是逻辑 job 运行的概念。考虑一个应在一天结束时运行的批处理 job,例如前图中的 EndOfDay
Job
。只有一个 EndOfDay
job,但 Job
的每一次单独运行都必须单独跟踪。对于此 job,每天有一个逻辑 JobInstance
。例如,有 1 月 1 日的运行、1 月 2 日的运行,依此类推。如果 1 月 1 日的运行第一次失败,并在第二天再次运行,它仍然是 1 月 1 日的运行。(通常,这与它处理的数据也相对应,意味着 1 月 1 日的运行处理 1 月 1 日的数据)。因此,每个 JobInstance
可以有多次执行(JobExecution
在本章后面会更详细地讨论),并且在给定时间只能运行一个 JobInstance
(它对应于特定的 Job
和标识性 JobParameters
)。
JobInstance
的定义与要加载的数据没有任何关系。完全取决于 ItemReader
的实现来决定如何加载数据。例如,在 EndOfDay
场景中,数据中可能有一个列指示数据所属的 effective date
或 schedule date
。因此,1 月 1 日的运行仅加载 1 日的数据,而 1 月 2 日的运行仅使用 2 日的数据。由于这种决定很可能是一个业务决策,因此留给 ItemReader
来决定。然而,使用相同的 JobInstance
决定了是否使用之前执行的“状态”(即本章后面讨论的 ExecutionContext
)。使用新的 JobInstance
意味着“从头开始”,而使用现有实例通常意味着“从上次停止的地方继续”。
JobParameters
讨论了 JobInstance
以及它与 Job
的区别后,自然会问的问题是:“如何区分不同的 JobInstance
?”答案是:JobParameters
。一个 JobParameters
对象包含用于启动批处理 job 的一组参数。这些参数可用于标识,甚至在运行期间用作参考数据,如下图所示

在前例中,有两个实例,一个对应 1 月 1 日,另一个对应 1 月 2 日,实际上只有一个 Job
,但它有两个 JobParameter
对象:一个是用 job 参数 01-01-2017 启动的,另一个是用参数 01-02-2017 启动的。因此,契约可以定义为:JobInstance
= Job
+ 标识性 JobParameters
。这允许开发者有效控制 JobInstance
的定义方式,因为他们控制传入的参数。
并非所有 job 参数都必须用于标识 JobInstance 。默认情况下是这样的。然而,框架也允许提交带有不用于标识 JobInstance 的参数的 Job 。 |
JobExecution
JobExecution
指的是运行 Job 的单次尝试的技术概念。一次执行可能以失败或成功告终,但除非执行成功完成,否则对应的 JobInstance
不被认为是完成的。以前面描述的 EndOfDay
Job
为例,考虑 01-01-2017 的 JobInstance
,它在第一次运行时失败了。如果使用与第一次运行相同的标识性 job 参数 (01-01-2017) 再次运行,则会创建一个新的 JobExecution
。然而,仍然只有一个 JobInstance
。
Job
定义了 job 是什么以及如何执行,而 JobInstance
纯粹是一个组织对象,用于将执行组合在一起,主要是为了实现正确的重启语义。然而,JobExecution
是实际运行期间发生情况的主要存储机制,包含许多必须控制和持久化的属性,如下表所示
属性 |
定义 |
|
一个 |
|
一个 |
|
一个 |
|
|
|
一个 |
|
一个 |
|
包含需要在执行之间持久化的任何用户数据的“属性包”。 |
|
执行 |
这些属性很重要,因为它们会被持久化,并可用于完全确定执行的状态。例如,如果 01-01 的 EndOfDay
job 在晚上 9:00 执行,并在 9:30 失败,则会在批处理元数据表中记录以下条目
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
IDENTIFYING |
1 |
DATE |
schedule.Date |
2017-01-01 |
TRUE |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
为了清晰和格式化,列名可能已被缩写或删除。 |
现在 job 已失败,假设确定问题花费了整个晚上,因此“批处理窗口”现已关闭。进一步假设窗口在晚上 9:00 开始,01-01 的 job 再次启动,从上次停止的地方继续,并在 9:30 成功完成。由于现在是第二天,01-02 的 job 也必须运行,它紧随其后在 9:31 启动,并在正常的一小时内于 10:30 完成。没有要求一个 JobInstance
必须在另一个之后启动,除非这两个 job 有可能尝试访问相同的数据,从而导致数据库级别的锁定问题。何时运行 Job
完全取决于调度器决定。由于它们是独立的 JobInstance
,Spring Batch 不会尝试阻止它们并发运行。(尝试在另一个 JobInstance
正在运行时运行同一个 JobInstance
会抛出 JobExecutionAlreadyRunningException
异常)。现在,在 JobInstance
和 JobParameters
表中都应该有一个额外的条目,在 JobExecution
表中应该有两个额外的条目,如下表所示
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
2 |
EndOfDayJob |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
IDENTIFYING |
1 |
DATE |
schedule.Date |
2017-01-01 00:00:00 |
TRUE |
2 |
DATE |
schedule.Date |
2017-01-01 00:00:00 |
TRUE |
3 |
DATE |
schedule.Date |
2017-01-02 00:00:00 |
TRUE |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
2 |
1 |
2017-01-02 21:00 |
2017-01-02 21:30 |
COMPLETED |
3 |
2 |
2017-01-02 21:31 |
2017-01-02 22:29 |
COMPLETED |
为了清晰和格式化,列名可能已被缩写或删除。 |
Step
Step
是一个领域对象,封装了批处理 job 的一个独立的、顺序的阶段。因此,每个 Job
完全由一个或多个 Step 组成。Step
包含了定义和控制实际批处理所需的所有信息。这是一个必然含糊的描述,因为任何给定 Step
的内容取决于编写 Job
的开发人员的决定。Step
可以像开发人员希望的那样简单或复杂。一个简单的 Step
可能将数据从文件加载到数据库,几乎不需要编写代码(取决于使用的实现)。更复杂的 Step
可能包含作为处理一部分应用的复杂业务规则。与 Job
一样,Step
有一个单独的 StepExecution
,它与一个唯一的 JobExecution
相关联,如下图所示

StepExecution
StepExecution
表示执行 Step
的单次尝试。每次运行 Step
时都会创建一个新的 StepExecution
,这与 JobExecution
类似。但是,如果一个 step 由于其之前的 step 失败而未能执行,则不会为其持久化执行记录。StepExecution
仅在其 Step
实际启动时创建。
Step
执行由 StepExecution
类的对象表示。每次执行都包含对其对应 step 和 JobExecution
的引用以及事务相关数据,例如提交和回滚计数以及开始和结束时间。此外,每次 step 执行都包含一个 ExecutionContext
,其中包含开发人员需要在批处理运行之间持久化的任何数据,例如统计信息或重启所需的状态信息。下表列出了 StepExecution
的属性
属性 |
定义 |
|
一个 |
|
一个 |
|
一个 |
|
|
|
包含需要在执行之间持久化的任何用户数据的“属性包”。 |
|
成功读取的项目数。 |
|
成功写入的项目数。 |
|
此执行已提交的事务数。 |
|
由 |
|
|
|
|
|
被 |
|
|
ExecutionContext
ExecutionContext
表示键/值对的集合,由框架持久化和控制,为开发人员提供一个存储与 StepExecution
对象或 JobExecution
对象作用域关联的持久状态的地方。(对于熟悉 Quartz 的人来说,它与 JobDataMap
非常相似。)最佳用法示例是辅助重启。以平面文件输入为例,在处理单行数据时,框架会在提交点定期持久化 ExecutionContext
。这样做可以让 ItemReader
在运行期间发生致命错误或甚至断电时存储其状态。所需做的就是将当前读取的行数放入上下文中,如下例所示,其余工作由框架完成
executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());
使用 Job
原型部分中的 EndOfDay
示例,假设有一个 step,名为 loadData
,它将文件加载到数据库中。第一次失败运行后,元数据表将如下例所示
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
JOB_INST_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
1 |
DATE |
schedule.Date |
2017-01-01 |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
STEP_EXEC_ID |
JOB_EXEC_ID |
STEP_NAME |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
loadData |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
STEP_EXEC_ID |
SHORT_CONTEXT |
1 |
{piece.count=40321} |
在前述情况下,Step
运行了 30 分钟,处理了 40,321 个“片段”,在此场景中表示文件中的行。该值由框架在每次提交之前更新,并可包含与 ExecutionContext
中的条目对应的多行。要在提交前收到通知,需要实现各种 StepListener
之一(或实现 ItemStream
),这将在本指南后面更详细地讨论。与前面的示例一样,假设 Job
在第二天重启。重启时,上次运行的 ExecutionContext
中的值会从数据库中重建。当 ItemReader
打开时,它可以检查上下文中是否有存储的状态,并从中初始化自身,如下例所示
if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
log.debug("Initializing for restart. Restart data is: " + executionContext);
long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));
LineReader reader = getReader();
Object record = "";
while (reader.getPosition() < lineCount && record != null) {
record = readLine();
}
}
在这种情况下,在前面的代码运行后,当前行是 40,322,允许 Step
从上次停止的地方再次开始。您还可以使用 ExecutionContext
来持久化关于运行本身的统计信息。例如,如果一个平面文件包含跨多行存在的待处理订单,则可能需要存储已处理的订单数(这与读取的行数大不相同),以便在 Step
结束时发送一封包含已处理订单总数的电子邮件。框架会为开发人员处理此信息的存储,以便将其正确地与单个 JobInstance
关联起来。很难知道是否应该使用现有的 ExecutionContext
。例如,使用上面的 EndOfDay
示例,当 01-01 的运行第二次再次开始时,框架会识别出它是同一个 JobInstance
,并在单个 Step
的基础上,从数据库中取出 ExecutionContext
,并将其(作为 StepExecution
的一部分)传递给 Step
本身。相反,对于 01-02 的运行,框架会识别出它是一个不同的实例,因此必须将一个空的 context 传递给 Step
。框架为开发人员做了许多此类判断,以确保在正确的时间将状态提供给他们。同样重要的是要注意,在任何给定时间,每个 StepExecution
恰好存在一个 ExecutionContext
。ExecutionContext
的客户端应谨慎,因为这会创建一个共享的键空间。因此,在存入值时应小心,确保数据不会被覆盖。然而,Step
在 context 中完全不存储数据,因此无法对框架产生不利影响。
注意,每个 JobExecution
至少有一个 ExecutionContext
,每个 StepExecution
也有一个。例如,考虑以下代码片段
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob
如注释中所述,ecStep
不等于 ecJob
。它们是两个不同的 ExecutionContexts
。作用域为 Step
的 Context 在 Step
的每个提交点保存,而作用域为 Job 的 Context 在每次 Step
执行之间保存。
在 ExecutionContext 中,所有非 transient 的条目必须是 Serializable 。执行上下文的正确序列化是 steps 和 jobs 重启能力的基础。如果您使用的键或值不是 natively serializable,则需要采用定制的序列化方法。未能序列化执行上下文可能会危及状态持久化过程,导致失败的 jobs 无法正确恢复。 |
JobRepository
JobRepository
是前面提到的所有原型的持久化机制。它为 JobLauncher
、Job
和 Step
实现提供了 CRUD 操作。首次启动 Job
时,会从 repository 获取一个 JobExecution
。此外,在执行过程中,通过将 StepExecution
和 JobExecution
实现传递给 repository 来进行持久化。
-
Java
-
XML
使用 Java 配置时,@EnableBatchProcessing
注解提供了一个 JobRepository
作为自动配置的组件之一。
Spring Batch XML 命名空间支持使用 <job-repository>
标签配置 JobRepository
实例,如下例所示
<job-repository id="jobRepository"/>
JobLauncher
JobLauncher
表示一个简单的接口,用于使用给定的一组 JobParameters
启动 Job
,如下例所示
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
预期实现会从 JobRepository
获取有效的 JobExecution
并执行 Job
。
ItemReader
ItemReader
是一个抽象,表示为 Step
逐项检索输入。当 ItemReader
提供完所有项目时,它通过返回 null
来指示。关于 ItemReader
接口及其各种实现的更多详细信息,请参见Reader 和 Writer。
ItemWriter
ItemWriter
是一个抽象,表示 Step
的输出,每次一个批次或一个块的项目。通常,ItemWriter
不知道接下来应该接收什么输入,只知道当前调用中传递的项目。关于 ItemWriter
接口及其各种实现的更多详细信息,请参见Reader 和 Writer。
ItemProcessor
ItemProcessor
是一个抽象,表示项目的业务处理。ItemReader
读入一个项目,ItemWriter
写出一个项目,而 ItemProcessor
提供了一个访问点来转换或应用其他业务处理。如果在处理项目时确定该项目无效,返回 null
表示该项目不应被写出。关于 ItemProcessor
接口的更多详细信息,请参见Reader 和 Writer。
批处理命名空间
前面列出的许多领域概念需要在 Spring ApplicationContext
中进行配置。虽然可以使用上述接口的实现进行标准 bean 定义,但为了便于配置,提供了一个命名空间,如下例所示
<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch
https://www.springframework.org/schema/batch/spring-batch.xsd">
<job id="ioSampleJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</tasklet>
</step>
</job>
</beans:beans>