常见批处理模式

一些批处理作业可以完全从 Spring Batch 中的现成组件组装而成。例如,可以配置 ItemReaderItemWriter 实现来涵盖各种场景。但是,对于大多数情况,必须编写自定义代码。应用程序开发人员的主要 API 入口点是 TaskletItemReaderItemWriter 和各种监听器接口。大多数简单的批处理作业可以使用 Spring Batch ItemReader 的现成输入,但通常情况下,处理和写入中存在自定义问题,需要开发人员实现 ItemWriterItemProcessor

在本章中,我们提供了一些自定义业务逻辑中常见模式的示例。这些示例主要以监听器接口为特色。需要注意的是,如果合适,ItemReaderItemWriter 也可以实现监听器接口。

记录项目处理和失败

一个常见的用例是需要对步骤中的错误进行特殊处理,逐项处理,也许是记录到特殊通道或将记录插入数据库。面向块的 Step(从步骤工厂 Bean 创建)允许用户使用简单的 ItemReadListener 来实现此用例,以处理 read 上的错误,以及使用 ItemWriteListener 来处理 write 上的错误。以下代码片段说明了一个监听器,它记录读取和写入失败

public class ItemFailureLoggerListener extends ItemListenerSupport {

    private static Log logger = LogFactory.getLog("item.error");

    public void onReadError(Exception ex) {
        logger.error("Encountered error on read", e);
    }

    public void onWriteError(Exception ex, List<? extends Object> items) {
        logger.error("Encountered error on write", ex);
    }
}

实现此监听器后,必须将其注册到步骤。

  • Java

  • XML

以下示例展示了如何在 Java 中将监听器注册到步骤

Java 配置
@Bean
public Step simpleStep(JobRepository jobRepository) {
	return new StepBuilder("simpleStep", jobRepository)
				...
				.listener(new ItemFailureLoggerListener())
				.build();
}

以下示例展示了如何在 XML 中注册一个步骤监听器。

XML 配置
<step id="simpleStep">
...
<listeners>
    <listener>
        <bean class="org.example...ItemFailureLoggerListener"/>
    </listener>
</listeners>
</step>
如果您的监听器在 onError() 方法中执行任何操作,它必须位于将要回滚的事务中。如果您需要在 onError() 方法中使用事务性资源(例如数据库),请考虑向该方法添加声明式事务(有关详细信息,请参阅 Spring Core 参考指南),并将它的传播属性值设置为 REQUIRES_NEW

出于业务原因手动停止作业

Spring Batch 通过 JobOperator 接口提供了一个 stop() 方法,但这实际上是供操作员使用,而不是应用程序程序员使用。有时,从业务逻辑中停止作业执行更方便或更有意义。

最简单的方法是抛出一个 RuntimeException(既不会无限期重试也不会跳过)。例如,可以使用自定义异常类型,如下例所示

public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> {

    @Override
    public T process(T item) throws Exception {
        if (isPoisonPill(item)) {
            throw new PoisonPillException("Poison pill detected: " + item);
        }
        return item;
    }
}

另一种简单的方法是,从 ItemReader 返回 null 来停止步骤执行,如下例所示

public class EarlyCompletionItemReader implements ItemReader<T> {

    private ItemReader<T> delegate;

    public void setDelegate(ItemReader<T> delegate) { ... }

    public T read() throws Exception {
        T item = delegate.read();
        if (isEndItem(item)) {
            return null; // end the step here
        }
        return item;
    }

}

前面的示例实际上依赖于这样一个事实:CompletionPolicy 策略有一个默认实现,当要处理的项目为 null 时,它会发出一个完整的批处理信号。可以实现更复杂的完成策略,并通过 SimpleStepFactoryBean 将其注入到 Step 中。

  • Java

  • XML

以下示例展示了如何在 Java 中将完成策略注入到步骤中

Java 配置
@Bean
public Step simpleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("simpleStep", jobRepository)
				.<String, String>chunk(new SpecialCompletionPolicy(), transactionManager)
				.reader(reader())
				.writer(writer())
				.build();
}

以下示例展示了如何在 XML 中将完成策略注入到步骤中

XML 配置
<step id="simpleStep">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"
               chunk-completion-policy="completionPolicy"/>
    </tasklet>
</step>

<bean id="completionPolicy" class="org.example...SpecialCompletionPolicy"/>

