DLT 策略
该框架提供了几种处理 DLT(死信主题)的策略。你可以提供一个 DLT 处理方法,使用默认的日志记录方法,或者完全不使用 DLT。此外,你还可以选择 DLT 处理失败时的行为。
DLT 处理方法
你可以指定用于处理主题 DLT 的方法,以及该处理失败时的行为。
为此,你可以在带有 @RetryableTopic
注解的类的某个方法上使用 @DltHandler
注解。请注意,该类中所有带有 @RetryableTopic
注解的方法将使用相同的方法处理 DLT。
@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@DltHandler
public void processDltMessage(MyPojo message) {
// ... message processing, persistence, etc
}
DLT 处理方法也可以通过 RetryTopicConfigurationBuilder.dltHandlerMethod(String, String)
方法提供,将应处理 DLT 消息的 bean 名称和方法名称作为参数传入。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
.create(template);
}
@Component
public class MyCustomDltProcessor {
private final MyDependency myDependency;
public MyCustomDltProcessor(MyDependency myDependency) {
this.myDependency = myDependency;
}
public void processDltMessage(MyPojo message) {
// ... message processing, persistence, etc
}
}
如果没有提供 DLT 处理程序,则使用默认的 RetryTopicConfigurer.LoggingDltListenerHandlerMethod 。 |
从 2.8 版本开始,如果你完全不想在此应用程序中从 DLT 消费,包括通过默认处理程序(或者你希望延迟消费),则可以控制 DLT 容器是否启动,这独立于容器工厂的 autoStartup
属性。
使用 @RetryableTopic
注解时,将 autoStartDltHandler
属性设置为 false
;使用配置构建器时,使用 autoStartDltHandler(false)
。
之后你可以通过 KafkaListenerEndpointRegistry
启动 DLT 处理程序。
DLT 失败行为
如果 DLT 处理失败,有两种可用的行为:ALWAYS_RETRY_ON_ERROR
和 FAIL_ON_ERROR
。
前者会将记录转发回 DLT 主题,以便它不会阻塞其他 DLT 记录的处理。后者则在不转发消息的情况下结束 consumer 的执行。
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
.doNotRetryOnDltFailure()
.create(template);
}
默认行为是 ALWAYS_RETRY_ON_ERROR 。 |
从 2.8.3 版本开始,如果记录导致抛出致命异常(例如 DeserializationException ),ALWAYS_RETRY_ON_ERROR 将不会将记录路由回 DLT,因为通常此类异常总是会抛出。 |
被认为是致命的异常包括:
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
你可以使用 DestinationTopicResolver
bean 上的方法向此列表添加或删除异常。
有关更多信息,请参见异常分类器。
配置无 DLT
框架还提供了不为主题配置 DLT 的可能性。在这种情况下,重试次数用尽后,处理简单地结束。
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotConfigureDlt()
.create(template);
}