高级元数据使用
到目前为止,我们已经讨论了 JobLauncher
和 JobRepository
接口。它们一起表示作业的简单启动和批处理领域对象的 CRUD 基本操作。
JobLauncher
使用 JobRepository
创建新的 JobExecution
对象并运行它们。Job
和 Step
实现随后在运行 Job
期间使用相同的 JobRepository
对相同的执行进行基本更新。对于简单的场景,基本操作就足够了。但是,在拥有数百个批处理作业和复杂调度需求的大型批处理环境中,需要更高级的元数据访问。
接下来几节将讨论 JobExplorer
和 JobOperator
接口,它们增加了查询和控制元数据的附加功能。
查询存储库
在任何高级功能之前,最基本的需求是能够查询存储库中现有的执行。此功能由 JobExplorer
接口提供。
public interface JobExplorer {
List<JobInstance> getJobInstances(String jobName, int start, int count);
JobExecution getJobExecution(Long executionId);
StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);
JobInstance getJobInstance(Long instanceId);
List<JobExecution> getJobExecutions(JobInstance jobInstance);
Set<JobExecution> findRunningJobExecutions(String jobName);
}
从其方法签名可以看出,JobExplorer
是 JobRepository
的只读版本,并且与 JobRepository
一样,它可以通过使用工厂 bean 来轻松配置。
-
Java
-
XML
以下示例显示如何在 Java 中配置 JobExplorer
...
// This would reside in your DefaultBatchConfiguration extension
@Bean
public JobExplorer jobExplorer() throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(this.dataSource);
return factoryBean.getObject();
}
...
以下示例显示如何在 XML 中配置 JobExplorer
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
p:dataSource-ref="dataSource" />
在本章前面,我们注意到可以修改 JobRepository
的表前缀以允许不同的版本或模式。由于 JobExplorer
使用相同的表,因此它也需要设置前缀的能力。
-
Java
-
XML
以下示例显示如何在 Java 中设置 JobExplorer
的表前缀
...
// This would reside in your DefaultBatchConfiguration extension
@Bean
public JobExplorer jobExplorer() throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(this.dataSource);
factoryBean.setTablePrefix("SYSTEM.");
return factoryBean.getObject();
}
...
以下示例显示如何在 XML 中设置 JobExplorer
的表前缀
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
p:tablePrefix="SYSTEM."/>
JobRegistry
JobRegistry
(及其父接口 JobLocator
)不是强制性的,但如果您想跟踪上下文中有哪些作业可用,它会很有用。当作业在其他地方创建(例如,在子上下文中)时,它也可用于集中收集应用程序上下文中的作业。您还可以使用自定义 JobRegistry
实现来操作已注册作业的名称和其他属性。框架只提供了一个实现,它基于从作业名称到作业实例的简单映射。
-
Java
-
XML
使用 @EnableBatchProcessing
时,会为您提供一个 JobRegistry
。以下示例显示如何配置您自己的 JobRegistry
...
// This is already provided via the @EnableBatchProcessing but can be customized via
// overriding the bean in the DefaultBatchConfiguration
@Override
@Bean
public JobRegistry jobRegistry() throws Exception {
return new MapJobRegistry();
}
...
以下示例显示如何为在 XML 中定义的作业包含 JobRegistry
<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />
您可以通过以下方式之一填充 JobRegistry
:使用 bean 后处理器,或使用智能初始化单例,或使用注册器生命周期组件。接下来的部分将描述这些机制。
JobRegistryBeanPostProcessor
这是一个 bean 后处理器,可以在创建所有作业时注册它们。
-
Java
-
XML
以下示例显示如何为在 Java 中定义的作业包含 JobRegistryBeanPostProcessor
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry);
return postProcessor;
}
以下示例显示如何为在 XML 中定义的作业包含 JobRegistryBeanPostProcessor
<bean id="jobRegistryBeanPostProcessor" class="org.spr...JobRegistryBeanPostProcessor">
<property name="jobRegistry" ref="jobRegistry"/>
</bean>
虽然不是严格必需的,但示例中的后处理器已赋予 id
,以便它可以包含在子上下文(例如,作为父 bean 定义)中,并导致在那里创建的所有作业也自动注册。
从 5.1 版本开始,@EnableBatchProcessing
注解会在应用程序上下文中自动注册 jobRegistryBeanPostProcessor
bean。
JobRegistrySmartInitializingSingleton
这是一个 SmartInitializingSingleton
,它在作业注册表中注册所有单例作业。
-
Java
-
XML
以下示例显示如何在 Java 中定义 JobRegistrySmartInitializingSingleton
@Bean
public JobRegistrySmartInitializingSingleton jobRegistrySmartInitializingSingleton(JobRegistry jobRegistry) {
return new JobRegistrySmartInitializingSingleton(jobRegistry);
}
以下示例显示如何在 XML 中定义 JobRegistrySmartInitializingSingleton
<bean class="org.springframework.batch.core.configuration.support.JobRegistrySmartInitializingSingleton">
<property name="jobRegistry" ref="jobRegistry" />
</bean>
AutomaticJobRegistrar
这是一个生命周期组件,它创建子上下文并在创建这些上下文中的作业时注册这些作业。这样做的一个优点是,虽然子上下文中的作业名称仍然必须在注册表中全局唯一,但它们的依赖项可以具有“自然”名称。因此,例如,您可以创建一组 XML 配置文件,每个文件只有一个作业,但所有文件都对具有相同 bean 名称(例如 reader
)的 ItemReader
具有不同的定义。如果所有这些文件都被导入到同一个上下文中,读取器定义将会冲突并相互覆盖,但是,使用自动注册器,可以避免这种情况。这使得集成来自应用程序不同模块的作业更容易。
-
Java
-
XML
以下示例显示如何为在 Java 中定义的作业包含 AutomaticJobRegistrar
@Bean
public AutomaticJobRegistrar registrar() {
AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();
registrar.setJobLoader(jobLoader());
registrar.setApplicationContextFactories(applicationContextFactories());
registrar.afterPropertiesSet();
return registrar;
}
以下示例显示如何为在 XML 中定义的作业包含 AutomaticJobRegistrar
<bean class="org.spr...AutomaticJobRegistrar">
<property name="applicationContextFactories">
<bean class="org.spr...ClasspathXmlApplicationContextsFactoryBean">
<property name="resources" value="classpath*:/config/job*.xml" />
</bean>
</property>
<property name="jobLoader">
<bean class="org.spr...DefaultJobLoader">
<property name="jobRegistry" ref="jobRegistry" />
</bean>
</property>
</bean>
注册器有两个强制属性:一个 ApplicationContextFactory
数组(在前面的示例中由方便的工厂 bean 创建)和一个 JobLoader
。JobLoader
负责管理子上下文的生命周期并在 JobRegistry
中注册作业。
ApplicationContextFactory
负责创建子上下文。最常见的用法(如前面的示例所示)是使用ClassPathXmlApplicationContextFactory
。此工厂的一个特性是,默认情况下,它会将一些配置从父上下文复制到子上下文。因此,例如,如果子上下文中的配置应该与父上下文相同,则无需在子上下文中重新定义PropertyPlaceholderConfigurer
或AOP配置。
您可以将AutomaticJobRegistrar
与JobRegistryBeanPostProcessor
结合使用(只要您也使用DefaultJobLoader
)。例如,如果主父上下文和子位置中都定义了作业,则可能需要这样做。
JobOperator
如前所述,JobRepository
提供元数据的CRUD操作,JobExplorer
提供元数据的只读操作。但是,这些操作在组合使用以执行常见的监控任务(例如停止、重启或汇总作业,这通常由批处理操作员完成)时最为有用。Spring Batch 在JobOperator
接口中提供了这些类型的操作。
public interface JobOperator {
List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;
List<Long> getJobInstances(String jobName, int start, int count)
throws NoSuchJobException;
Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;
String getParameters(long executionId) throws NoSuchJobExecutionException;
Long start(String jobName, String parameters)
throws NoSuchJobException, JobInstanceAlreadyExistsException;
Long restart(long executionId)
throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
NoSuchJobException, JobRestartException;
Long startNextInstance(String jobName)
throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;
boolean stop(long executionId)
throws NoSuchJobExecutionException, JobExecutionNotRunningException;
String getSummary(long executionId) throws NoSuchJobExecutionException;
Map<Long, String> getStepExecutionSummaries(long executionId)
throws NoSuchJobExecutionException;
Set<String> getJobNames();
}
前面的操作表示来自许多不同接口的方法,例如JobLauncher
、JobRepository
、JobExplorer
和JobRegistry
。因此,提供的JobOperator
实现(SimpleJobOperator
)具有许多依赖项。
-
Java
-
XML
以下示例显示了Java中SimpleJobOperator
的典型bean定义。
/**
* All injected dependencies for this bean are provided by the @EnableBatchProcessing
* infrastructure out of the box.
*/
@Bean
public SimpleJobOperator jobOperator(JobExplorer jobExplorer,
JobRepository jobRepository,
JobRegistry jobRegistry,
JobLauncher jobLauncher) {
SimpleJobOperator jobOperator = new SimpleJobOperator();
jobOperator.setJobExplorer(jobExplorer);
jobOperator.setJobRepository(jobRepository);
jobOperator.setJobRegistry(jobRegistry);
jobOperator.setJobLauncher(jobLauncher);
return jobOperator;
}
以下示例显示了XML中SimpleJobOperator
的典型bean定义。
<bean id="jobOperator" class="org.spr...SimpleJobOperator">
<property name="jobExplorer">
<bean class="org.spr...JobExplorerFactoryBean">
<property name="dataSource" ref="dataSource" />
</bean>
</property>
<property name="jobRepository" ref="jobRepository" />
<property name="jobRegistry" ref="jobRegistry" />
<property name="jobLauncher" ref="jobLauncher" />
</bean>
从5.0版本开始,@EnableBatchProcessing
注解会在应用程序上下文中自动注册一个作业操作符bean。
如果在作业存储库上设置了表前缀,请不要忘记在作业资源管理器上也设置它。 |
JobParametersIncrementer
JobOperator
上的大多数方法是不言自明的,您可以在接口的Javadoc中找到更详细的解释。但是,startNextInstance
方法值得注意。此方法始终启动Job
的新实例。如果JobExecution
中存在严重问题并且需要从头开始重新启动Job
,这将非常有用。与JobLauncher
(需要一个触发新JobInstance
的新JobParameters
对象)不同,如果参数与任何以前的参数集不同,则startNextInstance
方法使用与Job
绑定的JobParametersIncrementer
强制Job
进入新实例。
public interface JobParametersIncrementer {
JobParameters getNext(JobParameters parameters);
}
JobParametersIncrementer
的约定是,给定一个JobParameters对象,它通过递增可能包含的任何必要值来返回“下一个”JobParameters
对象。此策略很有用,因为框架无法知道对JobParameters
的哪些更改使其成为“下一个”实例。例如,如果JobParameters
中唯一的值得是日期,并且应该创建下一个实例,那么该值应该增加一天还是一周(如果作业是每周一次)?对于任何有助于标识Job
的数值,也可以这么说,如下例所示。
public class SampleIncrementer implements JobParametersIncrementer {
public JobParameters getNext(JobParameters parameters) {
if (parameters==null || parameters.isEmpty()) {
return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
}
long id = parameters.getLong("run.id",1L) + 1;
return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
}
}
在此示例中,键为run.id
的值用于区分JobInstances
。如果传入的JobParameters
为空,则可以假设Job
以前从未运行过,因此可以返回其初始状态。但是,如果不是,则获取旧值,将其加1,然后返回。
-
Java
-
XML
对于在Java中定义的作业,您可以通过构建器中提供的incrementer
方法将递增器与Job
关联,如下所示。
@Bean
public Job footballJob(JobRepository jobRepository) {
return new JobBuilder("footballJob", jobRepository)
.incrementer(sampleIncrementer())
...
.build();
}
对于在XML中定义的作业,您可以通过命名空间中的incrementer
属性将递增器与Job
关联,如下所示。
<job id="footballJob" incrementer="sampleIncrementer">
...
</job>
停止作业
JobOperator
最常见的用例之一是优雅地停止作业。
Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());
关闭不是立即的,因为没有办法强制立即关闭,尤其是在当前执行在框架无法控制的开发人员代码中(例如业务服务)。但是,一旦控制权返回到框架,它就会将当前StepExecution
的状态设置为BatchStatus.STOPPED
,保存它,并在完成前对JobExecution
执行相同的操作。
中止作业
可以重新启动状态为FAILED
的作业执行(如果Job
是可重新启动的)。状态为ABANDONED
的作业执行无法由框架重新启动。ABANDONED
状态也用于步骤执行,以将其标记为在重新启动的作业执行中可跳过。如果作业正在运行并遇到在之前的失败作业执行中已标记为ABANDONED
的步骤,它将继续执行下一步(由作业流程定义和步骤执行退出状态确定)。
如果进程死亡(kill -9
或服务器故障),作业当然没有运行,但JobRepository
无法知道,因为在进程死亡之前没有人告诉它。您必须手动告诉它您知道执行已失败或应被视为已中止(将其状态更改为FAILED
或ABANDONED
)。这是一个业务决策,无法自动化。只有当它是可重新启动的并且您知道重新启动数据有效时,才将其状态更改为FAILED
。