另一种方法是在 StepExecution 中设置一个标志,该标志由框架中的 Step 实现检查,位于项目处理之间。要实现这种方法,我们需要访问当前的 StepExecution,这可以通过实现 StepListener 并将其注册到 Step 来实现。以下示例展示了一个设置标志的监听器

public class CustomItemWriter extends ItemListenerSupport implements StepListener {

    private StepExecution stepExecution;

    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }

    public void afterRead(Object item) {
        if (isPoisonPill(item)) {
            stepExecution.setTerminateOnly();
       }
    }

}

当设置标志时,默认行为是步骤抛出一个 JobInterruptedException。可以通过 StepInterruptionPolicy 控制此行为。但是,唯一的选择是抛出或不抛出异常,因此这始终是作业的异常结束。

添加页脚记录

通常,在写入平面文件时,必须在所有处理完成后将“页脚”记录追加到文件的末尾。这可以通过使用 Spring Batch 提供的 FlatFileFooterCallback 接口来实现。FlatFileFooterCallback(及其对应项 FlatFileHeaderCallback)是 FlatFileItemWriter 的可选属性,可以添加到项目写入器中。

  • Java

  • XML

以下示例展示了如何在 Java 中使用 FlatFileHeaderCallbackFlatFileFooterCallback

Java 配置
@Bean
public FlatFileItemWriter<String> itemWriter(Resource outputResource) {
	return new FlatFileItemWriterBuilder<String>()
			.name("itemWriter")
			.resource(outputResource)
			.lineAggregator(lineAggregator())
			.headerCallback(headerCallback())
			.footerCallback(footerCallback())
			.build();
}

以下示例展示了如何在 XML 中使用 FlatFileHeaderCallbackFlatFileFooterCallback

XML 配置
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator" ref="lineAggregator"/>
    <property name="headerCallback" ref="headerCallback" />
    <property name="footerCallback" ref="footerCallback" />
</bean>

尾部回调接口只有一个方法,该方法在需要写入尾部时被调用,如下面的接口定义所示。

public interface FlatFileFooterCallback {

    void writeFooter(Writer writer) throws IOException;

}

编写摘要尾部

涉及尾部记录的常见需求是在输出过程中聚合信息,并将这些信息追加到文件末尾。此尾部通常用作文件的摘要或提供校验和。

例如,如果批处理作业将 Trade 记录写入平面文件,并且需要将所有 Trades 的总金额放在尾部,则可以使用以下 ItemWriter 实现。

public class TradeItemWriter implements ItemWriter<Trade>,
                                        FlatFileFooterCallback {

    private ItemWriter<Trade> delegate;

    private BigDecimal totalAmount = BigDecimal.ZERO;

    public void write(Chunk<? extends Trade> items) throws Exception {
        BigDecimal chunkTotal = BigDecimal.ZERO;
        for (Trade trade : items) {
            chunkTotal = chunkTotal.add(trade.getAmount());
        }

        delegate.write(items);

        // After successfully writing all items
        totalAmount = totalAmount.add(chunkTotal);
    }

    public void writeFooter(Writer writer) throws IOException {
        writer.write("Total Amount Processed: " + totalAmount);
    }

    public void setDelegate(ItemWriter delegate) {...}
}

TradeItemWriter 存储一个 totalAmount 值,该值会随着写入的每个 Trade 项目的 amount 而增加。在处理完最后一个 Trade 后,框架会调用 writeFooter,该方法将 totalAmount 放入文件。请注意,write 方法使用了一个临时变量 chunkTotal,该变量存储了块中 Trade 金额的总和。这样做是为了确保如果在 write 方法中发生跳过,totalAmount 不会改变。只有在 write 方法结束时,一旦我们保证不会抛出异常,我们才会更新 totalAmount

为了调用 writeFooter 方法,TradeItemWriter(它实现了 FlatFileFooterCallback)必须作为 footerCallback 连接到 FlatFileItemWriter 中。

  • Java

  • XML

以下示例展示了如何在 Java 中连接 TradeItemWriter

Java 配置
@Bean
public TradeItemWriter tradeItemWriter() {
	TradeItemWriter itemWriter = new TradeItemWriter();

	itemWriter.setDelegate(flatFileItemWriter(null));

	return itemWriter;
}

