批处理
本节详细介绍了 Spring Cloud Task 与 Spring Batch 的集成。本节内容包括跟踪作业执行与任务执行之间的关联,以及通过 Spring Cloud Deployer 进行远程分区。
将作业执行与执行该作业的任务关联
Spring Boot 提供了在 Spring Boot Uber-jar 内执行批处理作业的功能。Spring Boot 对此功能的支持允许开发者在一次执行中运行多个批处理作业。Spring Cloud Task 提供了将作业执行与任务执行关联的能力,以便可以相互追溯。
Spring Cloud Task 通过使用 TaskBatchExecutionListener
实现此功能。默认情况下,在任何同时配置了 Spring Batch Job(通过在上下文中定义类型为 Job
的 bean)并且类路径上有 spring-cloud-task-batch
jar 的上下文中,该监听器都会自动配置。该监听器会被注入到所有符合条件的作业中。
覆盖 TaskBatchExecutionListener
为了防止监听器被注入到当前上下文中的任何批处理作业中,您可以使用标准的 Spring Boot 机制禁用自动配置。
要仅将监听器注入到上下文中的特定作业中,请覆盖 batchTaskExecutionListenerBeanPostProcessor
并提供作业 bean ID 列表,如下例所示
public static TaskBatchExecutionListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() {
TaskBatchExecutionListenerBeanPostProcessor postProcessor =
new TaskBatchExecutionListenerBeanPostProcessor();
postProcessor.setJobNames(Arrays.asList(new String[] {"job1", "job2"}));
return postProcessor;
}
您可以在 Spring Cloud Task 项目的 samples 模块中找到一个批处理示例应用,此处。 |
远程分区
Spring Cloud Deployer 提供了在大多数云基础设施上启动基于 Spring Boot 的应用的功能。DeployerPartitionHandler
和 DeployerStepExecutionHandler
将工作步执行的启动委托给 Spring Cloud Deployer。
要配置 DeployerStepExecutionHandler
,必须提供一个表示要执行的 Spring Boot Uber-jar 的 Resource
、一个 TaskLauncherHandler
和一个 JobExplorer
。您可以配置任何环境变量以及同时执行的最大 worker 数量、轮询结果的间隔(默认为 10 秒)和超时(默认为 -1 或无超时)。以下示例展示了如何配置此 PartitionHandler
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
JobExplorer jobExplorer) throws Exception {
MavenProperties mavenProperties = new MavenProperties();
mavenProperties.setRemoteRepositories(new HashMap<>(Collections.singletonMap("springRepo",
new MavenProperties.RemoteRepository(repository))));
Resource resource =
MavenResource.parse(String.format("%s:%s:%s",
"io.spring.cloud",
"partitioned-batch-job",
"1.1.0.RELEASE"), mavenProperties);
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");
List<String> commandLineArgs = new ArrayList<>(3);
commandLineArgs.add("--spring.profiles.active=worker");
commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
commandLineArgs.add("--spring.batch.initializer.enabled=false");
partitionHandler.setCommandLineArgsProvider(
new PassThroughCommandLineArgsProvider(commandLineArgs));
partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
partitionHandler.setMaxWorkers(2);
partitionHandler.setApplicationName("PartitionedBatchJobTask");
return partitionHandler;
}
向分区传递环境变量时,每个分区可能位于具有不同环境设置的不同机器上。因此,您应该只传递必需的环境变量。 |
请注意,在上面的示例中,我们将最大 worker 数量设置为 2。设置最大 worker 数量可以确定同时应该运行的最大分区数量。
要执行的 Resource
应该是一个 Spring Boot Uber-jar,并且在当前上下文中将 DeployerStepExecutionHandler
配置为 CommandLineRunner
。前面示例中列出的仓库应该是 Spring Boot Uber-jar 所在的远程仓库。管理器和 worker 都应该能够访问用作作业仓库和任务仓库的同一数据存储。底层基础设施启动 Spring Boot jar 并且 Spring Boot 启动 DeployerStepExecutionHandler
后,步处理器就会执行请求的 Step
。以下示例展示了如何配置 DeployerStepExecutionHandler
@Bean
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
DeployerStepExecutionHandler handler =
new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
return handler;
}
您可以在 Spring Cloud Task 项目的 samples 模块中找到一个远程分区示例应用,此处。 |
异步启动远程批处理分区
默认情况下,批处理分区是按顺序启动的。然而,在某些情况下这可能会影响性能,因为每次启动都会阻塞,直到资源(例如:在 Kubernetes 中配置 pod)被配置完成。在这些情况下,您可以向 DeployerPartitionHandler
提供一个 ThreadPoolTaskExecutor
。这将根据 ThreadPoolTaskExecutor
的配置启动远程批处理分区。例如
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setThreadNamePrefix("default_task_executor_thread");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception {
Resource resource = this.resourceLoader
.getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
"workerStep", taskRepository, executor);
...
}
由于使用了 ThreadPoolTaskExecutor 会留下一个活跃线程,因此应用程序不会终止,我们需要关闭上下文。要适当关闭应用程序,我们需要将 spring.cloud.task.closecontextEnabled 属性设置为 true 。 |
开发 Kubernetes 平台批处理分区应用的注意事项
-
在 Kubernetes 平台上部署分区应用时,您必须使用 Spring Cloud Kubernetes Deployer 的以下依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-deployer-kubernetes</artifactId> </dependency>
-
任务应用及其分区的应用名称需要遵循以下正则表达式模式:
[a-z0-9]([-a-z0-9]*[a-z0-9])
。否则,会抛出异常。
批处理信息消息
Spring Cloud Task 提供了批处理作业发出信息消息的能力。“Spring Batch Events” 部分详细介绍了此功能。
批处理作业退出码
如前面讨论的,Spring Cloud Task 应用支持记录任务执行的退出码。然而,在使用默认的 Batch/Boot 行为时,如果您在一个任务中运行 Spring Batch Job,无论批处理作业执行如何完成,任务的结果始终为零。请记住,任务是一个 Boot 应用,从任务返回的退出码与 Boot 应用相同。要覆盖此行为,并允许任务在批处理作业返回 FAILED
的 BatchStatus 时返回非零退出码,请将 spring.cloud.task.batch.fail-on-job-failure
设置为 true
。此时退出码可以是 1(默认值),或者基于指定的 ExitCodeGenerator
)
此功能使用一个新的 ApplicationRunner
,它取代了 Spring Boot 提供的那个。默认情况下,它的配置顺序与 Spring Boot 提供的相同。然而,如果您想自定义 ApplicationRunner
的运行顺序,可以通过设置 spring.cloud.task.batch.applicationRunnerOrder
属性来指定其顺序。要让您的任务根据批处理作业执行的结果返回退出码,您需要编写自己的 CommandLineRunner
。