配置
从 2.9 版本开始,对于默认配置,应在 @Configuration
注解的类中使用 @EnableKafkaRetryTopic
注解。这使得该功能能够正确启动,并允许注入该功能的一些组件以便在运行时查找。
如果您添加此注解,则无需再添加 @EnableKafka ,因为 @EnableKafkaRetryTopic 是通过元注解方式包含 @EnableKafka 的。 |
此外,从该版本开始,为了对该功能的组件和全局特性进行更高级的配置,应在 @Configuration
类中继承 RetryTopicConfigurationSupport
类,并重写相应的方法。更多详情请参考 配置全局设置和特性。
默认情况下,重试 topic 的容器与主容器具有相同的并发度。从 3.0 版本开始,您可以为重试容器设置不同的 concurrency
(可以在注解上设置,也可以在 RetryTopicConfigurationBuilder
中设置)。
只能使用上述技术中的一种,并且只能有一个 @Configuration 类继承 RetryTopicConfigurationSupport 。 |
使用 @RetryableTopic
注解
要为 @KafkaListener
注解的方法配置重试 topic 和 DLT,只需在该方法上添加 @RetryableTopic
注解,Spring for Apache Kafka 将会使用默认配置启动所有必要的 topic 和消费者。
@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
// ... message processing
}
从 3.2 开始,对类上 @KafkaListener
的 @RetryableTopic
支持为
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {
@KafkaHandler
public void processMessage(MyPojo message) {
// ... message processing
}
}
您可以在同一个类中指定一个方法来处理 DLT 消息,只需使用 @DltHandler
注解标记该方法。如果没有提供 DltHandler 方法,则会创建一个默认消费者,该消费者仅记录消费情况。
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
如果您未指定 kafkaTemplate 名称,将查找名称为 defaultRetryTopicKafkaTemplate 的 bean。如果未找到 bean,则会抛出异常。 |
从 3.0 版本开始,@RetryableTopic
注解可以用作自定义注解的元注解;例如
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {
@AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
String parallelism() default "3";
}
使用 RetryTopicConfiguration
bean
您还可以通过在 @Configuration
注解的类中创建 RetryTopicConfiguration
bean 来配置非阻塞重试支持。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.create(template);
}
这将为所有使用 @KafkaListener
注解的方法中的 topic 创建重试 topic 和一个 DLT,以及相应的消费者,并使用默认配置。KafkaTemplate
实例是消息转发所必需的。
为了更精细地控制如何处理每个 topic 的非阻塞重试,可以提供多个 RetryTopicConfiguration
bean。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3000)
.maxAttempts(5)
.concurrency(1)
.includeTopics(List.of("my-topic", "my-other-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 5000)
.maxAttempts(4)
.excludeTopics(List.of("my-topic", "my-other-topic"))
.retryOn(MyException.class)
.create(template);
}
重试 topic 和 DLT 的消费者将被分配到一个消费者组,其 group id 是您在 @KafkaListener 注解的 groupId 参数中提供的值与 topic 后缀的组合。如果您未提供任何值,它们将都属于同一个组,并且重试 topic 上的重新平衡将导致主 topic 上不必要的重新平衡。 |
如果消费者配置了 ErrorHandlingDeserializer 来处理反序列化异常,那么配置 KafkaTemplate 及其生产者使用一个能够处理普通对象以及由反序列化异常产生的原始 byte[] 值的序列化器是很重要的。模板的泛型值类型应该是 Object 。一种技术是使用 DelegatingByTypeSerializer ;示例如下 |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
可以在同一个 topic 上使用多个 @KafkaListener 注解,无论是否手动分配分区,并且可以结合非阻塞重试,但对于给定的 topic 只会使用一个配置。最好使用单个 RetryTopicConfiguration bean 来配置此类 topic;如果在同一个 topic 上使用了多个 @RetryableTopic 注解,它们都应该具有相同的值,否则其中一个将被应用于该 topic 的所有监听器,而其他注解的值将被忽略。 |
配置全局设置和特性
自 2.9 起,已移除了之前通过 bean 覆盖来配置组件的方法(由于前述 API 的实验性,此更改未进行废弃提示)。这不会改变使用 RetryTopicConfiguration
bean 的方法,仅影响基础设施组件的配置。现在,应在一个(单个)@Configuration
类中继承 RetryTopicConfigurationSupport
类,并重写相应的方法。示例如下
@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
.backOff(new FixedBackOff(3000, 3));
}
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
// Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
customizersConfigurer.customizeErrorHandler(eh -> {
eh.setSeekAfterError(false);
});
}
}
当使用这种配置方法时,不应使用 @EnableKafkaRetryTopic 注解,以防止因 bean 重复导致上下文启动失败。请改为使用简单的 @EnableKafka 注解。 |
当 autoCreateTopics
为 true 时,主 topic 和重试 topic 将使用指定的分区数和副本因子创建。从 3.0 版本开始,默认的副本因子为 -1
,表示使用 broker 的默认设置。如果您的 broker 版本早于 2.4,则需要设置一个明确的值。要覆盖特定 topic(例如主 topic 或 DLT)的这些值,只需添加一个带有所需属性的 NewTopic
@Bean
;这将覆盖自动创建属性。
默认情况下,记录使用接收到的原始记录的分区发布到重试 topic。如果重试 topic 的分区少于主 topic,您应该适当地配置框架;示例如下。 |
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {
@Override
protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
}
...
}
函数的参数是消费者记录和下一个 topic 的名称。您可以返回一个特定的分区号,或者返回 null
表示应由 KafkaProducer
决定分区。
默认情况下,当记录通过重试 topic 转换时,重试头(尝试次数、时间戳)的所有值都会被保留。从 2.9.6 版本开始,如果您只想保留这些头的最后一个值,请使用上面所示的 configureDeadLetterPublishingContainerFactory()
方法将工厂的 retainAllRetryHeaderValues
属性设置为 false
。
查找 RetryTopicConfiguration
尝试提供 RetryTopicConfiguration
的实例,可以通过 @RetryableTopic
注解创建,或者在没有注解时从 bean 容器中获取。
如果在容器中找到了 bean,会进行检查以确定提供的 topic 是否应该由任何此类实例处理。
如果提供了 @RetryableTopic
注解,则会查找带有 @DltHandler
注解的方法。
从 3.2 开始,提供了新的 API 来在 @RetryableTopic
注解在类上时创建 RetryTopicConfiguration
@Bean
public RetryTopicConfiguration myRetryTopic() {
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}
@RetryableTopic
public static class AnnotatedClass {
// NoOps
}