高级元数据用法

到目前为止,我们已经讨论了 JobLauncherJobRepository 接口。它们共同代表了 Job 的简单启动以及 Batch 域对象的基本 CRUD 操作。

Job Repository
图 1. Job Repository

JobLauncher 使用 JobRepository 创建新的 JobExecution 对象并运行它们。JobStep 实现稍后会在 Job 运行期间使用同一个 JobRepository 对这些执行进行基本更新。基本操作足以应对简单的场景。然而,在包含数百个批处理 Job 和复杂调度要求的大型批处理环境中,需要更高级的元数据访问。

Job Repository Advanced
图 2. 高级 Job Repository 访问

JobExplorerJobOperator 接口将在接下来的章节中讨论,它们为查询和控制元数据提供了附加功能。

查询 Repository

在实现任何高级功能之前,最基本的需求是查询 Repository 中现有执行的能力。此功能由 JobExplorer 接口提供。

public interface JobExplorer {

    List<JobInstance> getJobInstances(String jobName, int start, int count);

    JobExecution getJobExecution(Long executionId);

    StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);

    JobInstance getJobInstance(Long instanceId);

    List<JobExecution> getJobExecutions(JobInstance jobInstance);

    Set<JobExecution> findRunningJobExecutions(String jobName);
}

从其方法签名可以看出,JobExplorerJobRepository 的只读版本,与 JobRepository 类似,可以通过使用 Factory Bean 轻松配置。

  • Java

  • XML

以下示例展示了如何在 Java 中配置 JobExplorer

Java 配置
...
// This would reside in your DefaultBatchConfiguration extension
@Bean
public JobExplorer jobExplorer() throws Exception {
	JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
	factoryBean.setDataSource(this.dataSource);
	return factoryBean.getObject();
}
...

以下示例展示了如何在 XML 中配置 JobExplorer

XML 配置
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
      p:dataSource-ref="dataSource" />

在本章前面,我们提到可以修改 JobRepository 的表前缀,以允许不同的版本或 Schema。由于 JobExplorer 使用相同的表,因此也需要设置前缀的功能。

  • Java

  • XML

以下示例展示了如何在 Java 中为 JobExplorer 设置表前缀

Java 配置
...
// This would reside in your DefaultBatchConfiguration extension
@Bean
public JobExplorer jobExplorer() throws Exception {
	JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
	factoryBean.setDataSource(this.dataSource);
	factoryBean.setTablePrefix("SYSTEM.");
	return factoryBean.getObject();
}
...

以下示例展示了如何在 XML 中为 JobExplorer 设置表前缀

XML 配置
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
		p:tablePrefix="SYSTEM."/>

JobRegistry

JobRegistry(及其父接口 JobLocator)不是必需的,但如果你想跟踪上下文中可用的 Job,它会很有用。当 Job 在其他地方(例如子上下文中)创建时,它也适用于在应用程序上下文中集中收集 Job。你还可以使用自定义 JobRegistry 实现来操作已注册 Job 的名称和其他属性。框架仅提供了一种实现,它基于 Job 名称到 Job 实例的简单 Map。

  • Java

  • XML

使用 @EnableBatchProcessing 时,系统会为你提供一个 JobRegistry。以下示例展示了如何配置自己的 JobRegistry

...
// This is already provided via the @EnableBatchProcessing but can be customized via
// overriding the bean in the DefaultBatchConfiguration
@Override
@Bean
public JobRegistry jobRegistry() throws Exception {
	return new MapJobRegistry();
}
...

以下示例展示了如何在 XML 中定义 Job 时包含 JobRegistry

<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />

你可以通过以下方式之一填充 JobRegistry:使用 Bean Post Processor,或者使用 Smart Initializing Singleton,或者使用 Registrar Lifecycle Component。接下来的章节将描述这些机制。

JobRegistryBeanPostProcessor

这是一个 Bean Post Processor,可以在 Job 创建时注册它们。

  • Java

  • XML

以下示例展示了如何在 Java 中定义 Job 时包含 JobRegistryBeanPostProcessor

Java 配置
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
    JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
    postProcessor.setJobRegistry(jobRegistry);
    return postProcessor;
}

以下示例展示了如何在 XML 中定义 Job 时包含 JobRegistryBeanPostProcessor

XML 配置
<bean id="jobRegistryBeanPostProcessor" class="org.spr...JobRegistryBeanPostProcessor">
    <property name="jobRegistry" ref="jobRegistry"/>
</bean>

虽然并非严格必要,但示例中的 Post Processor 已被赋予一个 id,以便可以将其包含在子上下文(例如,作为父 Bean 定义)中,并使在那里创建的所有 Job 也自动注册。

已弃用

从版本 5.2 开始,JobRegistryBeanPostProcessor 类已被弃用,取而代之的是 JobRegistrySmartInitializingSingleton,请参阅 JobRegistrySmartInitializingSingleton

JobRegistrySmartInitializingSingleton

