提供的 Advice 类

除了提供应用 AOP advice 类的通用机制外,Spring Integration 还提供了以下开箱即用的 advice 实现:

重试 Advice

重试 advice (`o.s.i.handler.advice.RequestHandlerRetryAdvice`) 利用了 Spring Retry 项目提供的丰富重试机制。`spring-retry` 的核心组件是 `RetryTemplate`,它允许配置复杂的重试场景,包括 `RetryPolicy` 和 `BackoffPolicy` 策略(有多种实现)以及 `RecoveryCallback` 策略,用于确定重试次数耗尽时要采取的操作。

无状态重试

无状态重试是指重试活动完全在 advice 中处理的情况。线程暂停(如果配置了)并重试该操作。

有状态重试

有状态重试是指重试状态在 advice 中管理,但抛出异常后由调用者重新提交请求的情况。有状态重试的一个例子是当我们需要消息发起方(例如 JMS)负责重新提交,而不是在当前线程上执行时。有状态重试需要某种机制来检测重复提交。

有关 `spring-retry` 的更多信息,请参阅该项目的 Javadoc 以及 Spring Batch 的参考文档,`spring-retry` 最初源于此。

默认的回退行为是不进行回退。重试会立即尝试。使用导致线程在尝试之间暂停的回退策略可能会导致性能问题,包括过多的内存使用和线程饥饿。在高并发环境中,应谨慎使用回退策略。

配置重试 Advice

本节的示例使用以下始终抛出异常的 ``

public class FailingService {

    public void service(String message) {
        throw new RuntimeException("error");
    }
}
简单无状态重试

默认的 `RetryTemplate` 具有 `SimpleRetryPolicy`,它会尝试三次。没有 `BackOffPolicy`,因此三次尝试是连续进行的,中间没有延迟。没有 `RecoveryCallback`,因此最终失败重试后会将异常抛给调用者。在 Spring Integration 环境中,可以使用入站端点上的 `error-channel` 来处理这个最终异常。以下示例使用了 `RetryTemplate` 并显示了其 `DEBUG` 输出

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/>
    </int:request-handler-advice-chain>
</int:service-activator>

DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3
带 Recovery 的简单无状态重试

以下示例在前一个示例中添加了 `RecoveryCallback`,并使用 `ErrorMessageSendingRecoverer` 将 `ErrorMessage` 发送到通道

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3
DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]
带自定义策略和 Recovery 的无状态重试

为了更精细的控制,我们可以为 advice 提供一个定制的 `RetryTemplate`。本示例继续使用 `SimpleRetryPolicy`,但将尝试次数增加到四次。它还添加了 `ExponentialBackoffPolicy`,其中第一次重试等待一秒,第二次等待五秒,第三次等待 25 秒(总共四次尝试)。以下列表展示了示例及其 `DEBUG` 输出

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
            <property name="retryTemplate" ref="retryTemplate" />
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="retryPolicy">
        <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
            <property name="maxAttempts" value="4" />
        </bean>
    </property>
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="1000" />
            <property name="multiplier" value="5.0" />
            <property name="maxInterval" value="60000" />
        </bean>
    </property>
</bean>

27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
27.071 DEBUG [task-scheduler-1]Retry: count=0
27.080 DEBUG [task-scheduler-1]Sleeping for 1000
28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1
28.081 DEBUG [task-scheduler-1]Retry: count=1
28.081 DEBUG [task-scheduler-1]Sleeping for 5000
33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2
33.082 DEBUG [task-scheduler-1]Retry: count=2
33.083 DEBUG [task-scheduler-1]Sleeping for 25000
58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3
58.083 DEBUG [task-scheduler-1]Retry: count=3
58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4
58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4
58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]
无状态重试的命名空间支持

从 4.0 版本开始,得益于对重试 advice 的命名空间支持,前面的配置可以大大简化,示例如下

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <ref bean="retrier" />
    </int:request-handler-advice-chain>
</int:service-activator>

<int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
    <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
</int:handler-retry-advice>

在前面的示例中,advice 被定义为一个顶级 bean,以便可以在多个 `request-handler-advice-chain` 实例中使用。您也可以直接在链中定义 advice,示例如下

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
            <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
        </int:retry-advice>
    </int:request-handler-advice-chain>
</int:service-activator>

一个 `` 可以有一个 `` 或 `` 子元素,也可以没有子元素。没有子元素的 `` 不使用回退。如果没有 `recovery-channel`,则在重试次数耗尽时抛出异常。命名空间只能用于无状态重试。

对于更复杂的环境(自定义策略等),请使用普通的 `` 定义。

