提供的建议类
除了提供应用 AOP 增强类的通用机制外,Spring Integration 还提供以下开箱即用的增强实现。
-
RequestHandlerRetryAdvice
(在 重试增强 中描述) -
RequestHandlerCircuitBreakerAdvice
(在 断路器增强 中描述) -
ExpressionEvaluatingRequestHandlerAdvice
(在 表达式增强 中描述) -
RateLimiterRequestHandlerAdvice
(在 限速增强 中描述) -
CacheRequestHandlerAdvice
(在 缓存增强 中描述) -
ReactiveRequestHandlerAdvice
(在 响应式增强 中描述) -
ContextHolderRequestHandlerAdvice
(在 上下文持有者增强 中描述)
重试增强
重试增强(o.s.i.handler.advice.RequestHandlerRetryAdvice
)利用了 Spring Retry 项目提供的丰富的重试机制。spring-retry
的核心组件是 RetryTemplate
,它允许配置复杂的重试场景,包括 RetryPolicy
和 BackoffPolicy
策略(以及许多实现),以及 RecoveryCallback
策略,用于确定在重试耗尽时采取的操作。
- 无状态重试
-
无状态重试是指重试活动完全在增强内部处理的情况。线程会暂停(如果配置了这样做)并重试操作。
- 有状态重试
-
有状态重试是指重试状态在增强内部管理,但会抛出异常,并由调用者重新提交请求的情况。有状态重试的一个例子是,当我们希望消息发起者(例如,JMS)负责重新提交,而不是在当前线程上执行时。有状态重试需要一些机制来检测重新提交的请求。
有关 spring-retry
的更多信息,请参阅 该项目的 Javadoc 以及 Spring Batch 的参考文档,spring-retry
就是起源于此。
默认的回退行为是不回退。重试会立即尝试。使用导致线程在尝试之间暂停的回退策略可能会导致性能问题,包括过度使用内存和线程饥饿。在高流量环境中,应谨慎使用回退策略。 |
配置重试建议
本节中的示例使用以下始终抛出异常的<service-activator>
public class FailingService {
public void service(String message) {
throw new RuntimeException("error");
}
}
- 简单无状态重试
-
默认的
RetryTemplate
具有一个SimpleRetryPolicy
,它尝试三次。没有BackOffPolicy
,因此三次尝试是背靠背进行的,尝试之间没有延迟。没有RecoveryCallback
,因此结果是在最终失败的重试发生后将异常抛给调用者。在 Spring Integration 环境中,此最终异常可能通过在入站端点上使用error-channel
来处理。以下示例使用RetryTemplate
并显示其DEBUG
输出<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3
- 具有恢复的简单无状态重试
-
以下示例在前面的示例中添加了
RecoveryCallback
,并使用ErrorMessageSendingRecoverer
将ErrorMessage
发送到通道<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3 DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]
- 具有自定义策略和恢复的无状态重试
-
为了更复杂,我们可以为建议提供一个自定义的
RetryTemplate
。此示例继续使用SimpleRetryPolicy
,但将尝试次数增加到四次。它还添加了ExponentialBackoffPolicy
,其中第一次重试等待一秒,第二次等待五秒,第三次等待 25 秒(总共四次尝试)。以下清单显示了示例及其DEBUG
输出<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> <property name="retryTemplate" ref="retryTemplate" /> </bean> </int:request-handler-advice-chain> </int:service-activator> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="retryPolicy"> <bean class="org.springframework.retry.policy.SimpleRetryPolicy"> <property name="maxAttempts" value="4" /> </bean> </property> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="1000" /> <property name="multiplier" value="5.0" /> <property name="maxInterval" value="60000" /> </bean> </property> </bean> 27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...] 27.071 DEBUG [task-scheduler-1]Retry: count=0 27.080 DEBUG [task-scheduler-1]Sleeping for 1000 28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1 28.081 DEBUG [task-scheduler-1]Retry: count=1 28.081 DEBUG [task-scheduler-1]Sleeping for 5000 33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2 33.082 DEBUG [task-scheduler-1]Retry: count=2 33.083 DEBUG [task-scheduler-1]Sleeping for 25000 58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3 58.083 DEBUG [task-scheduler-1]Retry: count=3 58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4 58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4 58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]
- 无状态重试的命名空间支持
-
从 4.0 版本开始,由于对重试建议的命名空间支持,前面的配置可以大大简化,如下例所示
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <ref bean="retrier" /> </int:request-handler-advice-chain> </int:service-activator> <int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:handler-retry-advice>
在前面的示例中,建议被定义为顶级 bean,以便它可以在多个
request-handler-advice-chain
实例中使用。您也可以直接在链中定义建议,如下例所示<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:retry-advice> </int:request-handler-advice-chain> </int:service-activator>
<handler-retry-advice>
可以具有<fixed-back-off>
或<exponential-back-off>
子元素,或者没有子元素。没有子元素的<handler-retry-advice>
不使用任何回退。如果没有recovery-channel
,则在重试耗尽时抛出异常。命名空间只能与无状态重试一起使用。对于更复杂的环境(自定义策略等),请使用正常的
<bean>
定义。 - 具有恢复的简单有状态重试
-
为了使重试有状态,我们需要为建议提供一个
RetryStateGenerator
实现。此类用于识别消息是否为重新提交,以便RetryTemplate
可以确定此消息的当前重试状态。框架提供了一个SpelExpressionRetryStateGenerator
,它使用 SpEL 表达式来确定消息标识符。此示例再次使用默认策略(三次尝试,没有回退)。与无状态重试一样,这些策略可以自定义。以下清单显示了示例及其DEBUG
输出<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="retryStateGenerator"> <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator"> <constructor-arg value="headers['jms_messageId']" /> </bean> </property> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> 24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 24.368 DEBUG [Container#0-1]Retry: count=0 24.387 DEBUG [Container#0-1]Checking for rethrow: count=1 24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1 24.387 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 25.412 DEBUG [Container#0-1]Retry: count=1 25.413 DEBUG [Container#0-1]Checking for rethrow: count=2 25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2 25.413 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 26.418 DEBUG [Container#0-1]Retry: count=2 26.419 DEBUG [Container#0-1]Checking for rethrow: count=3 26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3 26.419 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3 27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]
如果您将前面的示例与无状态示例进行比较,您会发现,使用有状态重试,异常会在每次失败时抛出给调用者。
- 重试的异常分类
-
Spring Retry 在确定哪些异常可以触发重试方面具有很大的灵活性。默认配置会重试所有异常,异常分类器会查看顶层异常。如果您将其配置为仅在
MyException
上重试,而您的应用程序抛出了一个SomeOtherException
,其原因是MyException
,则不会发生重试。从 Spring Retry 1.0.3 开始,
BinaryExceptionClassifier
具有一个名为traverseCauses
的属性(默认值为false
)。当为true
时,它会遍历异常原因,直到找到匹配项或遍历完所有原因。要将此分类器用于重试,请使用一个
SimpleRetryPolicy
,它使用接受最大尝试次数、Exception
对象的Map
和traverseCauses
布尔值的构造函数创建。然后,您可以将此策略注入到RetryTemplate
中。
在这种情况下,需要 traverseCauses ,因为用户异常可能会包装在 MessagingException 中。
|
断路器建议
断路器模式的基本思想是,如果服务当前不可用,不要浪费时间(和资源)尝试使用它。o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice
实现了这种模式。当断路器处于关闭状态时,端点尝试调用服务。如果一定数量的连续尝试失败,断路器将进入打开状态。当它处于打开状态时,新的请求会“快速失败”,并且不会尝试调用服务,直到一段时间过去。
当这段时间过去后,断路器将设置为半打开状态。处于此状态时,如果即使只有一次尝试失败,断路器也会立即进入打开状态。如果尝试成功,断路器将进入关闭状态,在这种情况下,它不会再次进入打开状态,直到配置的连续失败次数再次发生。任何成功的尝试都会将状态重置为零失败,以便确定断路器何时可能再次进入打开状态。
通常,此建议可能用于外部服务,因为这些服务可能需要一些时间才能失败(例如,尝试建立网络连接的超时)。
RequestHandlerCircuitBreakerAdvice
具有两个属性:threshold
和 halfOpenAfter
。threshold
属性表示断路器进入打开状态之前需要发生的连续失败次数。它默认为 5
。halfOpenAfter
属性表示断路器在最后一次失败后等待的时间,然后再尝试另一个请求。默认值为 1000 毫秒。
以下示例配置了一个断路器,并展示了它的DEBUG
和ERROR
输出
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
在前面的示例中,阈值设置为2
,halfOpenAfter
设置为12
秒。每5秒就会有一个新的请求到达。前两次尝试调用了服务。第三次和第四次尝试失败,并抛出了一个异常,表明断路器处于打开状态。第五次请求尝试成功,因为距离上次失败已经过去了15秒。第六次尝试立即失败,因为断路器立即进入打开状态。
表达式评估建议
最后提供的建议类是o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice
。此建议比其他两个建议更通用。它提供了一种机制来评估发送到端点的原始入站消息上的表达式。在成功或失败之后,都可以评估单独的表达式。可选地,可以将包含评估结果的消息以及输入消息发送到消息通道。
此建议的典型用例可能是使用<ftp:outbound-channel-adapter/>
,也许是将文件移动到一个目录(如果传输成功)或另一个目录(如果失败)。
该建议具有属性来设置成功时的表达式、失败时的表达式以及每个表达式的相应通道。对于成功的情况,发送到successChannel
的消息是AdviceMessage
,其有效负载是表达式评估的结果。另一个名为inputMessage
的属性包含发送到处理程序的原始消息。发送到failureChannel
的消息(当处理程序抛出异常时)是ErrorMessage
,其有效负载为MessageHandlingExpressionEvaluatingAdviceException
。与所有MessagingException
实例一样,此有效负载具有failedMessage
和cause
属性,以及一个名为evaluationResult
的附加属性,其中包含表达式评估的结果。
从 5.1.3 版本开始,如果配置了通道但没有提供表达式,则使用默认表达式来评估消息的payload 。
|
当在建议范围内抛出异常时,默认情况下,该异常会在评估任何failureExpression
之后抛出给调用者。如果您希望抑制抛出异常,请将trapException
属性设置为true
。以下建议展示了如何使用 Java DSL 配置advice
@SpringBootApplication
public class EerhaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
MessageChannel in = context.getBean("advised.input", MessageChannel.class);
in.send(new GenericMessage<>("good"));
in.send(new GenericMessage<>("bad"));
context.close();
}
@Bean
public IntegrationFlow advised() {
return f -> f.<String>handle((payload, headers) -> {
if (payload.equals("good")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, c -> c.advice(expressionAdvice()));
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
}
速率限制建议
速率限制建议(RateLimiterRequestHandlerAdvice
)允许确保端点不会被请求过载。当速率限制被突破时,请求将进入阻塞状态。
此建议的一个典型用例可能是外部服务提供商不允许每分钟超过n
个请求。
RateLimiterRequestHandlerAdvice
实现完全基于 Resilience4j 项目,需要注入 RateLimiter
或 RateLimiterConfig
。也可以配置默认值和/或自定义名称。
以下示例配置了一个速率限制器建议,每秒一个请求
@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build());
}
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
...
}
缓存建议
从 5.2 版本开始,引入了 CacheRequestHandlerAdvice
。它基于 Spring Framework 中的缓存抽象,并与 @Caching
注释系列提供的概念和功能保持一致。内部逻辑基于 CacheAspectSupport
扩展,其中缓存操作的代理是在 AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage
方法周围完成的,请求 Message<?>
作为参数。此建议可以配置为使用 SpEL 表达式或 Function
来评估缓存键。请求 Message<?>
可用作 SpEL 评估上下文的根对象,或作为 Function
输入参数。默认情况下,请求消息的 payload
用于缓存键。CacheRequestHandlerAdvice
必须配置 cacheNames
,当默认缓存操作是 CacheableOperation
时,或者配置一组任意 CacheOperation
。每个 CacheOperation
可以单独配置或共享选项,例如 CacheManager
、CacheResolver
和 CacheErrorHandler
,可以从 CacheRequestHandlerAdvice
配置中重复使用。此配置功能类似于 Spring Framework 的 @CacheConfig
和 @Caching
注释组合。如果没有提供 CacheManager
,则默认情况下会从 CacheAspectSupport
中的 BeanFactory
解析单个 bean。
以下示例配置了两个具有不同缓存操作集的建议
@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
return cacheRequestHandlerAdvice;
}
@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
...
}
@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
cachePutBuilder.setCacheName(TEST_PUT_CACHE);
CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
cacheEvictBuilder.setCacheName(TEST_CACHE);
cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
return cacheRequestHandlerAdvice;
}
@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
...
}