扩展与并行处理

许多批处理问题可以使用单线程、单进程的 Job 来解决,因此在考虑更复杂的实现之前,始终应该仔细检查它们是否满足您的需求。首先测量一个实际 Job 的性能,看看最简单的实现是否满足您的需求。即使使用标准硬件,您也可以在不到一分钟的时间内读写一个几百兆字节的文件。

当您准备开始使用一些并行处理来实现 Job 时,Spring Batch 提供了多种选项,本章将对此进行介绍,尽管有些特性在其他地方已经涵盖。从高层次来看,并行处理有两种模式:

  • 单进程、多线程

  • 多进程

这些也可以细分为以下类别:

  • 多线程 Step(单进程)

  • 并行 Step(单进程)

  • Step 的远程 Chunking(多进程)

  • 分区 Step(单进程或多进程)

首先,我们回顾单进程选项。然后,我们回顾多进程选项。

多线程 Step

开始并行处理最简单的方法是将 TaskExecutor 添加到您的 Step 配置中。

  • Java

  • XML

使用 Java 配置时,可以将 TaskExecutor 添加到 Step 中,如下例所示:

Java 配置
@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.taskExecutor(taskExecutor)
				.build();
}

例如,您可以向 tasklet 添加属性,如下所示:

<step id="loading">
    <tasklet task-executor="taskExecutor">...</tasklet>
</step>

在此示例中,taskExecutor 是对实现 TaskExecutor 接口的另一个 bean 定义的引用。TaskExecutor 是一个标准的 Spring 接口,有关可用实现的详细信息,请参阅 Spring 用户指南。最简单的多线程 TaskExecutorSimpleAsyncTaskExecutor

前述配置的结果是,Step 通过在单独的执行线程中读取、处理和写入每个 item chunk(每个提交间隔)来执行。请注意,这意味着 item 的处理没有固定顺序,并且与单线程情况相比,一个 chunk 可能包含非连续的 item。除了任务执行器施加的任何限制(例如它是否由线程池支持)之外,tasklet 配置还有一个限制(默认值:4)。您可能需要增加此限制以确保线程池得到充分利用。

  • Java

  • XML

使用 Java 配置时,builder 提供了对限制的访问,如下所示:

Java 配置
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.taskExecutor(taskExecutor)
				.throttleLimit(20)
				.build();
}

例如,您可以增加限制,如下所示:

<step id="loading"> <tasklet
    task-executor="taskExecutor"
    throttle-limit="20">...</tasklet>
</step>

另请注意,您的 Step 中使用的任何池化资源(例如 DataSource)可能会对并发性施加限制。确保这些资源中的池至少与 Step 中所需的并发线程数一样大。

限制弃用

从 v5.0 开始,限制已被弃用,没有替代方案。如果您想替换默认 TaskExecutorRepeatTemplate 中的当前限制机制,您需要提供一个自定义的 RepeatOperations 实现(基于带有有界任务队列的 TaskExecutor),并使用 StepBuilder#stepOperations 将其设置在 Step 上。

Java 配置
@Bean
public Step sampleStep(RepeatOperations customRepeatOperations, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.stepOperations(customRepeatOperations)
				.build();
}

对于某些常见的批处理用例,使用多线程 Step 实现存在一些实际限制。Step 中的许多参与者(例如 reader 和 writer)都是有状态的。如果状态没有按线程隔离,则这些组件无法在多线程 Step 中使用。特别是,Spring Batch 中的大多数 reader 和 writer 都不是为多线程使用而设计的。但是,可以使用无状态或线程安全的 reader 和 writer,Spring Batch Samples 中有一个示例(名为 parallelJob),展示了如何使用进程指示器(参见防止状态持久化)来跟踪数据库输入表中已处理的 item。

Spring Batch 提供了一些 ItemWriterItemReader 的实现。通常,它们会在 Javadoc 中说明是否是线程安全的,或者在并发环境中如何避免问题。如果 Javadoc 中没有信息,您可以检查实现以查看是否存在任何状态。如果 reader 不是线程安全的,您可以使用提供的 SynchronizedItemStreamReader 对其进行装饰,或者在您自己的同步 delegator 中使用它。您可以同步对 read() 的调用,只要处理和写入是 chunk 中最耗时的部分,您的 Step 仍然可能比单线程配置更快地完成。

并行 Step

只要需要并行处理的应用程序逻辑可以分解为不同的职责并分配给单个 Step,就可以在单个进程中并行处理。并行 Step 执行易于配置和使用。

  • Java

  • XML

使用 Java 配置时,与 step3 并行执行 Step (step1,step2) 非常简单,如下所示:

Java 配置
@Bean
public Job job(JobRepository jobRepository) {
    return new JobBuilder("job", jobRepository)
        .start(splitFlow())
        .next(step4())
        .build()        //builds FlowJobBuilder instance
        .build();       //builds Job instance
}

@Bean
public Flow splitFlow() {
    return new FlowBuilder<SimpleFlow>("splitFlow")
        .split(taskExecutor())
        .add(flow1(), flow2())
        .build();
}

