重复
RepeatTemplate
批处理就是重复性的操作,既可以是简单的优化,也可以是作业的一部分。为了制定和概括重复操作并提供迭代器框架,Spring Batch 提供了 RepeatOperations 接口。RepeatOperations 接口的定义如下:
public interface RepeatOperations {
RepeatStatus iterate(RepeatCallback callback) throws RepeatException;
}
回调是一个接口,其定义如下,允许您插入一些要重复的业务逻辑:
public interface RepeatCallback {
RepeatStatus doInIteration(RepeatContext context) throws Exception;
}
回调会重复执行,直到实现确定迭代应该结束为止。这些接口中的返回值是一个枚举值,可以是 RepeatStatus.CONTINUABLE 或 RepeatStatus.FINISHED。RepeatStatus 枚举向重复操作的调用者传达是否有任何工作剩余的信息。一般来说,RepeatOperations 的实现应该检查 RepeatStatus,并将其作为结束迭代决策的一部分。任何希望向调用者表明没有工作剩余的回调都可以返回 RepeatStatus.FINISHED。
RepeatOperations 最简单的通用实现是 RepeatTemplate。
RepeatTemplate template = new RepeatTemplate();
template.setCompletionPolicy(new SimpleCompletionPolicy(2));
template.iterate(new RepeatCallback() {
public RepeatStatus doInIteration(RepeatContext context) {
// Do stuff in batch...
return RepeatStatus.CONTINUABLE;
}
});
在前面的示例中,我们返回 RepeatStatus.CONTINUABLE,表示还有更多工作要做。回调也可以返回 RepeatStatus.FINISHED,向调用者表明没有工作剩余。有些迭代可能会因回调中正在进行的工作的内在考虑而终止。其他迭代实际上是无限循环(就回调而言),并且完成决策委托给外部策略,如前面示例所示。
完成策略
在 RepeatTemplate 中,iterate 方法中循环的终止由 CompletionPolicy 决定,它也是 RepeatContext 的工厂。RepeatTemplate 负责使用当前策略创建 RepeatContext,并在迭代的每个阶段将其传递给 RepeatCallback。在回调完成其 doInIteration 后,RepeatTemplate 必须调用 CompletionPolicy 以要求它更新其状态(将存储在 RepeatContext 中)。然后它询问策略迭代是否完成。
Spring Batch 提供了一些简单的通用 CompletionPolicy 实现。SimpleCompletionPolicy 允许执行固定次数(RepeatStatus.FINISHED 强制随时提前完成)。
用户可能需要实现自己的完成策略以进行更复杂的决策。例如,一个批处理窗口阻止批处理作业在在线系统使用后执行,这将需要一个自定义策略。
异常处理
如果在 RepeatCallback 内部抛出异常,RepeatTemplate 会咨询 ExceptionHandler,后者可以决定是否重新抛出异常。
以下列表显示了 ExceptionHandler 接口定义:
public interface ExceptionHandler {
void handleException(RepeatContext context, Throwable throwable)
throws Throwable;
}
一个常见的用例是计算给定类型异常的数量,并在达到限制时失败。为此,Spring Batch 提供了 SimpleLimitExceptionHandler 和一个稍微更灵活的 RethrowOnThresholdExceptionHandler。SimpleLimitExceptionHandler 具有一个限制属性和一个应该与当前异常进行比较的异常类型。还计算给定类型的所有子类。给定类型的异常将被忽略,直到达到限制,然后它们将被重新抛出。其他类型的异常总是被重新抛出。
SimpleLimitExceptionHandler 的一个重要可选属性是名为 useParent 的布尔标志。它默认为 false,因此限制仅在当前 RepeatContext 中计算。当设置为 true 时,限制会在嵌套迭代中的同级上下文之间保持(例如,步骤内的一组块)。
监听器
通常,能够为跨多个不同迭代的横切关注点接收额外的回调非常有用。为此,Spring Batch 提供了 RepeatListener 接口。RepeatTemplate 允许用户注册 RepeatListener 实现,并在迭代期间获得带有 RepeatContext 和 RepeatStatus(如果可用)的回调。
RepeatListener 接口的定义如下:
public interface RepeatListener {
void before(RepeatContext context);
void after(RepeatContext context, RepeatStatus result);
void open(RepeatContext context);
void onError(RepeatContext context, Throwable e);
void close(RepeatContext context);
}
open 和 close 回调在整个迭代之前和之后发生。before、after 和 onError 适用于单独的 RepeatCallback 调用。
请注意,当有多个监听器时,它们在一个列表中,因此存在顺序。在这种情况下,open 和 before 以相同的顺序调用,而 after、onError 和 close 以相反的顺序调用。
并行处理
RepeatOperations 的实现不限于按顺序执行回调。一些实现能够并行执行其回调非常重要。为此,Spring Batch 提供了 TaskExecutorRepeatTemplate,它使用 Spring TaskExecutor 策略运行 RepeatCallback。默认是使用 SynchronousTaskExecutor,其效果是在同一线程中执行整个迭代(与普通的 RepeatTemplate 相同)。
声明式迭代
有时,您知道每次发生时都希望重复一些业务处理。经典的例子是消息管道的优化。如果一批消息频繁到达,则处理它们比为每条消息承担单独事务的成本更有效率。Spring Batch 提供了一个 AOP 拦截器,它为此目的将方法调用包装在 RepeatOperations 对象中。RepeatOperationsInterceptor 执行被拦截的方法并根据提供的 RepeatTemplate 中的 CompletionPolicy 进行重复。
-
Java
-
XML
以下示例使用 Java 配置重复调用名为 processMessage 的服务方法(有关如何配置 AOP 拦截器的更多详细信息,请参阅Spring 用户指南):
@Bean
public MyService myService() {
ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
factory.setInterfaces(MyService.class);
factory.setTarget(new MyService());
MyService service = (MyService) factory.getProxy();
JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
pointcut.setPatterns(".*processMessage.*");
RepeatOperationsInterceptor interceptor = new RepeatOperationsInterceptor();
((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));
return service;
}
以下示例显示了使用 Spring AOP 命名空间重复调用名为 processMessage 的服务方法的声明式迭代(有关如何配置 AOP 拦截器的更多详细信息,请参阅Spring 用户指南):
<aop:config>
<aop:pointcut id="transactional"
expression="execution(* com..*Service.processMessage(..))" />
<aop:advisor pointcut-ref="transactional"
advice-ref="retryAdvice" order="-1"/>
</aop:config>
<bean id="retryAdvice" class="org.spr...RepeatOperationsInterceptor"/>
前面的示例在拦截器内部使用默认的 RepeatTemplate。要更改策略、监听器和其他详细信息,您可以将 RepeatTemplate 实例注入到拦截器中。
如果被拦截方法返回 void,则拦截器始终返回 RepeatStatus.CONTINUABLE(因此如果 CompletionPolicy 没有有限的终点,则存在无限循环的危险)。否则,它返回 RepeatStatus.CONTINUABLE,直到被拦截方法的返回值是 null。此时,它返回 RepeatStatus.FINISHED。因此,目标方法内部的业务逻辑可以通过返回 null 或抛出由提供的 RepeatTemplate 中的 ExceptionHandler 重新抛出的异常来表示没有更多工作要做。