重复

RepeatTemplate

批处理涉及重复操作,无论是作为简单的优化还是作为作业的一部分。为了对重复进行策略化和泛化,并提供相当于迭代器框架的功能,Spring Batch 提供了 RepeatOperations 接口。RepeatOperations 接口具有以下定义

public interface RepeatOperations {

    RepeatStatus iterate(RepeatCallback callback) throws RepeatException;

}

回调是一个接口,如下定义所示,它允许您插入一些要重复的业务逻辑

public interface RepeatCallback {

    RepeatStatus doInIteration(RepeatContext context) throws Exception;

}

回调会重复执行,直到实现确定迭代应该结束。这些接口中的返回值是一个枚举值,可以是 RepeatStatus.CONTINUABLERepeatStatus.FINISHEDRepeatStatus 枚举向重复操作的调用方传达有关是否还有工作剩余的信息。一般来说,RepeatOperations 的实现应该检查 RepeatStatus 并将其用作结束迭代的决策的一部分。任何希望向调用方发出信号表示没有剩余工作的回调都可以返回 RepeatStatus.FINISHED

RepeatOperations 最简单的通用实现是 RepeatTemplate

RepeatTemplate template = new RepeatTemplate();

template.setCompletionPolicy(new SimpleCompletionPolicy(2));

template.iterate(new RepeatCallback() {

    public RepeatStatus doInIteration(RepeatContext context) {
        // Do stuff in batch...
        return RepeatStatus.CONTINUABLE;
    }

});

在前面的示例中,我们返回 RepeatStatus.CONTINUABLE,以表明还有更多工作要做。回调也可以返回 RepeatStatus.FINISHED,以向调用方发出信号表示没有剩余工作。某些迭代可以通过回调中正在执行的工作的内在考虑因素来终止。其他迭代实际上是无限循环(就回调而言),并且完成决策委托给外部策略,如前面的示例所示。

RepeatContext

RepeatCallback 的方法参数是 RepeatContext。许多回调都会忽略上下文。但是,如果需要,您可以将其用作属性包,以在迭代期间存储临时数据。iterate 方法返回后,上下文将不再存在。

如果正在进行嵌套迭代,则 RepeatContext 具有父上下文。父上下文偶尔用于存储需要在对 iterate 的调用之间共享的数据。例如,如果您想计算迭代中事件发生的次数并在后续调用中记住它,就会出现这种情况。

RepeatStatus

RepeatStatus 是 Spring Batch 使用的枚举,用于指示处理是否已完成。它有两个可能的 RepeatStatus

表 1. RepeatStatus 属性

描述

CONTINUABLE

还有更多工作要做。

FINISHED

不应再进行重复。

您可以使用 RepeatStatus 中的 and() 方法将 RepeatStatus 值与逻辑 AND 操作结合起来。这样做的效果是对可继续标志进行逻辑 AND 操作。换句话说,如果任一状态为 FINISHED,则结果为 FINISHED

完成策略

RepeatTemplate 内部,iterate 方法中循环的终止由 CompletionPolicy 确定,它也是 RepeatContext 的工厂。RepeatTemplate 负责使用当前策略创建 RepeatContext 并将其传递给每次迭代中的 RepeatCallback。在回调完成其 doInIteration 后,RepeatTemplate 必须调用 CompletionPolicy 以要求其更新其状态(这将存储在 RepeatContext 中)。然后,它询问策略迭代是否已完成。

Spring Batch 提供了一些简单的通用 CompletionPolicy 实现。SimpleCompletionPolicy 允许执行固定次数(使用 RepeatStatus.FINISHED 在任何时候强制提前完成)。

用户可能需要为更复杂的决策实现自己的完成策略。例如,防止批处理作业在联机系统正在使用时执行的批处理处理窗口将需要自定义策略。

异常处理

如果在 RepeatCallback 内部抛出异常,则 RepeatTemplate 会咨询 ExceptionHandler,它可以决定是否重新抛出异常。

以下清单显示了 ExceptionHandler 接口定义

public interface ExceptionHandler {

    void handleException(RepeatContext context, Throwable throwable)
        throws Throwable;

}

