配置 Step 以便重启

在“配置和运行 Job”一节中,讨论了重启 Job。重启对步骤有诸多影响,因此可能需要一些特定的配置。

设置启动次数限制

在许多场景中,您可能希望控制 Step 可以启动的次数。例如,您可能需要配置一个特定的 Step 使其只运行一次,因为它会使某些资源失效,必须手动修复后才能再次运行。这可以在步骤级别进行配置,因为不同的步骤可能有不同的要求。一个只能执行一次的 Step 可以与一个可以无限次运行的 Step 存在于同一个 Job 中。

  • Java

  • XML

以下代码片段显示了 Java 中的启动次数限制配置示例

Java 配置
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.startLimit(1)
				.build();
}

以下代码片段显示了 XML 中的启动次数限制配置示例

XML 配置
<step id="step1">
    <tasklet start-limit="1">
        <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
    </tasklet>
</step>

前面示例中显示的步骤只能运行一次。尝试再次运行它会导致抛出 StartLimitExceededException 异常。请注意,start-limit 的默认值为 Integer.MAX_VALUE

重启已完成的 Step

对于一个可重启的 Job,可能有一个或多个步骤应该始终运行,无论它们第一次是否成功。例如,可以是验证步骤或在处理前清理资源的 Step。在重启的 Job 的正常处理过程中,任何状态为 COMPLETED 的步骤(意味着它已经成功完成)都会被跳过。将 allow-start-if-complete 设置为 true 会覆盖此行为,以便该步骤始终运行。

  • Java

  • XML

以下代码片段显示了如何在 Java 中定义一个可重启的 Job

Java 配置
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.allowStartIfComplete(true)
				.build();
}

以下代码片段显示了如何在 XML 中定义一个可重启的 Job

XML 配置
<step id="step1">
    <tasklet allow-start-if-complete="true">
        <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
    </tasklet>
</step>

Step 重启配置示例

  • Java

  • XML

以下 Java 示例显示了如何配置 Job 以使其包含可重启的步骤

Java 配置
@Bean
public Job footballJob(JobRepository jobRepository, Step playerLoad, Step gameLoad, Step playerSummarization) {
	return new JobBuilder("footballJob", jobRepository)
				.start(playerLoad)
				.next(gameLoad)
				.next(playerSummarization)
				.build();
}

@Bean
public Step playerLoad(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("playerLoad", jobRepository)
			.<String, String>chunk(10, transactionManager)
			.reader(playerFileItemReader())
			.writer(playerWriter())
			.build();
}

@Bean
public Step gameLoad(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("gameLoad", jobRepository)
			.allowStartIfComplete(true)
			.<String, String>chunk(10, transactionManager)
			.reader(gameFileItemReader())
			.writer(gameWriter())
			.build();
}

@Bean
public Step playerSummarization(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("playerSummarization", jobRepository)
			.startLimit(2)
			.<String, String>chunk(10, transactionManager)
			.reader(playerSummarizationSource())
			.writer(summaryWriter())
			.build();
}

以下 XML 示例显示了如何配置 Job 以使其包含可重启的步骤

XML 配置
<job id="footballJob" restartable="true">
    <step id="playerload" next="gameLoad">
        <tasklet>
            <chunk reader="playerFileItemReader" writer="playerWriter"
                   commit-interval="10" />
        </tasklet>
    </step>
    <step id="gameLoad" next="playerSummarization">
        <tasklet allow-start-if-complete="true">
            <chunk reader="gameFileItemReader" writer="gameWriter"
                   commit-interval="10"/>
        </tasklet>
    </step>
    <step id="playerSummarization">
        <tasklet start-limit="2">
            <chunk reader="playerSummarizationSource" writer="summaryWriter"
                   commit-interval="10"/>
        </tasklet>
    </step>
</job>

前面的示例配置适用于一个 Job,该 Job 加载足球比赛信息并进行汇总。它包含三个步骤:playerLoadgameLoadplayerSummarizationplayerLoad 步骤从扁平文件加载球员信息,而 gameLoad 步骤加载比赛信息。最后一个步骤 playerSummarization 则根据提供的比赛汇总每个球员的统计数据。假设由 playerLoad 加载的文件只能加载一次,而 gameLoad 可以加载特定目录中找到的任何比赛文件,并在成功加载到数据库后将其删除。因此,playerLoad 步骤不包含额外的配置。它可以启动任意次数,如果完成则跳过。然而,gameLoad 步骤需要每次都运行,以防上次运行后添加了额外文件。它将 allow-start-if-complete 设置为 true,以确保始终启动。(假设比赛数据加载到的数据库表带有一个处理指示器,以确保汇总步骤可以正确找到新比赛)。汇总步骤是 Job 中最重要的步骤,配置了启动次数限制为 2。这很有用,因为如果该步骤持续失败,将返回新的退出码给控制 Job 执行的操作员,并且在人工干预之前无法再次启动。

此 Job 是本文档的一个示例,与 samples 项目中的 footballJob 不同。

本节的其余部分描述了 footballJob 示例的每次运行会发生什么。

运行 1

  1. playerLoad 运行并成功完成,向 PLAYERS 表添加 400 名球员。

  2. gameLoad 运行并处理 11 个比赛数据文件,将其内容加载到 GAMES 表中。

  3. playerSummarization 开始处理并在 5 分钟后失败。

运行 2

  1. playerLoad 不运行,因为它已经成功完成,并且 allow-start-if-completefalse(默认值)。

  2. gameLoad 再次运行并处理另外 2 个文件,也将其内容加载到 GAMES 表中(带有处理指示器,表明它们尚未被处理)。

  3. playerSummarization 开始处理所有剩余的比赛数据(使用处理指示器进行过滤),并在 30 分钟后再次失败。

运行 3

  1. playerLoad 不运行,因为它已经成功完成,并且 allow-start-if-completefalse(默认值)。

  2. gameLoad 再次运行并处理另外 2 个文件,也将其内容加载到 GAMES 表中(带有处理指示器,表明它们尚未被处理)。

  3. playerSummarization 未启动,作业立即被终止,因为这是 playerSummarization 的第三次执行,而其限制仅为 2。必须提高限制,或者将该 Job 作为新的 JobInstance 执行。