带 Recovery 的简单有状态重试

为了使重试有状态,我们需要为 advice 提供一个 `RetryStateGenerator` 实现。此类用于识别消息是否是重新提交的,以便 `RetryTemplate` 可以确定该消息当前的重试状态。框架提供了 `SpelExpressionRetryStateGenerator`,它使用 SpEL 表达式来确定消息标识符。本示例再次使用默认策略(三次尝试,无回退)。与无状态重试一样,这些策略也可以自定义。以下列表展示了示例及其 `DEBUG` 输出

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="retryStateGenerator">
                <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator">
                    <constructor-arg value="headers['jms_messageId']" />
                </bean>
            </property>
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
24.368 DEBUG [Container#0-1]Retry: count=0
24.387 DEBUG [Container#0-1]Checking for rethrow: count=1
24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1
24.387 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
25.412 DEBUG [Container#0-1]Retry: count=1
25.413 DEBUG [Container#0-1]Checking for rethrow: count=2
25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2
25.413 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
26.418 DEBUG [Container#0-1]Retry: count=2
26.419 DEBUG [Container#0-1]Checking for rethrow: count=3
26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3
26.419 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3
27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]

如果您将前面的示例与无状态示例进行比较,您会发现,在有状态重试中,每次失败时都会将异常抛给调用者。

重试的异常分类

Spring Retry 在确定哪些异常可以触发重试方面具有很大的灵活性。默认配置会对所有异常进行重试,并且异常分类器会查看顶级异常。如果您将其配置为仅在 `MyException` 上重试,并且您的应用程序抛出了一个 `SomeOtherException`,而其原因是 `MyException`,则不会发生重试。

从 Spring Retry 1.0.3 开始,`BinaryExceptionClassifier` 有一个名为 `traverseCauses` 的属性(默认为 `false`)。当设置为 `true` 时,它会遍历异常原因,直到找到匹配项或遍历完所有原因。

要将此分类器用于重试,请使用接受最大尝试次数、`Exception` 对象 `Map` 和 `traverseCauses` 布尔值的构造函数创建 `SimpleRetryPolicy`。然后您可以将此策略注入到 `RetryTemplate` 中。

在这种情况下需要 `traverseCauses`,因为用户异常可能被包装在 `MessagingException` 中。

熔断器 Advice

熔断器模式的基本思想是,如果服务当前不可用,就不要浪费时间(和资源)去尝试使用它。`o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice` 实现了此模式。当熔断器处于关闭状态时,端点尝试调用服务。如果连续失败达到一定次数,熔断器将进入打开状态。当处于打开状态时,新请求会“快速失败”,在一段时间过去之前不会尝试调用服务。

当该时间过去后,熔断器被设置为半开状态。在此状态下,即使只有一次尝试失败,熔断器也会立即进入打开状态。如果尝试成功,熔断器将进入关闭状态,在这种情况下,直到再次发生配置的连续失败次数之前,它不会再次进入打开状态。任何成功的尝试都会将失败状态重置为零,以确定何时熔断器可能再次进入打开状态。

通常,此 advice 可能用于外部服务,这些服务可能需要一些时间才会失败(例如尝试建立网络连接时的超时)。

`RequestHandlerCircuitBreakerAdvice` 有两个属性:`threshold` 和 `halfOpenAfter`。`threshold` 属性表示在熔断器打开之前需要发生的连续失败次数,默认为 `5`。`halfOpenAfter` 属性表示在最后一次失败后,熔断器在尝试下一个请求之前等待的时间,默认为 1000 毫秒。

以下示例配置了一个熔断器并显示其 `DEBUG` 和 `ERROR` 输出

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
            <property name="threshold" value="2" />
            <property name="halfOpenAfter" value="12000" />
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator

在前面的示例中,`threshold` 设置为 `2`,`halfOpenAfter` 设置为 `12` 秒。每隔 5 秒到达一个新请求。前两次尝试调用了服务。第三次和第四次失败并抛出异常,表明熔断器已打开。第五个请求被尝试是因为该请求发生在最后一次失败后 15 秒。第六次尝试立即失败,因为熔断器立即进入了打开状态。

表达式求值 Advice

最后一个提供的 advice 类是 `o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice`。这个 advice 比其他两个更通用。它提供了一种机制,可以在发送到端点的原始入站消息上评估表达式。在成功或失败后,都可以评估单独的表达式。可选地,可以将包含评估结果以及输入消息的消息发送到消息通道。

此 advice 的典型用例可能是与 `` 一起使用,例如在传输成功时将文件移动到某个目录,或者在失败时移动到另一个目录

该 advice 具有属性,可以在成功时设置表达式,在失败时设置表达式,并为每个情况设置相应的通道。对于成功的情况,发送到 `successChannel` 的消息是 `AdviceMessage`,其有效负载是表达式评估的结果。一个名为 `inputMessage` 的附加属性包含发送到处理器的原始消息。发送到 `failureChannel` 的消息(当处理器抛出异常时)是 `ErrorMessage`,其有效负载为 `MessageHandlingExpressionEvaluatingAdviceException`。与所有 `MessagingException` 实例一样,此有效负载具有 `failedMessage` 和 `cause` 属性,以及一个名为 `evaluationResult` 的附加属性,其中包含表达式评估的结果。

从版本 5.1.3 开始,如果配置了通道但未提供表达式,则使用默认表达式对消息的 `payload` 进行求值。

默认情况下,当在 advice 范围内抛出异常时,在评估任何 `failureExpression` 后,该异常会抛给调用者。如果您希望抑制抛出异常,请将 `trapException` 属性设置为 `true`。以下 advice 显示了如何使用 Java DSL 配置 `advice`

@SpringBootApplication
public class EerhaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
        MessageChannel in = context.getBean("advised.input", MessageChannel.class);
        in.send(new GenericMessage<>("good"));
        in.send(new GenericMessage<>("bad"));
        context.close();
    }

    @Bean
    public IntegrationFlow advised() {
        return f -> f.<String>handle((payload, headers) -> {
            if (payload.equals("good")) {
                return null;
            }
            else {
                throw new RuntimeException("some failure");
            }
        }, c -> c.advice(expressionAdvice()));
    }

    @Bean
    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("success.input");
        advice.setOnSuccessExpressionString("payload + ' was successful'");
        advice.setFailureChannelName("failure.input");
        advice.setOnFailureExpressionString(
                "payload + ' was bad, with reason: ' + #exception.cause.message");
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow success() {
        return f -> f.handle(System.out::println);
    }

    @Bean
    public IntegrationFlow failure() {
        return f -> f.handle(System.out::println);
    }

}

限速器 Advice

限速器 advice (`RateLimiterRequestHandlerAdvice`) 可以确保端点不会因请求过多而过载。当超过限速时,请求将进入阻塞状态。

此 advice 的典型用例可能是外部服务提供商不允许每分钟超过 `n` 个请求的情况。

`RateLimiterRequestHandlerAdvice` 的实现完全基于 Resilience4j 项目,并且需要注入 `RateLimiter` 或 `RateLimiterConfig`。也可以配置默认值和/或自定义名称。

以下示例配置了一个限速器 advice,限制为每 1 秒一个请求

@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
    return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
            .limitRefreshPeriod(Duration.ofSeconds(1))
            .limitForPeriod(1)
            .build());
}

@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
		adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
    ...
}

