特性
大多数特性对于 `@RetryableTopic` 注解和 `RetryTopicConfiguration` bean 都可用。
BackOff 配置
BackOff 配置依赖于 Spring Retry 项目中的 `BackOffPolicy` 接口。
包括
-
固定间隔 Back Off
-
指数间隔 Back Off
-
随机指数间隔 Back Off
-
均匀随机间隔 Back Off
-
无 Back Off
-
自定义 Back Off
@RetryableTopic(attempts = 5,
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3_000)
.maxAttempts(4)
.create(template);
}
你还可以提供 Spring Retry 的 `SleepingBackOffPolicy` 接口的自定义实现
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.customBackoff(new MyCustomBackOffPolicy())
.maxAttempts(5)
.create(template);
}
默认的 back off 策略是 `FixedBackOffPolicy`,最多重试 3 次,间隔 1000ms。 |
`ExponentialBackOffPolicy` 的默认最大延迟为 30 秒。如果你的 back off 策略需要大于此值的延迟,请相应调整 `maxDelay` 属性。 |
第一次尝试会计入 `maxAttempts`,因此如果你提供 `maxAttempts` 值为 4,则会有原始尝试加上 3 次重试。 |
全局超时
你可以为重试过程设置全局超时时间。如果达到该时间,消费者下次抛出异常时,消息将直接发送到 DLT,或者如果没有 DLT 可用,则直接结束处理。
@RetryableTopic(backoff = @Backoff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(2_000)
.timeoutAfter(5_000)
.create(template);
}
默认情况下不设置超时,这也可以通过提供 -1 作为超时值来实现。 |
异常分类器
你可以指定哪些异常需要重试,哪些不需要。你还可以设置遍历原因以查找嵌套异常。
@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = "true")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.notRetryOn(MyDontRetryException.class)
.create(template);
}
默认行为是对所有异常进行重试,且不遍历原因。 |
自 2.8.3 版本起,有一个全局致命异常列表,这些异常会导致记录直接发送到 DLT 而不进行任何重试。请参阅 DefaultErrorHandler 查看默认的致命异常列表。你可以通过在继承 `RetryTopicConfigurationSupport` 的 `@Configuration` 类中覆盖 `configureNonBlockingRetries` 方法来向此列表添加或从中移除异常。更多信息请参阅 Configuring Global Settings and Features。
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
要禁用致命异常分类,只需清空提供的列表即可。 |
包含和排除主题
你可以通过 `.includeTopic(String topic)`、`.includeTopics(Collection<String> topics)`、`.excludeTopic(String topic)` 和 `.excludeTopics(Collection<String> topics)` 方法决定哪些主题将被 `RetryTopicConfiguration` bean 处理,哪些不被处理。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.includeTopics(List.of("my-included-topic", "my-other-included-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.excludeTopic("my-excluded-topic")
.create(template);
}
默认行为是包含所有主题。 |
主题自动创建
除非另有说明,否则框架将使用 `KafkaAdmin` bean 消费的 `NewTopic` bean 自动创建所需的主题。你可以指定创建主题时的分区数和副本因子,并且可以关闭此功能。从 3.0 版本开始,默认副本因子为 -1,表示使用 broker 的默认设置。如果你的 broker 版本低于 2.4,则需要设置一个明确的值。
请注意,如果你不使用 Spring Boot,则必须提供一个 `KafkaAdmin` bean 才能使用此功能。 |
@RetryableTopic(numPartitions = "2", replicationFactor = "3")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@RetryableTopic(autoCreateTopics = "false")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.autoCreateTopicsWith(2, 3)
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotAutoCreateRetryTopics()
.create(template);
}
默认情况下,主题会自动创建,分区数为 1,副本因子为 -1(表示使用 broker 的默认设置)。如果你的 broker 版本低于 2.4,则需要设置一个明确的值。 |
失败头部管理
在考虑如何管理失败头部(原始头部和异常头部)时,框架委托给 `DeadLetterPublishingRecoverer` 来决定是附加还是替换头部。
默认情况下,它明确将 `appendOriginalHeaders` 设置为 `false`,并让 `stripPreviousExceptionHeaders` 使用 `DeadLetterPublishingRecover` 的默认设置。
这意味着在默认配置下,只保留第一个“原始”头部和最后一个异常头部。这是为了避免在涉及多次重试时创建过大的消息(例如由于堆栈跟踪头部)。
更多信息请参阅 管理死信记录头部。
要重新配置框架以便对这些属性使用不同的设置,请通过在继承 `RetryTopicConfigurationSupport` 的 `@Configuration` 类中覆盖 `configureCustomizers` 方法来配置 `DeadLetterPublishingRecoverer` 定制器。更多详细信息请参阅 配置全局设置和特性。
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
dlpr.setAppendOriginalHeaders(true);
dlpr.setStripPreviousExceptionHeaders(false);
});
}
从 2.8.4 版本开始,如果你希望添加自定义头部(除了工厂添加的重试信息头部),你可以向工厂添加一个 `headersFunction` - `factory.setHeadersFunction((rec, ex) -> { ... })`。
默认情况下,添加的任何头部都是累积的 - Kafka 头部可以包含多个值。从 2.9.5 版本开始,如果函数返回的 Headers 包含类型为 `DeadLetterPublishingRecoverer.SingleRecordHeader` 的头部,则该头部任何现有值都将被移除,只保留新的单个值。
自定义 DeadLetterPublishingRecoverer
如 失败头部管理 中所示,可以定制框架创建的默认 `DeadLetterPublishingRecoverer` 实例。然而,对于某些用例,需要继承 `DeadLetterPublishingRecoverer`,例如覆盖 `createProducerRecord()` 方法来修改发送到重试(或死信)主题的内容。从 3.0.9 版本开始,你可以覆盖 `RetryTopicConfigurationSupport.configureDeadLetterPublishingContainerFactory()` 方法来提供一个 `DeadLetterPublisherCreator` 实例,例如
@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
configureDeadLetterPublishingContainerFactory() {
return (factory) -> factory.setDeadLetterPublisherCreator(
(templateResolver, destinationResolver) ->
new CustomDLPR(templateResolver, destinationResolver));
}
建议你在构建自定义实例时使用提供的解析器。
基于抛出的异常将消息路由到自定义 DLT
从 3.2.0 版本开始,可以根据处理消息过程中抛出的异常类型将消息路由到自定义 DLT。为此,需要指定路由。路由定制包括额外目标的规范。目标又包括两个设置:`suffix` 和 `exceptions`。当抛出 `exceptions` 中指定的异常类型时,包含 `suffix` 的 DLT 将被视为消息的目标主题,优先级高于通用 DLT。以下是使用注解或 `RetryTopicConfiguration` bean 进行配置的示例
@RetryableTopic(exceptionBasedDltRouting = {
@ExceptionBasedDltDestination(
suffix = "-deserialization", exceptions = {DeserializationException.class}
)}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
.create(template);
}
在自定义 DLT 名称中,`suffix` 出现在通用的 `dltTopicSuffix` 之前。考虑到提供的示例,导致 `DeserializationException` 的消息将被路由到 `my-annotated-topic-deserialization-dlt`,而不是 `my-annotated-topic-dlt`。自定义 DLT 的创建将遵循与 主题自动创建 中所述相同的规则。