配置

对于默认设置,通过将 @RetryableTopic 注解添加到 @KafkaListener 方法中来启用非阻塞重试。这是推荐且最简单的方法,因为它会自动配置所需的重试基础设施,并使用默认设置创建重试和 DLT 主题。

要导入非阻塞重试基础设施并将其组件公开为 bean,请使用 @EnableKafkaRetryTopic 注解 @Configuration 类。这使得该功能的组件能够被注入和在运行时查找,并作为高级和全局配置的基础。

如果添加了此注解,则无需再添加 @EnableKafka,因为 @EnableKafkaRetryTopic 已被 @EnableKafka 元注解。

对于高级和全局自定义,请在单个 @Configuration 类中扩展 RetryTopicConfigurationSupport 并覆盖相关方法。有关更多详细信息,请参阅配置全局设置和功能

默认情况下,重试主题的容器将与主容器具有相同的并发性。从 3.0 版本开始,您可以为重试容器设置不同的 concurrency(在注解上或在 RetryTopicConfigurationBuilder 中)。

仅使用上述两种全局配置方法之一(@EnableKafkaRetryTopic 或扩展 RetryTopicConfigurationSupport)。此外,只有一个 @Configuration 类应该扩展 RetryTopicConfigurationSupport

使用 @RetryableTopic 注解

要为 @KafkaListener 注解的方法配置重试主题和 DLT,您只需向其添加 @RetryableTopic 注解,Spring for Apache Kafka 将使用默认配置启动所有必要的主题和消费者。

@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
    // ... message processing
}

自 3.2 起,@RetryableTopic 支持类上的 @KafkaListener 将是

@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {

    @KafkaHandler
    public void processMessage(MyPojo message) {
        // ... message processing
    }

}

您可以通过使用 @DltHandler 注解标记同一类中的方法来处理 DLT 消息。如果没有提供 DltHandler 方法,则会创建一个默认消费者,该消费者仅记录消费情况。

@DltHandler
public void processMessage(MyPojo message) {
    // ... message processing, persistence, etc
}
如果您没有指定 kafkaTemplate 名称,则将查找名为 defaultRetryTopicKafkaTemplate 的 bean。如果找不到 bean,则会抛出异常。

从 3.0 版本开始,@RetryableTopic 注解可以用作自定义注解上的元注解;例如

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {

    @AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
    String parallelism() default "3";

}

使用 RetryTopicConfiguration bean

您还可以通过在 @Configuration 注解的类中创建 RetryTopicConfiguration bean 来配置非阻塞重试支持。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .create(template);
}

这将为使用默认配置的 @KafkaListener 注解方法中的所有主题创建重试主题和 DLT,以及相应的消费者。需要 KafkaTemplate 实例用于消息转发。

为了更精细地控制每个主题的非阻塞重试处理方式,可以提供多个 RetryTopicConfiguration bean。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3000)
            .maxAttempts(5)
            .concurrency(1)
            .includeTopics(List.of("my-topic", "my-other-topic"))
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1000, 2, 5000)
            .maxAttempts(4)
            .excludeTopics(List.of("my-topic", "my-other-topic"))
            .retryOn(MyException.class)
            .create(template);
}
重试主题和 DLT 的消费者将被分配到一个消费者组,其组 ID 是您在 @KafkaListener 注解的 groupId 参数中提供的值与主题后缀的组合。如果您不提供任何值,它们都将属于同一个组,并且重试主题上的重新平衡将导致主主题上不必要的重新平衡。
如果消费者配置了 ErrorHandlingDeserializer 来处理反序列化异常,那么配置 KafkaTemplate 及其生产者使用能够同时处理普通对象和原始 byte[] 值(由反序列化异常引起)的序列化器非常重要。模板的通用值类型应为 Object。一种技术是使用 DelegatingByTypeSerializer;示例如下
@Bean
public ProducerFactory<String, Object> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
        new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
               MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}
多个 @KafkaListener 注解可用于同一个主题,无论是否手动分区分配以及非阻塞重试,但对于给定主题只会使用一个配置。最好为这些主题的配置使用单个 RetryTopicConfiguration bean;如果同一个主题使用了多个 @RetryableTopic 注解,则它们都应具有相同的值,否则其中一个将应用于该主题的所有监听器,而其他注解的值将被忽略。

配置全局设置和功能

自 2.9 版本起,用于配置组件的旧的 bean 覆盖方法已被移除(由于前面提到的 API 的实验性,未弃用)。这不会改变 RetryTopicConfiguration bean 方法 - 只会改变基础设施组件的配置。现在,RetryTopicConfigurationSupport 类应该在(单个)@Configuration 类中扩展,并覆盖适当的方法。示例如下

@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {

    @Override
    protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
        blockingRetries
                .retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
                .backOff(new FixedBackOff(3000, 3));
    }

    @Override
    protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
        nonBlockingFatalExceptions.add(MyNonBlockingException.class);
    }

    @Override
    protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
        // Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
        customizersConfigurer.customizeErrorHandler(eh -> {
            eh.setSeekAfterError(false);
        });
    }

}
使用此配置方法时,不应使用 @EnableKafkaRetryTopic 注解,以防止因重复的 bean 导致上下文启动失败。请改用简单的 @EnableKafka 注解。

autoCreateTopics 为 true 时,主主题和重试主题将使用指定的分区数和复制因子创建。从 3.0 版本开始,默认复制因子为 -1,表示使用 broker 默认值。如果您的 broker 版本早于 2.4,则需要设置一个显式值。要覆盖特定主题(例如主主题或 DLT)的这些值,只需添加一个具有所需属性的 NewTopic @Bean;这将覆盖自动创建属性。

默认情况下,记录使用接收记录的原始分区发布到重试主题。如果重试主题的分区少于主主题,您应该适当配置框架;示例如下。
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {

    @Override
    protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
        return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
    }

    ...

}

函数的参数是消费者记录和下一个主题的名称。您可以返回特定的分区号,或返回 null 以指示 KafkaProducer 应确定分区。

默认情况下,当记录在重试主题之间转换时,重试头的所有值(尝试次数、时间戳)都会保留。从 2.9.6 版本开始,如果您只想保留这些头的最后一个值,请使用上面显示的 configureDeadLetterPublishingContainerFactory() 方法将工厂的 retainAllRetryHeaderValues 属性设置为 false

查找 RetryTopicConfiguration

尝试通过从 @RetryableTopic 注解创建实例或在没有可用注解时从 bean 容器创建实例来提供 RetryTopicConfiguration 的实例。

如果在容器中找到 bean,则会进行检查以确定所提供的主题是否应由任何此类实例处理。

如果提供了 @RetryableTopic 注解,则会查找 DltHandler 注解方法。

自 3.2 起,提供新的 API 以在类上使用 @RetryableTopic 注解时创建 RetryTopicConfiguration

@Bean
public RetryTopicConfiguration myRetryTopic() {
    RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
    return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}

@RetryableTopic
public static class AnnotatedClass {
    // NoOps
}
© . This site is unofficial and not affiliated with VMware.