高级元数据用法
到目前为止,我们已经讨论了 JobLauncher
和 JobRepository
接口。它们共同代表了 Job 的简单启动以及 Batch 域对象的基本 CRUD 操作。

JobLauncher
使用 JobRepository
创建新的 JobExecution
对象并运行它们。Job
和 Step
实现稍后会在 Job
运行期间使用同一个 JobRepository
对这些执行进行基本更新。基本操作足以应对简单的场景。然而,在包含数百个批处理 Job 和复杂调度要求的大型批处理环境中,需要更高级的元数据访问。

JobExplorer
和 JobOperator
接口将在接下来的章节中讨论,它们为查询和控制元数据提供了附加功能。
查询 Repository
在实现任何高级功能之前,最基本的需求是查询 Repository 中现有执行的能力。此功能由 JobExplorer
接口提供。
public interface JobExplorer {
List<JobInstance> getJobInstances(String jobName, int start, int count);
JobExecution getJobExecution(Long executionId);
StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);
JobInstance getJobInstance(Long instanceId);
List<JobExecution> getJobExecutions(JobInstance jobInstance);
Set<JobExecution> findRunningJobExecutions(String jobName);
}
从其方法签名可以看出,JobExplorer
是 JobRepository
的只读版本,与 JobRepository
类似,可以通过使用 Factory Bean 轻松配置。
-
Java
-
XML
以下示例展示了如何在 Java 中配置 JobExplorer
...
// This would reside in your DefaultBatchConfiguration extension
@Bean
public JobExplorer jobExplorer() throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(this.dataSource);
return factoryBean.getObject();
}
...
以下示例展示了如何在 XML 中配置 JobExplorer
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
p:dataSource-ref="dataSource" />
在本章前面,我们提到可以修改 JobRepository
的表前缀,以允许不同的版本或 Schema。由于 JobExplorer
使用相同的表,因此也需要设置前缀的功能。
-
Java
-
XML
以下示例展示了如何在 Java 中为 JobExplorer
设置表前缀
...
// This would reside in your DefaultBatchConfiguration extension
@Bean
public JobExplorer jobExplorer() throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(this.dataSource);
factoryBean.setTablePrefix("SYSTEM.");
return factoryBean.getObject();
}
...
以下示例展示了如何在 XML 中为 JobExplorer
设置表前缀
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
p:tablePrefix="SYSTEM."/>
JobRegistry
JobRegistry
(及其父接口 JobLocator
)不是必需的,但如果你想跟踪上下文中可用的 Job,它会很有用。当 Job 在其他地方(例如子上下文中)创建时,它也适用于在应用程序上下文中集中收集 Job。你还可以使用自定义 JobRegistry
实现来操作已注册 Job 的名称和其他属性。框架仅提供了一种实现,它基于 Job 名称到 Job 实例的简单 Map。
-
Java
-
XML
使用 @EnableBatchProcessing
时,系统会为你提供一个 JobRegistry
。以下示例展示了如何配置自己的 JobRegistry
...
// This is already provided via the @EnableBatchProcessing but can be customized via
// overriding the bean in the DefaultBatchConfiguration
@Override
@Bean
public JobRegistry jobRegistry() throws Exception {
return new MapJobRegistry();
}
...
以下示例展示了如何在 XML 中定义 Job 时包含 JobRegistry
<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />
你可以通过以下方式之一填充 JobRegistry
:使用 Bean Post Processor,或者使用 Smart Initializing Singleton,或者使用 Registrar Lifecycle Component。接下来的章节将描述这些机制。
JobRegistryBeanPostProcessor
这是一个 Bean Post Processor,可以在 Job 创建时注册它们。
-
Java
-
XML
以下示例展示了如何在 Java 中定义 Job 时包含 JobRegistryBeanPostProcessor
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry);
return postProcessor;
}
以下示例展示了如何在 XML 中定义 Job 时包含 JobRegistryBeanPostProcessor
<bean id="jobRegistryBeanPostProcessor" class="org.spr...JobRegistryBeanPostProcessor">
<property name="jobRegistry" ref="jobRegistry"/>
</bean>
虽然并非严格必要,但示例中的 Post Processor 已被赋予一个 id
,以便可以将其包含在子上下文(例如,作为父 Bean 定义)中,并使在那里创建的所有 Job 也自动注册。
已弃用
从版本 5.2 开始, |
JobRegistrySmartInitializingSingleton
这是一个 SmartInitializingSingleton
,它会在 Job 注册表中注册所有 Singleton Job。
-
Java
-
XML
以下示例展示了如何在 Java 中定义 JobRegistrySmartInitializingSingleton
@Bean
public JobRegistrySmartInitializingSingleton jobRegistrySmartInitializingSingleton(JobRegistry jobRegistry) {
return new JobRegistrySmartInitializingSingleton(jobRegistry);
}
以下示例展示了如何在 XML 中定义 JobRegistrySmartInitializingSingleton
<bean class="org.springframework.batch.core.configuration.support.JobRegistrySmartInitializingSingleton">
<property name="jobRegistry" ref="jobRegistry" />
</bean>
AutomaticJobRegistrar
这是一个生命周期组件,它创建子上下文并在 Job 创建时从这些上下文中注册 Job。这样做的一个优点是,虽然子上下文中的 Job 名称在注册表中仍然需要全局唯一,但它们的依赖项可以具有“自然”名称。例如,你可以创建一组 XML 配置文件,每个文件只有一个 Job,但它们都对具有相同 Bean 名称(例如 reader
)的 ItemReader
有不同的定义。如果所有这些文件都导入到同一个上下文中,Reader 的定义就会冲突并相互覆盖,但有了 Automatic Registrar,这就可以避免。这使得集成从应用程序不同模块贡献的 Job 变得更加容易。
-
Java
-
XML
以下示例展示了如何在 Java 中定义 Job 时包含 AutomaticJobRegistrar
@Bean
public AutomaticJobRegistrar registrar() {
AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();
registrar.setJobLoader(jobLoader());
registrar.setApplicationContextFactories(applicationContextFactories());
registrar.afterPropertiesSet();
return registrar;
}
以下示例展示了如何在 XML 中定义 Job 时包含 AutomaticJobRegistrar
<bean class="org.spr...AutomaticJobRegistrar">
<property name="applicationContextFactories">
<bean class="org.spr...ClasspathXmlApplicationContextsFactoryBean">
<property name="resources" value="classpath*:/config/job*.xml" />
</bean>
</property>
<property name="jobLoader">
<bean class="org.spr...DefaultJobLoader">
<property name="jobRegistry" ref="jobRegistry" />
</bean>
</property>
</bean>
该 Registrar 有两个强制属性:一个 ApplicationContextFactory
数组(在前面的示例中从方便的 Factory Bean 创建)和一个 JobLoader
。JobLoader
负责管理子上下文的生命周期并在 JobRegistry
中注册 Job。
ApplicationContextFactory
负责创建子上下文。最常见的用法是(如前面的示例所示)使用 ClassPathXmlApplicationContextFactory
。该 Factory 的一个特点是,默认情况下,它会将父上下文的一些配置复制到子上下文。因此,例如,只要子上下文的配置与父上下文相同,你就不需要在子上下文中重新定义 PropertyPlaceholderConfigurer
或 AOP 配置。
你可以将 AutomaticJobRegistrar
与 JobRegistryBeanPostProcessor
结合使用(前提是你也使用了 DefaultJobLoader
)。例如,如果在主父上下文和子位置中都定义了 Job,这可能是可取的。
JobOperator
如前所述,JobRepository
提供了元数据的 CRUD 操作,而 JobExplorer
提供了元数据的只读操作。然而,当这些操作结合使用以执行常见的监控任务时(例如停止、重启或汇总 Job,这通常由批处理操作员完成),它们最有益处。Spring Batch 在 JobOperator
接口中提供了这些类型的操作。
public interface JobOperator {
List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;
List<Long> getJobInstances(String jobName, int start, int count)
throws NoSuchJobException;
Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;
String getParameters(long executionId) throws NoSuchJobExecutionException;
Long start(String jobName, String parameters)
throws NoSuchJobException, JobInstanceAlreadyExistsException;
Long restart(long executionId)
throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
NoSuchJobException, JobRestartException;
Long startNextInstance(String jobName)
throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;
boolean stop(long executionId)
throws NoSuchJobExecutionException, JobExecutionNotRunningException;
String getSummary(long executionId) throws NoSuchJobExecutionException;
Map<Long, String> getStepExecutionSummaries(long executionId)
throws NoSuchJobExecutionException;
Set<String> getJobNames();
}
前面的操作代表了来自许多不同接口的方法,例如 JobLauncher
、JobRepository
、JobExplorer
和 JobRegistry
。因此,提供的 JobOperator
实现(SimpleJobOperator
)具有许多依赖项。
-
Java
-
XML
以下示例展示了 Java 中 SimpleJobOperator
的典型 Bean 定义
/**
* All injected dependencies for this bean are provided by the @EnableBatchProcessing
* infrastructure out of the box.
*/
@Bean
public SimpleJobOperator jobOperator(JobExplorer jobExplorer,
JobRepository jobRepository,
JobRegistry jobRegistry,
JobLauncher jobLauncher) {
SimpleJobOperator jobOperator = new SimpleJobOperator();
jobOperator.setJobExplorer(jobExplorer);
jobOperator.setJobRepository(jobRepository);
jobOperator.setJobRegistry(jobRegistry);
jobOperator.setJobLauncher(jobLauncher);
return jobOperator;
}
以下示例展示了 XML 中 SimpleJobOperator
的典型 Bean 定义
<bean id="jobOperator" class="org.spr...SimpleJobOperator">
<property name="jobExplorer">
<bean class="org.spr...JobExplorerFactoryBean">
<property name="dataSource" ref="dataSource" />
</bean>
</property>
<property name="jobRepository" ref="jobRepository" />
<property name="jobRegistry" ref="jobRegistry" />
<property name="jobLauncher" ref="jobLauncher" />
</bean>
从版本 5.0 开始,@EnableBatchProcessing
注解会自动在应用程序上下文中注册一个 Job Operator Bean。
如果你在 Job Repository 上设置了表前缀,请不要忘记在 Job Explorer 上也设置。 |
JobParametersIncrementer
JobOperator
中的大多数方法都是不言自明的,你可以在 接口的 Javadoc 中找到更详细的解释。然而,startNextInstance
方法值得注意。此方法总是启动 Job
的一个新实例。如果在 JobExecution
中存在严重问题并且需要从头开始重新启动 Job
,这会非常有用。与 JobLauncher
不同(JobLauncher
需要一个新的 JobParameters
对象来触发新的 JobInstance
),如果参数与之前任何一组参数不同,startNextInstance
方法会使用与 Job
关联的 JobParametersIncrementer
来强制 Job
创建一个新实例。
public interface JobParametersIncrementer {
JobParameters getNext(JobParameters parameters);
}
JobParametersIncrementer
的契约是,给定一个 JobParameters 对象,它通过递增其中包含的任何必要值来返回“下一个” JobParameters
对象。此策略很有用,因为框架无法知道对 JobParameters
进行哪些更改会使其成为“下一个”实例。例如,如果 JobParameters
中唯一的值是日期,并且应该创建下一个实例,那么该值应该递增一天还是一周(例如,如果 Job 是每周运行的)?同样也可以用于任何有助于标识 Job
的数值,如下面的示例所示
public class SampleIncrementer implements JobParametersIncrementer {
public JobParameters getNext(JobParameters parameters) {
if (parameters==null || parameters.isEmpty()) {
return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
}
long id = parameters.getLong("run.id",1L) + 1;
return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
}
}
在此示例中,键为 run.id
的值用于区分 JobInstances
。如果传入的 JobParameters
为 null,则可以假定该 Job
之前从未运行过,因此可以返回其初始状态。否则,则获取旧值,递增一,并返回。
-
Java
-
XML
对于在 Java 中定义的 Job,你可以通过 Builder 中提供的 incrementer
方法将递增器与 Job
关联,如下所示
@Bean
public Job footballJob(JobRepository jobRepository) {
return new JobBuilder("footballJob", jobRepository)
.incrementer(sampleIncrementer())
...
.build();
}
对于在 XML 中定义的 Job,你可以通过命名空间中的 incrementer
属性将递增器与 Job
关联,如下所示
<job id="footballJob" incrementer="sampleIncrementer">
...
</job>
停止 Job
JobOperator
最常见的用例之一是优雅地停止 Job
Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());
关机不是立即的,因为无法强制立即关机,特别是如果执行当前位于框架无法控制的开发人员代码中,例如业务服务。但是,一旦控制权返回给框架,它会将当前 StepExecution
的状态设置为 BatchStatus.STOPPED
,保存它,并在完成之前对 JobExecution
进行相同的操作。
中止 Job
状态为 FAILED
的 Job 执行可以重新启动(如果 Job
是可重新启动的)。状态为 ABANDONED
的 Job 执行无法由框架重新启动。ABANDONED
状态也用于 Step 执行中,以便在重新启动的 Job 执行中将其标记为可跳过。如果一个 Job 正在运行并且遇到一个在之前失败的 Job 执行中被标记为 ABANDONED
的 Step,它将(根据 Job Flow 定义和 Step 执行的 Exit Status 确定)继续执行下一个 Step。
如果进程死亡(kill -9
或服务器故障),Job 当然没有在运行,但 JobRepository
无法知道,因为在进程死亡之前没有人告诉它。你必须手动告诉它你知道执行失败了或者应该被视为已中止(将其状态更改为 FAILED
或 ABANDONED
)。这是一个业务决策,无法自动化。只有当 Job 是可重新启动的且你知道重新启动数据有效时,才将其状态更改为 FAILED
。