一个常见的用例是计算给定类型的异常次数,并在达到限制时失败。为此,Spring Batch 提供了 SimpleLimitExceptionHandler 和一个稍微灵活一些的 RethrowOnThresholdExceptionHandlerSimpleLimitExceptionHandler 具有一个限制属性和一个应与当前异常进行比较的异常类型。提供的类型的全部子类也会被计数。给定类型的异常会被忽略,直到达到限制,然后才会重新抛出。其他类型的异常始终会重新抛出。

SimpleLimitExceptionHandler 的一个重要的可选属性是名为 useParent 的布尔标志。默认情况下它为 false,因此限制仅在当前 RepeatContext 中计算。当设置为 true 时,限制会在嵌套迭代中的同级上下文中保留(例如,步骤内的一组块)。

监听器

通常,能够为跨多个不同迭代的横切关注点接收额外的回调非常有用。为此,Spring Batch 提供了RepeatListener接口。RepeatTemplate允许用户注册RepeatListener实现,并在迭代期间提供RepeatContextRepeatStatus的回调。

RepeatListener接口具有以下定义

public interface RepeatListener {
    void before(RepeatContext context);
    void after(RepeatContext context, RepeatStatus result);
    void open(RepeatContext context);
    void onError(RepeatContext context, Throwable e);
    void close(RepeatContext context);
}

openclose回调分别在整个迭代之前和之后调用。beforeafteronError适用于各个RepeatCallback调用。

请注意,当有多个监听器时,它们位于列表中,因此存在顺序。在这种情况下,openbefore按相同顺序调用,而afteronErrorclose则按相反顺序调用。

并行处理

RepeatOperations的实现不受限于按顺序执行回调。重要的是,某些实现能够并行执行其回调。为此,Spring Batch 提供了TaskExecutorRepeatTemplate,它使用 Spring 的TaskExecutor策略来运行RepeatCallback。默认情况下使用SynchronousTaskExecutor,其效果是在同一线程中执行整个迭代(与普通的RepeatTemplate相同)。

声明式迭代

有时,您知道某些业务处理每次发生时都需要重复。这方面的经典示例是消息管道的优化。如果一批消息频繁到达,则处理它们比为每条消息承担单独事务的成本更有效。Spring Batch 提供了一个AOP拦截器,用于将方法调用包装在RepeatOperations对象中以实现此目的。RepeatOperationsInterceptor执行被拦截的方法并根据提供的RepeatTemplate中的CompletionPolicy进行重复。

  • Java

  • XML

以下示例使用 Java 配置来重复对名为processMessage的方法的服务调用(有关如何配置 AOP 拦截器的更多详细信息,请参阅<<docs.spring.io/spring-framework/docs/current/reference/html/core.html#aop,Spring用户指南>>)。

@Bean
public MyService myService() {
	ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
	factory.setInterfaces(MyService.class);
	factory.setTarget(new MyService());

	MyService service = (MyService) factory.getProxy();
	JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
	pointcut.setPatterns(".*processMessage.*");

	RepeatOperationsInterceptor interceptor = new RepeatOperationsInterceptor();

	((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));

	return service;
}

以下示例显示了使用 Spring AOP 命名空间来重复对名为processMessage的方法的服务调用的声明式迭代(有关如何配置 AOP 拦截器的更多详细信息,请参阅<<docs.spring.io/spring-framework/docs/current/reference/html/core.html#aop,Spring用户指南>>)。

<aop:config>
    <aop:pointcut id="transactional"
        expression="execution(* com..*Service.processMessage(..))" />
    <aop:advisor pointcut-ref="transactional"
        advice-ref="retryAdvice" order="-1"/>
</aop:config>

<bean id="retryAdvice" class="org.spr...RepeatOperationsInterceptor"/>

前面的示例在拦截器内部使用了默认的RepeatTemplate。要更改策略、监听器和其他详细信息,您可以将RepeatTemplate的实例注入拦截器。

如果被拦截的方法返回void,则拦截器始终返回RepeatStatus.CONTINUABLE(因此,如果CompletionPolicy没有有限的终点,则存在无限循环的风险)。否则,它会返回RepeatStatus.CONTINUABLE,直到被拦截方法的返回值为null。此时,它将返回RepeatStatus.FINISHED。因此,目标方法内部的业务逻辑可以通过返回null或抛出由提供的RepeatTemplate中的ExceptionHandler重新抛出的异常来发出不再有更多工作要做的信号。