缓存 Advice

从 5.2 版本开始,引入了 `CacheRequestHandlerAdvice`。它基于 Spring Framework 中的缓存抽象,并与 `@Caching` 注解家族提供的概念和功能保持一致。其内部逻辑基于 `CacheAspectSupport` 扩展,在 `AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage` 方法周围进行缓存操作的代理,并将请求 `Message` 作为参数。这个 advice 可以配置一个 SpEL 表达式或一个 `Function` 来计算缓存键。请求 `Message` 可以作为 SpEL 求值上下文的根对象,或作为 `Function` 的输入参数。默认情况下,使用请求消息的 `payload` 作为缓存键。当默认缓存操作为 `CacheableOperation` 时,`CacheRequestHandlerAdvice` 必须配置 `cacheNames`,或者配置任意一组 `CacheOperation`s。每个 `CacheOperation` 都可以单独配置,或者共享选项,例如 `CacheManager`、`CacheResolver` 和 `CacheErrorHandler`,这些可以从 `CacheRequestHandlerAdvice` 配置中重用。此配置功能类似于 Spring Framework 的 `@CacheConfig` 和 `@Caching` 注解组合。如果未提供 `CacheManager`,默认情况下会从 `CacheAspectSupport` 中的 `BeanFactory` 解析单个 bean。

以下示例配置了两个使用不同缓存操作集的 advice

@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    return cacheRequestHandlerAdvice;
}

@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
    ...
}

@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
    cachePutBuilder.setCacheName(TEST_PUT_CACHE);
    CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
    cacheEvictBuilder.setCacheName(TEST_CACHE);
    cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
    return cacheRequestHandlerAdvice;
}

@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
    adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
    ...
}