主题命名
重试主题和DLT通过在主主题后附加提供的值或默认值进行命名,然后附加该主题的延迟或索引。
示例
"my-topic" → "my-topic-retry-0", "my-topic-retry-1", …, "my-topic-dlt"
"my-other-topic" → "my-topic-myRetrySuffix-1000", "my-topic-myRetrySuffix-2000", …, "my-topic-myDltSuffix"
默认行为是为每次尝试创建单独的重试主题,并附加一个索引值:retry-0, retry-1, …, retry-n。因此,默认情况下,重试主题的数量是配置的maxAttempts减1。 |
您可以配置后缀,选择是附加尝试索引还是延迟,在使用固定退避时使用单个重试主题,以及在使用指数退避时为具有maxInterval的尝试使用单个重试主题。
重试主题和DLT后缀
您可以指定重试和DLT主题将使用的后缀。
@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.retryTopicSuffix("-my-retry-suffix")
.dltTopicSuffix("-my-dlt-suffix")
.create(template);
}
| 默认后缀分别是重试主题的"-retry"和DLT的"-dlt"。 |
附加主题索引或延迟
您可以在后缀后面附加主题的索引或延迟值。
@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.suffixTopicsWithIndexValues()
.create(template);
}
| 默认行为是使用延迟值作为后缀,但对于具有多个主题的固定延迟配置,主题会以主题索引作为后缀。 |
固定延迟重试的单个主题
如果您使用固定延迟策略,例如FixedBackOffPolicy或NoBackOffPolicy,您可以使用单个主题来实现非阻塞重试。此主题将以提供的或默认的后缀结尾,并且不会附加索引或延迟值。
以前的FixedDelayStrategy现在已弃用,可以替换为SameIntervalTopicReuseStrategy。 |
@RetryableTopic(backOff = @BackOff(2_000), sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3_000)
.maxAttempts(5)
.useSingleTopicForSameIntervals()
.create(template);
}
| 默认行为是为每次尝试创建单独的重试主题,并附加其索引值:retry-0, retry-1, … |
maxInterval 指数延迟的单个主题
如果您使用指数退避策略 (ExponentialBackOffPolicy),您可以使用单个重试主题来实现延迟为配置的maxInterval的尝试的非阻塞重试。
这个“最终”重试主题将以提供的或默认的后缀结尾,并且会附加索引或maxInterval值。
通过选择对具有maxInterval延迟的重试使用单个主题,配置一个长时间重试的指数重试策略可能变得更加可行,因为在这种方法中您不需要大量的(重试)主题。 |
从3.2版本开始,默认行为是在使用指数退避时,重用相同间隔的重试主题,重试主题会以延迟值作为后缀,最后一个重试主题会重用相同间隔(对应于maxInterval延迟)。
例如,当配置指数退避策略,initialInterval=1_000,multiplier=2,和maxInterval=16_000,为了持续重试一小时,需要将maxAttempts配置为229,默认情况下所需的重试主题将是
-
-retry-1000
-
-retry-2000
-
-retry-4000
-
-retry-8000
-
-retry-16000
当使用重试主题数量等于配置的maxAttempts减1的策略时,最后一个重试主题(对应于maxInterval延迟)将附加一个额外的索引,如下所示
-
-retry-1000
-
-retry-2000
-
-retry-4000
-
-retry-8000
-
-retry-16000-0
-
-retry-16000-1
-
-retry-16000-2
-
...
-
-retry-16000-224
如果需要多个主题,可以使用以下配置实现。
@RetryableTopic(attempts = 230,
backOff = @BackOff(delay = 1_000, multiplier = 2, maxDelay = 16_000),
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1_000, 2, 16_000)
.maxAttempts(230)
.useSingleTopicForSameIntervals()
.create(template);
}
自定义命名策略
更复杂的命名策略可以通过注册一个实现RetryTopicNamesProviderFactory的bean来实现。默认实现是SuffixingRetryTopicNamesProviderFactory,可以通过以下方式注册不同的实现
@Override
protected RetryTopicComponentFactory createComponentFactory() {
return new RetryTopicComponentFactory() {
@Override
public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
return new CustomRetryTopicNamesProviderFactory();
}
};
}
例如,以下实现除了标准后缀外,还在重试/dlt主题名称中添加了一个前缀
public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(
DestinationTopic.Properties properties) {
if (properties.isMainEndpoint()) {
return new SuffixingRetryTopicNamesProvider(properties);
}
else {
return new SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(String topic) {
return "my-prefix-" + super.getTopicName(topic);
}
};
}
}
}