提供的 Advice 类
除了提供应用 AOP 建议类的通用机制外,Spring Integration 还提供了以下开箱即用的建议实现:
-
RequestHandlerRetryAdvice(在重试建议中描述) -
RequestHandlerCircuitBreakerAdvice(在断路器建议中描述) -
ExpressionEvaluatingRequestHandlerAdvice(在表达式建议中描述) -
RateLimiterRequestHandlerAdvice(在速率限制器建议中描述) -
CacheRequestHandlerAdvice(在缓存建议中描述) -
ReactiveRequestHandlerAdvice(在响应式建议中描述) -
ContextHolderRequestHandlerAdvice(在上下文持有者建议中描述) -
LockRequestHandlerAdvice(在锁定建议中描述)
重试建议
重试建议(o.s.i.handler.advice.RequestHandlerRetryAdvice)利用了 Spring Framework 中重试支持提供的丰富重试机制。此建议的核心组件是 RetryTemplate,它允许配置复杂的重试场景,包括 RetryPolicy 以及 RecoveryCallback 策略,以确定重试耗尽时要采取的操作。
- 无状态重试
-
无状态重试是指重试活动完全在建议中处理的情况。线程会暂停(如果配置如此),然后重试该操作。
- 有状态重试
-
有状态重试是指重试状态在建议中管理,但会抛出异常并且调用者重新提交请求的情况。有状态重试的一个例子是,我们希望消息发起者(例如 JMS)负责重新提交,而不是在当前线程上执行。有状态重试需要某种机制来检测重试的提交。为此,
RequestHandlerRetryAdvice公开了stateKeyFunction、newMessagePredicate和stateCacheSize属性。其中后两个属性仅在提供了第一个属性时才有意义。本质上,stateKeyFunction是将RequestHandlerRetryAdvice逻辑从无状态切换到有状态的指示器。newMessagePredicate的含义是根据要处理的消息刷新键的现有重试状态。stateCacheSize默认为100,当有更多新的重试状态到来时,较旧的条目会从缓存中删除。也许这些旧消息不再从上游流重新传递,例如,消息代理根据其重新传递策略将这些消息死信。
| 默认的退避行为是不退避。重试会立即尝试。使用导致线程在尝试之间暂停的退避策略可能会导致性能问题,包括过多的内存使用和线程饥饿。在高并发环境中,应谨慎使用退避策略。 |
配置重试建议
本节中的示例使用以下始终抛出异常的 @ServiceActivator
public class FailingService {
@ServiceActivator(inputChannel = "input", adviceChain = "retryAdvice")
public void service(String message) {
throw new RuntimeException("error");
}
}
- 简单的无状态重试
-
默认的
RetryPolicy是尝试三次,加上对目标MessageHandler的原始调用。默认情况下没有退避,因此三次尝试会背靠背地进行,尝试之间没有延迟。没有RecoveryCallback,因此在最终重试失败后,结果是将异常抛给调用者。在 Spring Integration 环境中,此最终异常可以通过在入站端点上使用error-channel来处理。以下示例使用RequestHandlerRetryAdvice的默认配置@Bean RequestHandlerRetryAdvice retryAdvice() { return new RequestHandlerRetryAdvice(); } - 带恢复功能的简单无状态重试
-
以下示例在前一个示例中添加了一个
RecoveryCallback,并使用ErrorMessageSendingRecoverer将ErrorMessage发送到一个通道@Bean RequestHandlerRetryAdvice retryAdvice(MessageChannel recoveryChannel) { RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice(); requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel)); return requestHandlerRetryAdvice; } - 具有自定义策略和恢复功能的无状态重试
-
为了更复杂,可以为
RequestHandlerRetryAdvice提供自定义的RetryPolicy。此示例继续使用简单的RetryPolicy,但将尝试次数增加到四次。它还添加了一个ExponentialBackoff,其中第一次重试等待一秒,第二次等待五秒,第三次等待 25 秒(总共四次尝试)。以下清单显示了此类配置的示例@Bean RequestHandlerRetryAdvice retryAdvice() { RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice(); requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel())); RetryPolicy retryPolicy = RetryPolicy.builder() .maxRetries(4) .delay(Duration.ofSeconds(1)) .multiplier(5.0) .maxDelay(Duration.ofMinutes(1)) .build(); requestHandlerRetryAdvice.setRetryPolicy(retryPolicy); return requestHandlerRetryAdvice; } - 无状态重试的命名空间支持
-
以下示例演示了如何使用 Spring Integration XML 命名空间及其自定义标签配置
RequestHandlerRetryAdvice<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <ref bean="retrier" /> </int:request-handler-advice-chain> </int:service-activator> <int:handler-retry-advice id="retrier" max-retries="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:handler-retry-advice>在前面的示例中,建议被定义为顶级 bean,以便可以在多个
request-handler-advice-chain实例中使用。您也可以直接在链中定义建议,如下例所示<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <int:retry-advice id="retrier" max-retries="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:retry-advice> </int:request-handler-advice-chain> </int:service-activator><handler-retry-advice>可以有一个<fixed-back-off>或<exponential-back-off>子元素,也可以没有子元素。没有子元素的<handler-retry-advice>不使用退避。如果没有recovery-channel,则在重试耗尽时抛出异常。命名空间只能用于无状态重试。对于更复杂的环境(自定义策略等),请使用正常的
<bean>定义。 - 带恢复功能的简单有状态重试
-
要使重试有状态,必须为
RequestHandlerRetryAdvice实例提供Function<Message<?>, Object> stateKeyFunction。此函数用于将消息识别为重新提交,以便RequestHandlerRetryAdvice可以确定此消息的当前重试状态。有状态重试背后的想法是不阻塞当前线程,而是缓存此消息的重试状态并将MessageHandler失败重新抛回给调用者。通常,这适用于能够重新提交(或重新传递)事件的消息发起者,例如,使用nack的 RabbitMQ 或具有 seek 功能的 Apache Kafka 等消息代理;或者在消费回滚后的 JMS。如果还没有缓存状态(或者Predicate<Message<?>> newMessagePredicate为该消息返回true),则MessageHandler调用被视为第一次调用,在其失败时,基于BackOffExecution的内部RetryState将缓存到上述键下。在下一条消息到达时,缓存状态为Thread.sleep()提供一个退避间隔,然后尝试调用MessageHandler。如果此退避间隔等于BackOffExecution.STOP(例如,已达到maxAttempts),则表示此消息不再重试:整个重试周期被视为已耗尽,并将相应的RetryException抛回给调用者,或者在提供了RecoveryCallback时用于调用RecoveryCallback。通常,异常处理逻辑和退避执行与无状态行为类似,唯一的区别是线程不会在所有maxAttempts期间阻塞。由消息发起者来重新传递消息以进行下一次重试调用。
断路器建议
断路器模式的一般思想是,如果服务当前不可用,则不要浪费时间(和资源)尝试使用它。o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice 实现了此模式。当断路器处于关闭状态时,端点尝试调用服务。如果一定数量的连续尝试失败,断路器将进入打开状态。当处于打开状态时,新请求“快速失败”,并且在一段时间过期之前不会尝试调用服务。
当时间到期时,断路器被设置为半开状态。在此状态下,即使只有一次尝试失败,断路器也会立即进入打开状态。如果尝试成功,断路器将进入关闭状态,在这种情况下,它不会再次进入打开状态,直到再次发生配置的连续失败次数。任何成功的尝试都会将状态重置为零失败,以确定断路器何时可能再次进入打开状态。
通常,此建议可能用于外部服务,其中可能需要一些时间才能失败,例如尝试建立网络连接时超时。
RequestHandlerCircuitBreakerAdvice 有两个属性:threshold 和 halfOpenAfter。threshold 属性表示断路器进入打开状态所需的连续失败次数。它默认为 5。halfOpenAfter 属性表示在上次失败后,断路器等待多长时间才尝试另一个请求。默认值为 1000 毫秒。
以下示例配置了一个断路器并显示了其 DEBUG 和 ERROR 输出
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
在前面的示例中,阈值设置为 2,halfOpenAfter 设置为 12 秒。每 5 秒到达一个新请求。前两次尝试调用了服务。第三次和第四次尝试失败,并抛出异常,表明断路器已打开。第五个请求被尝试,因为该请求是在上次失败后 15 秒进行的。第六次尝试立即失败,因为断路器立即进入打开状态。
表达式求值建议
最后一个提供的建议类是 o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice。此建议比其他两个建议更通用。它提供了一种机制,用于评估发送到端点的原始入站消息上的表达式。成功或失败后,可以使用单独的表达式进行评估。可选地,包含评估结果和输入消息的消息可以发送到消息通道。
此建议的典型用例可能是与 <ftp:outbound-channel-adapter/> 结合使用,也许在传输成功时将文件移动到某个目录,或者在失败时移动到另一个目录
该建议具有在成功时设置表达式、在失败时设置表达式以及各自通道的属性。对于成功的情况,发送到 successChannel 的消息是 AdviceMessage,其有效负载是表达式求值的结果。一个附加属性,称为 inputMessage,包含发送给处理程序的原始消息。发送到 failureChannel 的消息(当处理程序抛出异常时)是 ErrorMessage,其有效负载为 MessageHandlingExpressionEvaluatingAdviceException。与所有 MessagingException 实例一样,此有效负载具有 failedMessage 和 cause 属性,以及一个名为 evaluationResult 的附加属性,其中包含表达式求值的结果。
从版本 5.1.3 开始,如果配置了通道但未提供表达式,则使用默认表达式求值为消息的 payload。 |
默认情况下,当在建议范围内抛出异常时,在评估任何 failureExpression 之后,该异常会抛给调用者。如果您希望抑制抛出异常,请将 trapException 属性设置为 true。以下建议演示了如何使用 Java DSL 配置 advice
@SpringBootApplication
public class EerhaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
MessageChannel in = context.getBean("advised.input", MessageChannel.class);
in.send(new GenericMessage<>("good"));
in.send(new GenericMessage<>("bad"));
context.close();
}
@Bean
public IntegrationFlow advised() {
return f -> f.<String>handle((payload, headers) -> {
if (payload.equals("good")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, c -> c.advice(expressionAdvice()));
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
}
速率限制器建议
速率限制器建议(RateLimiterRequestHandlerAdvice)可确保端点不会因请求而过载。当超出速率限制时,请求将进入阻塞状态。
此建议的典型用例可能是外部服务提供商不允许每分钟的请求数超过 n。
RateLimiterRequestHandlerAdvice 实现完全基于 Resilience4j 项目,并且需要注入 RateLimiter 或 RateLimiterConfig。也可以使用默认值和/或自定义名称进行配置。
以下示例配置了一个速率限制器建议,每 1 秒请求一次
@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build());
}
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
...
}
缓存建议
从版本 5.2 开始,引入了 CacheRequestHandlerAdvice。它基于 Spring Framework 中的缓存抽象,并与 @Caching 注解族提供的概念和功能保持一致。内部逻辑基于 CacheAspectSupport 扩展,其中缓存操作的代理是在 AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage 方法周围完成的,以请求 Message<?> 作为参数。此建议可以使用 SpEL 表达式或 Function 配置以评估缓存键。请求 Message<?> 可作为 SpEL 评估上下文的根对象,或作为 Function 输入参数。默认情况下,请求消息的 payload 用于缓存键。CacheRequestHandlerAdvice 必须使用 cacheNames 进行配置,当默认缓存操作是 CacheableOperation 时,或者使用一组任意的 CacheOperation。每个 CacheOperation 都可以单独配置,也可以共享选项,例如 CacheManager、CacheResolver 和 CacheErrorHandler,可以从 CacheRequestHandlerAdvice 配置中重用。此配置功能类似于 Spring Framework 的 @CacheConfig 和 @Caching 注解组合。如果未提供 CacheManager,则默认情况下从 CacheAspectSupport 中的 BeanFactory 解析单个 bean。
以下示例配置了两个具有不同缓存操作集的建议
@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
return cacheRequestHandlerAdvice;
}
@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
...
}
@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
cachePutBuilder.setCacheName(TEST_PUT_CACHE);
CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
cacheEvictBuilder.setCacheName(TEST_CACHE);
cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
return cacheRequestHandlerAdvice;
}
@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
...
}