@Bean
public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) {
	return new FlatFileItemWriterBuilder<String>()
			.name("itemWriter")
			.resource(outputResource)
			.lineAggregator(lineAggregator())
			.footerCallback(tradeItemWriter())
			.build();
}

以下示例展示了如何在 XML 中连接 TradeItemWriter

XML 配置
<bean id="tradeItemWriter" class="..TradeItemWriter">
    <property name="delegate" ref="flatFileItemWriter" />
</bean>

<bean id="flatFileItemWriter" class="org.spr...FlatFileItemWriter">
   <property name="resource" ref="outputResource" />
   <property name="lineAggregator" ref="lineAggregator"/>
   <property name="footerCallback" ref="tradeItemWriter" />
</bean>

目前为止,TradeItemWriter 的编写方式仅在 Step 不可重启的情况下才能正常运行。这是因为该类是有状态的(因为它存储了 totalAmount),但 totalAmount 并没有持久化到数据库中。因此,在重启的情况下无法检索它。为了使该类可重启,应实现 ItemStream 接口以及 openupdate 方法,如下例所示

public void open(ExecutionContext executionContext) {
    if (executionContext.containsKey("total.amount") {
        totalAmount = (BigDecimal) executionContext.get("total.amount");
    }
}

public void update(ExecutionContext executionContext) {
    executionContext.put("total.amount", totalAmount);
}

update 方法在将该对象持久化到数据库之前,将 totalAmount 的最新版本存储到 ExecutionContext 中。open 方法从 ExecutionContext 中检索任何现有的 totalAmount,并将其用作处理的起点,从而使 TradeItemWriter 能够在重启时从上次运行 Step 的位置继续执行。

驱动查询型 ItemReaders

关于读取器和写入器的章节中,讨论了使用分页进行数据库输入。许多数据库供应商(如 DB2)具有极其悲观的锁定策略,如果要读取的表也需要被在线应用程序的其他部分使用,则可能会导致问题。此外,对极其庞大的数据集打开游标可能会在某些供应商的数据库上造成问题。因此,许多项目更喜欢使用“驱动查询”方法来读取数据。这种方法通过迭代键而不是需要返回的整个对象来工作,如下图所示

Driving Query Job
图 1. 驱动查询作业

如您所见,上图所示的示例使用了与基于游标的示例中相同的“FOO”表。但是,它并没有选择整行,而只选择了 ID。因此,read 返回的不是 FOO 对象,而是一个 Integer。然后,可以使用此数字查询“详细信息”,即完整的 Foo 对象,如下图所示

Driving Query Example
图 2. 驱动查询示例

应使用 ItemProcessor 将从驱动查询中获得的键转换为完整的 Foo 对象。可以使用现有的 DAO 根据键查询完整的对象。

多行记录

虽然通常情况下,平面文件中的每条记录都只占一行,但常见的情况是,文件可能包含跨越多行且具有多种格式的记录。以下文件摘录显示了这种安排的示例

HEA;0013100345;2007-02-15
NCU;Smith;Peter;;T;20014539;F
BAD;;Oak Street 31/A;;Small Town;00235;IL;US
FOT;2;2;267.34

从以“HEA”开头的行到以“FOT”开头的行之间的所有内容都被视为一条记录。为了正确处理这种情况,必须考虑以下几点

  • ItemReader 必须一次读取多行记录的每一行,而不是一次读取一条记录,以便可以完整地将其传递给 ItemWriter

  • 每种行类型可能需要不同的标记化方式。

由于单个记录跨越多行,并且我们可能不知道有多少行,因此ItemReader必须小心地始终读取整个记录。为了做到这一点,应该实现一个自定义的ItemReader作为FlatFileItemReader的包装器。

  • Java

  • XML

以下示例展示了如何在 Java 中实现自定义的ItemReader

Java 配置
@Bean
public MultiLineTradeItemReader itemReader() {
	MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();

	itemReader.setDelegate(flatFileItemReader());

	return itemReader;
}

@Bean
public FlatFileItemReader flatFileItemReader() {
	FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<>()
			.name("flatFileItemReader")
			.resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
			.lineTokenizer(orderFileTokenizer())
			.fieldSetMapper(orderFieldSetMapper())
			.build();
	return reader;
}

以下示例展示了如何在 XML 中实现自定义的ItemReader

XML 配置
<bean id="itemReader" class="org.spr...MultiLineTradeItemReader">
    <property name="delegate">
        <bean class="org.springframework.batch.item.file.FlatFileItemReader">
            <property name="resource" value="data/iosample/input/multiLine.txt" />
            <property name="lineMapper">
                <bean class="org.spr...DefaultLineMapper">
                    <property name="lineTokenizer" ref="orderFileTokenizer"/>
                    <property name="fieldSetMapper" ref="orderFieldSetMapper"/>
                </bean>
            </property>
        </bean>
    </property>
</bean>

为了确保每行都正确标记化,这对于固定长度的输入尤其重要,可以在委托的FlatFileItemReader上使用PatternMatchingCompositeLineTokenizer。有关更多详细信息,请参阅读者和作者章节中的FlatFileItemReader。然后,委托读者使用PassThroughFieldSetMapper为每行提供一个FieldSet,并将其返回给包装的ItemReader

  • Java

  • XML

以下示例展示了如何在 Java 中确保每行都正确标记化

Java 内容
@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
	PatternMatchingCompositeLineTokenizer tokenizer =
			new PatternMatchingCompositeLineTokenizer();

	Map<String, LineTokenizer> tokenizers = new HashMap<>(4);

	tokenizers.put("HEA*", headerRecordTokenizer());
	tokenizers.put("FOT*", footerRecordTokenizer());
	tokenizers.put("NCU*", customerLineTokenizer());
	tokenizers.put("BAD*", billingAddressLineTokenizer());

	tokenizer.setTokenizers(tokenizers);

	return tokenizer;
}

