高级元数据使用

到目前为止,我们已经讨论了JobLauncherJobRepository接口。它们共同代表了作业的简单启动和批处理域对象的简单 CRUD 操作

Job Repository
图 1. 作业存储库

JobLauncher 使用 JobRepository 创建新的 JobExecution 对象并运行它们。JobStep 实现随后在运行 Job 期间使用相同的 JobRepository 对相同的执行进行基本更新。基本操作足以满足简单的场景。但是,在拥有数百个批处理作业和复杂调度要求的大型批处理环境中,需要更高级的元数据访问权限。

Job Repository Advanced
图 2. 高级作业存储库访问

JobExplorerJobOperator 接口(将在接下来的部分中讨论)为查询和控制元数据添加了额外的功能。

查询存储库

在任何高级功能之前,最基本的需求是能够查询存储库以查找现有执行。此功能由 JobExplorer 接口提供。

public interface JobExplorer {

    List<JobInstance> getJobInstances(String jobName, int start, int count);

    JobExecution getJobExecution(Long executionId);

    StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);

    JobInstance getJobInstance(Long instanceId);

    List<JobExecution> getJobExecutions(JobInstance jobInstance);

    Set<JobExecution> findRunningJobExecutions(String jobName);
}

从其方法签名可以明显看出,JobExplorerJobRepository 的只读版本,并且与 JobRepository 一样,它可以通过使用工厂 Bean 轻松配置。

  • Java

  • XML

以下示例展示了如何在 Java 中配置 JobExplorer

Java 配置
...
// This would reside in your DefaultBatchConfiguration extension
@Bean
public JobExplorer jobExplorer() throws Exception {
	JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
	factoryBean.setDataSource(this.dataSource);
	return factoryBean.getObject();
}
...

以下示例展示了如何在 XML 中配置 JobExplorer

XML 配置
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
      p:dataSource-ref="dataSource" />

本章前面,我们注意到您可以修改 JobRepository 的表前缀以允许不同的版本或模式。由于 JobExplorer 使用相同的表,因此它也需要设置前缀的能力。

  • Java

  • XML

以下示例展示了如何在 Java 中为 JobExplorer 设置表前缀

Java 配置
...
// This would reside in your DefaultBatchConfiguration extension
@Bean
public JobExplorer jobExplorer() throws Exception {
	JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
	factoryBean.setDataSource(this.dataSource);
	factoryBean.setTablePrefix("SYSTEM.");
	return factoryBean.getObject();
}
...

以下示例展示了如何在 XML 中为 JobExplorer 设置表前缀

XML 配置
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
		p:tablePrefix="SYSTEM."/>

JobRegistry

JobRegistry(及其父接口 JobLocator)不是必需的,但如果您想跟踪上下文中有哪些作业可用,它会很有用。它对于在作业在其他地方创建(例如,在子上下文)时,将作业集中收集到应用程序上下文中也很有用。您还可以使用自定义 JobRegistry 实现来操作注册的作业的名称和其他属性。框架只提供了一个实现,它基于从作业名称到作业实例的简单映射。

  • Java

  • XML

使用 @EnableBatchProcessing 时,会为您提供 JobRegistry。以下示例展示了如何配置您自己的 JobRegistry

...
// This is already provided via the @EnableBatchProcessing but can be customized via
// overriding the bean in the DefaultBatchConfiguration
@Override
@Bean
public JobRegistry jobRegistry() throws Exception {
	return new MapJobRegistry();
}
...

以下示例展示了如何在 XML 中定义的作业中包含 JobRegistry

<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />

您可以通过以下几种方式填充 JobRegistry:使用 Bean 后置处理器,或使用智能初始化单例,或使用注册器生命周期组件。接下来的部分将描述这些机制。

JobRegistryBeanPostProcessor

这是一个 Bean 后置处理器,它可以在创建所有作业时注册它们。

  • Java

  • XML

以下示例展示了如何在 Java 中定义的作业中包含 JobRegistryBeanPostProcessor

Java 配置
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
    JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
    postProcessor.setJobRegistry(jobRegistry);
    return postProcessor;
}

以下示例展示了如何在 XML 中定义的作业中包含 JobRegistryBeanPostProcessor

XML 配置
<bean id="jobRegistryBeanPostProcessor" class="org.spr...JobRegistryBeanPostProcessor">
    <property name="jobRegistry" ref="jobRegistry"/>
</bean>

虽然这不是严格必要的,但示例中的后置处理器被赋予了一个 id,以便它可以包含在子上下文(例如,作为父 Bean 定义)中,并导致在那里创建的所有作业也被自动注册。

从 5.1 版本开始,@EnableBatchProcessing 注解会在应用程序上下文中自动注册一个 jobRegistryBeanPostProcessor Bean。

