弹性:从错误和代理故障中恢复
Spring AMQP 提供的一些关键(也是最受欢迎)的高级功能与在协议错误或代理故障发生时的恢复和自动重连有关。我们已经在指南中看到了所有相关的组件,但在本节中将它们集中在一起并单独指出这些功能和恢复场景将会很有帮助。
主要的重连功能由 CachingConnectionFactory
本身启用。使用 RabbitAdmin
的自动声明功能通常也很有益。此外,如果你关心保证送达,可能还需要在 RabbitTemplate
和 SimpleMessageListenerContainer
中使用 channelTransacted
标志,并在 SimpleMessageListenerContainer
中使用 AcknowledgeMode.AUTO
(如果你自己处理确认,则使用手动模式)。
Exchange、Queue 和 Binding 的自动声明
RabbitAdmin
组件可以在启动时声明 exchange、queue 和 binding。它通过一个 ConnectionListener
延迟执行此操作。因此,如果代理在启动时不存在,也没有关系。第一次使用 Connection
(例如,通过发送消息)时,监听器会触发并应用 admin 功能。在监听器中进行自动声明的另一个好处是,如果连接因任何原因(例如,代理宕机、网络故障等)断开,当连接重新建立时,它们会再次被应用。
以这种方式声明的队列必须具有固定的名称——要么是显式声明的,要么是框架为 AnonymousQueue 实例生成的。匿名队列是不可持久化、排他的且自动删除的。 |
自动声明仅在 CachingConnectionFactory 的缓存模式为 CHANNEL (默认值)时执行。此限制存在的原因是排他队列和自动删除队列绑定到连接。 |
从 2.2.2 版本开始,RabbitAdmin
将检测 DeclarableCustomizer
类型的 bean,并在实际处理声明之前应用该功能。这很有用,例如,可以在框架内部对某个新参数(属性)提供一流支持之前,先对其进行设置。
@Bean
public DeclarableCustomizer customizer() {
return dec -> {
if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
dec.addArgument("some.new.queue.argument", true);
}
return dec;
};
}
在不直接访问 Declarable
bean 定义的项目中,这也很有用。
另请参阅 RabbitMQ 自动连接/拓扑恢复。
同步操作中的故障与重试选项
在使用 RabbitTemplate
进行同步操作序列时,如果丢失了与代理的连接(例如),Spring AMQP 会抛出 AmqpException
(通常是 AmqpIOException
,但不总是)。我们不会隐藏出现问题的事实,因此你必须能够捕获并响应此异常。如果你怀疑连接丢失(并且不是你的原因),最简单的做法是再次尝试该操作。你可以手动执行此操作,或者考虑使用 Spring Retry 来处理重试(命令式或声明式)。
Spring Retry 提供了一些 AOP 拦截器,并在指定重试参数(尝试次数、异常类型、退避算法等)方面提供了极大的灵活性。Spring AMQP 还提供了一些便利的工厂 bean,用于以方便的形式创建适用于 AMQP 用例的 Spring Retry 拦截器,它们带有强类型回调接口,你可以使用这些接口实现自定义恢复逻辑。有关更多详细信息,请参阅 StatefulRetryOperationsInterceptor
和 StatelessRetryOperationsInterceptor
的 Javadoc 和属性。如果不存在事务或事务在重试回调内部启动,则无状态重试是合适的。请注意,无状态重试比有状态重试更容易配置和分析,但如果存在必须回滚或肯定会回滚的正在进行的事务,则通常不适用。在事务中间断开连接应该与回滚具有相同的效果。因此,对于事务在堆栈更高层启动的重连,有状态重试通常是最佳选择。有状态重试需要一种机制来唯一标识消息。最简单的方法是让发送者在 MessageId
消息属性中放入一个唯一值。提供的消息转换器提供了执行此操作的选项:你可以将 createMessageIds
设置为 true
。否则,你可以将 MessageKeyGenerator
实现注入到拦截器中。密钥生成器必须为每条消息返回一个唯一密钥。在 2.0 版本之前,提供了 MissingMessageIdAdvice
。它允许没有 messageId
属性的消息恰好重试一次(忽略重试设置)。现在不再提供此 advice,因为从 spring-retry
1.2 版本开始,其功能已内置到拦截器和消息监听器容器中。
为了向后兼容,默认情况下(在一次重试后),带有 null 消息 ID 的消息对消费者来说被视为致命错误(消费者停止)。要复制 MissingMessageIdAdvice 提供的功能,可以在监听器容器上将 statefulRetryFatalWithNullMessageId 属性设置为 false 。通过此设置,消费者会继续运行,并且消息将被拒绝(在一次重试后)。它将被丢弃或路由到死信队列(如果已配置)。 |
从 1.3 版本开始,提供了一个构建器 API,用于通过 Java(在 @Configuration
类中)辅助组装这些拦截器。以下示例展示了如何执行此操作
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}
通过这种方式只能配置重试能力的一个子集。更高级的功能需要将 RetryTemplate
配置为 Spring bean。有关可用策略及其配置的完整信息,请参阅 Spring Retry Javadoc。
批量监听器的重试
不建议为批量监听器配置重试,除非该批次是由生产者在一个记录中创建的。有关消费者和生产者创建的批次的信息,请参阅批量消息。对于消费者创建的批次,框架不知道批次中的哪条消息导致了故障,因此在重试耗尽后无法恢复。对于生产者创建的批次,由于只有一条消息实际失败,因此可以恢复整个消息。应用程序可能希望通知自定义恢复器故障发生在批次的哪个位置,或许可以通过设置抛出异常的索引属性。
批量监听器的重试恢复器必须实现 MessageBatchRecoverer
。
消息监听器与异步情况
如果 MessageListener
因业务异常而失败,该异常由消息监听器容器处理,然后容器会返回继续监听下一条消息。如果故障是由连接断开(非业务异常)引起的,则为监听器收集消息的消费者必须被取消并重新启动。SimpleMessageListenerContainer
会无缝地处理这种情况,并记录日志表示正在重启监听器。实际上,它会无限循环,尝试重启消费者。只有当消费者行为非常糟糕时,它才会放弃。一个副作用是,如果在容器启动时代理宕机,它会一直尝试直到连接建立成功。
与协议错误和连接断开不同,业务异常处理可能需要更多的考虑和一些自定义配置,尤其是在使用事务或容器确认的情况下。在 2.8.x 之前,RabbitMQ 没有死信行为的定义。因此,默认情况下,因业务异常而被拒绝或回滚的消息可能会被无限次地重新投递。为了在客户端限制重新投递的次数,一种选择是在监听器的 advice 链中使用 StatefulRetryOperationsInterceptor
。该拦截器可以有一个恢复回调,用于实现自定义的死信操作——根据你的特定环境进行相应的处理。
另一种选择是将容器的 defaultRequeueRejected
属性设置为 false
。这将导致所有失败的消息被丢弃。当使用 RabbitMQ 2.8.x 或更高版本时,这也方便将消息投递到死信 exchange。
或者,你可以抛出 AmqpRejectAndDontRequeueException
。这样做可以阻止消息重新入队,无论 defaultRequeueRejected
属性设置如何。
从 2.1 版本开始,引入了 ImmediateRequeueAmqpException
,用于执行完全相反的逻辑:无论 defaultRequeueRejected
属性设置如何,消息都将被重新入队。
通常,这两种技术会结合使用。可以在 advice 链中使用 StatefulRetryOperationsInterceptor
,并配合一个抛出 AmqpRejectAndDontRequeueException
的 MessageRecoverer
。当所有重试都已耗尽时,会调用 MessageRecover
。RejectAndDontRequeueRecoverer
正是执行此操作的。默认的 MessageRecoverer
会处理错误的 message 并发出 WARN
级别的消息。
从 1.3 版本开始,提供了一个新的 RepublishMessageRecoverer
,允许在重试耗尽后发布失败的消息。
当恢复器处理最终异常时,消息会被确认,并且如果已配置,代理也不会将其发送到死信 exchange。
当在消费者端使用 RepublishMessageRecoverer 时,接收到的消息在 receivedDeliveryMode 消息属性中包含 deliveryMode 。在这种情况下,deliveryMode 可能为 null 。这意味着代理上的投递模式为 NON_PERSISTENT 。从 2.0 版本开始,你可以为 RepublishMessageRecoverer 配置要在重新发布的消息中设置的 deliveryMode ,如果它为 null 的话。默认情况下,它使用 MessageProperties 的默认值 - MessageDeliveryMode.PERSISTENT 。 |
以下示例展示了如何将 RepublishMessageRecoverer
设置为恢复器
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
.build();
}
RepublishMessageRecoverer
在消息头部中发布消息,并附带额外信息,例如异常消息、堆栈跟踪、原始 exchange 和路由键。可以通过创建子类并重写 additionalHeaders()
来添加额外的头部。deliveryMode
(或任何其他属性)也可以在 additionalHeaders()
中更改,如下例所示
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {
protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
message.getMessageProperties()
.setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
return null;
}
};
从 2.0.5 版本开始,如果堆栈跟踪太大,可能会被截断;这是因为所有头部都必须适合单个帧。默认情况下,如果堆栈跟踪导致为其他头部留出的可用空间(“余量”)少于 20,000 字节,它将被截断。如果你需要更多或更少空间用于其他头部,可以通过设置恢复器的 frameMaxHeadroom
属性来调整此值。从 2.1.13 和 2.2.3 版本开始,异常消息也包含在此计算中,并且堆栈跟踪的量将使用以下算法最大化
-
如果仅堆栈跟踪就超出限制,则异常消息头部将被截断为 97 字节加上
…
,并且堆栈跟踪也会被截断。 -
如果堆栈跟踪很小,消息(加上
…
)将被截断以适应可用字节(但堆栈跟踪本身中的消息被截断为 97 字节加上…
)。
每当发生任何类型的截断时,原始异常都会被记录下来,以保留完整信息。评估在头部被增强后执行,因此像异常类型这样的信息可以在表达式中使用。
从 2.4.8 版本开始,错误 exchange 和路由键可以作为 SpEL 表达式提供,其中 Message
是评估的根对象。
从 2.3.3 版本开始,提供了一个新的子类 RepublishMessageRecovererWithConfirms
;它支持两种风格的发布者确认,并会在返回之前等待确认(如果未确认或消息被返回,则抛出异常)。
如果确认类型是 CORRELATED
,该子类还会检测消息是否被返回并抛出 AmqpMessageReturnedException
;如果发布被否定确认,它会抛出 AmqpNackReceivedException
。
如果确认类型是 SIMPLE
,该子类将在通道上调用 waitForConfirmsOrDie
方法。
有关确认和返回的更多信息,请参阅发布者确认和返回。
从 2.1 版本开始,添加了 ImmediateRequeueMessageRecoverer
,用于抛出 ImmediateRequeueAmqpException
,它通知监听器容器重新入队当前失败的消息。
Spring Retry 的异常分类
Spring Retry 在确定哪些异常可以触发重试方面具有极大的灵活性。默认配置会重试所有异常。考虑到用户异常被包装在 ListenerExecutionFailedException
中,我们需要确保分类器检查异常的原因。默认分类器仅查看顶层异常。
从 Spring Retry 1.0.3 开始,BinaryExceptionClassifier
有一个名为 traverseCauses
的属性(默认为 false
)。当设置为 true
时,它会遍历异常原因,直到找到匹配项或没有更多原因。
要将此分类器用于重试,可以使用一个通过构造函数创建的 SimpleRetryPolicy
,该构造函数接受最大尝试次数、Exception
实例的 Map
和布尔值 (traverseCauses
),然后将此策略注入到 RetryTemplate
中。
通过代理进行重试
从队列成为死信的消息在从 DLX 重新路由后,可以重新发布回该队列。这种重试行为通过 x-death
头部在代理端控制。有关此方法的更多信息,请参阅官方 RabbitMQ 文档。
另一种方法是从应用程序手动将失败的消息重新发布回原始 exchange。从 4.0
版本开始,RabbitMQ 代理不考虑客户端发送的 x-death
头部。实际上,客户端发送的任何 x-*
头部都会被忽略。
为了缓解 RabbitMQ 代理的这种新行为,Spring AMQP 从 3.2 版本开始引入了一个 retry_count
头部。当此头部不存在且服务器端 DLX 生效时,x-death.count
属性会被映射到此头部。当手动重新发布失败消息以进行重试时,retry_count
头部的值必须手动递增。有关更多信息,请参阅 MessageProperties.incrementRetryCount()
的 JavaDoc。
以下示例总结了通过代理进行手动重试的算法
@RabbitListener(queueNames = "some_queue")
public void rePublish(Message message) {
try {
// Process message
}
catch (Exception ex) {
Long retryCount = message.getMessageProperties().getRetryCount();
if (retryCount < 3) {
message.getMessageProperties().incrementRetryCount();
this.rabbitTemplate.send("", "some_queue", message);
}
else {
throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
}
}
}