以下示例展示了如何在 XML 中确保每行都正确标记化

XML 内容
<bean id="orderFileTokenizer" class="org.spr...PatternMatchingCompositeLineTokenizer">
    <property name="tokenizers">
        <map>
            <entry key="HEA*" value-ref="headerRecordTokenizer" />
            <entry key="FOT*" value-ref="footerRecordTokenizer" />
            <entry key="NCU*" value-ref="customerLineTokenizer" />
            <entry key="BAD*" value-ref="billingAddressLineTokenizer" />
        </map>
    </property>
</bean>

此包装器必须能够识别记录的结尾,以便它可以不断地调用其委托的read(),直到到达结尾。对于读取的每一行,包装器应该构建要返回的项目。一旦到达页脚,就可以返回该项目以传递给ItemProcessorItemWriter,如以下示例所示

private FlatFileItemReader<FieldSet> delegate;

public Trade read() throws Exception {
    Trade t = null;

    for (FieldSet line = null; (line = this.delegate.read()) != null;) {
        String prefix = line.readString(0);
        if (prefix.equals("HEA")) {
            t = new Trade(); // Record must start with header
        }
        else if (prefix.equals("NCU")) {
            Assert.notNull(t, "No header was found.");
            t.setLast(line.readString(1));
            t.setFirst(line.readString(2));
            ...
        }
        else if (prefix.equals("BAD")) {
            Assert.notNull(t, "No header was found.");
            t.setCity(line.readString(4));
            t.setState(line.readString(6));
          ...
        }
        else if (prefix.equals("FOT")) {
            return t; // Record must end with footer
        }
    }
    Assert.isNull(t, "No 'END' was found.");
    return null;
}

执行系统命令

许多批处理作业需要从批处理作业中调用外部命令。此类进程可以由调度程序单独启动,但会丢失有关运行的通用元数据的优势。此外,多步骤作业还需要拆分为多个作业。

由于这种需求非常普遍,Spring Batch 提供了一个用于调用系统命令的Tasklet实现。

  • Java

  • XML

以下示例展示了如何在 Java 中调用外部命令

Java 配置
@Bean
public SystemCommandTasklet tasklet() {
	SystemCommandTasklet tasklet = new SystemCommandTasklet();

	tasklet.setCommand("echo hello");
	tasklet.setTimeout(5000);

	return tasklet;
}

以下示例展示了如何在 XML 中调用外部命令

XML 配置
<bean class="org.springframework.batch.core.step.tasklet.SystemCommandTasklet">
    <property name="command" value="echo hello" />
    <!-- 5 second timeout for the command to complete -->
    <property name="timeout" value="5000" />
</bean>

处理没有找到输入时的步骤完成