@Bean
public Flow flow1() {
    return new FlowBuilder<SimpleFlow>("flow1")
        .start(step1())
        .next(step2())
        .build();
}

@Bean
public Flow flow2() {
    return new FlowBuilder<SimpleFlow>("flow2")
        .start(step3())
        .build();
}

@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

例如,与 step3 并行执行 Step (step1,step2) 非常简单,如下所示:

<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step4">
        <flow>
            <step id="step1" parent="s1" next="step2"/>
            <step id="step2" parent="s2"/>
        </flow>
        <flow>
            <step id="step3" parent="s3"/>
        </flow>
    </split>
    <step id="step4" parent="s4"/>
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

可配置的任务执行器用于指定应执行各个流程的 TaskExecutor 实现。默认是 SyncTaskExecutor,但需要异步 TaskExecutor 才能并行运行 Step。请注意,Job 确保 split 中的每个流程在聚合退出状态并转换之前完成。

更多详细信息请参阅Split Flows 部分。

远程 Chunking

在远程 chunking 中,Step 处理被分割到多个进程中,这些进程通过某些中间件相互通信。下图展示了该模式:

Remote Chunking
图 1. 远程 Chunking

管理器组件是一个单进程,而 workers 是多个远程进程。如果管理器不是瓶颈,则此模式效果最佳,因此处理必须比读取 item 更耗时(实际情况通常如此)。

管理器是 Spring Batch Step 的实现,其 ItemWriter 被替换为一个通用版本,该版本知道如何将 item chunk 作为消息发送到中间件。workers 是正在使用的任何中间件的标准监听器(例如,对于 JMS,它们将是 MesssageListener 实现),它们的作用是通过 ChunkProcessor 接口,使用标准的 ItemWriterItemProcessorItemWriter 来处理 item chunk。使用此模式的一个优点是 reader、processor 和 writer 组件都是现成的(与 Step 的本地执行使用的相同)。item 是动态划分的,并且工作是通过中间件共享的,因此,如果监听器都是积极消费者,则负载均衡是自动的。

中间件必须是持久的,具有保证的传递和每个消息的单个消费者。JMS 是显而易见的候选方案,但在网格计算和共享内存产品领域存在其他选项(例如 JavaSpaces)。

更多详细信息请参阅Spring Batch 集成 - 远程 Chunking 部分。

分区

Spring Batch 还提供了一个 SPI,用于对 Step 执行进行分区并在远程执行。在这种情况下,远程参与者是 Step 实例,它们也可以轻松地配置用于本地处理。下图展示了该模式:

Partitioning Overview
图 2. 分区

Job 在左侧作为一系列 Step 实例运行,其中一个 Step 实例被标记为管理器。图中的 workers 都是相同的 Step 实例,实际上可以取代管理器,从而为 Job 带来相同的结果。workers 通常是远程服务,但也可能是本地执行线程。管理器在此模式中发送给 workers 的消息不需要是持久的或具有保证的传递。JobRepository 中的 Spring Batch 元数据确保每个 worker 在每个 Job 执行中只执行一次。

Spring Batch 中的 SPI 由 Step 的一个特殊实现(称为 PartitionStep)和需要针对特定环境实现的两个策略接口组成。策略接口是 PartitionHandlerStepExecutionSplitter,以下时序图展示了它们的作用:

Partitioning SPI
图 3. 分区 SPI

在这种情况下,右侧的 Step 是“远程” worker,因此,可能有很多对象和/或进程扮演此角色,而 PartitionStep 显示为驱动执行。

  • Java

  • XML

以下示例展示了使用 Java 配置时的 PartitionStep 配置:

Java 配置
@Bean
public Step step1Manager(JobRepository jobRepository) {
    return new StepBuilder("step1.manager", jobRepository)
        .<String, String>partitioner("step1", partitioner())
        .step(step1())
        .gridSize(10)
        .taskExecutor(taskExecutor())
        .build();
}

与多线程 Step 的 throttleLimit 方法类似,gridSize 方法可防止任务执行器被单个 Step 的请求饱和。

以下示例展示了使用 XML 配置时的 PartitionStep 配置:

<step id="step1.manager">
    <partition step="step1" partitioner="partitioner">
        <handler grid-size="10" task-executor="taskExecutor"/>
    </partition>
</step>

与多线程 Step 的 throttle-limit 属性类似,grid-size 属性可防止任务执行器被单个 Step 的请求饱和。

Spring Batch Samples 的单元测试套件(参见 partition*Job.xml 配置)有一个简单的示例,您可以复制和扩展。

Spring Batch 为分区创建名为 step1:partition0 等等的 Step 执行。许多人更喜欢为了保持一致性而将管理器 Step 命名为 step1:manager。您可以使用 Step 的别名(通过指定 name 属性而不是 id 属性)。

PartitionHandler

PartitionHandler 是知道 remoting 或网格环境结构的组件。它能够将 StepExecution 请求发送到远程 Step 实例,并以某种特定于结构的格式包装,例如 DTO。它无需知道如何分割输入数据或如何聚合多个 Step 执行的结果。一般来说,它可能也无需了解弹性或故障转移,因为这些在许多情况下是结构的特性。无论如何,Spring Batch 总是提供独立于结构的重启能力。失败的 Job 总是可以重启,在这种情况下,只有失败的 Steps 会被重新执行。

