功能
大多数功能都适用于 @RetryableTopic 注解和 RetryTopicConfiguration bean。
退避配置
退避配置依赖于 Spring Retry 项目中的 BackOffPolicy 接口。
它包括
-
固定退避
-
指数退避
-
随机指数退避
-
均匀随机退避
-
无退避
-
自定义退避
@RetryableTopic(attempts = 5,
backOff = @BackOff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3_000)
.maxAttempts(4)
.create(template);
}
您还可以提供 Spring Retry 的 SleepingBackOffPolicy 接口的自定义实现。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.customBackoff(new MyCustomBackOffPolicy())
.maxAttempts(5)
.create(template);
}
默认的退避策略是 FixedBackOffPolicy,最大尝试次数为 3 次,间隔为 1000 毫秒。 |
ExponentialBackOffPolicy 的默认最大延迟为 30 秒。如果您的退避策略需要大于此值的延迟,请相应地调整 maxDelay 属性。 |
第一次尝试计入 maxAttempts,因此如果您提供 maxAttempts 值为 4,则将有原始尝试加 3 次重试。 |
全局超时
您可以设置重试过程的全局超时。如果达到该时间,下一次消费者抛出异常时,消息将直接进入 DLT,如果 DLT 不可用,则仅结束处理。
@RetryableTopic(backOff = @BackOff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(2_000)
.timeoutAfter(5_000)
.create(template);
}
| 默认情况下未设置超时,也可以通过提供 -1 作为超时值来实现。 |
异常分类器
您可以指定要重试哪些异常,不重试哪些异常。您还可以将其设置为遍历原因以查找嵌套异常。
@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = "true")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.notRetryOn(MyDontRetryException.class)
.create(template);
}
| 默认行为是重试所有异常,而不遍历原因。 |
自 2.8.3 版本起,有一个全局致命异常列表,它将导致记录被发送到 DLT 而不进行任何重试。有关致命异常的默认列表,请参阅 DefaultErrorHandler。您可以通过覆盖扩展 RetryTopicConfigurationSupport 的 @Configuration 类中的 configureNonBlockingRetries 方法来向此列表添加或从中删除异常。有关更多信息,请参阅 配置全局设置和功能。
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
| 要禁用致命异常分类,只需清除提供的列表。 |
包含和排除主题
您可以通过 .includeTopic(String topic)、.includeTopics(Collection<String> topics)、.excludeTopic(String topic) 和 .excludeTopics(Collection<String> topics) 方法决定哪些主题将由 RetryTopicConfiguration bean 处理,哪些不处理。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.includeTopics(List.of("my-included-topic", "my-other-included-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.excludeTopic("my-excluded-topic")
.create(template);
}
| 默认行为是包含所有主题。 |
主题自动创建
除非另有说明,否则框架将使用由 KafkaAdmin bean 消费的 NewTopic bean 自动创建所需的主题。您可以指定创建主题的分区数和复制因子,并且可以关闭此功能。从 3.0 版本开始,默认复制因子为 -1,表示使用 broker 默认值。如果您的 broker 版本早于 2.4,则需要设置一个明确的值。
| 请注意,如果您不使用 Spring Boot,则必须提供一个 KafkaAdmin bean 才能使用此功能。 |
@RetryableTopic(numPartitions = "2", replicationFactor = "3")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@RetryableTopic(autoCreateTopics = "false")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.autoCreateTopicsWith(2, 3)
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotAutoCreateRetryTopics()
.create(template);
}
| 默认情况下,主题以一个分区和 -1 的复制因子(表示使用 broker 默认值)自动创建。如果您的 broker 版本早于 2.4,则需要设置一个明确的值。 |
故障头管理
在考虑如何管理故障头(原始头和异常头)时,框架委托给 DeadLetterPublishingRecoverer 来决定是追加还是替换头。
默认情况下,它显式将 appendOriginalHeaders 设置为 false,并将 stripPreviousExceptionHeaders 留给 DeadLetterPublishingRecover 使用的默认值。
这意味着在默认配置下,只保留第一个“原始”和最后一个异常头。这是为了避免在涉及许多重试步骤时创建过大的消息(例如,由于堆栈跟踪头)。
有关更多信息,请参阅 管理死信记录头。
要重新配置框架以将不同设置用于这些属性,请通过覆盖扩展 RetryTopicConfigurationSupport 的 @Configuration 类中的 configureCustomizers 方法来配置 DeadLetterPublishingRecoverer 自定义器。有关更多详细信息,请参阅 配置全局设置和功能。
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
dlpr.setAppendOriginalHeaders(true);
dlpr.setStripPreviousExceptionHeaders(false);
});
}
从 2.8.4 版本开始,如果您希望添加自定义头(除了工厂添加的重试信息头),您可以向工厂添加一个 headersFunction - factory.setHeadersFunction((rec, ex) -> { ... })。
默认情况下,添加的任何头都将是累积的 - Kafka 头可以包含多个值。从 2.9.5 版本开始,如果函数返回的 Headers 包含类型为 DeadLetterPublishingRecoverer.SingleRecordHeader 的头,则该头的任何现有值将被删除,并且只保留新的单个值。
自定义 DeadLetterPublishingRecoverer
如 故障头管理 中所述,可以自定义框架创建的默认 DeadLetterPublishingRecoverer 实例。但是,对于某些用例,有必要子类化 DeadLetterPublishingRecoverer,例如覆盖 createProducerRecord() 以修改发送到重试(或死信)主题的内容。从 3.0.9 版本开始,您可以覆盖 RetryTopicConfigurationSupport.configureDeadLetterPublishingContainerFactory() 方法以提供 DeadLetterPublisherCreator 实例,例如
@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
configureDeadLetterPublishingContainerFactory() {
return (factory) -> factory.setDeadLetterPublisherCreator(
(templateResolver, destinationResolver) ->
new CustomDLPR(templateResolver, destinationResolver));
}
建议您在构造自定义实例时使用提供的解析器。
基于抛出异常的消息路由到自定义 DLT
从 3.2.0 版本开始,可以根据在处理消息期间抛出的异常类型将消息路由到自定义 DLT。为此,需要指定路由。路由自定义包括附加目标的规范。目标反过来又包括两个设置:suffix 和 exceptions。当抛出 exceptions 中指定的异常类型时,包含 suffix 的 DLT 将被视为消息的目标主题,然后才考虑通用 DLT。使用注解或 RetryTopicConfiguration bean 进行配置的示例
@RetryableTopic(exceptionBasedDltRouting = {
@ExceptionBasedDltDestination(
suffix = "-deserialization", exceptions = {DeserializationException.class}
)}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
.create(template);
}
suffix 在自定义 DLT 名称中位于通用 dltTopicSuffix 之前。考虑所提供的示例,导致 DeserializationException 的消息将被路由到 my-annotated-topic-deserialization-dlt 而不是 my-annotated-topic-dlt。自定义 DLT 将按照 主题自动创建 中所述的相同规则创建。