主题命名

重试主题和 DLT 通过在主主题后附加提供或默认的值,然后根据该主题的延迟或索引进行命名。

示例

"my-topic" → "my-topic-retry-0", "my-topic-retry-1", …​, "my-topic-dlt"

"my-other-topic" → "my-topic-myRetrySuffix-1000", "my-topic-myRetrySuffix-2000", …​, "my-topic-myDltSuffix"

默认行为是为每次尝试创建单独的重试主题,并附加索引值:retry-0, retry-1, …​, retry-n。因此,默认情况下,重试主题的数量是配置的 `maxAttempts` 减 1。

您可以配置后缀,选择是附加尝试次数索引还是延迟值,在使用固定回退时使用单个重试主题,以及在使用指数回退时为具有最大间隔的尝试使用单个重试主题

重试主题和 DLT 后缀

您可以指定重试主题和 DLT 将使用的后缀。

@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .retryTopicSuffix("-my-retry-suffix")
            .dltTopicSuffix("-my-dlt-suffix")
            .create(template);
}
默认后缀分别是 "-retry" 和 "-dlt",用于重试主题和 dlt。

附加主题的索引或延迟值

您可以在后缀后附加主题的索引或延迟值。

@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .suffixTopicsWithIndexValues()
            .create(template);
    }
默认行为是使用延迟值作为后缀,但对于具有多个主题的固定延迟配置除外,在这种情况下,主题以后缀加上主题的索引进行命名。

固定延迟重试的单个主题

如果您使用固定延迟策略,例如 `FixedBackOffPolicy` 或 `NoBackOffPolicy`,您可以使用单个主题来实现非阻塞重试。这个主题将附加提供或默认的后缀,并且不会附加索引或延迟值。

之前的 `FixedDelayStrategy` 已被弃用,可以使用 `SameIntervalTopicReuseStrategy` 替代。
@RetryableTopic(backoff = @Backoff(2_000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3_000)
            .maxAttempts(5)
            .useSingleTopicForFixedDelays()
            .create(template);
}
默认行为是为每次尝试创建单独的重试主题,并附加其索引值:retry-0, retry-1, …​

最大间隔指数延迟的单个主题

如果您使用指数回退策略(`ExponentialBackOffPolicy`),您可以使用单个重试主题来完成延迟是配置的 `maxInterval` 的尝试的非阻塞重试。

这个“最终”重试主题将附加提供或默认的后缀,并会附加索引或 `maxInterval` 值。

通过选择为具有 `maxInterval` 延迟的重试使用单个主题,配置一个长时间持续重试的指数重试策略可能会变得更可行,因为这种方法不需要大量的重试主题。

从 3.2 版本开始,默认行为是在使用指数回退时,对相同间隔重用重试主题;重试主题附加延迟值作为后缀,最后一个重试主题用于相同间隔(对应于 `maxInterval` 延迟)的重用。

例如,当配置指数回退,设置 `initialInterval=1_000`,`multiplier=2` 和 `maxInterval=16_000` 时,为了持续重试一小时,需要将 `maxAttempts` 配置为 229。默认情况下,所需的重试主题将是

  • -retry-1000

  • -retry-2000

  • -retry-4000

  • -retry-8000

  • -retry-16000

当使用重试主题数量等于配置的 `maxAttempts` 减 1 的策略时,最后一个重试主题(对应于 `maxInterval` 延迟)将附加额外的索引作为后缀,例如

  • -retry-1000

  • -retry-2000

  • -retry-4000

  • -retry-8000

  • -retry-16000-0

  • -retry-16000-1

  • -retry-16000-2

  • …​

  • -retry-16000-224

如果需要多个主题,可以使用以下配置来完成。

@RetryableTopic(attempts = 230,
    backoff = @Backoff(delay = 1_000, multiplier = 2, maxDelay = 16_000),
    sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1_000, 2, 16_000)
            .maxAttempts(230)
            .useSingleTopicForSameIntervals()
            .create(template);
}

自定义命名策略

通过注册实现 `RetryTopicNamesProviderFactory` 接口的 Bean,可以实现更复杂的命名策略。默认实现是 `SuffixingRetryTopicNamesProviderFactory`,可以通过以下方式注册不同的实现

@Override
protected RetryTopicComponentFactory createComponentFactory() {
    return new RetryTopicComponentFactory() {
        @Override
        public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
            return new CustomRetryTopicNamesProviderFactory();
        }
    };
}

例如,以下实现除了标准后缀外,还为重试/dlt 主题名称添加了前缀

public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {

    @Override
    public RetryTopicNamesProvider createRetryTopicNamesProvider(
                DestinationTopic.Properties properties) {

        if (properties.isMainEndpoint()) {
            return new SuffixingRetryTopicNamesProvider(properties);
        }
        else {
            return new SuffixingRetryTopicNamesProvider(properties) {

                @Override
                public String getTopicName(String topic) {
                    return "my-prefix-" + super.getTopicName(topic);
                }

            };
        }
    }

}