JobRegistrySmartInitializingSingleton

这是一个 SmartInitializingSingleton,它在作业注册表中注册所有单例作业。

  • Java

  • XML

以下示例展示了如何在 Java 中定义 JobRegistrySmartInitializingSingleton

Java 配置
@Bean
public JobRegistrySmartInitializingSingleton jobRegistrySmartInitializingSingleton(JobRegistry jobRegistry) {
    return new JobRegistrySmartInitializingSingleton(jobRegistry);
}

以下示例展示了如何在 XML 中定义 JobRegistrySmartInitializingSingleton

XML 配置
<bean class="org.springframework.batch.core.configuration.support.JobRegistrySmartInitializingSingleton">
    <property name="jobRegistry" ref="jobRegistry" />
</bean>

AutomaticJobRegistrar

这是一个生命周期组件,它创建子上下文并在创建时从这些上下文中注册作业。这样做的一个优点是,虽然子上下文中的作业名称在注册表中仍然必须是全局唯一的,但它们的依赖关系可以具有“自然”名称。因此,例如,您可以创建一组 XML 配置文件,每个文件只有一个作业,但它们都具有不同的 ItemReader 定义,并且具有相同的 Bean 名称,例如 reader。如果所有这些文件都被导入到同一个上下文中,那么阅读器定义将发生冲突并相互覆盖,但是,使用自动注册器,可以避免这种情况。这使得集成来自应用程序不同模块的作业变得更容易。

  • Java

  • XML

以下示例展示了如何在 Java 中定义的作业中包含 AutomaticJobRegistrar

Java 配置
@Bean
public AutomaticJobRegistrar registrar() {

    AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();
    registrar.setJobLoader(jobLoader());
    registrar.setApplicationContextFactories(applicationContextFactories());
    registrar.afterPropertiesSet();
    return registrar;

}

以下示例展示了如何在 XML 中定义的作业中包含 AutomaticJobRegistrar

XML 配置
<bean class="org.spr...AutomaticJobRegistrar">
   <property name="applicationContextFactories">
      <bean class="org.spr...ClasspathXmlApplicationContextsFactoryBean">
         <property name="resources" value="classpath*:/config/job*.xml" />
      </bean>
   </property>
   <property name="jobLoader">
      <bean class="org.spr...DefaultJobLoader">
         <property name="jobRegistry" ref="jobRegistry" />
      </bean>
   </property>
</bean>

注册器有两个必填属性:一个 ApplicationContextFactory 数组(从前面的示例中的一个方便的工厂 Bean 创建)和一个 JobLoaderJobLoader 负责管理子上下文的生命周期并在 JobRegistry 中注册作业。

ApplicationContextFactory 负责创建子上下文。最常见的用法是(如前面的示例)使用 ClassPathXmlApplicationContextFactory。此工厂的一个特性是,默认情况下,它会将一些配置从父上下文复制到子上下文。因此,例如,您不需要在子上下文中重新定义 PropertyPlaceholderConfigurer 或 AOP 配置,前提是它应该与父上下文相同。

您可以将AutomaticJobRegistrarJobRegistryBeanPostProcessor结合使用(只要您也使用DefaultJobLoader)。例如,如果主父上下文和子位置中都定义了作业,则可能需要这样做。

JobOperator

如前所述,JobRepository提供元数据的CRUD操作,而JobExplorer提供元数据的只读操作。但是,这些操作在用于执行常见的监控任务(例如停止、重新启动或汇总作业)时最有用,这通常由批处理操作员完成。Spring Batch在JobOperator接口中提供了这些类型的操作

public interface JobOperator {

    List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;

    List<Long> getJobInstances(String jobName, int start, int count)
          throws NoSuchJobException;

    Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;

    String getParameters(long executionId) throws NoSuchJobExecutionException;

    Long start(String jobName, String parameters)
          throws NoSuchJobException, JobInstanceAlreadyExistsException;

    Long restart(long executionId)
          throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
                  NoSuchJobException, JobRestartException;

    Long startNextInstance(String jobName)
          throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
                 JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;

    boolean stop(long executionId)
          throws NoSuchJobExecutionException, JobExecutionNotRunningException;

    String getSummary(long executionId) throws NoSuchJobExecutionException;

    Map<Long, String> getStepExecutionSummaries(long executionId)
          throws NoSuchJobExecutionException;

    Set<String> getJobNames();

}

前面的操作代表来自许多不同接口的方法,例如JobLauncherJobRepositoryJobExplorerJobRegistry。因此,提供的JobOperator实现(SimpleJobOperator)具有许多依赖项。

  • Java

  • XML

