通过消息启动批处理作业

使用核心 Spring Batch API 启动批处理作业时,您基本上有两种选择

  • 从命令行,使用 CommandLineJobRunner

  • 以编程方式,使用 JobOperator.start()JobLauncher.run()

例如,您可能希望在使用 shell 脚本调用批处理作业时使用 CommandLineJobRunner。或者,您可以直接使用 JobOperator(例如,在将 Spring Batch 用作 Web 应用程序的一部分时)。但是,更复杂的使用场景呢?也许您需要轮询远程 (S)FTP 服务器以检索批处理作业的数据,或者您的应用程序必须同时支持多个不同的数据源。例如,您可能不仅从 Web 接收数据文件,还从 FTP 和其他来源接收数据文件。也许在调用 Spring Batch 之前需要对输入文件进行额外的转换。

因此,使用 Spring Integration 及其众多适配器来执行批处理作业将更加强大。例如,您可以使用文件入站通道适配器监控文件系统中的目录,并在输入文件到达时启动批处理作业。此外,您可以创建使用多个不同适配器的 Spring Integration 流,通过仅使用配置轻松地从多个来源同时提取数据以供您的批处理作业使用。使用 Spring Integration 实现所有这些场景都很容易,因为它允许对JobLauncher进行解耦的、事件驱动的执行。

Spring Batch Integration 提供了JobLaunchingMessageHandler类,您可以使用它来启动批处理作业。JobLaunchingMessageHandler的输入由 Spring Integration 消息提供,该消息的有效负载类型为JobLaunchRequest。此类是围绕要启动的Job以及启动批处理作业所需的JobParameters的包装器。

下图显示了启动批处理作业所需的典型 Spring Integration 消息流。 EIP(企业集成模式)网站提供了消息图标及其描述的完整概述。

Launch Batch Job
图 1. 启动批处理作业

将文件转换为 JobLaunchRequest

以下示例将文件转换为JobLaunchRequest

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

import java.io.File;

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}

JobExecution 响应

当批处理作业正在执行时,将返回一个JobExecution实例。您可以使用此实例来确定执行的状态。如果能够成功创建JobExecution,则始终会返回它,无论实际执行是否成功。

JobExecution实例的返回方式的具体行为取决于提供的TaskExecutor。如果使用同步(单线程)TaskExecutor实现,则仅在作业完成后返回JobExecution响应。当使用异步TaskExecutor时,JobExecution实例会立即返回。然后,您可以获取JobExecution实例的id(使用JobExecution.getJobId())并使用JobExplorer查询JobRepository以获取作业的更新状态。有关更多信息,请参阅查询存储库

Spring Batch Integration 配置

考虑一个需要创建文件入站通道适配器来监听提供的目录中的 CSV 文件、将它们传递给转换器(FileMessageToJobRequest)、通过作业启动网关启动作业以及使用日志记录通道适配器记录JobExecution输出的情况。

  • Java

  • XML

以下示例展示了如何在 Java 中配置这种常见情况

Java 配置
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);

    return jobLaunchingGateway;
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
    return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
                    filter(new SimplePatternFileListFilter("*.csv")),
            c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
            transform(fileMessageToJobRequest()).
            handle(jobLaunchingGateway).
            log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
            get();
}

以下示例展示了如何在 XML 中配置这种常见情况

XML 配置
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>

<int-file:inbound-channel-adapter id="filePoller"
    channel="inboundFileChannel"
    directory="file:/tmp/myfiles/"
    filename-pattern="*.csv">
  <int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>

<int:transformer input-channel="inboundFileChannel"
    output-channel="outboundJobRequestChannel">
  <bean class="io.spring.sbi.FileMessageToJobRequest">
    <property name="job" ref="personJob"/>
    <property name="fileParameterName" value="input.file.name"/>
  </bean>
</int:transformer>

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel"/>

<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

示例 ItemReader 配置

现在我们正在轮询文件并启动作业,我们需要配置我们的 Spring Batch ItemReader(例如)以使用在名为“input.file.name”的作业参数定义的位置找到的文件,如下面的 bean 配置所示

  • Java

  • XML

以下 Java 示例展示了必要的 bean 配置

Java 配置
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
    FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
    flatFileItemReader.setResource(new FileSystemResource(resource));
...
    return flatFileItemReader;
}

以下 XML 示例展示了必要的 bean 配置

XML 配置
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
    scope="step">
  <property name="resource" value="file://#{jobParameters['input.file.name']}"/>
    ...
</bean>

前面示例中的主要关注点是将 #{jobParameters['input.file.name']} 的值注入为 Resource 属性值,并将 ItemReader bean 设置为具有步骤范围。将 bean 设置为具有步骤范围利用了延迟绑定支持,这允许访问 jobParameters 变量。