扩展和并行处理
许多批处理问题都可以通过单线程、单进程作业来解决,因此在考虑更复杂的实现之前,最好先正确检查是否满足您的需求。测量一个实际作业的性能,并首先查看最简单的实现是否满足您的需求。即使使用标准硬件,您也可以在不到一分钟的时间内读写一个数百兆字节的文件。
当您准备开始使用一些并行处理来实现作业时,Spring Batch 提供了一系列选项,这些选项在本章中进行了描述,尽管其他地方也介绍了一些功能。从高层次来看,有两种并行处理模式
-
单进程、多线程
-
多进程
这些还可以细分为以下类别
-
多线程步骤(单进程)
-
并行步骤(单进程)
-
步骤的远程分块(多进程)
-
对步骤进行分区(单进程或多进程)
首先,我们回顾一下单进程选项。然后,我们回顾一下多进程选项。
多线程步骤
开始并行处理的最简单方法是向步骤配置中添加一个 TaskExecutor
。
-
Java
-
XML
在使用 Java 配置时,您可以向步骤中添加一个 TaskExecutor
,如下例所示
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.build();
}
例如,您可以向 tasklet
添加一个属性,如下所示
<step id="loading">
<tasklet task-executor="taskExecutor">...</tasklet>
</step>
在此示例中,taskExecutor
是对实现 TaskExecutor
接口的另一个 Bean 定义的引用。TaskExecutor
是一个标准的 Spring 接口,因此请查阅 Spring 用户指南以了解可用实现的详细信息。最简单的多线程 TaskExecutor
是 SimpleAsyncTaskExecutor
。
上述配置的结果是,Step
通过在单独的执行线程中读取、处理和写入每一块项目(每个提交间隔)来执行。请注意,这意味着要处理的项目没有固定的顺序,并且与单线程情况相比,一个块可能包含非连续的项目。除了任务执行器施加的任何限制(例如它是否由线程池支持)之外,tasklet 配置还有一个限制限制(默认值:4)。您可能需要增加此限制以确保充分利用线程池。
-
Java
-
XML
在使用 Java 配置时,构建器提供对限制限制的访问,如下所示
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.throttleLimit(20)
.build();
}
例如,您可以增加限制限制,如下所示
<step id="loading"> <tasklet
task-executor="taskExecutor"
throttle-limit="20">...</tasklet>
</step>
还要注意,步骤中使用的任何池化资源(例如 DataSource
)都可能对并发性施加限制。请务必确保这些资源中的池至少与步骤中所需的并发线程数一样大。
限制限制弃用
从 v5.0 开始,限制限制已弃用,且没有替代。如果您想在默认的 Java 配置
|
对于一些常见的批处理用例,使用多线程 Step
实现有一些实际限制。Step
中的许多参与者(例如读取器和写入器)是有状态的。如果状态未按线程隔离,则这些组件在多线程 Step
中不可用。特别是,大多数来自 Spring Batch 的读取器和写入器都不适用于多线程使用。但是,可以使用无状态或线程安全的读取器和写入器,并且在 Spring Batch Samples 中有一个示例(称为 parallelJob
),它展示了如何使用进程指示器(请参阅 防止状态持久性)来跟踪已在数据库输入表中处理的项目。
Spring Batch 提供了一些 ItemWriter
和 ItemReader
的实现。通常,它们在 Javadoc 中说明是否线程安全,或者在并发环境中避免出现问题需要做什么。如果 Javadoc 中没有信息,可以检查实现以查看是否有任何状态。如果读取器不是线程安全的,可以使用提供的 SynchronizedItemStreamReader
进行装饰,或在自己的同步委托中使用它。可以同步对 read()
的调用,并且只要处理和写入是块中最昂贵的部分,步骤仍然可以比在单线程配置中完成得快得多。
并行步骤
只要需要并行化的应用程序逻辑可以拆分为不同的职责并分配给各个步骤,就可以在单个进程中并行化。并行步骤执行易于配置和使用。
-
Java
-
XML
在使用 Java 配置时,用 step3
并行执行步骤 (step1,step2)
很简单,如下所示
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(splitFlow())
.next(step4())
.build() //builds FlowJobBuilder instance
.build(); //builds Job instance
}
@Bean
public Flow splitFlow() {
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(taskExecutor())
.add(flow1(), flow2())
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
例如,用 step3
并行执行步骤 (step1,step2)
很简单,如下所示
<job id="job1">
<split id="split1" task-executor="taskExecutor" next="step4">
<flow>
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
<flow>
<step id="step3" parent="s3"/>
</flow>
</split>
<step id="step4" parent="s4"/>
</job>
<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>
可配置的任务执行器用于指定哪个 TaskExecutor
实现应执行各个流。默认值为 SyncTaskExecutor
,但需要异步 TaskExecutor
来并行运行步骤。请注意,作业确保拆分中的每个流在聚合退出状态并转换之前完成。
有关更多详细信息,请参阅 拆分流 部分。
远程分块
在远程分块中,Step
处理跨多个进程拆分,通过一些中间件相互通信。下图显示了该模式
管理器组件是一个单一进程,工作进程是多个远程进程。如果管理器不是瓶颈,则此模式效果最佳,因此处理必须比读取项目更昂贵(这在实践中通常如此)。
管理器是 Spring Batch Step
的实现,其中 ItemWriter
被一个泛型版本替换,该版本知道如何将项目块作为消息发送到中间件。工作人员是用于任何正在使用的中间件的标准侦听器(例如,对于 JMS,它们将是 MesssageListener
实现),它们的作用是通过标准 ItemWriter
或 ItemProcessor
加上 ItemWriter
,通过 ChunkProcessor
接口来处理项目块。使用此模式的一个优点是,读取器、处理器和编写器组件是现成的(与用于步骤的本地执行相同)。项目被动态划分,工作通过中间件共享,因此,如果侦听器都是急切的使用者,则负载平衡是自动的。
中间件必须是持久的,具有保证的传递和每个消息的单个使用者。JMS 是显而易见的选择,但网格计算和共享内存产品空间中存在其他选项(如 JavaSpaces)。
有关更多详细信息,请参阅 Spring Batch 集成 - 远程分块 部分。
分区
Spring Batch 还提供了一个 SPI,用于对 Step
执行进行分区并远程执行它。在这种情况下,远程参与者是 Step
实例,这些实例可以很容易地配置和用于本地处理。下图显示了模式
Job
在左侧作为 Step
实例的序列运行,其中一个 Step
实例被标记为管理器。图片中的工作人员都是 Step
的相同实例,实际上可以取代管理器,从而对 Job
产生相同的结果。工作人员通常是远程服务,但也可以是本地执行线程。在此模式中,管理器发送给工作人员的消息不需要持久或保证传递。JobRepository
中的 Spring Batch 元数据确保每个工作人员仅针对每个 Job
执行一次。
Spring Batch 中的 SPI 包含 Step
的特殊实现(称为 PartitionStep
)和两个策略接口,需要针对特定环境实现。策略接口是 PartitionHandler
和 StepExecutionSplitter
,以下顺序图显示了它们的作用
在这种情况下,右侧的 Step
是“远程”工作人员,因此,可能有多个对象和或进程扮演此角色,并且 PartitionStep
显示为驱动执行。
-
Java
-
XML
以下示例显示了使用 Java 配置时的 PartitionStep
配置
@Bean
public Step step1Manager(JobRepository jobRepository) {
return new StepBuilder("step1.manager", jobRepository)
.<String, String>partitioner("step1", partitioner())
.step(step1())
.gridSize(10)
.taskExecutor(taskExecutor())
.build();
}
类似于多线程步骤的 throttleLimit
方法,gridSize
方法可防止任务执行器因单个步骤的请求而饱和。
以下示例显示了使用 XML 配置时的 PartitionStep
配置
<step id="step1.manager">
<partition step="step1" partitioner="partitioner">
<handler grid-size="10" task-executor="taskExecutor"/>
</partition>
</step>
类似于多线程步骤的 throttle-limit
属性,grid-size
属性可防止任务执行器因单个步骤的请求而饱和。
用于 Spring Batch 样本 的单元测试套件(请参阅 partition*Job.xml
配置)有一个简单的示例,您可以复制并扩展该示例。
Spring Batch 为名为 step1:partition0
等的分区创建步骤执行。为了保持一致性,许多人更愿意将管理器步骤称为 step1:manager
。您可以使用步骤的别名(通过指定 name
属性而不是 id
属性)。
PartitionHandler
PartitionHandler
是了解远程处理或网格环境结构的组件。它能够以某些特定于结构的格式(如 DTO)将 StepExecution
请求发送到远程 Step
实例。它不必知道如何拆分输入数据或如何聚合多个 Step
执行的结果。一般来说,它可能也不需要了解复原能力或故障转移,因为在许多情况下这些都是结构的特性。无论如何,Spring Batch 始终提供独立于结构的可重启性。失败的 Job
始终可以重新启动,在这种情况下,只有失败的 Steps
会重新执行。
PartitionHandler
接口可以针对各种结构类型拥有专门的实现,包括简单的 RMI 远程处理、EJB 远程处理、自定义 Web 服务、JMS、Java Spaces、共享内存网格(例如 Terracotta 或 Coherence)以及网格执行结构(例如 GridGain)。Spring Batch 不包含任何专有网格或远程处理结构的实现。
但是,Spring Batch 提供了 PartitionHandler
的一个有用实现,该实现使用 Spring 的 TaskExecutor
策略在执行的单独线程中本地执行 Step
实例。该实现称为 TaskExecutorPartitionHandler
。
-
Java
-
XML
您可以使用 Java 配置显式配置 TaskExecutorPartitionHandler
,如下所示
@Bean
public Step step1Manager(JobRepository jobRepository) {
return new StepBuilder("step1.manager", jobRepository)
.partitioner("step1", partitioner())
.partitionHandler(partitionHandler())
.build();
}
@Bean
public PartitionHandler partitionHandler() {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor());
retVal.setStep(step1());
retVal.setGridSize(10);
return retVal;
}
对于使用之前显示的 XML 命名空间配置的步骤,TaskExecutorPartitionHandler
是默认值。你还可以按如下方式显式配置它
<step id="step1.manager">
<partition step="step1" handler="handler"/>
</step>
<bean class="org.spr...TaskExecutorPartitionHandler">
<property name="taskExecutor" ref="taskExecutor"/>
<property name="step" ref="step1" />
<property name="gridSize" value="10" />
</bean>
gridSize
属性确定要创建的单独步骤执行的数量,因此它可以与 TaskExecutor
中的线程池大小相匹配。或者,可以将其设置为大于可用线程数,这会使工作块变小。
TaskExecutorPartitionHandler
对于 IO 密集型 Step
实例很有用,例如复制大量文件或将文件系统复制到内容管理系统。它还可以通过提供一个作为远程调用代理的 Step
实现(例如使用 Spring Remoting)用于远程执行。
分区器
Partitioner
具有更简单的职责:仅为新步骤执行生成执行上下文作为输入参数(无需担心重新启动)。它有一个方法,如下面的接口定义所示
public interface Partitioner {
Map<String, ExecutionContext> partition(int gridSize);
}
此方法的返回值将每个步骤执行的唯一名称(String
)与 ExecutionContext
形式的输入参数关联起来。这些名称稍后在批处理元数据中显示为分区 StepExecutions
中的步骤名称。ExecutionContext
只是一个名称-值对的集合,因此它可能包含一系列主键、行号或输入文件的位置。然后,远程 Step
通常使用 #{…}
占位符(步骤范围内的延迟绑定)绑定到上下文输入,如下一节所示。
步骤执行的名称(Partitioner
返回的 Map
中的键)在 Job
的步骤执行中需要是唯一的,但没有其他特定要求。最简单的方法(并且对用户来说有意义的名称)是使用前缀+后缀命名约定,其中前缀是要执行的步骤的名称(它本身在 Job
中是唯一的),后缀只是一个计数器。框架中有一个 SimplePartitioner
使用此约定。
你可以使用一个名为 PartitionNameProvider
的可选接口来单独提供分区名称和分区本身。如果 Partitioner
实现此接口,则仅在重新启动时查询名称。如果分区开销很大,这可能是一个有用的优化。PartitionNameProvider
提供的名称必须与 Partitioner
提供的名称匹配。
将输入数据绑定到步骤
对于由 PartitionHandler
执行的步骤,拥有相同的配置并从 ExecutionContext
在运行时绑定其输入参数非常有效。这很容易使用 Spring Batch 的 StepScope 功能(在 延迟绑定 部分中有更详细的介绍)。例如,如果 Partitioner
使用称为 fileName
的属性键创建 ExecutionContext
实例,指向每个步骤调用的不同文件(或目录),则 Partitioner
输出可能类似于下表的内容
步骤执行名称(键) |
ExecutionContext(值) |
filecopy:partition0 |
fileName=/home/data/one |
filecopy:partition1 |
fileName=/home/data/two |
filecopy:partition2 |
fileName=/home/data/three |
然后,可以使用对执行上下文的后期绑定将文件名绑定到步骤。
-
Java
-
XML
以下示例演示如何在 Java 中定义后期绑定
@Bean
public MultiResourceItemReader itemReader(
@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
return new MultiResourceItemReaderBuilder<String>()
.delegate(fileReader())
.name("itemReader")
.resources(resources)
.build();
}
以下示例演示如何在 XML 中定义后期绑定
<bean id="itemReader" scope="step"
class="org.spr...MultiResourceItemReader">
<property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>