以下示例显示了Java中SimpleJobOperator的典型bean定义

 /**
  * All injected dependencies for this bean are provided by the @EnableBatchProcessing
  * infrastructure out of the box.
  */
 @Bean
 public SimpleJobOperator jobOperator(JobExplorer jobExplorer,
                                JobRepository jobRepository,
                                JobRegistry jobRegistry,
                                JobLauncher jobLauncher) {

	SimpleJobOperator jobOperator = new SimpleJobOperator();
	jobOperator.setJobExplorer(jobExplorer);
	jobOperator.setJobRepository(jobRepository);
	jobOperator.setJobRegistry(jobRegistry);
	jobOperator.setJobLauncher(jobLauncher);

	return jobOperator;
 }

以下示例显示了XML中SimpleJobOperator的典型bean定义

<bean id="jobOperator" class="org.spr...SimpleJobOperator">
    <property name="jobExplorer">
        <bean class="org.spr...JobExplorerFactoryBean">
            <property name="dataSource" ref="dataSource" />
        </bean>
    </property>
    <property name="jobRepository" ref="jobRepository" />
    <property name="jobRegistry" ref="jobRegistry" />
    <property name="jobLauncher" ref="jobLauncher" />
</bean>

从版本5.0开始,@EnableBatchProcessing注释会自动在应用程序上下文中注册一个作业操作员bean。

如果您在作业存储库上设置了表前缀,请不要忘记在作业资源管理器上也设置它。

JobParametersIncrementer

JobOperator上的大多数方法是不言自明的,您可以在接口的Javadoc中找到更详细的解释。但是,startNextInstance方法值得注意。此方法始终启动Job的新实例。如果JobExecution中存在严重问题,并且需要从头开始重新启动Job,这将非常有用。与JobLauncher(需要一个新的JobParameters对象来触发新的JobInstance)不同,如果参数与任何以前的参数集不同,则startNextInstance方法使用与Job绑定的JobParametersIncrementer强制Job进入新的实例

public interface JobParametersIncrementer {

    JobParameters getNext(JobParameters parameters);

}

JobParametersIncrementer的约定是,给定一个JobParameters对象,它通过递增其可能包含的任何必要值来返回“下一个”JobParameters对象。此策略很有用,因为框架无法知道对JobParameters的哪些更改使其成为“下一个”实例。例如,如果JobParameters中唯一的价值是日期,并且应该创建下一个实例,那么该价值应该增加一天还是一周(例如,如果作业是每周一次)?对于任何有助于识别Job的数值,也可以说相同的话,如下面的示例所示

public class SampleIncrementer implements JobParametersIncrementer {

    public JobParameters getNext(JobParameters parameters) {
        if (parameters==null || parameters.isEmpty()) {
            return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
        }
        long id = parameters.getLong("run.id",1L) + 1;
        return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
    }
}

在此示例中,具有run.id键的值用于区分JobInstances。如果传入的JobParameters为null,则可以假设该Job以前从未运行过,因此可以返回其初始状态。但是,如果不是,则获取旧值,将其递增1,然后返回。

  • Java

  • XML

对于在 Java 中定义的作业,您可以通过构建器中提供的 incrementer 方法将增量器与 Job 关联,如下所示

@Bean
public Job footballJob(JobRepository jobRepository) {
    return new JobBuilder("footballJob", jobRepository)
    				 .incrementer(sampleIncrementer())
    				 ...
                     .build();
}

对于在 XML 中定义的作业,您可以通过命名空间中的 incrementer 属性将增量器与 Job 关联,如下所示

<job id="footballJob" incrementer="sampleIncrementer">
    ...
</job>

停止作业

JobOperator 最常见的用例之一是优雅地停止作业

Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());

关闭不是立即的,因为没有办法强制立即关闭,尤其是在执行当前处于框架无法控制的开发人员代码中时,例如业务服务。但是,一旦控制权返回到框架,它就会将当前 StepExecution 的状态设置为 BatchStatus.STOPPED,保存它,并在完成之前对 JobExecution 执行相同的操作。

中止作业

状态为 FAILED 的作业执行可以重新启动(如果 Job 可重新启动)。状态为 ABANDONED 的作业执行无法由框架重新启动。ABANDONED 状态也用于步骤执行,以将它们标记为在重新启动的作业执行中可跳过。如果作业正在运行并遇到在先前失败的作业执行中标记为 ABANDONED 的步骤,它将继续执行到下一步(由作业流程定义和步骤执行退出状态确定)。

如果进程死亡(kill -9 或服务器故障),作业当然不会运行,但 JobRepository 无法知道,因为在进程死亡之前没有人告诉它。您必须手动告诉它,您知道执行已失败或应被视为中止(将其状态更改为 FAILEDABANDONED)。这是一个业务决策,无法自动化。仅当作业可重新启动且您知道重新启动数据有效时,才将状态更改为 FAILED