通过消息启动批处理 Job
使用核心 Spring Batch API 启动批处理 Job 时,您基本上有两种选择:
-
通过命令行,使用 `CommandLineJobRunner`
-
通过编程方式,使用 `JobOperator.start()` 或 `JobLauncher.run()`
例如,当您使用 shell 脚本调用批处理 Job 时,可能需要使用 `CommandLineJobRunner`。另外,您可以直接使用 `JobOperator`(例如,在将 Spring Batch 作为 web 应用程序的一部分使用时)。然而,对于更复杂的用例呢?也许您需要轮询远程 (S)FTP 服务器来检索批处理 Job 的数据,或者您的应用程序必须同时支持多个不同的数据源。例如,您可能不仅从 web 接收数据文件,还从 FTP 和其他来源接收。在调用 Spring Batch 之前,可能需要对输入文件进行额外的转换。
因此,使用 Spring Integration 及其众多适配器来执行批处理 Job 将更加强大。例如,您可以使用 *File Inbound Channel Adapter* 来监视文件系统中的目录,并在输入文件到达时立即启动批处理 Job。此外,您可以创建 Spring Integration 流程,这些流程使用多个不同的适配器,仅通过配置即可轻松地同时从多个源获取批处理 Job 的数据。使用 Spring Integration 实现所有这些场景都很容易,因为它允许 `JobLauncher` 解耦、事件驱动地执行。
Spring Batch Integration 提供了 `JobLaunchingMessageHandler` 类,可用于启动批处理 Job。`JobLaunchingMessageHandler` 的输入由 Spring Integration 消息提供,其有效载荷类型为 `JobLaunchRequest`。此类是待启动的 `Job` 和启动批处理 Job 所必需的 `JobParameters` 的包装器。
下图显示了启动批处理 Job 所需的典型 Spring Integration 消息流。EIP(企业集成模式)网站提供了消息图标及其描述的完整概述。

将 File 转换为 JobLaunchRequest
以下示例将文件转换为 `JobLaunchRequest`
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;
import java.io.File;
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder =
new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,
message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
JobExecution 响应
当批处理 Job 执行时,会返回一个 `JobExecution` 实例。您可以使用此实例来确定执行的状态。如果 `JobExecution` 能够成功创建,它总是会被返回,无论实际执行是否成功。
`JobExecution` 实例如何返回的确切行为取决于提供的 `TaskExecutor`。如果使用 `同步`(单线程)的 `TaskExecutor` 实现,则 `JobExecution` 响应仅在 Job 完成 `后` 返回。使用 `异步` 的 `TaskExecutor` 时,`JobExecution` 实例会立即返回。然后,您可以获取 `JobExecution` 实例的 `id`(通过 `JobExecution.getJobId()`),并使用 `JobExplorer` 向 `JobRepository` 查询 Job 的更新状态。更多信息,请参阅查询 Repository。
Spring Batch Integration 配置
考虑这样一种情况:有人需要创建一个文件 `inbound-channel-adapter` 来监听指定目录中的 CSV 文件,将其交给一个 transformer (`FileMessageToJobRequest`) 进行转换,通过 Job 启动网关启动 Job,并使用 `logging-channel-adapter` 记录 `JobExecution` 的输出。
-
Java
-
XML
以下示例展示了如何在 Java 中配置这个常见用例
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(personJob());
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
filter(new SimplePatternFileListFilter("*.csv")),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
transform(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
以下示例展示了如何在 XML 中配置这个常见用例
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-file:inbound-channel-adapter id="filePoller"
channel="inboundFileChannel"
directory="file:/tmp/myfiles/"
filename-pattern="*.csv">
<int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="io.spring.sbi.FileMessageToJobRequest">
<property name="job" ref="personJob"/>
<property name="fileParameterName" value="input.file.name"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel"/>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
ItemReader 配置示例
既然我们正在轮询文件并启动 Job,就需要配置我们的 Spring Batch `ItemReader`(例如)来使用由名为 "input.file.name" 的 Job 参数定义的位置找到的文件,如下面的 bean 配置所示:
-
Java
-
XML
以下 Java 示例展示了所需的 bean 配置
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
flatFileItemReader.setResource(new FileSystemResource(resource));
...
return flatFileItemReader;
}
以下 XML 示例展示了所需的 bean 配置
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
scope="step">
<property name="resource" value="file://#{jobParameters['input.file.name']}"/>
...
</bean>
前面示例中主要的关注点是将 `#{jobParameters['input.file.name']}` 的值注入到 Resource 属性中,并将 `ItemReader` bean 设置为 step 作用域。将 bean 设置为 step 作用域利用了后期绑定支持,这允许访问 `jobParameters` 变量。