批处理

本节详细介绍了 Spring Cloud Task 与 Spring Batch 的集成。本节内容包括跟踪作业执行与任务执行之间的关联,以及通过 Spring Cloud Deployer 进行远程分区。

将作业执行与执行该作业的任务关联

Spring Boot 提供了在 Spring Boot Uber-jar 内执行批处理作业的功能。Spring Boot 对此功能的支持允许开发者在一次执行中运行多个批处理作业。Spring Cloud Task 提供了将作业执行与任务执行关联的能力,以便可以相互追溯。

Spring Cloud Task 通过使用 TaskBatchExecutionListener 实现此功能。默认情况下,在任何同时配置了 Spring Batch Job(通过在上下文中定义类型为 Job 的 bean)并且类路径上有 spring-cloud-task-batch jar 的上下文中,该监听器都会自动配置。该监听器会被注入到所有符合条件的作业中。

覆盖 TaskBatchExecutionListener

为了防止监听器被注入到当前上下文中的任何批处理作业中,您可以使用标准的 Spring Boot 机制禁用自动配置。

要仅将监听器注入到上下文中的特定作业中,请覆盖 batchTaskExecutionListenerBeanPostProcessor 并提供作业 bean ID 列表,如下例所示

public static TaskBatchExecutionListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() {
	TaskBatchExecutionListenerBeanPostProcessor postProcessor =
		new TaskBatchExecutionListenerBeanPostProcessor();

	postProcessor.setJobNames(Arrays.asList(new String[] {"job1", "job2"}));

	return postProcessor;
}
您可以在 Spring Cloud Task 项目的 samples 模块中找到一个批处理示例应用,此处

远程分区

Spring Cloud Deployer 提供了在大多数云基础设施上启动基于 Spring Boot 的应用的功能。DeployerPartitionHandlerDeployerStepExecutionHandler 将工作步执行的启动委托给 Spring Cloud Deployer。

要配置 DeployerStepExecutionHandler,必须提供一个表示要执行的 Spring Boot Uber-jar 的 Resource、一个 TaskLauncherHandler 和一个 JobExplorer。您可以配置任何环境变量以及同时执行的最大 worker 数量、轮询结果的间隔(默认为 10 秒)和超时(默认为 -1 或无超时)。以下示例展示了如何配置此 PartitionHandler

@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
		JobExplorer jobExplorer) throws Exception {

	MavenProperties mavenProperties = new MavenProperties();
	mavenProperties.setRemoteRepositories(new HashMap<>(Collections.singletonMap("springRepo",
		new MavenProperties.RemoteRepository(repository))));

 	Resource resource =
		MavenResource.parse(String.format("%s:%s:%s",
				"io.spring.cloud",
				"partitioned-batch-job",
				"1.1.0.RELEASE"), mavenProperties);

	DeployerPartitionHandler partitionHandler =
		new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");

	List<String> commandLineArgs = new ArrayList<>(3);
	commandLineArgs.add("--spring.profiles.active=worker");
	commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
	commandLineArgs.add("--spring.batch.initializer.enabled=false");

	partitionHandler.setCommandLineArgsProvider(
		new PassThroughCommandLineArgsProvider(commandLineArgs));
	partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
	partitionHandler.setMaxWorkers(2);
	partitionHandler.setApplicationName("PartitionedBatchJobTask");

	return partitionHandler;
}
向分区传递环境变量时,每个分区可能位于具有不同环境设置的不同机器上。因此,您应该只传递必需的环境变量。

请注意,在上面的示例中,我们将最大 worker 数量设置为 2。设置最大 worker 数量可以确定同时应该运行的最大分区数量。

要执行的 Resource 应该是一个 Spring Boot Uber-jar,并且在当前上下文中将 DeployerStepExecutionHandler 配置为 CommandLineRunner。前面示例中列出的仓库应该是 Spring Boot Uber-jar 所在的远程仓库。管理器和 worker 都应该能够访问用作作业仓库和任务仓库的同一数据存储。底层基础设施启动 Spring Boot jar 并且 Spring Boot 启动 DeployerStepExecutionHandler 后,步处理器就会执行请求的 Step。以下示例展示了如何配置 DeployerStepExecutionHandler

@Bean
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
	DeployerStepExecutionHandler handler =
		new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);

	return handler;
}
您可以在 Spring Cloud Task 项目的 samples 模块中找到一个远程分区示例应用,此处

异步启动远程批处理分区

默认情况下,批处理分区是按顺序启动的。然而,在某些情况下这可能会影响性能,因为每次启动都会阻塞,直到资源(例如:在 Kubernetes 中配置 pod)被配置完成。在这些情况下,您可以向 DeployerPartitionHandler 提供一个 ThreadPoolTaskExecutor。这将根据 ThreadPoolTaskExecutor 的配置启动远程批处理分区。例如

	@Bean
	public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.setCorePoolSize(4);
		executor.setThreadNamePrefix("default_task_executor_thread");
		executor.setWaitForTasksToCompleteOnShutdown(true);
		executor.initialize();
		return executor;
	}

	@Bean
	public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
		TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception {
		Resource resource = this.resourceLoader
			.getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");

		DeployerPartitionHandler partitionHandler =
			new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
				"workerStep", taskRepository, executor);
	...
	}
由于使用了 ThreadPoolTaskExecutor 会留下一个活跃线程,因此应用程序不会终止,我们需要关闭上下文。要适当关闭应用程序,我们需要将 spring.cloud.task.closecontextEnabled 属性设置为 true

开发 Kubernetes 平台批处理分区应用的注意事项

  • 在 Kubernetes 平台上部署分区应用时,您必须使用 Spring Cloud Kubernetes Deployer 的以下依赖

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-deployer-kubernetes</artifactId>
    </dependency>
  • 任务应用及其分区的应用名称需要遵循以下正则表达式模式:[a-z0-9]([-a-z0-9]*[a-z0-9])。否则,会抛出异常。

批处理信息消息

Spring Cloud Task 提供了批处理作业发出信息消息的能力。“Spring Batch Events” 部分详细介绍了此功能。

批处理作业退出码

前面讨论的,Spring Cloud Task 应用支持记录任务执行的退出码。然而,在使用默认的 Batch/Boot 行为时,如果您在一个任务中运行 Spring Batch Job,无论批处理作业执行如何完成,任务的结果始终为零。请记住,任务是一个 Boot 应用,从任务返回的退出码与 Boot 应用相同。要覆盖此行为,并允许任务在批处理作业返回 FAILEDBatchStatus 时返回非零退出码,请将 spring.cloud.task.batch.fail-on-job-failure 设置为 true。此时退出码可以是 1(默认值),或者基于指定的 ExitCodeGenerator

此功能使用一个新的 ApplicationRunner,它取代了 Spring Boot 提供的那个。默认情况下,它的配置顺序与 Spring Boot 提供的相同。然而,如果您想自定义 ApplicationRunner 的运行顺序,可以通过设置 spring.cloud.task.batch.applicationRunnerOrder 属性来指定其顺序。要让您的任务根据批处理作业执行的结果返回退出码,您需要编写自己的 CommandLineRunner