扩展和并行处理

许多批处理问题可以通过单线程、单进程作业来解决,因此在考虑更复杂的实现之前,始终建议先正确检查这是否满足您的需求。测量一个真实作业的性能,并首先查看最简单的实现是否满足您的需求。即使使用标准硬件,您也可以在不到一分钟的时间内读取和写入数百兆字节的文件。

当您准备好开始实现一个包含一些并行处理的作业时,Spring Batch 提供了一系列选项,本章将对此进行描述,尽管某些功能在其他地方有介绍。在较高层面上,并行处理有两种模式

  • 单进程、多线程

  • 多进程

这些也分解成以下类别

  • 多线程 Step(单进程)

  • 并行 Step(单进程)

  • Step 的远程分块(多进程)

  • Step 的分区(单进程或多进程)

首先,我们回顾单进程选项。然后,我们回顾多进程选项。

多线程 Step

启动并行处理最简单的方法是向 Step 配置中添加一个 TaskExecutor。

  • Java

  • XML

使用 Java 配置时,您可以向 Step 添加一个 TaskExecutor,如下例所示

Java 配置
@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.taskExecutor(taskExecutor)
				.build();
}

例如,您可以向 tasklet 添加一个属性,如下所示

<step id="loading">
    <tasklet task-executor="taskExecutor">...</tasklet>
</step>

在此示例中,taskExecutor 是对另一个 bean 定义的引用,该定义实现了 TaskExecutor 接口。TaskExecutor 是一个标准的 Spring 接口,因此请参阅 Spring 用户指南以了解可用实现的详细信息。最简单的多线程 TaskExecutor 是 SimpleAsyncTaskExecutor。

上述配置的结果是,Step 通过在单独的执行线程中读取、处理和写入每个项目块(每个提交间隔)来执行。请注意,这意味着项目处理没有固定的顺序,并且与单线程情况相比,一个块可能包含非连续的项目。除了任务执行器施加的任何限制(例如它是否由线程池支持)之外,tasklet 配置还具有一个节流限制(默认值:4)。您可能需要增加此限制以确保充分利用线程池。

  • Java

  • XML

使用 Java 配置时,构建器可以访问节流限制,如下所示

Java 配置
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.taskExecutor(taskExecutor)
				.throttleLimit(20)
				.build();
}

例如,您可以增加节流限制,如下所示

<step id="loading"> <tasklet
    task-executor="taskExecutor"
    throttle-limit="20">...</tasklet>
</step>

另请注意,Step 中使用的任何池化资源(例如 DataSource)都可能会对并发性施加限制。请确保这些资源中的池至少与 Step 中所需的并发线程数量一样大。

节流限制弃用

从 v5.0 开始,节流限制已弃用,并且没有替代方案。如果您想替换默认 TaskExecutorRepeatTemplate 中的当前节流机制,则需要提供一个自定义的 RepeatOperations 实现(基于具有有界任务队列的 TaskExecutor),并使用 StepBuilder#stepOperations将其设置在 Step 上。

Java 配置
@Bean
public Step sampleStep(RepeatOperations customRepeatOperations, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.stepOperations(customRepeatOperations)
				.build();
}

对于某些常见的批处理用例,使用多线程 Step 实现存在一些实际限制。Step 中的许多参与者(例如读取器和写入器)都是有状态的。如果状态未按线程隔离,则这些组件在多线程 Step 中不可用。特别是,Spring Batch 中的大多数读取器和写入器并非设计用于多线程使用。但是,可以使用无状态或线程安全的读取器和写入器,并且在Spring Batch 示例中有一个示例(称为 parallelJob),它展示了如何使用进程指示器(请参阅防止状态持久化)来跟踪数据库输入表中已处理的项目。

Spring Batch 提供了一些 ItemWriterItemReader 的实现。通常,Javadoc 中会说明它们是否线程安全,或者在并发环境中如何避免问题。如果 Javadoc 中没有相关信息,您可以检查实现以查看是否存在任何状态。如果读取器不是线程安全的,您可以使用提供的 SynchronizedItemStreamReader 装饰它,或者在您自己的同步委托中使用它。您可以同步对 read() 的调用,并且,只要处理和写入是块中最昂贵的部分,您的步骤仍然可能比在单线程配置中完成得快得多。

并行步骤

只要需要并行化的应用程序逻辑可以拆分成不同的职责并分配给各个步骤,它就可以在一个进程中并行化。并行步骤执行易于配置和使用。

  • Java

  • XML

当使用 Java 配置时,并行执行步骤 (step1,step2)step3 非常简单,如下所示

Java 配置
@Bean
public Job job(JobRepository jobRepository) {
    return new JobBuilder("job", jobRepository)
        .start(splitFlow())
        .next(step4())
        .build()        //builds FlowJobBuilder instance
        .build();       //builds Job instance
}

@Bean
public Flow splitFlow() {
    return new FlowBuilder<SimpleFlow>("splitFlow")
        .split(taskExecutor())
        .add(flow1(), flow2())
        .build();
}