PartitionHandler 接口可以针对各种结构类型拥有专门的实现,包括简单的 RMI remoting、EJB remoting、自定义 web service、JMS、Java Spaces、共享内存网格(如 Terracotta 或 Coherence)以及网格执行结构(如 GridGain)。Spring Batch 不包含任何专有网格或 remoting 结构的实现。

然而,Spring Batch 确实提供了 PartitionHandler 的一个有用实现,它使用 Spring 的 TaskExecutor 策略在本地的独立执行线程中执行 Step 实例。此实现称为 TaskExecutorPartitionHandler

  • Java

  • XML

您可以使用 Java 配置明确配置 TaskExecutorPartitionHandler,如下所示:

Java 配置
@Bean
public Step step1Manager(JobRepository jobRepository) {
    return new StepBuilder("step1.manager", jobRepository)
        .partitioner("step1", partitioner())
        .partitionHandler(partitionHandler())
        .build();
}

@Bean
public PartitionHandler partitionHandler() {
    TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
    retVal.setTaskExecutor(taskExecutor());
    retVal.setStep(step1());
    retVal.setGridSize(10);
    return retVal;
}

TaskExecutorPartitionHandler 是使用前面显示的 XML namespace 配置的 Step 的默认设置。您也可以明确配置它,如下所示:

<step id="step1.manager">
    <partition step="step1" handler="handler"/>
</step>

<bean class="org.spr...TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="taskExecutor"/>
    <property name="step" ref="step1" />
    <property name="gridSize" value="10" />
</bean>

gridSize 属性决定了要创建的独立 Step 执行的数量,因此可以将其与 TaskExecutor 中线程池的大小相匹配。或者,可以将其设置为大于可用线程的数量,这会使工作块变小。

TaskExecutorPartitionHandler 对于 IO 密集型 Step 实例很有用,例如复制大量文件或将文件系统复制到内容管理系统中。它也可以通过提供作为远程调用的代理的 Step 实现(例如使用 Spring Remoting)来用于远程执行。

Partitioner

Partitioner 责任更简单:仅为新的 Step 执行生成执行上下文作为输入参数(无需担心重启)。它有一个方法,如下接口定义所示:

public interface Partitioner {
    Map<String, ExecutionContext> partition(int gridSize);
}

此方法的返回值将每个 Step 执行的唯一名称(String 类型)与 ExecutionContext 形式的输入参数相关联。这些名称随后在 Batch 元数据中作为分区 StepExecions 的 Step 名称出现。ExecutionContext 只是一个键值对的集合,因此它可能包含主键范围、行号或输入文件的位置。然后,远程 Step 通常通过使用 #{…​} 占位符(Step 范围内的后期绑定)绑定到上下文输入,如下一节所示。

Step 执行的名称(Partitioner 返回的 Map 中的 key)在 Job 的 Step 执行中必须是唯一的,但没有其他特定要求。最简单的方法(并使名称对用户有意义)是使用 prefix+suffix 命名约定,其中 prefix 是正在执行的 Step 的名称(该名称在 Job 中本身是唯一的),suffix 只是一个计数器。框架中有一个使用此约定的 SimplePartitioner

您可以使用一个可选接口 PartitionNameProvider 来独立于分区本身提供分区名称。如果 Partitioner 实现了此接口,则在重启时只查询名称。如果分区操作很耗时,这可以是一个有用的优化。PartitionNameProvider 提供的名称必须与 Partitioner 提供的名称匹配。

将输入数据绑定到 Step

对于由 PartitionHandler 执行的 Step,拥有相同的配置以及在运行时从 ExecutionContext 绑定其输入参数是非常高效的。使用 Spring Batch 的 StepScope 特性(更多详细信息请参见后期绑定部分)可以轻松实现这一点。例如,如果 Partitioner 创建带有属性键 fileNameExecutionContext 实例,并且该键指向每个 Step 调用不同的文件(或目录),则 Partitioner 的输出可能类似于下表的内容:

表 1. Partitioner 提供的面向目录处理的 Step 执行名称到执行上下文示例

Step 执行名称 (key)

ExecutionContext (value)

filecopy:partition0

fileName=/home/data/one

filecopy:partition1

fileName=/home/data/two

filecopy:partition2

fileName=/home/data/three

然后可以使用后期绑定到执行上下文将文件名绑定到 Step。

  • Java

  • XML

以下示例展示了如何在 Java 中定义后期绑定:

Java 配置
@Bean
public MultiResourceItemReader itemReader(
	@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
	return new MultiResourceItemReaderBuilder<String>()
			.delegate(fileReader())
			.name("itemReader")
			.resources(resources)
			.build();
}

以下示例展示了如何在 XML 中定义后期绑定:

XML 配置
<bean id="itemReader" scope="step"
      class="org.spr...MultiResourceItemReader">
    <property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>