这是一个 SmartInitializingSingleton,它会在 Job 注册表中注册所有 Singleton Job。

  • Java

  • XML

以下示例展示了如何在 Java 中定义 JobRegistrySmartInitializingSingleton

Java 配置
@Bean
public JobRegistrySmartInitializingSingleton jobRegistrySmartInitializingSingleton(JobRegistry jobRegistry) {
    return new JobRegistrySmartInitializingSingleton(jobRegistry);
}

以下示例展示了如何在 XML 中定义 JobRegistrySmartInitializingSingleton

XML 配置
<bean class="org.springframework.batch.core.configuration.support.JobRegistrySmartInitializingSingleton">
    <property name="jobRegistry" ref="jobRegistry" />
</bean>

AutomaticJobRegistrar

这是一个生命周期组件,它创建子上下文并在 Job 创建时从这些上下文中注册 Job。这样做的一个优点是,虽然子上下文中的 Job 名称在注册表中仍然需要全局唯一,但它们的依赖项可以具有“自然”名称。例如,你可以创建一组 XML 配置文件,每个文件只有一个 Job,但它们都对具有相同 Bean 名称(例如 reader)的 ItemReader 有不同的定义。如果所有这些文件都导入到同一个上下文中,Reader 的定义就会冲突并相互覆盖,但有了 Automatic Registrar,这就可以避免。这使得集成从应用程序不同模块贡献的 Job 变得更加容易。

  • Java

  • XML

以下示例展示了如何在 Java 中定义 Job 时包含 AutomaticJobRegistrar

Java 配置
@Bean
public AutomaticJobRegistrar registrar() {

    AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();
    registrar.setJobLoader(jobLoader());
    registrar.setApplicationContextFactories(applicationContextFactories());
    registrar.afterPropertiesSet();
    return registrar;

}

以下示例展示了如何在 XML 中定义 Job 时包含 AutomaticJobRegistrar

XML 配置
<bean class="org.spr...AutomaticJobRegistrar">
   <property name="applicationContextFactories">
      <bean class="org.spr...ClasspathXmlApplicationContextsFactoryBean">
         <property name="resources" value="classpath*:/config/job*.xml" />
      </bean>
   </property>
   <property name="jobLoader">
      <bean class="org.spr...DefaultJobLoader">
         <property name="jobRegistry" ref="jobRegistry" />
      </bean>
   </property>
</bean>

该 Registrar 有两个强制属性:一个 ApplicationContextFactory 数组(在前面的示例中从方便的 Factory Bean 创建)和一个 JobLoaderJobLoader 负责管理子上下文的生命周期并在 JobRegistry 中注册 Job。

ApplicationContextFactory 负责创建子上下文。最常见的用法是(如前面的示例所示)使用 ClassPathXmlApplicationContextFactory。该 Factory 的一个特点是,默认情况下,它会将父上下文的一些配置复制到子上下文。因此,例如,只要子上下文的配置与父上下文相同,你就不需要在子上下文中重新定义 PropertyPlaceholderConfigurer 或 AOP 配置。

你可以将 AutomaticJobRegistrarJobRegistryBeanPostProcessor 结合使用(前提是你也使用了 DefaultJobLoader)。例如,如果在主父上下文和子位置中都定义了 Job,这可能是可取的。

JobOperator

如前所述,JobRepository 提供了元数据的 CRUD 操作,而 JobExplorer 提供了元数据的只读操作。然而,当这些操作结合使用以执行常见的监控任务时(例如停止、重启或汇总 Job,这通常由批处理操作员完成),它们最有益处。Spring Batch 在 JobOperator 接口中提供了这些类型的操作。

public interface JobOperator {

    List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;

    List<Long> getJobInstances(String jobName, int start, int count)
          throws NoSuchJobException;

    Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;

    String getParameters(long executionId) throws NoSuchJobExecutionException;

    Long start(String jobName, String parameters)
          throws NoSuchJobException, JobInstanceAlreadyExistsException;

    Long restart(long executionId)
          throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
                  NoSuchJobException, JobRestartException;

    Long startNextInstance(String jobName)
          throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
                 JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;

    boolean stop(long executionId)
          throws NoSuchJobExecutionException, JobExecutionNotRunningException;

    String getSummary(long executionId) throws NoSuchJobExecutionException;

    Map<Long, String> getStepExecutionSummaries(long executionId)
          throws NoSuchJobExecutionException;

    Set<String> getJobNames();

}

前面的操作代表了来自许多不同接口的方法,例如 JobLauncherJobRepositoryJobExplorerJobRegistry。因此,提供的 JobOperator 实现(SimpleJobOperator)具有许多依赖项。

  • Java

  • XML

