批处理的领域语言
对于任何有经验的批处理架构师来说,Spring Batch 中使用的批处理总体概念应该很熟悉和舒适。有“作业”和“步骤”,以及开发人员提供的称为ItemReader
和ItemWriter
的处理单元。但是,由于 Spring 模式、操作、模板、回调和习惯用法,存在以下机会
-
显着提高对明确关注点分离的遵守。
-
明确划分的架构层和作为接口提供的服务。
-
提供简单且默认的实现,以便于快速采用和开箱即用。
-
显著增强了可扩展性。
以下图表是经过数十年的使用,对批处理参考架构进行简化的版本。它概述了构成批处理领域语言的组件。此架构框架是一个蓝图,它已通过在过去几代平台(大型机上的 COBOL、Unix 上的 C,以及现在的 Java 任何地方)上的数十年的实现而得到验证。JCL 和 COBOL 开发人员可能与 C、C# 和 Java 开发人员一样熟悉这些概念。Spring Batch 为通常在健壮、可维护的系统中找到的层、组件和技术服务提供了物理实现,这些系统用于解决从简单到复杂的批处理应用程序的创建,并具有解决非常复杂的处理需求的基础设施和扩展。
上面的图表突出了构成 Spring Batch 领域语言的关键概念。一个 Job
具有一个到多个步骤,每个步骤都恰好有一个 ItemReader
、一个 ItemProcessor
和一个 ItemWriter
。一个作业需要启动(使用 JobLauncher
),并且有关当前运行进程的元数据需要存储(在 JobRepository
中)。
作业
本节描述与批处理作业概念相关的模式。一个 Job
是一个封装整个批处理过程的实体。与其他 Spring 项目一样,一个 Job
使用 XML 配置文件或基于 Java 的配置进行连接。此配置可能被称为“作业配置”。但是,Job
只是整个层次结构的顶端,如以下图表所示
在 Spring Batch 中,一个 Job
只是一个 Step
实例的容器。它将逻辑上属于一个流程的多个步骤组合在一起,并允许配置对所有步骤全局有效的属性,例如可恢复性。作业配置包含
-
作业的名称。
-
Step
实例的定义和排序。 -
作业是否可恢复。
-
Java
-
XML
对于使用 Java 配置的用户,Spring Batch 提供了 Job
接口的默认实现,形式为 SimpleJob
类,它在 Job
之上创建了一些标准功能。在使用基于 Java 的配置时,会提供一系列构建器来实例化 Job
,如下例所示
@Bean
public Job footballJob(JobRepository jobRepository) {
return new JobBuilder("footballJob", jobRepository)
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.build();
}
对于使用 XML 配置的用户,Spring Batch 提供了 Job
接口的默认实现,形式为 SimpleJob
类,它在 Job
之上创建了一些标准功能。但是,批处理命名空间抽象了直接实例化它的需要。相反,您可以使用 <job>
元素,如下例所示
<job id="footballJob">
<step id="playerload" next="gameLoad"/>
<step id="gameLoad" next="playerSummarization"/>
<step id="playerSummarization"/>
</job>
作业实例
JobInstance
代表一个逻辑上的作业运行的概念。考虑一个每天结束时运行一次的批处理作业,例如前面图表中的 EndOfDay
Job
。只有一个 EndOfDay
作业,但每个 Job
的单独运行必须单独跟踪。在这种情况下,每天有一个逻辑 JobInstance
。例如,有一个 1 月 1 日运行、一个 1 月 2 日运行,等等。如果 1 月 1 日的运行第一次失败并在第二天再次运行,它仍然是 1 月 1 日的运行。(通常,这与它正在处理的数据相对应,这意味着 1 月 1 日的运行处理 1 月 1 日的数据)。因此,每个 JobInstance
可以有多次执行(JobExecution
将在本章后面详细讨论),并且在给定时间只能运行一个 JobInstance
(它对应于特定的 Job
和标识 JobParameters
)。
JobInstance
的定义与要加载的数据完全无关。完全由 ItemReader
实现来决定如何加载数据。例如,在 EndOfDay
场景中,数据可能有一列指示数据所属的 有效日期
或 计划日期
。因此,1 月 1 日的运行只会加载来自 1 日的数据,而 1 月 2 日的运行只会使用来自 2 日的数据。由于这种确定很可能是一个业务决策,因此由 ItemReader
来决定。但是,使用相同的 JobInstance
决定是否使用先前执行的“状态”(即 ExecutionContext
,将在本章后面讨论)。使用新的 JobInstance
意味着“从头开始”,而使用现有实例通常意味着“从上次中断的地方开始”。
JobParameters
在讨论了 JobInstance
及其与 Job
的区别后,自然会问:“如何区分一个 JobInstance
与另一个?” 答案是:JobParameters
。JobParameters
对象保存一组用于启动批处理作业的参数。它们可以用于识别,甚至可以在运行期间用作参考数据,如下图所示
在前面的示例中,有两个实例,一个用于 1 月 1 日,另一个用于 1 月 2 日,实际上只有一个 Job
,但它有两个 JobParameter
对象:一个使用 01-01-2017 的作业参数启动,另一个使用 01-02-2017 的参数启动。因此,可以将合同定义为:JobInstance
= Job
+ 标识 JobParameters
。这允许开发人员有效地控制 JobInstance
的定义方式,因为他们控制着传递的参数。
并非所有作业参数都需要为识别 JobInstance 做出贡献。默认情况下,它们会这样做。但是,框架还允许提交具有不影响 JobInstance 身份的参数的 Job 。
|
JobExecution
JobExecution
指的是运行作业的单次尝试的技术概念。执行可能会以失败或成功结束,但与给定执行相对应的 JobInstance
只有在执行成功完成时才被认为是完整的。以之前描述的 EndOfDay
Job
为例,考虑一个 2017-01-01 的 JobInstance
,它在第一次运行时失败了。如果它再次使用与第一次运行相同的标识作业参数(2017-01-01)运行,则会创建一个新的 JobExecution
。但是,仍然只有一个 JobInstance
。
Job
定义了作业是什么以及如何执行,而 JobInstance
是一个纯粹的组织对象,用于将执行分组在一起,主要用于启用正确的重启语义。但是,JobExecution
是实际运行过程中发生的事情的主要存储机制,并且包含许多必须控制和持久化的属性,如下表所示。
属性 |
定义 |
|
一个 |
|
一个 |
|
一个 |
|
|
|
一个 |
|
表示上次 |
|
包含任何需要在执行之间持久化的用户数据的“属性包”。 |
|
执行 |
这些属性很重要,因为它们是持久化的,可以用来完全确定执行的状态。例如,如果01-01的EndOfDay
作业在晚上9:00执行,并在9:30失败,则在批处理元数据表中会进行以下条目
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
IDENTIFYING |
1 |
DATE |
schedule.Date |
2017-01-01 |
TRUE |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
为了清晰和格式化,列名可能已被缩写或删除。 |
现在作业已经失败,假设整个晚上都在解决问题,所以“批处理窗口”现在已经关闭。进一步假设窗口从晚上9:00开始,作业再次为01-01启动,从它停止的地方开始,并在9:30成功完成。由于现在是第二天,所以必须运行01-02作业,它在9:31之后立即启动,并在正常的一小时时间内在10:30完成。没有要求一个JobInstance
在另一个JobInstance
之后启动,除非两个作业有可能尝试访问相同的数据,从而导致数据库级别的锁定问题。完全由调度程序决定何时运行Job
。由于它们是独立的JobInstances
,Spring Batch不会尝试阻止它们同时运行。(尝试在另一个JobInstance
正在运行时运行同一个JobInstance
会导致抛出JobExecutionAlreadyRunningException
异常)。现在,JobInstance
和JobParameters
表中应该有一个额外的条目,JobExecution
表中应该有两个额外的条目,如下表所示
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
2 |
EndOfDayJob |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
IDENTIFYING |
1 |
DATE |
schedule.Date |
2017-01-01 00:00:00 |
TRUE |
2 |
DATE |
schedule.Date |
2017-01-01 00:00:00 |
TRUE |
3 |
DATE |
schedule.Date |
2017-01-02 00:00:00 |
TRUE |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
2 |
1 |
2017-01-02 21:00 |
2017-01-02 21:30 |
已完成 |
3 |
2 |
2017-01-02 21:31 |
2017-01-02 22:29 |
已完成 |
为了清晰和格式化,列名可能已被缩写或删除。 |
步骤
一个Step
是一个域对象,它封装了批处理作业的独立、顺序阶段。因此,每个Job
完全由一个或多个步骤组成。一个Step
包含定义和控制实际批处理所需的所有信息。这是一个必要的模糊描述,因为任何给定Step
的内容由编写Job
的开发人员自行决定。一个Step
可以像开发人员想要的那么简单或复杂。一个简单的Step
可能将数据从文件加载到数据库中,需要很少或不需要代码(取决于使用的实现)。一个更复杂的Step
可能具有作为处理的一部分应用的复杂业务规则。与Job
一样,Step
也有一个单独的StepExecution
,它与唯一的JobExecution
相关联,如下图所示
StepExecution
一个StepExecution
代表执行Step
的单次尝试。每次运行Step
时都会创建一个新的StepExecution
,类似于JobExecution
。但是,如果一个步骤由于前面的步骤失败而无法执行,则不会为它持久化执行。只有当Step
实际启动时才会创建StepExecution
。
Step
执行由StepExecution
类的对象表示。每次执行都包含对它对应的步骤和JobExecution
以及事务相关数据的引用,例如提交和回滚计数以及开始和结束时间。此外,每个步骤执行都包含一个ExecutionContext
,其中包含开发人员需要在批处理运行之间持久化的任何数据,例如统计信息或重新启动所需的狀態信息。下表列出了StepExecution
的属性
属性 |
定义 |
|
一个 |
|
表示执行开始时的当前系统时间的 |
|
表示执行完成时的当前系统时间的 |
|
表示执行结果的 |
|
包含任何需要在执行之间持久化的用户数据的“属性包”。 |
|
已成功读取的项目数量。 |
|
已成功写入的项目数量。 |
|
为本次执行提交的事务数量。 |
|
由 |
|
|
|
|
|
由 |
|
|
ExecutionContext
ExecutionContext
代表一个键值对集合,这些键值对由框架持久化和控制,为开发人员提供了一个存储持久状态的地方,该状态的作用域为 StepExecution
对象或 JobExecution
对象。(对于熟悉 Quartz 的人来说,它与 JobDataMap
非常相似。)最佳使用示例是促进重启。以使用平面文件输入为例,在处理单个行时,框架会在提交点定期持久化 ExecutionContext
。这样做可以让 ItemReader
存储其状态,以防运行过程中发生致命错误,甚至在断电的情况下也是如此。所需要做的就是将当前读取的行数放入上下文中,如下例所示,框架会完成剩下的工作
executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());
以 Job
类别部分中的 EndOfDay
示例为例,假设有一个步骤 loadData
,它将文件加载到数据库中。在第一次运行失败后,元数据表将如下例所示
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
JOB_INST_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
1 |
DATE |
schedule.Date |
2017-01-01 |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
STEP_EXEC_ID |
JOB_EXEC_ID |
STEP_NAME |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
loadData |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
STEP_EXEC_ID |
SHORT_CONTEXT |
1 |
{piece.count=40321} |
在上述情况下,Step
运行了 30 分钟,处理了 40,321 个“片段”,在本例中,这些片段代表文件中的行。此值在每次提交之前由框架更新,并且可以包含对应于 ExecutionContext
中条目的多行。在提交之前收到通知需要使用各种 StepListener
实现(或 ItemStream
)之一,这些将在本指南的后面部分详细讨论。与前面的示例一样,假设 Job
在第二天重新启动。当它重新启动时,来自上次运行的 ExecutionContext
的值将从数据库中重新构建。当 ItemReader
打开时,它可以检查上下文是否有任何存储状态,并从那里初始化自身,如下例所示
if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
log.debug("Initializing for restart. Restart data is: " + executionContext);
long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));
LineReader reader = getReader();
Object record = "";
while (reader.getPosition() < lineCount && record != null) {
record = readLine();
}
}
在这种情况下,在前面的代码运行之后,当前行是 40,322,让 Step
从它停止的地方重新开始。您还可以使用 ExecutionContext
来存储有关运行本身需要持久化的统计信息。例如,如果一个平面文件包含跨多行处理的订单,则可能需要存储已处理的订单数量(这与读取的行数大不相同),以便在 Step
结束时发送一封电子邮件,其中包含已处理的订单总数。框架负责为开发人员存储此信息,以便将其正确地与单个 JobInstance
关联。很难知道是否应该使用现有的 ExecutionContext
。例如,使用上面的 EndOfDay
示例,当 01-01 运行第二次启动时,框架会识别出它是同一个 JobInstance
,并且在单个 Step
基础上,从数据库中提取 ExecutionContext
,并将其(作为 StepExecution
的一部分)传递给 Step
本身。相反,对于 01-02 运行,框架会识别出它是一个不同的实例,因此必须将一个空上下文传递给 Step
。框架为开发人员做出了许多此类确定,以确保在正确的时间向他们提供状态。还需要注意的是,在任何给定时间,每个 StepExecution
只有一个 ExecutionContext
。ExecutionContext
的客户端应该小心,因为这会创建一个共享的键空间。因此,在放入值时要小心,以确保不会覆盖任何数据。但是,Step
在上下文中绝对不存储任何数据,因此无法对框架产生负面影响。
请注意,每个 JobExecution
至少有一个 ExecutionContext
,每个 StepExecution
也都有一个。例如,请考虑以下代码片段
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob
如注释中所述,ecStep
不等于 ecJob
。它们是两个不同的 ExecutionContext
。作用域为 Step
的一个在 Step
的每个提交点保存,而作用域为 Job
的一个在每个 Step
执行之间保存。
在 ExecutionContext 中,所有非瞬态条目必须是 Serializable 。执行上下文的正确序列化是步骤和作业重启功能的基础。如果您使用非原生可序列化的键或值,则需要采用定制的序列化方法。未能序列化执行上下文可能会危及状态持久化过程,导致失败的作业无法正确恢复。
|
JobRepository
JobRepository
是前面提到的所有构造型的持久化机制。它为 JobLauncher
、Job
和 Step
实现提供 CRUD 操作。当一个 Job
首次启动时,会从存储库中获取一个 JobExecution
。此外,在执行过程中,StepExecution
和 JobExecution
实现通过传递给存储库来持久化。
-
Java
-
XML
使用 Java 配置时,@EnableBatchProcessing
注解提供 JobRepository
作为自动配置的组件之一。
Spring Batch XML 命名空间支持使用 <job-repository>
标签配置 JobRepository
实例,如下例所示
<job-repository id="jobRepository"/>
JobLauncher
JobLauncher
代表一个简单的接口,用于使用给定的 JobParameters
集启动 Job
,如下例所示
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
预计实现将从 JobRepository
获取有效的 JobExecution
并执行 Job
。
ItemReader
ItemReader
是一个抽象,代表一次检索一个 Step
的输入。当 ItemReader
用尽它可以提供的项目时,它通过返回 null
来指示这一点。您可以在 读取器和写入器 中找到有关 ItemReader
接口及其各种实现的更多详细信息。
ItemWriter
ItemWriter
是一个抽象,代表一次一个批次或块地输出 Step
。通常,ItemWriter
不了解它应该接收的下一个输入,只知道在其当前调用中传递的项目。您可以在 读取器和写入器 中找到有关 ItemWriter
接口及其各种实现的更多详细信息。
ItemProcessor
ItemProcessor
是一个抽象,它代表了对一项的业务处理。当 ItemReader
读取一项时,ItemWriter
写入一项,ItemProcessor
提供了一个访问点来转换或应用其他业务处理。如果在处理该项时,确定该项无效,则返回 null
表示该项不应被写入。您可以在 读者和作者 中找到有关 ItemProcessor
接口的更多详细信息。
批处理命名空间
前面列出的许多域概念需要在 Spring ApplicationContext
中配置。虽然您可以使用标准 bean 定义中提供的接口实现,但为了方便配置,提供了一个命名空间,如下例所示
<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch
https://www.springframework.org/schema/batch/spring-batch.xsd">
<job id="ioSampleJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</tasklet>
</step>
</job>
</beans:beans>