扩展和并行处理
许多批处理问题可以通过单线程、单进程作业解决,因此在考虑更复杂的实现之前,最好先检查这是否能满足您的需求。衡量一个实际作业的性能,首先看看最简单的实现是否满足您的需求。即使使用标准硬件,您也可以在不到一分钟的时间内读写数百兆字节的文件。
当您准备开始实现带有并行处理的作业时,Spring Batch 提供了一系列选项,这些选项将在本章中描述,尽管某些功能在其他地方有所涉及。从高层次上看,并行处理有两种模式:
-
单进程,多线程
-
多进程
这些模式也分为以下几类:
-
多线程步骤(单进程)
-
并行步骤(单进程)
-
步骤的本地分块(单进程)
-
步骤的远程分块(多进程)
-
步骤分区(单进程或多进程)
-
远程步骤(多进程)
首先,我们回顾单进程选项。然后我们回顾多进程选项。
多线程步骤
启动并行处理最简单的方法是向您的 Step 配置添加一个 TaskExecutor。
-
Java
-
XML
当使用 Java 配置时,您可以向步骤添加一个 TaskExecutor,如下例所示:
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10).transactionManager(transactionManager)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.build();
}
例如,您可以向 tasklet 添加一个属性,如下所示:
<step id="loading">
<tasklet task-executor="taskExecutor">...</tasklet>
</step>
在此示例中,taskExecutor 是对另一个实现 TaskExecutor 接口的 bean 定义的引用。TaskExecutor 是一个标准的 Spring 接口,因此请查阅 Spring 用户指南以了解可用实现的详细信息。最简单的多线程 TaskExecutor 是 SimpleAsyncTaskExecutor。
上述配置的结果是,Step 通过在单独的执行线程中读取、处理和写入每个项目块(每个提交间隔)来执行。请注意,这意味着项目的处理没有固定的顺序,并且与单线程情况相比,一个块可能包含非连续的项目。
另请注意,您的步骤中使用的任何池化资源(例如 DataSource)都可能限制并发。请确保这些资源中的池至少与步骤中所需的并发线程数一样大。
对于一些常见的批处理用例,使用多线程 Step 实现存在一些实际限制。Step 中的许多参与者(例如读取器和写入器)是有状态的。如果状态未按线程隔离,则这些组件无法在多线程 Step 中使用。特别是,Spring Batch 中的大多数读取器和写入器并非设计用于多线程使用。然而,可以使用无状态或线程安全的读取器和写入器,并且 Spring Batch 示例中有一个示例(名为 parallelJob)展示了如何使用进程指示器(请参阅防止状态持久化)来跟踪数据库输入表中已处理的项目。
Spring Batch 提供了一些 ItemWriter 和 ItemReader 的实现。通常,它们的 Javadoc 中会说明它们是否线程安全,或者在并发环境中如何避免问题。如果 Javadoc 中没有信息,您可以检查实现以查看是否存在任何状态。如果读取器不是线程安全的,您可以使用提供的 SynchronizedItemStreamReader 装饰它,或者在您自己的同步委托器中使用它。您可以同步对 read() 的调用,并且只要处理和写入是块中最昂贵的部分,您的步骤仍然可以比单线程配置更快地完成。
并行步骤
只要需要并行化的应用程序逻辑可以分解为不同的职责并分配给单独的步骤,它就可以在单个进程中并行化。并行步骤执行易于配置和使用。
-
Java
-
XML
当使用 Java 配置时,并行执行步骤 (step1,step2) 和 step3 非常简单,如下所示:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(splitFlow())
.next(step4())
.build() //builds FlowJobBuilder instance
.build(); //builds Job instance
}
@Bean
public Flow splitFlow() {
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(taskExecutor())
.add(flow1(), flow2())
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
例如,并行执行步骤 (step1,step2) 和 step3 非常简单,如下所示:
<job id="job1">
<split id="split1" task-executor="taskExecutor" next="step4">
<flow>
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
<flow>
<step id="step3" parent="s3"/>
</flow>
</split>
<step id="step4" parent="s4"/>
</job>
<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>
可配置的任务执行器用于指定哪个 TaskExecutor 实现应该执行各个流。默认是 SyncTaskExecutor,但需要一个异步 TaskExecutor 才能并行运行这些步骤。请注意,作业确保拆分中的每个流都在聚合退出状态并进行转换之前完成。
有关更多详细信息,请参阅 拆分流部分。
本地分块
本地分块是 v6.0 中的一项新功能,它允许您在同一 JVM 中使用多个线程在本地并行处理项目块。当您有大量项目需要处理并希望利用多核处理器时,此功能特别有用。通过本地分块,您可以配置面向块的步骤,以使用多个线程并发处理项目块。每个线程将独立读取、处理和写入自己的项目块,而步骤将管理整体执行并提交结果。
此功能可通过使用 ChunkMessageChannelItemWriter 来实现,它是一个项目写入器,用于将块请求从 TaskExecutor 提交到本地工作器
@Bean
public ChunkTaskExecutorItemWriter<Vet> itemWriter(ChunkProcessor<Vet> chunkProcessor) {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(4);
taskExecutor.setThreadNamePrefix("worker-thread-");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.afterPropertiesSet();
return new ChunkTaskExecutorItemWriter<>(chunkProcessor, taskExecutor);
}
ChunkMessageChannelItemWriter 需要一个 TaskExecutor 来并发处理块,以及一个 ChunkProcessor 来定义如何处理每个块。这是一个将每个项目块写入关系数据库表的块处理器示例:
@Bean
public ChunkProcessor<Vet> chunkProcessor(DataSource dataSource, TransactionTemplate transactionTemplate) {
String sql = "insert into vets (firstname, lastname) values (?, ?)";
JdbcBatchItemWriter<Vet> itemWriter = new JdbcBatchItemWriterBuilder<Vet>().dataSource(dataSource)
.sql(sql)
.itemPreparedStatementSetter((item, ps) -> {
ps.setString(1, item.firstname());
ps.setString(2, item.lastname());
})
.build();
return (chunk, contribution) -> transactionTemplate.executeWithoutResult(transactionStatus -> {
try {
itemWriter.write(chunk);
contribution.incrementWriteCount(chunk.size());
contribution.setExitStatus(ExitStatus.COMPLETED);
}
catch (Exception e) {
transactionStatus.setRollbackOnly();
contribution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
}
});
}
您可以在本地分块示例中找到这种扩展技术的示例。
远程分块
在远程分块中,Step 处理通过一些中间件在多个进程之间拆分。下图显示了该模式:
管理器组件是一个单进程,而工作器是多个远程进程。如果管理器不是瓶颈,此模式效果最佳,因此处理必须比读取项目更耗时(这在实践中通常是如此)。
管理器是 Spring Batch Step 的实现,其 ItemWriter 被替换为通用版本,该版本知道如何将项目块作为消息发送到中间件。工作器是所用中间件的标准侦听器(例如,对于 JMS,它们将是 MessageListener 实现),其作用是通过 ChunkProcessor 接口,使用标准 ItemWriter 或 ItemProcessor 以及 ItemWriter 来处理项目块。使用此模式的优点之一是读取器、处理器和写入器组件是现成的(与步骤的本地执行所使用的相同)。项目动态划分,并通过中间件共享工作,因此,如果侦听器都是急切的消费者,则负载均衡是自动的。
中间件必须是持久的,具有保证交付和每个消息的单个消费者。JMS 是显而易见的选择,但网格计算和共享内存产品领域也存在其他选项(例如 JavaSpaces)。
有关更多详细信息,请参阅 Spring Batch 集成 - 远程分块部分。
分区
Spring Batch 还提供了一个 SPI,用于分区 Step 执行并远程执行它。在这种情况下,远程参与者是 Step 实例,它们可以像配置和用于本地处理一样容易地进行配置和使用。下图显示了该模式:
Job 在左侧作为一系列 Step 实例运行,其中一个 Step 实例被标记为管理器。此图片中的工作器都是 Step 的相同实例,它们实际上可以取代管理器,从而为 Job 产生相同的结果。工作器通常是远程服务,但也可能是本地执行线程。在此模式中,管理器发送给工作器的消息不需要是持久的或具有保证交付。JobRepository 中的 Spring Batch 元数据确保每个工作器针对每个 Job 执行只执行一次。</p>
Spring Batch 中的 SPI 由 Step 的特殊实现(称为 PartitionStep)和两个需要为特定环境实现的策略接口组成。策略接口是 PartitionHandler 和 StepExecutionSplitter,以下序列图显示了它们的作用:
在这种情况下,右侧的 Step 是“远程”工作器,因此,可能有许多对象和/或进程扮演此角色,并且 PartitionStep 显示正在驱动执行。
-
Java
-
XML
以下示例显示了使用 Java 配置时的 PartitionStep 配置:
@Bean
public Step step1Manager(JobRepository jobRepository) {
return new StepBuilder("step1.manager", jobRepository)
.<String, String>partitioner("step1", partitioner())
.step(step1())
.gridSize(10)
.taskExecutor(taskExecutor())
.build();
}
类似于多线程步骤的 throttleLimit 方法,gridSize 方法阻止任务执行器被单个步骤的请求饱和。
以下示例显示了使用 XML 配置时的 PartitionStep 配置:
<step id="step1.manager">
<partition step="step1" partitioner="partitioner">
<handler grid-size="10" task-executor="taskExecutor"/>
</partition>
</step>
类似于多线程步骤的 throttle-limit 属性,grid-size 属性阻止任务执行器被单个步骤的请求饱和。
Spring Batch 示例的单元测试套件(参见 partition*Job.xml 配置)有一个简单的示例,您可以复制和扩展。
Spring Batch 为名为 step1:partition0 等的分区创建步骤执行。许多人为了保持一致性,更喜欢将管理器步骤命名为 step1:manager。您可以使用步骤的别名(通过指定 name 属性而不是 id 属性)。
PartitionHandler
PartitionHandler 是了解远程或网格环境结构(fabric)的组件。它能够将 StepExecution 请求发送到远程 Step 实例,这些请求被封装在某种特定于结构(fabric-specific)的格式中,例如 DTO。它不需要知道如何拆分输入数据,也不需要知道如何聚合多个 Step 执行的结果。一般来说,它可能也不需要知道弹性或故障转移,因为在许多情况下这些都是结构(fabric)的特性。无论如何,Spring Batch 始终提供独立于结构(fabric)的重启能力。失败的 Job 始终可以重新启动,在这种情况下,只有失败的 Steps 会被重新执行。
PartitionHandler 接口可以针对各种结构类型具有专门的实现,包括简单的 RMI 远程调用、EJB 远程调用、自定义 Web 服务、JMS、Java Spaces、共享内存网格(例如 Terracotta 或 Coherence)和网格执行结构(例如 GridGain)。Spring Batch 不包含任何专有网格或远程调用结构的实现。
然而,Spring Batch 确实提供了一个有用的 PartitionHandler 实现,该实现使用 Spring 的 TaskExecutor 策略在单独的执行线程中本地执行 Step 实例。该实现名为 TaskExecutorPartitionHandler。
-
Java
-
XML
您可以使用 Java 配置显式配置 TaskExecutorPartitionHandler,如下所示:
@Bean
public Step step1Manager(JobRepository jobRepository) {
return new StepBuilder("step1.manager", jobRepository)
.partitioner("step1", partitioner())
.partitionHandler(partitionHandler())
.build();
}
@Bean
public PartitionHandler partitionHandler() {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor());
retVal.setStep(step1());
retVal.setGridSize(10);
return retVal;
}
TaskExecutorPartitionHandler 是使用前面显示的 XML 命名空间配置的步骤的默认值。您也可以显式配置它,如下所示:
<step id="step1.manager">
<partition step="step1" handler="handler"/>
</step>
<bean class="org.spr...TaskExecutorPartitionHandler">
<property name="taskExecutor" ref="taskExecutor"/>
<property name="step" ref="step1" />
<property name="gridSize" value="10" />
</bean>
gridSize 属性决定了要创建的独立步骤执行的数量,因此它可以与 TaskExecutor 中的线程池大小相匹配。或者,它可以设置为大于可用线程数,这会使工作块变小。
TaskExecutorPartitionHandler 对于 I/O 密集型 Step 实例非常有用,例如复制大量文件或将文件系统复制到内容管理系统中。它还可以通过提供一个作为远程调用代理的 Step 实现(例如使用 Spring Remoting)来用于远程执行。
分区器
Partitioner 具有更简单的职责:仅为新的步骤执行生成作为输入参数的执行上下文(无需担心重启)。它只有一个方法,如下面的接口定义所示:
public interface Partitioner {
Map<String, ExecutionContext> partition(int gridSize);
}
此方法的返回值将每个步骤执行的唯一名称(String)与 ExecutionContext 形式的输入参数相关联。这些名称稍后会在批处理元数据中作为分区 StepExecutions 中的步骤名称出现。ExecutionContext 只是一个名称-值对的集合,因此它可能包含一系列主键、行号或输入文件的位置。远程 Step 通常通过使用 #{…} 占位符(步骤范围内的后期绑定)绑定到上下文输入,如下一节所示。
步骤执行的名称(Partitioner 返回的 Map 中的键)需要在 Job 的步骤执行中是唯一的,但没有其他特定要求。最简单的方法(并使名称对用户有意义)是使用前缀+后缀命名约定,其中前缀是被执行步骤的名称(在 Job 中本身是唯一的),后缀只是一个计数器。框架中有一个 SimplePartitioner 使用此约定。
您可以使用一个可选接口 PartitionNameProvider 来提供分区名称,使其与分区本身分离。如果 Partitioner 实现了此接口,则在重新启动时仅查询名称。如果分区成本较高,这可能是一个有用的优化。PartitionNameProvider 提供的名称必须与 Partitioner 提供的名称匹配。
将输入数据绑定到步骤
由 PartitionHandler 执行的步骤具有相同的配置,并且它们的输入参数在运行时从 ExecutionContext 绑定,这是非常高效的。这很容易通过 Spring Batch 的 StepScope 功能实现(在后期绑定部分有更详细的介绍)。例如,如果 Partitioner 创建的 ExecutionContext 实例中有一个名为 fileName 的属性键,指向每个步骤调用不同的文件(或目录),则 Partitioner 的输出可能类似于下表的内容:
步骤执行名称(键) |
执行上下文(值) |
filecopy:partition0 |
fileName=/home/data/one |
filecopy:partition1 |
fileName=/home/data/two |
filecopy:partition2 |
fileName=/home/data/three |
然后,文件名可以通过后期绑定到执行上下文的方式绑定到步骤。
-
Java
-
XML
以下示例展示了如何在 Java 中定义后期绑定:
@Bean
public MultiResourceItemReader itemReader(
@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
return new MultiResourceItemReaderBuilder<String>()
.delegate(fileReader())
.name("itemReader")
.resources(resources)
.build();
}
以下示例展示了如何在 XML 中定义后期绑定:
<bean id="itemReader" scope="step"
class="org.spr...MultiResourceItemReader">
<property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>
远程步骤执行
从 v6.0 开始,Spring Batch 提供了对远程步骤执行的支持,允许您在远程机器或集群上执行批处理作业的步骤。此功能对于大规模批处理场景特别有用,在这种场景中,您希望将工作负载分发到多个节点以提高性能和可扩展性。远程步骤执行由 RemoteStep 类提供,该类使用 Spring Integration 消息通道来实现本地作业执行环境与远程步骤执行器之间的通信。
RemoteStep 被配置为一个常规步骤,通过提供远程步骤名称和消息模板来向远程工作器发送步骤执行请求
@Bean
public Step step(MessagingTemplate messagingTemplate, JobRepository jobRepository) {
return new RemoteStep("step", "workerStep", jobRepository, messagingTemplate);
}
在工作器端,您需要定义要执行的远程步骤(此示例中为 workerStep),并配置一个 Spring Integration 流来拦截步骤执行请求并调用 StepExecutionRequestHandler
@Bean
public Step workerStep(JobRepository jobRepository, JdbcTransactionManager transactionManager) {
return new StepBuilder("workerStep", jobRepository)
// define step logic
.build();
}
/*
* Configure inbound flow (requests coming from the manager)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory, JobRepository jobRepository,
StepLocator stepLocator) {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
stepExecutionRequestHandler.setJobRepository(jobRepository);
stepExecutionRequestHandler.setStepLocator(stepLocator);
return IntegrationFlow.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
.channel(requests())
.handle(stepExecutionRequestHandler, "handle")
.get();
}
@Bean
public StepLocator stepLocator(BeanFactory beanFactory) {
BeanFactoryStepLocator beanFactoryStepLocator = new BeanFactoryStepLocator();
beanFactoryStepLocator.setBeanFactory(beanFactory);
return beanFactoryStepLocator;
}
您可以在远程步骤示例中找到完整的示例。