以下示例展示了 Java 中 SimpleJobOperator 的典型 Bean 定义

 /**
  * All injected dependencies for this bean are provided by the @EnableBatchProcessing
  * infrastructure out of the box.
  */
 @Bean
 public SimpleJobOperator jobOperator(JobExplorer jobExplorer,
                                JobRepository jobRepository,
                                JobRegistry jobRegistry,
                                JobLauncher jobLauncher) {

	SimpleJobOperator jobOperator = new SimpleJobOperator();
	jobOperator.setJobExplorer(jobExplorer);
	jobOperator.setJobRepository(jobRepository);
	jobOperator.setJobRegistry(jobRegistry);
	jobOperator.setJobLauncher(jobLauncher);

	return jobOperator;
 }

以下示例展示了 XML 中 SimpleJobOperator 的典型 Bean 定义

<bean id="jobOperator" class="org.spr...SimpleJobOperator">
    <property name="jobExplorer">
        <bean class="org.spr...JobExplorerFactoryBean">
            <property name="dataSource" ref="dataSource" />
        </bean>
    </property>
    <property name="jobRepository" ref="jobRepository" />
    <property name="jobRegistry" ref="jobRegistry" />
    <property name="jobLauncher" ref="jobLauncher" />
</bean>

从版本 5.0 开始,@EnableBatchProcessing 注解会自动在应用程序上下文中注册一个 Job Operator Bean。

如果你在 Job Repository 上设置了表前缀,请不要忘记在 Job Explorer 上也设置。

JobParametersIncrementer

JobOperator 中的大多数方法都是不言自明的,你可以在 接口的 Javadoc 中找到更详细的解释。然而,startNextInstance 方法值得注意。此方法总是启动 Job 的一个新实例。如果在 JobExecution 中存在严重问题并且需要从头开始重新启动 Job,这会非常有用。与 JobLauncher 不同(JobLauncher 需要一个新的 JobParameters 对象来触发新的 JobInstance),如果参数与之前任何一组参数不同,startNextInstance 方法会使用与 Job 关联的 JobParametersIncrementer 来强制 Job 创建一个新实例。

public interface JobParametersIncrementer {

    JobParameters getNext(JobParameters parameters);

}

JobParametersIncrementer 的契约是,给定一个 JobParameters 对象,它通过递增其中包含的任何必要值来返回“下一个” JobParameters 对象。此策略很有用,因为框架无法知道对 JobParameters 进行哪些更改会使其成为“下一个”实例。例如,如果 JobParameters 中唯一的值是日期,并且应该创建下一个实例,那么该值应该递增一天还是一周(例如,如果 Job 是每周运行的)?同样也可以用于任何有助于标识 Job 的数值,如下面的示例所示

public class SampleIncrementer implements JobParametersIncrementer {

    public JobParameters getNext(JobParameters parameters) {
        if (parameters==null || parameters.isEmpty()) {
            return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
        }
        long id = parameters.getLong("run.id",1L) + 1;
        return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
    }
}

在此示例中,键为 run.id 的值用于区分 JobInstances。如果传入的 JobParameters 为 null,则可以假定该 Job 之前从未运行过,因此可以返回其初始状态。否则,则获取旧值,递增一,并返回。

  • Java

  • XML

对于在 Java 中定义的 Job,你可以通过 Builder 中提供的 incrementer 方法将递增器与 Job 关联,如下所示

@Bean
public Job footballJob(JobRepository jobRepository) {
    return new JobBuilder("footballJob", jobRepository)
    				 .incrementer(sampleIncrementer())
    				 ...
                     .build();
}

对于在 XML 中定义的 Job,你可以通过命名空间中的 incrementer 属性将递增器与 Job 关联,如下所示

<job id="footballJob" incrementer="sampleIncrementer">
    ...
</job>

停止 Job

JobOperator 最常见的用例之一是优雅地停止 Job

Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());

关机不是立即的,因为无法强制立即关机,特别是如果执行当前位于框架无法控制的开发人员代码中,例如业务服务。但是,一旦控制权返回给框架,它会将当前 StepExecution 的状态设置为 BatchStatus.STOPPED,保存它,并在完成之前对 JobExecution 进行相同的操作。

中止 Job

状态为 FAILED 的 Job 执行可以重新启动(如果 Job 是可重新启动的)。状态为 ABANDONED 的 Job 执行无法由框架重新启动。ABANDONED 状态也用于 Step 执行中,以便在重新启动的 Job 执行中将其标记为可跳过。如果一个 Job 正在运行并且遇到一个在之前失败的 Job 执行中被标记为 ABANDONED 的 Step,它将(根据 Job Flow 定义和 Step 执行的 Exit Status 确定)继续执行下一个 Step。

如果进程死亡(kill -9 或服务器故障),Job 当然没有在运行,但 JobRepository 无法知道,因为在进程死亡之前没有人告诉它。你必须手动告诉它你知道执行失败了或者应该被视为已中止(将其状态更改为 FAILEDABANDONED)。这是一个业务决策,无法自动化。只有当 Job 是可重新启动的且你知道重新启动数据有效时,才将其状态更改为 FAILED