@Bean
public Flow flow1() {
    return new FlowBuilder<SimpleFlow>("flow1")
        .start(step1())
        .next(step2())
        .build();
}

@Bean
public Flow flow2() {
    return new FlowBuilder<SimpleFlow>("flow2")
        .start(step3())
        .build();
}

@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

例如,并行执行步骤 (step1,step2)step3 非常简单,如下所示

<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step4">
        <flow>
            <step id="step1" parent="s1" next="step2"/>
            <step id="step2" parent="s2"/>
        </flow>
        <flow>
            <step id="step3" parent="s3"/>
        </flow>
    </split>
    <step id="step4" parent="s4"/>
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

可配置的任务执行器用于指定哪个 TaskExecutor 实现应该执行各个流程。默认值为 SyncTaskExecutor,但需要异步 TaskExecutor 来并行运行步骤。请注意,作业确保拆分中的每个流程都完成,然后再聚合退出状态并进行转换。

有关更多详细信息,请参阅有关 拆分流程 的部分。

远程分块

在远程分块中,Step 处理过程分布在多个进程中,通过某些中间件相互通信。下图显示了该模式

Remote Chunking
图 1. 远程分块

管理器组件是一个单一进程,而工作者是多个远程进程。如果管理器不是瓶颈,则此模式效果最佳,因此处理必须比读取项更昂贵(在实践中通常如此)。

管理器是 Spring Batch Step 的一个实现,其中 ItemWriter 被替换为一个通用版本,该版本知道如何将项目块作为消息发送到中间件。工作者是正在使用的任何中间件的标准侦听器(例如,对于 JMS,它们将是 MesssageListener 实现),它们的作用是通过使用标准 ItemWriterItemProcessor 加上 ItemWriter 来处理项目块,通过 ChunkProcessor 接口。使用此模式的优点之一是读取器、处理器和写入器组件是现成的(与步骤的本地执行中使用的相同)。项目被动态地划分,并且工作通过中间件共享,因此,如果所有侦听器都是渴望的消费者,则负载平衡是自动的。

中间件必须是持久的,具有保证的交付和每条消息的单个消费者。JMS 是显而易见的选择,但在网格计算和共享内存产品空间中也存在其他选项(例如 JavaSpaces)。

有关更多详细信息,请参阅有关 Spring Batch 集成 - 远程分块 的部分。

分区

Spring Batch 还提供了一个用于对 Step 执行进行分区并在远程执行它的 SPI。在这种情况下,远程参与者是 Step 实例,这些实例也可能很容易配置和用于本地处理。下图显示了该模式

Partitioning Overview
图 2. 分区

Job 在左侧作为一系列 Step 实例运行,其中一个 Step 实例被标记为管理器。此图中的工作者都是 Step 的相同实例,实际上可以取代管理器,从而为 Job 产生相同的结果。工作者通常将是远程服务,但也可能是本地执行线程。在此模式下,管理器发送给工作者的消息不需要持久或具有保证的交付。JobRepository 中的 Spring Batch 元数据确保每个工作者在每个 Job 执行中只执行一次。

Spring Batch 中的 SPI 由一个特殊的 Step 实现(称为 PartitionStep)和两个需要针对特定环境实现的策略接口组成。策略接口是 PartitionHandlerStepExecutionSplitter,以下序列图显示了它们的作用

Partitioning SPI
图 3. 分区 SPI

在这种情况下,右侧的 Step 是“远程”工作者,因此,可能有许多对象或进程扮演此角色,并且 PartitionStep 显示驱动执行。

  • Java

  • XML

以下示例显示了使用 Java 配置时的 PartitionStep 配置

Java 配置
@Bean
public Step step1Manager(JobRepository jobRepository) {
    return new StepBuilder("step1.manager", jobRepository)
        .<String, String>partitioner("step1", partitioner())
        .step(step1())
        .gridSize(10)
        .taskExecutor(taskExecutor())
        .build();
}

类似于多线程步骤的 throttleLimit 方法,gridSize 方法可以防止任务执行器被来自单个步骤的请求饱和。

以下示例显示了使用 XML 配置时的 PartitionStep 配置

<step id="step1.manager">
    <partition step="step1" partitioner="partitioner">
        <handler grid-size="10" task-executor="taskExecutor"/>
    </partition>
</step>

类似于多线程步骤的 throttle-limit 属性,grid-size 属性可以防止任务执行器被来自单个步骤的请求饱和。

有关 Spring Batch 示例(请参阅 partition*Job.xml 配置)的单元测试套件提供了一个您可以复制和扩展的简单示例。

Spring Batch 为名为 step1:partition0 等的分区创建步骤执行。许多人更喜欢将管理器步骤命名为 step1:manager 以保持一致性。您可以为步骤使用别名(通过指定 name 属性而不是 id 属性)。

PartitionHandler

PartitionHandler 是了解远程或网格环境结构的组件。它能够将 StepExecution 请求发送到远程 Step 实例,并将其包装在某种特定于结构的格式(如 DTO)中。它不必知道如何拆分输入数据或如何聚合多个 Step 执行的结果。一般来说,它可能也不需要了解弹性或故障转移,因为在许多情况下,这些都是结构的功能。无论如何,Spring Batch 始终提供独立于结构的重启能力。失败的 Job 始终可以重新启动,在这种情况下,只会重新执行失败的 Steps

