拦截 Step
执行
与 Job
一样,在 Step
执行期间有许多事件,用户可能需要在这些事件中执行某些功能。例如,要写入需要页脚的平面文件,当 Step
完成时需要通知 ItemWriter
,以便可以写入页脚。这可以通过许多 Step
作用域的监听器之一来实现。
您可以通过 listeners
元素将实现 StepListener
扩展接口之一(但不包括该接口本身,因为它为空)的任何类应用于 step。listeners
元素在 step、tasklet 或 chunk 声明中有效。我们建议您在其功能适用的级别声明监听器,或者如果它具有多项功能(例如 StepExecutionListener
和 ItemReadListener
),则在它适用的最细粒度级别声明它。
-
Java
-
XML
以下示例展示了在 Java 中应用于 chunk 级别的监听器
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(reader())
.writer(writer())
.listener(chunkListener())
.build();
}
以下示例展示了在 XML 中应用于 chunk 级别的监听器
<step id="step1">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="10"/>
<listeners>
<listener ref="chunkListener"/>
</listeners>
</tasklet>
</step>
如果使用命名空间 <step>
元素或其中一个 *StepFactoryBean
工厂,实现 StepListener
接口之一的 ItemReader
、ItemWriter
或 ItemProcessor
会自动注册到 Step
。这仅适用于直接注入到 Step
中的组件。如果监听器嵌套在另一个组件内部,则需要显式注册它(如之前在将 ItemStream
注册到 Step
中所述)。
除了 StepListener
接口,还提供了注解来解决相同的问题。普通的 Java 对象可以拥有带有这些注解的方法,然后这些方法会被转换为相应的 StepListener
类型。同时,对 chunk 组件的自定义实现(如 ItemReader
、ItemWriter
或 Tasklet
)进行注解也很常见。XML 解析器和构建器中的 listener
方法都会分析 <listener/>
元素和这些注解,因此您只需使用 XML 命名空间或构建器将监听器注册到 step 即可。
StepExecutionListener
StepExecutionListener
是用于 Step
执行的最通用的监听器。它允许在 Step
开始之前以及在它结束之后(无论正常结束还是失败)进行通知,如下例所示
public interface StepExecutionListener extends StepListener {
void beforeStep(StepExecution stepExecution);
ExitStatus afterStep(StepExecution stepExecution);
}
afterStep
的返回类型是 ExitStatus
,这使监听器有机会修改 Step
完成时返回的退出码。
与此接口对应的注解是
-
@BeforeStep
-
@AfterStep
ChunkListener
“chunk” 定义为在事务范围内处理的项。在每个提交间隔提交事务即是提交一个 chunk。您可以使用 ChunkListener
在 chunk 开始处理之前或 chunk 成功完成之后执行逻辑,如下面的接口定义所示
public interface ChunkListener extends StepListener {
void beforeChunk(ChunkContext context);
void afterChunk(ChunkContext context);
void afterChunkError(ChunkContext context);
}
beforeChunk
方法在事务启动后、ItemReader
开始读取之前调用。反之,afterChunk
在 chunk 提交后调用(如果发生回滚则完全不调用)。
与此接口对应的注解是
-
@BeforeChunk
-
@AfterChunk
-
@AfterChunkError
即使没有 chunk 声明,您也可以应用 ChunkListener
。TaskletStep
负责调用 ChunkListener
,因此它也适用于非面向项的 tasklet(在 tasklet 之前和之后调用)。
ChunkListener
不设计用于抛出 checked exception。错误必须在实现中处理,否则 step 将终止。
ItemReadListener
在之前讨论跳过逻辑时,曾提到记录跳过的记录可能会很有益处,以便以后处理它们。在读取错误的情况下,这可以通过 ItemReaderListener
来完成,如下面的接口定义所示
public interface ItemReadListener<T> extends StepListener {
void beforeRead();
void afterRead(T item);
void onReadError(Exception ex);
}
beforeRead
方法在每次调用 ItemReader
的 read 方法之前调用。afterRead
方法在每次成功调用 read 方法之后调用,并传递读取的项。如果在读取时发生错误,则调用 onReadError
方法。提供遇到的异常,以便可以记录下来。
与此接口对应的注解是
-
@BeforeRead
-
@AfterRead
-
@OnReadError
ItemProcessListener
与 ItemReadListener
类似,可以“监听”项的处理过程,如下面的接口定义所示
public interface ItemProcessListener<T, S> extends StepListener {
void beforeProcess(T item);
void afterProcess(T item, S result);
void onProcessError(T item, Exception e);
}
beforeProcess
方法在调用 ItemProcessor
的 process
方法之前调用,并接收待处理的项。afterProcess
方法在项成功处理后调用。如果在处理时发生错误,则调用 onProcessError
方法。提供遇到的异常和尝试处理的项,以便可以记录下来。
与此接口对应的注解是
-
@BeforeProcess
-
@AfterProcess
-
@OnProcessError
ItemWriteListener
您可以使用 ItemWriteListener
“监听”项的写入过程,如下面的接口定义所示
public interface ItemWriteListener<S> extends StepListener {
void beforeWrite(List<? extends S> items);
void afterWrite(List<? extends S> items);
void onWriteError(Exception exception, List<? extends S> items);
}
beforeWrite
方法在调用 ItemWriter
的 write
方法之前调用,并接收要写入的项列表。afterWrite
方法在项成功写入后调用,但在提交与 chunk 处理关联的事务之前。如果在写入时发生错误,则调用 onWriteError
方法。提供遇到的异常和尝试写入的项,以便可以记录下来。
与此接口对应的注解是
-
@BeforeWrite
-
@AfterWrite
-
@OnWriteError
SkipListener
ItemReadListener
、ItemProcessListener
和 ItemWriteListener
都提供了错误通知机制,但它们都不会告知您某条记录是否已被实际跳过。例如,即使某个项重试成功,onWriteError
也会被调用。因此,有一个单独的接口用于跟踪跳过的项,如下面的接口定义所示
public interface SkipListener<T,S> extends StepListener {
void onSkipInRead(Throwable t);
void onSkipInProcess(T item, Throwable t);
void onSkipInWrite(S item, Throwable t);
}
当读取时跳过某个项时,会调用 onSkipInRead
。值得注意的是,回滚可能会导致同一个项被多次注册为跳过。当写入时跳过某个项时,会调用 onSkipInWrite
。因为该项已被成功读取(未跳过),所以也会将该项本身作为参数提供。
与此接口对应的注解是
-
@OnSkipInRead
-
@OnSkipInWrite
-
@OnSkipInProcess