在许多批处理场景中,在数据库或文件中找不到要处理的行并不算异常。Step 只是被认为没有找到工作,并以 0 个读取项完成。Spring Batch 中开箱即用的所有 ItemReader 实现都默认采用这种方法。如果即使有输入也无法写入任何内容(通常发生在文件命名错误或类似问题出现时),这可能会导致一些混淆。因此,应该检查元数据本身以确定框架找到了多少工作要处理。但是,如果找不到输入被认为是异常情况呢?在这种情况下,以编程方式检查元数据以查看是否有未处理的项目并导致失败是最佳解决方案。由于这是一个常见的用例,Spring Batch 提供了一个具有此功能的监听器,如 NoWorkFoundStepExecutionListener 类定义所示。

public class NoWorkFoundStepExecutionListener extends StepExecutionListenerSupport {

    public ExitStatus afterStep(StepExecution stepExecution) {
        if (stepExecution.getReadCount() == 0) {
            return ExitStatus.FAILED;
        }
        return null;
    }

}

前面的 StepExecutionListener 在 'afterStep' 阶段检查 StepExecutionreadCount 属性以确定是否没有读取任何项目。如果是这种情况,则返回退出代码 FAILED,表示 Step 应该失败。否则,返回 null,不会影响 Step 的状态。

将数据传递给未来的步骤

将信息从一个步骤传递到另一个步骤通常很有用。这可以通过 ExecutionContext 完成。问题是存在两个 ExecutionContexts:一个在 Step 级别,另一个在 Job 级别。Step ExecutionContext 只在步骤执行期间存在,而 Job ExecutionContext 在整个 Job 中存在。另一方面,Step ExecutionContext 在每次 Step 提交一个块时都会更新,而 Job ExecutionContext 仅在每个 Step 结束时更新。

这种分离的结果是,所有数据都必须在 Step 执行期间放置在 Step ExecutionContext 中。这样做可以确保数据在 Step 运行期间正确存储。如果数据存储到 Job ExecutionContext 中,那么它在 Step 执行期间不会被持久化。如果 Step 失败,该数据将丢失。

public class SavingItemWriter implements ItemWriter<Object> {
    private StepExecution stepExecution;

    public void write(Chunk<? extends Object> items) throws Exception {
        // ...

        ExecutionContext stepContext = this.stepExecution.getExecutionContext();
        stepContext.put("someKey", someObject);
    }

    @BeforeStep
    public void saveStepExecution(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }
}

为了使数据可用于未来的 Steps,必须在步骤完成后将其“提升”到 Job ExecutionContext。Spring Batch 为此目的提供了 ExecutionContextPromotionListener。监听器必须配置与 ExecutionContext 中必须提升的数据相关的键。它还可以选择性地配置一个退出代码模式列表,这些模式应为此进行提升(COMPLETED 是默认值)。与所有监听器一样,它必须在 Step 上注册。

  • Java

  • XML

以下示例展示了如何在 Java 中将步骤提升到 Job ExecutionContext

Java 配置
@Bean
public Job job1(JobRepository jobRepository, Step step1, Step step2) {
	return new JobBuilder("job1", jobRepository)
				.start(step1)
				.next(step2)
				.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(reader())
				.writer(savingWriter())
				.listener(promotionListener())
				.build();
}

@Bean
public ExecutionContextPromotionListener promotionListener() {
	ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();

	listener.setKeys(new String[] {"someKey"});

	return listener;
}

以下示例展示了如何在 XML 中将步骤提升到 Job ExecutionContext

XML 配置
<job id="job1">
    <step id="step1">
        <tasklet>
            <chunk reader="reader" writer="savingWriter" commit-interval="10"/>
        </tasklet>
        <listeners>
            <listener ref="promotionListener"/>
        </listeners>
    </step>

    <step id="step2">
       ...
    </step>
</job>

<beans:bean id="promotionListener" class="org.spr....ExecutionContextPromotionListener">
    <beans:property name="keys">
        <list>
            <value>someKey</value>
        </list>
    </beans:property>
</beans:bean>

最后,必须从 Job ExecutionContext 中检索保存的值,如以下示例所示

public class RetrievingItemWriter implements ItemWriter<Object> {
    private Object someObject;

    public void write(Chunk<? extends Object> items) throws Exception {
        // ...
    }

    @BeforeStep
    public void retrieveInterstepData(StepExecution stepExecution) {
        JobExecution jobExecution = stepExecution.getJobExecution();
        ExecutionContext jobContext = jobExecution.getExecutionContext();
        this.someObject = jobContext.get("someKey");
    }
}