重复

RepeatTemplate

批处理涉及到重复的操作,无论是作为简单的优化还是作为作业的一部分。为了策略化和泛化重复过程,并提供类似于迭代器框架的功能,Spring Batch 提供了 RepeatOperations 接口。RepeatOperations 接口定义如下:

public interface RepeatOperations {

    RepeatStatus iterate(RepeatCallback callback) throws RepeatException;

}

回调是一个接口,定义如下,它允许您插入一些需要重复的业务逻辑:

public interface RepeatCallback {

    RepeatStatus doInIteration(RepeatContext context) throws Exception;

}

回调会重复执行,直到实现判断迭代应该结束。这些接口的返回值是一个枚举值,可以是 RepeatStatus.CONTINUABLERepeatStatus.FINISHEDRepeatStatus 枚举向重复操作的调用者传达是否还有工作需要完成的信息。一般来说,RepeatOperations 的实现应该检查 RepeatStatus 并将其作为结束迭代决策的一部分。任何希望向调用者表明没有剩余工作的回调都可以返回 RepeatStatus.FINISHED

RepeatOperations 最简单的通用实现是 RepeatTemplate

RepeatTemplate template = new RepeatTemplate();

template.setCompletionPolicy(new SimpleCompletionPolicy(2));

template.iterate(new RepeatCallback() {

    public RepeatStatus doInIteration(RepeatContext context) {
        // Do stuff in batch...
        return RepeatStatus.CONTINUABLE;
    }

});

在前面的示例中,我们返回 RepeatStatus.CONTINUABLE,以表明还有更多工作要做。回调也可以返回 RepeatStatus.FINISHED,向调用者表明没有剩余工作。有些迭代可以根据回调中正在进行的工作本身的考量而终止。其他迭代(就回调而言)实际上是无限循环,完成决策委托给外部策略,如前面示例所示的情况。

RepeatContext

RepeatCallback 的方法参数是一个 RepeatContext。许多回调会忽略该上下文。但是,如有必要,您可以将其用作属性包,用于在迭代期间存储瞬时数据。在 iterate 方法返回后,上下文就不再存在了。

如果存在嵌套迭代,RepeatContext 会有一个父上下文。父上下文有时用于存储需要在 iterate 调用之间共享的数据。例如,如果您想计算迭代中某个事件发生的次数并在后续调用中记住它,就可以使用父上下文。

RepeatStatus

RepeatStatus 是 Spring Batch 使用的一个枚举,用于指示处理是否已完成。它有两个可能的 RepeatStatus 值:

表 1. RepeatStatus 属性

描述

CONTINUABLE

还有更多工作要做。

FINISHED

不应再进行重复。

您可以使用 RepeatStatus 中的 and() 方法将 RepeatStatus 值与逻辑 AND 运算结合起来。其效果是对可继续 (continuable) 标志进行逻辑 AND 运算。换句话说,如果任一状态是 FINISHED,则结果是 FINISHED

完成策略

RepeatTemplate 内部,iterate 方法中循环的终止由 CompletionPolicy 决定,CompletionPolicy 也是 RepeatContext 的工厂。RepeatTemplate 负责使用当前策略创建 RepeatContext,并在迭代的每个阶段将其传递给 RepeatCallback。在回调完成其 doInIteration 后,RepeatTemplate 必须调用 CompletionPolicy 以要求它更新其状态(该状态将存储在 RepeatContext 中)。然后,它会询问策略迭代是否已完成。

Spring Batch 提供了一些简单的通用 CompletionPolicy 实现。SimpleCompletionPolicy 允许执行固定次数(在任何时候 RepeatStatus.FINISHED 都会强制提前完成)。

用户可能需要实现自己的完成策略来处理更复杂的决策。例如,一个批处理窗口,一旦在线系统投入使用就阻止批处理作业执行,这就需要自定义策略。

异常处理

如果在 RepeatCallback 内部抛出异常,RepeatTemplate 会咨询 ExceptionHandler,后者可以决定是否重新抛出异常。

以下清单显示了 ExceptionHandler 接口定义:

public interface ExceptionHandler {

    void handleException(RepeatContext context, Throwable throwable)
        throws Throwable;

}