PartitionHandler 接口可以针对各种结构类型具有专门的实现,包括简单的 RMI 远程调用、EJB 远程调用、自定义 Web 服务、JMS、JavaSpaces、共享内存网格(如 Terracotta 或 Coherence)和网格执行结构(如 GridGain)。Spring Batch 不包含任何专有网格或远程调用结构的实现。

但是,Spring Batch 提供了一个有用的 PartitionHandler 实现,它使用来自 Spring 的 TaskExecutor 策略在单独的执行线程中本地执行 Step 实例。该实现称为 TaskExecutorPartitionHandler

  • Java

  • XML

您可以使用 Java 配置显式配置 TaskExecutorPartitionHandler,如下所示

Java 配置
@Bean
public Step step1Manager(JobRepository jobRepository) {
    return new StepBuilder("step1.manager", jobRepository)
        .partitioner("step1", partitioner())
        .partitionHandler(partitionHandler())
        .build();
}

@Bean
public PartitionHandler partitionHandler() {
    TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
    retVal.setTaskExecutor(taskExecutor());
    retVal.setStep(step1());
    retVal.setGridSize(10);
    return retVal;
}

对于使用前面显示的 XML 命名空间配置的步骤,TaskExecutorPartitionHandler 是默认值。您也可以显式配置它,如下所示

<step id="step1.manager">
    <partition step="step1" handler="handler"/>
</step>

<bean class="org.spr...TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="taskExecutor"/>
    <property name="step" ref="step1" />
    <property name="gridSize" value="10" />
</bean>

gridSize 属性确定要创建的单独步骤执行的数量,因此可以将其与 TaskExecutor 中线程池的大小相匹配。或者,可以将其设置为大于可用线程数,这使得工作块更小。

TaskExecutorPartitionHandler 对于 IO 密集型 Step 实例很有用,例如复制大量文件或将文件系统复制到内容管理系统中。它也可以用于远程执行,方法是提供一个 Step 实现,该实现是远程调用的代理(例如使用 Spring 远程调用)。

Partitioner

Partitioner 的职责更简单:仅生成执行上下文作为新步骤执行的输入参数(无需担心重新启动)。它有一个方法,如下面的接口定义所示

public interface Partitioner {
    Map<String, ExecutionContext> partition(int gridSize);
}

此方法的返回值将每个步骤执行的唯一名称(String)与 ExecutionContext 形式的输入参数关联起来。这些名称稍后在 Batch 元数据中显示为分区 StepExecutions 中的步骤名称。ExecutionContext 只是一个名称-值对的集合,因此它可能包含一系列主键、行号或输入文件的位置。然后,远程 Step 通常通过使用 #{…​} 占位符(步骤范围内的延迟绑定)绑定到上下文输入,如下一节所示。

步骤执行的名称(Partitioner 返回的 Map 中的键)在 Job 的步骤执行中必须是唯一的,但没有其他特定要求。执行此操作(并使名称对用户有意义)的最简单方法是使用前缀+后缀命名约定,其中前缀是要执行的步骤的名称(它本身在 Job 中是唯一的),后缀只是一个计数器。框架中有一个 SimplePartitioner 使用此约定。

您可以使用一个名为 PartitionNameProvider 的可选接口,以单独提供分区名称而不是分区本身。如果 Partitioner 实现此接口,则仅在重新启动时查询名称。如果分区成本很高,这可能是一个有用的优化。PartitionNameProvider 提供的名称必须与 Partitioner 提供的名称匹配。

将输入数据绑定到步骤

对于 PartitionHandler 执行的步骤具有相同的配置,并且它们的输入参数在运行时从 ExecutionContext 绑定,这是非常高效的。这可以通过 Spring Batch 的 StepScope 功能轻松实现(在有关 延迟绑定 的部分中进行了更详细的介绍)。例如,如果 Partitioner 创建具有名为 fileName 的属性键的 ExecutionContext 实例,并为每个步骤调用指向不同的文件(或目录),则 Partitioner 输出可能类似于以下表格内容

表 1. 针对目录处理的 Partitioner 提供的示例步骤执行名称到执行上下文

步骤执行名称(键)

ExecutionContext(值)

filecopy:partition0

fileName=/home/data/one

filecopy:partition1

fileName=/home/data/two

filecopy:partition2

fileName=/home/data/three

然后,可以通过使用延迟绑定到执行上下文来将文件名绑定到步骤。

  • Java

  • XML

以下示例显示了如何在 Java 中定义延迟绑定

Java 配置
@Bean
public MultiResourceItemReader itemReader(
	@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
	return new MultiResourceItemReaderBuilder<String>()
			.delegate(fileReader())
			.name("itemReader")
			.resources(resources)
			.build();
}

以下示例显示了如何在 XML 中定义延迟绑定

XML 配置
<bean id="itemReader" scope="step"
      class="org.spr...MultiResourceItemReader">
    <property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>