拦截 Step 执行

就像 Job 一样,在 Step 执行期间也有许多事件,用户可能需要在这些事件中执行某些功能。例如,要写入需要页脚的平面文件,则需要在 Step 完成时通知 ItemWriter,以便可以写入页脚。这可以通过许多 Step 范围的监听器之一来实现。

您可以通过 listeners 元素将任何实现 StepListener 扩展之一的类(但不能实现该接口本身,因为它为空)应用于步骤。listeners 元素在步骤、tasklet 或块声明内有效。我们建议您在适用功能的级别声明监听器,或者如果它是多功能的(例如 StepExecutionListenerItemReadListener),则在适用功能的最细粒度级别声明它。

  • Java

  • XML

以下示例显示了在 Java 中块级别应用的监听器

Java 配置
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(reader())
				.writer(writer())
				.listener(chunkListener())
				.build();
}

以下示例显示了在 XML 中块级别应用的监听器

XML 配置
<step id="step1">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"/>
        <listeners>
            <listener ref="chunkListener"/>
        </listeners>
    </tasklet>
</step>

如果使用命名空间 <step> 元素或 *StepFactoryBean 工厂之一,则本身实现 StepListener 接口之一的 ItemReaderItemWriterItemProcessor 会自动注册到 Step。这仅适用于直接注入 Step 的组件。如果监听器嵌套在另一个组件中,则需要显式注册它(如前面 ItemStream 注册到 Step 中所述)。

除了 StepListener 接口外,还提供了注释来解决相同的问题。普通旧 Java 对象可以具有带有这些注释的方法,然后将其转换为相应的 StepListener 类型。为块组件(如 ItemReaderItemWriterTasklet)的自定义实现添加注释也很常见。XML 解析器会为 <listener/> 元素分析这些注释,并与生成器中的 listener 方法一起注册,因此您只需使用 XML 命名空间或生成器即可将监听器注册到步骤。

StepExecutionListener

StepExecutionListener 代表 Step 执行的最通用监听器。它允许在 Step 开始之前和结束之后(无论是正常结束还是失败)发出通知,如下例所示

public interface StepExecutionListener extends StepListener {

    void beforeStep(StepExecution stepExecution);

    ExitStatus afterStep(StepExecution stepExecution);

}

ExitStatus 具有 afterStep 的返回类型,以使监听器有机会修改在 Step 完成时返回的退出代码。

与此接口相对应的注释为

  • @BeforeStep

  • @AfterStep

ChunkListener

“块”定义为在事务范围内处理的项目。在每个提交间隔提交事务都会提交一个块。您可以使用 ChunkListener 在块开始处理之前或块成功完成后执行逻辑,如下面的接口定义所示

public interface ChunkListener extends StepListener {

    void beforeChunk(ChunkContext context);
    void afterChunk(ChunkContext context);
    void afterChunkError(ChunkContext context);

}

beforeChunk 方法在事务启动后但 ItemReader 开始读取之前调用。相反,afterChunk 在块已提交后调用(如果发生回滚,则根本不调用)。

与此接口相对应的注释为

  • @BeforeChunk

  • @AfterChunk

  • @AfterChunkError

当没有块声明时,您可以应用 ChunkListenerTaskletStep 负责调用 ChunkListener,因此它也适用于非面向项目的 tasklet(在 tasklet 之前和之后调用)。

ItemReadListener

在之前讨论跳过逻辑时,曾提到记录跳过的记录可能很有用,以便以后处理它们。在读取错误的情况下,可以使用 ItemReaderListener 来完成此操作,如下面的接口定义所示

public interface ItemReadListener<T> extends StepListener {

    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);

}

beforeRead 方法在每次对 ItemReader 进行读取调用之前调用。afterRead 方法在每次成功读取调用后调用,并传递读取的项目。如果读取时出错,则会调用 onReadError 方法。提供遇到的异常,以便可以对其进行记录。

与此接口相对应的注释为

  • @BeforeRead

  • @AfterRead

  • @OnReadError

ItemProcessListener

ItemReadListener 一样,可以“监听”项目的处理,如下面的接口定义所示

public interface ItemProcessListener<T, S> extends StepListener {

    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);

}

beforeProcess 方法在 ItemProcessor 上调用 process 之前调用,并传递要处理的项目。afterProcess 方法在项目成功处理后调用。如果处理时出错,则会调用 onProcessError 方法。提供遇到的异常和尝试处理的项目,以便可以对其进行记录。

与此接口相对应的注释为

  • @BeforeProcess

  • @AfterProcess

  • @OnProcessError

ItemWriteListener

您可以使用 ItemWriteListener “监听”项目的写入,如下面的接口定义所示

public interface ItemWriteListener<S> extends StepListener {

    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);

}

beforeWrite 方法在 ItemWriterwrite 方法之前调用,并传递要写入的项目列表。afterWrite 方法在项目成功写入后但提交与块处理关联的事务之前调用。如果写入过程中发生错误,则调用 onWriteError 方法。会提供遇到的异常和尝试写入的项目,以便进行日志记录。

与此接口相对应的注释为

  • @BeforeWrite

  • @AfterWrite

  • @OnWriteError

SkipListener

ItemReadListenerItemProcessListenerItemWriteListener 都提供了用于接收错误通知的机制,但没有一个通知您记录实际上已被跳过。例如,即使项目被重试并成功,也会调用 onWriteError。因此,有一个单独的接口用于跟踪跳过的项目,如下面的接口定义所示

public interface SkipListener<T,S> extends StepListener {

    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);

}

onSkipInRead 在读取时跳过项目时调用。需要注意的是,回滚可能会导致同一项目被多次注册为已跳过。onSkipInWrite 在写入时跳过项目时调用。因为项目已成功读取(并且未跳过),所以它还将项目本身作为参数提供。

与此接口相对应的注释为

  • @OnSkipInRead

  • @OnSkipInWrite

  • @OnSkipInProcess

SkipListeners 和事务

SkipListener 最常见的用例之一是记录跳过的项目,以便可以使用另一个批处理过程甚至人工过程来评估和修复导致跳过的错误。由于在许多情况下原始事务可能会回滚,因此 Spring Batch 做出以下两个保证

  • 每个项目仅调用一次相应的跳过方法(取决于错误发生的时间)。

  • SkipListener 始终在事务提交之前调用。这是为了确保监听器调用的任何事务性资源不会因 ItemWriter 中的失败而回滚。