一个常见的用例是计算给定类型的异常数量,并在达到限制时失败。为此,Spring Batch 提供了 SimpleLimitExceptionHandler 和稍灵活一些的 RethrowOnThresholdExceptionHandlerSimpleLimitExceptionHandler 有一个 limit 属性和一个应与当前异常进行比较的异常类型。提供的类型的所有子类也会被计数。给定类型的异常在达到限制之前会被忽略,然后重新抛出。其他类型的异常始终会被重新抛出。

SimpleLimitExceptionHandler 的一个重要的可选属性是布尔标志 useParent。它默认为 false,因此限制仅在当前 RepeatContext 中计算。当设置为 true 时,限制会在嵌套迭代中的兄弟上下文之间保持(例如,一个 step 中的一组 chunk)。

监听器

通常,能够接收额外的回调来处理多个不同迭代中的横切关注点是非常有用的。为此,Spring Batch 提供了 RepeatListener 接口。RepeatTemplate 允许用户注册 RepeatListener 实现,并在迭代过程中,只要可用,就会向它们提供带有 RepeatContextRepeatStatus 的回调。

RepeatListener 接口定义如下:

public interface RepeatListener {
    void before(RepeatContext context);
    void after(RepeatContext context, RepeatStatus result);
    void open(RepeatContext context);
    void onError(RepeatContext context, Throwable e);
    void close(RepeatContext context);
}

openclose 回调分别发生在整个迭代之前和之后。beforeafteronError 适用于单独的 RepeatCallback 调用。

请注意,当存在多个监听器时,它们位于一个列表中,因此存在顺序。在这种情况下,openbefore 按照相同的顺序调用,而 afteronErrorclose 则按照相反的顺序调用。

并行处理

RepeatOperations 的实现不限于顺序执行回调。重要的是,一些实现能够并行执行其回调。为此,Spring Batch 提供了 TaskExecutorRepeatTemplate,它使用 Spring 的 TaskExecutor 策略来运行 RepeatCallback。默认是使用 SynchronousTaskExecutor,这会导致整个迭代在同一个线程中执行(与普通的 RepeatTemplate 相同)。

声明式迭代

有时,有些业务处理您知道每次发生时都希望重复执行。经典的例子是消息管道的优化。如果频繁到达一批消息,处理它们比为每条消息承担单独事务的成本更有效率。为此,Spring Batch 提供了一个 AOP 拦截器,它将方法调用包装在 RepeatOperations 对象中。RepeatOperationsInterceptor 执行被拦截的方法,并根据提供的 RepeatTemplate 中的 CompletionPolicy 进行重复。

  • Java

  • XML

以下示例使用 Java 配置重复调用名为 processMessage 的服务方法(有关如何配置 AOP 拦截器的更多详细信息,请参阅 Spring 用户指南):

@Bean
public MyService myService() {
	ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
	factory.setInterfaces(MyService.class);
	factory.setTarget(new MyService());

	MyService service = (MyService) factory.getProxy();
	JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
	pointcut.setPatterns(".*processMessage.*");

	RepeatOperationsInterceptor interceptor = new RepeatOperationsInterceptor();

	((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));

	return service;
}

以下示例显示了使用 Spring AOP 命名空间的声明式迭代,用于重复调用名为 processMessage 的服务方法(有关如何配置 AOP 拦截器的更多详细信息,请参阅 Spring 用户指南):

<aop:config>
    <aop:pointcut id="transactional"
        expression="execution(* com..*Service.processMessage(..))" />
    <aop:advisor pointcut-ref="transactional"
        advice-ref="retryAdvice" order="-1"/>
</aop:config>

<bean id="retryAdvice" class="org.spr...RepeatOperationsInterceptor"/>

前面的示例在拦截器内部使用了默认的 RepeatTemplate。要更改策略、监听器和其他细节,您可以将一个 RepeatTemplate 实例注入到拦截器中。

如果被拦截的方法返回 void,则拦截器始终返回 RepeatStatus.CONTINUABLE(因此如果 CompletionPolicy 没有有限的终点,则存在无限循环的危险)。否则,它返回 RepeatStatus.CONTINUABLE,直到被拦截方法的返回值为 null。此时,它返回 RepeatStatus.FINISHED。因此,目标方法中的业务逻辑可以通过返回 null 或抛出提供的 RepeatTemplate 中的 ExceptionHandler 会重新抛出的异常来指示没有更多工作要做。