异常处理
本节描述了在使用 Spring for Apache Kafka 时如何处理可能出现的各种异常。
监听器错误处理器
从 2.0 版本开始,@KafkaListener 注解新增了一个属性:errorHandler。
您可以使用 errorHandler 来提供 KafkaListenerErrorHandler 实现的 bean 名称。此函数式接口有一个方法,如下所示:
@FunctionalInterface
public interface KafkaListenerErrorHandler {
Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;
}
您可以访问消息转换器生成的 spring-messaging Message<?> 对象以及监听器抛出的异常,该异常封装在 ListenerExecutionFailedException 中。错误处理器可以抛出原始异常或新异常,该异常会被抛给容器。错误处理器返回的任何内容都会被忽略。
从 2.7 版本开始,您可以在 MessagingMessageConverter 和 BatchMessagingMessageConverter 上设置 rawRecordHeader 属性,这会导致原始的 ConsumerRecord 被添加到转换后的 Message<?> 的 KafkaHeaders.RAW_DATA 标头中。这很有用,例如,如果您希望在监听器错误处理器中使用 DeadLetterPublishingRecoverer。它可以在请求/回复场景中使用,在该场景中,您希望在多次重试后,将失败结果发送给发送者,并将失败的记录捕获到死信主题中。
@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
return "FAILED";
}
throw ex;
};
}
它有一个子接口(ConsumerAwareListenerErrorHandler),通过以下方法可以访问消费者对象:
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
另一个子接口(ManualAckListenerErrorHandler)在使用手动 AckMode 时提供对 Acknowledgment 对象的访问。
Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer, @Nullable Acknowledgment ack);
在这两种情况下,您都不应该在消费者上执行任何寻址操作,因为容器将无法感知它们。
容器错误处理器
从 2.8 版本开始,旧的 ErrorHandler 和 BatchErrorHandler 接口已被新的 CommonErrorHandler 取代。这些错误处理器可以处理记录和批处理监听器的错误,允许单个监听器容器工厂为两种类型的监听器创建容器。提供了 CommonErrorHandler 的实现以替换大多数旧框架错误处理器实现。
有关将自定义旧错误处理器迁移到 CommonErrorHandler 的信息,请参见 将自定义旧错误处理器实现迁移到 CommonErrorHandler。
当使用事务时,默认情况下不配置错误处理器,以便异常会回滚事务。事务容器的错误处理由 AfterRollbackProcessor 处理。如果在使用事务时提供自定义错误处理器,如果您希望回滚事务,它必须抛出异常。
该接口有一个默认方法 isAckAfterHandle(),容器调用它来确定如果错误处理器返回而不抛出异常,是否应提交偏移量;它默认返回 true。
通常,框架提供的错误处理器在错误未“处理”(例如,执行寻址操作后)时会抛出异常。默认情况下,此类异常由容器以 ERROR 级别记录。所有框架错误处理器都扩展了 KafkaExceptionLogLevelAware,它允许您控制这些异常的日志级别。
/**
* Set the level at which the exception thrown by this handler is logged.
* @param logLevel the level (default ERROR).
*/
public void setLogLevel(KafkaException.Level logLevel) {
...
}
您可以指定一个全局错误处理器,用于容器工厂中的所有监听器。以下示例演示了如何实现:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setCommonErrorHandler(myErrorHandler);
...
return factory;
}
默认情况下,如果带注解的监听器方法抛出异常,它会被抛给容器,并且消息将根据容器配置进行处理。
容器在调用错误处理器之前提交任何挂起的偏移量提交。
如果您正在使用 Spring Boot,您只需将错误处理器添加为 @Bean,Boot 将其添加到自动配置的工厂中。
退避处理器
诸如 DefaultErrorHandler 之类的错误处理器使用 BackOff 来确定在重试传递之前等待多长时间。从 2.9 版本开始,您可以配置自定义的 BackOffHandler。默认处理器只是暂停线程,直到退避时间过去(或容器停止)。框架还提供了 ContainerPausingBackOffHandler,它会暂停监听器容器,直到退避时间过去,然后恢复容器。当延迟时间长于 max.poll.interval.ms 消费者属性时,这很有用。请注意,实际退避时间的解析将受到 pollTimeout 容器属性的影响。
DefaultErrorHandler
这个新的错误处理器取代了 SeekToCurrentErrorHandler 和 RecoveringBatchErrorHandler,它们已经作为默认错误处理器存在了几个版本。一个不同之处是,对于批处理监听器(当抛出 BatchListenerFailedException 以外的异常时)的备用行为相当于 重试完整批次。
从 2.9 版本开始,可以配置 DefaultErrorHandler 以提供与下面讨论的寻求未处理记录偏移量相同的语义,但实际上不进行寻求。相反,记录由监听器容器保留,并在错误处理器退出后(以及在执行一次暂停的 poll() 以保持消费者活动后;如果正在使用 非阻塞重试 或 ContainerPausingBackOffHandler,暂停可能会持续多次轮询)重新提交给监听器。错误处理器向容器返回一个结果,指示当前失败的记录是否可以重新提交,或者它是否已恢复,然后将不再发送给监听器。要启用此模式,请将属性 seekAfterError 设置为 false。 |
错误处理器可以恢复(跳过)持续失败的记录。默认情况下,在十次失败后,失败的记录会被记录(以 ERROR 级别)。您可以使用自定义的恢复器(BiConsumer)和控制传递尝试和每次尝试之间延迟的 BackOff 来配置处理器。使用 FixedBackOff 和 FixedBackOff.UNLIMITED_ATTEMPTS 会导致(实际上)无限重试。以下示例配置了三次尝试后的恢复:
DefaultErrorHandler errorHandler =
new DefaultErrorHandler((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
要为监听器容器配置此处理器的自定义实例,请将其添加到容器工厂。
例如,对于 @KafkaListener 容器工厂,您可以按如下方式添加 DefaultErrorHandler:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
return factory;
}
对于记录监听器,这将重试传递最多 2 次(3 次传递尝试),退避时间为 1 秒,而不是默认配置(FixedBackOff(0L, 9))。重试耗尽后,失败只会记录日志。
例如,如果 poll 返回六条记录(分区 0、1、2 各两条),并且监听器在第四条记录上抛出异常,则容器通过提交其偏移量来确认前三条消息。DefaultErrorHandler 寻求分区 1 的偏移量 1 和分区 2 的偏移量 0。下一次 poll() 返回三条未处理的记录。
如果 AckMode 是 BATCH,则容器在调用错误处理器之前提交前两个分区的偏移量。
对于批处理监听器,监听器必须抛出 BatchListenerFailedException,指示批处理中的哪些记录失败。
事件序列是:
-
提交索引之前的记录的偏移量。
-
如果重试未耗尽,则执行寻求操作,以便所有剩余记录(包括失败记录)将重新传递。
-
如果重试耗尽,尝试恢复失败记录(默认只记录日志),并执行寻求操作,以便剩余记录(不包括失败记录)将重新传递。已恢复记录的偏移量将被提交。
-
如果重试耗尽且恢复失败,则像重试未耗尽一样执行寻求操作。
从 2.9 版本开始,可以配置 DefaultErrorHandler 以提供与上面讨论的寻求未处理记录偏移量相同的语义,但实际上不进行寻求。相反,错误处理器创建一个新的 ConsumerRecords<?, ?>,其中只包含未处理的记录,然后将其提交给监听器(在执行一次暂停的 poll() 以保持消费者活动后)。要启用此模式,请将属性 seekAfterError 设置为 false。 |
默认的恢复器在重试耗尽后记录失败的记录。您可以使用自定义恢复器,或框架提供的恢复器,例如 DeadLetterPublishingRecoverer。
当使用 POJO 批处理监听器(例如 List<Thing>),并且您没有完整的消费者记录要添加到异常中时,您只需添加失败记录的索引:
@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
for (int i = 0; i < things.size(); i++) {
try {
process(things.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", i);
}
}
}
当容器配置为 AckMode.MANUAL_IMMEDIATE 时,可以将错误处理器配置为提交已恢复记录的偏移量;将 commitRecovered 属性设置为 true。
另请参见 发布死信记录。
当使用事务时,DefaultAfterRollbackProcessor 提供了类似的功能。请参见 回滚后处理器。
DefaultErrorHandler 认为某些异常是致命的,并跳过这些异常的重试;恢复器在第一次失败时被调用。默认情况下被认为是致命的异常有:
-
DeserializationException -
MessageConversionException -
ConversionException -
MethodArgumentResolutionException -
NoSuchMethodException -
ClassCastException
因为这些异常不太可能在重试传递时得到解决。
您可以将更多异常类型添加到不可重试类别中,或者完全替换分类异常的映射。有关更多信息,请参见 DefaultErrorHandler.addNotRetryableException() 和 DefaultErrorHandler.setClassifications() 的 Javadoc,以及 ExceptionMatcher。
这是一个将 IllegalArgumentException 添加到不可重试异常的示例:
@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
handler.addNotRetryableExceptions(IllegalArgumentException.class);
return handler;
}
DefaultErrorHandler 只处理继承自 RuntimeException 的异常。继承自 Error 的异常会完全绕过错误处理器,导致消费者立即终止,关闭 Kafka 连接,并跳过所有重试/恢复机制。这种关键的区别意味着应用程序可能会报告健康状态,尽管已终止的消费者不再处理消息。始终确保消息处理代码中抛出的异常明确地扩展自 RuntimeException 而不是 Error,以允许正确的错误处理。换句话说,如果应用程序抛出异常,请确保它扩展自 RuntimeException,而不是无意中继承自 Error。像 OutOfMemoryError、IllegalAccessError 和应用程序无法控制的其他错误仍然被视为 Error,并且不会重试。 |
错误处理器可以配置一个或多个 RetryListener,接收重试和恢复进度的通知。从 2.8.10 版本开始,添加了批处理监听器的方法。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
}
default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
}
default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
}
}
有关更多信息,请参见 JavaDocs。
如果恢复器失败(抛出异常),失败的记录将包含在寻址操作中。如果恢复器失败,默认情况下 BackOff 将被重置,并且重新传递将再次通过退避操作,然后再次尝试恢复。要在恢复失败后跳过重试,请将错误处理器的 resetStateOnRecoveryFailure 设置为 false。 |
您可以为错误处理器提供一个 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,以便根据失败的记录和/或异常来确定要使用的 BackOff:
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回 null,则将使用处理器的默认 BackOff。
将 resetStateOnExceptionChange 设置为 true,如果异常类型在失败之间发生变化,则重试序列将重新启动(包括选择一个新的 BackOff,如果已配置)。当为 false(2.9 版本之前的默认值)时,不考虑异常类型。
从 2.9 版本开始,这现在默认为 true。
另请参见 传递尝试头。
批处理监听器与死信主题的错误处理
非阻塞重试 (@RetryableTopic 注解) 不支持批处理监听器。对于批处理监听器与死信主题功能,请使用 DefaultErrorHandler 和 DeadLetterPublishingRecoverer。 |
使用 BatchListenerFailedException
要指示批次中哪个特定记录失败,请抛出 BatchListenerFailedException:
@KafkaListener(id = "batch-listener", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<String, Order>> records) {
for (ConsumerRecord<String, Order> record : records) {
try {
process(record.value());
}
catch (Exception e) {
// Identifies the failed record for error handling
throw new BatchListenerFailedException("Failed to process", e, record);
}
}
}
对于没有 ConsumerRecord 的 POJO 批处理监听器,请改用索引:
@KafkaListener(id = "batch-listener", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<Order> orders) {
for (int i = 0; i < orders.size(); i++) {
try {
process(orders.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", e, i);
}
}
}
为批处理监听器配置死信主题
在您的批处理监听器容器工厂上配置一个带有 DeadLetterPublishingRecoverer 的 DefaultErrorHandler:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> batchFactory(
ConsumerFactory<String, Order> consumerFactory,
KafkaTemplate<String, Order> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, Order> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
// Configure Dead Letter Publishing
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(record.topic() + "-dlt", record.partition()));
// Configure retries: 3 attempts with 1 second between each
DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer,
new FixedBackOff(1000L, 2L)); // 2 retries = 3 total attempts
factory.setCommonErrorHandler(errorHandler);
return factory;
}
批处理错误处理的工作原理
当抛出 BatchListenerFailedException 时,DefaultErrorHandler 会:
-
提交失败记录之前所有记录的偏移量
-
根据
BackOff配置重试失败记录(以及后续记录) -
当重试耗尽时,发布到 DLT - 只有失败的记录被发送到 DLT
-
提交失败记录的偏移量并重新传递剩余记录进行处理
批处理包含 6 条记录,其中索引为 2 的记录失败的示例流程:
-
第一次尝试:记录 0、1 成功处理;记录 2 失败
-
容器提交记录 0、1 的偏移量
-
重试尝试 1:记录 2、3、4、5 被重试
-
重试尝试 2:记录 2、3、4、5 再次被重试
-
重试耗尽后:记录 2 发布到 DLT 并提交其偏移量
-
容器继续处理记录 3、4、5
跳过特定异常的重试
默认情况下,DefaultErrorHandler 会重试所有异常,除了致命异常(如 DeserializationException、MessageConversionException 等)。要跳过您自己的异常类型的重试,请使用异常分类配置错误处理器。
错误处理器检查 BatchListenerFailedException 的原因以确定是否应跳过重试:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> batchFactory(
ConsumerFactory<String, Order> consumerFactory,
KafkaTemplate<String, Order> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, Order> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer,
new FixedBackOff(1000L, 2L));
// Add custom exception types that should skip retries and go directly to DLT
errorHandler.addNotRetryableExceptions(ValidationException.class, InvalidFormatException.class);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
现在在您的监听器中:
@KafkaListener(id = "batch-listener", topics = "orders", containerFactory = "batchFactory")
public void processOrders(List<ConsumerRecord<String, Order>> records) {
for (ConsumerRecord<String, Order> record : records) {
try {
process(record.value());
}
catch (DatabaseException e) {
// Will be retried 3 times (according to BackOff configuration)
throw new BatchListenerFailedException("Database error", e, record);
}
catch (ValidationException e) {
// Skips retries - goes directly to DLT
// (because ValidationException is configured as not retryable)
throw new BatchListenerFailedException("Validation failed", e, record);
}
}
}
错误处理器检查 BatchListenerFailedException 的原因(第二个参数)。如果原因被归类为不可重试,则记录会立即发送到 DLT,而不进行重试。 |
偏移量提交行为
了解偏移量提交对于批处理错误处理很重要:
-
AckMode.BATCH(批处理监听器最常见)
-
失败记录之前的偏移量在错误处理之前提交
-
失败记录的偏移量在成功恢复(DLT 发布)后提交
-
-
AckMode.MANUAL_IMMEDIATE:
-
设置
errorHandler.setCommitRecovered(true)以提交已恢复记录的偏移量 -
您在监听器中控制确认时机
-
手动确认示例
@KafkaListener(id = "manual-batch", topics = "myTopic", containerFactory = "manualBatchFactory")
public void listen(List<ConsumerRecord<String, Order>> records, Acknowledgment ack) {
for (ConsumerRecord<String, Order> record : records) {
try {
process(record.value());
}
catch (Exception e) {
throw new BatchListenerFailedException("Processing failed", e, record);
}
}
ack.acknowledge();
}
批处理错误处理器中的转换错误
从 2.8 版本开始,批处理监听器现在可以正确处理转换错误,当使用带有 ByteArrayDeserializer、BytesDeserializer 或 StringDeserializer 的 MessageConverter 和 DefaultErrorHandler 时。当发生转换错误时,有效负载被设置为 null,并且反序列化异常被添加到记录头中,类似于 ErrorHandlingDeserializer。监听器中会提供一个 ConversionException 列表,因此监听器可以抛出 BatchListenerFailedException,指示发生转换异常的第一个索引。
示例
@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
for (int i = 0; i < in.size(); i++) {
Foo foo = in.get(i);
if (foo == null && exceptions.get(i) != null) {
throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
}
process(foo);
}
}
批处理监听器中的反序列化错误
批处理监听器需要手动处理反序列化错误。与记录监听器不同,没有自动错误处理器可以检测并将反序列化失败路由到 DLT。您必须显式检查失败的记录并抛出 BatchListenerFailedException。 |
使用 ErrorHandlingDeserializer 可防止反序列化失败停止整个批次
@Bean
public ConsumerFactory<String, Order> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// Wrap your deserializer with ErrorHandlingDeserializer
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
return new DefaultKafkaConsumerFactory<>(props);
}
在您的监听器中,您必须手动检查 null 值,这些值表示反序列化失败
@KafkaListener(id = "batch-deser", topics = "orders", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<String, Order>> records) {
for (ConsumerRecord<String, Order> record : records) {
if (record.value() == null) {
// Deserialization failed - throw exception to send to DLT
throw new BatchListenerFailedException("Deserialization failed", record);
}
process(record.value());
}
}
当 DeadLetterPublishingRecoverer 将反序列化失败发布到 DLT 时
-
无法反序列化的原始
byte[]数据将作为记录值恢复 -
异常信息(类名、消息、堆栈跟踪)将添加到标准 DLT 异常头中
-
原始的
ErrorHandlingDeserializer异常头默认会被移除(设置 recoverer 的setRetainExceptionHeader(true)以保留它)
重试完整批次
对于批处理监听器,当监听器抛出 BatchListenerFailedException 以外的异常时,这现在是 DefaultErrorHandler 的备用行为。
无法保证当批次重新传递时,批次具有相同数量的记录和/或重新传递的记录顺序相同。因此,不可能轻易地维护批次的重试状态。FallbackBatchErrorHandler 采用以下方法。如果批处理监听器抛出不是 BatchListenerFailedException 的异常,则从内存中的记录批次执行重试。为了避免在扩展重试序列期间发生再平衡,错误处理器会暂停消费者,在每次重试时,在退避休眠之前对其进行轮询,并再次调用监听器。如果/当重试耗尽时,会为批次中的每条记录调用 ConsumerRecordRecoverer。如果恢复器抛出异常,或者线程在其休眠期间被中断,则记录批次将在下一次轮询时重新传递。在退出之前,无论结果如何,消费者都会恢复。
| 此机制不能与事务一起使用。 |
在等待 BackOff 间隔时,错误处理器将以短时休眠循环,直到达到所需的延迟,同时检查容器是否已停止,从而允许休眠在 stop() 后立即退出,而不是导致延迟。
容器停止错误处理器
如果监听器抛出异常,CommonContainerStoppingErrorHandler 将停止容器。对于记录监听器,当 AckMode 为 RECORD 时,已处理记录的偏移量将提交。对于记录监听器,当 AckMode 为任何手动值时,已确认记录的偏移量将提交。对于记录监听器,当 AckMode 为 BATCH 时,或对于批处理监听器,当容器重新启动时,整个批次将重播。
容器停止后,会抛出一个包装 ListenerExecutionFailedException 的异常。这是为了导致事务回滚(如果启用了事务)。
委派错误处理器
CommonDelegatingErrorHandler 可以根据异常类型委托给不同的错误处理器。例如,您可能希望为大多数异常调用 DefaultErrorHandler,或者为其他异常调用 CommonContainerStoppingErrorHandler。
所有委托必须共享相同的兼容属性(ackAfterHandle、seekAfterError…)。
通用错误处理器摘要
-
DefaultErrorHandler -
CommonContainerStoppingErrorHandler -
CommonDelegatingErrorHandler -
CommonLoggingErrorHandler -
CommonMixedErrorHandler
旧版错误处理器及其替代品
| 旧版错误处理器 | 替代 |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
无替代品,使用具有无限 |
|
|
|
无替代品,使用 |
将自定义旧版错误处理器实现迁移到 CommonErrorHandler
请参阅 CommonErrorHandler 中的 JavaDocs。
要替换 ErrorHandler 或 ConsumerAwareErrorHandler 实现,您应该实现 handleOne() 并让 seeksAfterHandle() 返回 false(默认)。您还应该实现 handleOtherException() 来处理在记录处理范围之外发生的异常(例如消费者错误)。
要替换 RemainingRecordsErrorHandler 实现,您应该实现 handleRemaining() 并重写 seeksAfterHandle() 以返回 true(错误处理器必须执行必要的寻求)。您还应该实现 handleOtherException() - 以处理在记录处理范围之外发生的异常(例如消费者错误)。
要替换任何 BatchErrorHandler 实现,您应该实现 handleBatch()。您还应该实现 handleOtherException() - 以处理在记录处理范围之外发生的异常(例如消费者错误)。
回滚后处理器
使用事务时,如果监听器抛出异常(并且错误处理器,如果存在,也抛出异常),事务将回滚。默认情况下,任何未处理的记录(包括失败的记录)将在下一次轮询时重新获取。这是通过在 DefaultAfterRollbackProcessor 中执行 seek 操作来实现的。对于批处理监听器,整个批处理记录都会被重新处理(容器不知道批处理中的哪个记录失败)。要修改此行为,您可以使用自定义 AfterRollbackProcessor 配置监听器容器。例如,对于基于记录的监听器,您可能希望跟踪失败的记录并在尝试一定次数后放弃,也许通过将其发布到死信主题。
从 2.2 版本开始,DefaultAfterRollbackProcessor 现在可以恢复(跳过)持续失败的记录。默认情况下,在十次失败后,失败的记录会被记录(以 ERROR 级别)。您可以配置处理器,使其具有自定义的恢复器(BiConsumer)和最大失败次数。将 maxFailures 属性设置为负数会导致无限重试。以下示例配置了三次尝试后的恢复:
AfterRollbackProcessor<String, String> processor =
new DefaultAfterRollbackProcessor((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
当您不使用事务时,可以通过配置 DefaultErrorHandler 来实现类似的功能。请参阅 容器错误处理器。
从 3.2 版本开始,恢复现在可以恢复(跳过)持续失败的整个批处理记录。将 ContainerProperties.setBatchRecoverAfterRollback(true) 设置为 true 以启用此功能。
| 默认行为是,批处理监听器无法进行恢复,因为框架不知道批处理中哪个记录持续失败。在这种情况下,应用程序监听器必须处理持续失败的记录。 |
另请参见 发布死信记录。
从 2.2.5 版本开始,DefaultAfterRollbackProcessor 可以在新事务中调用(在失败事务回滚后启动)。然后,如果您正在使用 DeadLetterPublishingRecoverer 发布失败记录,处理器将在原始主题/分区中将已恢复记录的偏移量发送到事务。要启用此功能,请在 DefaultAfterRollbackProcessor 上设置 commitRecovered 和 kafkaTemplate 属性。
如果恢复器失败(抛出异常),失败的记录将包含在寻址操作中。从 2.5.5 版本开始,如果恢复器失败,默认情况下 BackOff 将被重置,并且重新传递将再次通过退避操作,然后再次尝试恢复。在早期版本中,BackOff 未被重置,并在下次失败时重新尝试恢复。要恢复到以前的行为,请将处理器的 resetStateOnRecoveryFailure 属性设置为 false。 |
从 2.6 版本开始,您现在可以为处理器提供一个 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,以便根据失败的记录和/或异常来确定要使用的 BackOff:
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回 null,则将使用处理器的默认 BackOff。
从 2.6.3 版本开始,将 resetStateOnExceptionChange 设置为 true,如果异常类型在失败之间发生变化,则重试序列将重新启动(包括选择一个新的 BackOff,如果已配置)。默认情况下,不考虑异常类型。
从 2.3.1 版本开始,类似于 DefaultErrorHandler,DefaultAfterRollbackProcessor 认为某些异常是致命的,并跳过这些异常的重试;恢复器在第一次失败时被调用。默认情况下被认为是致命的异常有:
-
DeserializationException -
MessageConversionException -
ConversionException -
MethodArgumentResolutionException -
NoSuchMethodException -
ClassCastException
因为这些异常不太可能在重试传递时得到解决。
您可以将更多异常类型添加到不可重试类别中,或者完全替换分类异常的映射。有关更多信息,请参见 DefaultAfterRollbackProcessor.setClassifications() 的 Javadoc,以及 ExceptionMatcher。
这是一个将 IllegalArgumentException 添加到不可重试异常的示例:
@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
processor.addNotRetryableException(IllegalArgumentException.class);
return processor;
}
另请参见 传递尝试头。
使用当前的 kafka-clients,容器无法检测到 ProducerFencedException 是由再平衡引起的,还是由于超时或过期导致生产者的 transactional.id 被撤销。因为在大多数情况下,它是由再平衡引起的,容器不会调用 AfterRollbackProcessor(因为它不适合寻求分区,因为我们不再分配它们)。如果您确保超时足够大以处理每个事务并定期执行“空”事务(例如通过 ListenerContainerIdleEvent),您可以避免因超时和过期而导致的隔离。或者,您可以将 stopContainerWhenFenced 容器属性设置为 true,容器将停止,避免记录丢失。您可以消费 ConsumerStoppedEvent 并检查 Reason 属性的 FENCED 以检测此情况。由于事件还具有对容器的引用,您可以使用此事件重新启动容器。 |
从 2.7 版本开始,在等待 BackOff 间隔时,错误处理器将以短时休眠循环,直到达到所需的延迟,同时检查容器是否已停止,从而允许休眠在 stop() 后立即退出,而不是导致延迟。
从 2.7 版本开始,处理器可以配置一个或多个 RetryListener,接收重试和恢复进度的通知。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
}
有关更多信息,请参见 JavaDocs。
传递尝试头
以下仅适用于记录监听器,不适用于批处理监听器。
从 2.5 版本开始,当使用实现了 DeliveryAttemptAware 的 ErrorHandler 或 AfterRollbackProcessor 时,可以启用添加 KafkaHeaders.DELIVERY_ATTEMPT 头(kafka_deliveryAttempt)到记录中。此头的值是一个从 1 开始递增的整数。当接收原始 ConsumerRecord<?, ?> 时,整数位于 byte[4] 中。
int delivery = ByteBuffer.wrap(record.headers()
.lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
.getInt();
当使用 @KafkaListener 和 JsonKafkaHeaderMapper 或 SimpleKafkaHeaderMapper 时,可以通过在监听器方法中添加 @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery 作为参数来获取。
要启用此标头的填充,请将容器属性 deliveryAttemptHeader 设置为 true。默认情况下禁用此功能,以避免查找每个记录状态和添加标头(少量)的开销。
DefaultErrorHandler 和 DefaultAfterRollbackProcessor 支持此功能。
批处理监听器的传递尝试头
当使用 BatchListener 处理 ConsumerRecord 时,KafkaHeaders.DELIVERY_ATTEMPT 头可能以与 SingleRecordListener 不同的方式存在。
从 3.3 版本开始,如果您想在使用 BatchListener 时将 KafkaHeaders.DELIVERY_ATTEMPT 头注入到 ConsumerRecord 中,请将 DeliveryAttemptAwareRetryListener 设置为 ErrorHandler 中的 RetryListener。
请参阅以下代码。
final FixedBackOff fixedBackOff = new FixedBackOff(1, 10);
final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);
然后,每当批处理未能完成时,DeliveryAttemptAwareRetryListener 将把 KafkaHeaders.DELIVERY_ATTMPT 头注入到 ConsumerRecord 中。
监听器信息头
在某些情况下,能够知道监听器正在哪个容器中运行是有用的。
从 2.8.4 版本开始,您现在可以设置监听器容器上的 listenerInfo 属性,或者在 @KafkaListener 注解上设置 info 属性。然后,容器会将此信息添加到所有传入消息的 KafkaListener.LISTENER_INFO 头中;它可以在记录拦截器、过滤器等中,或在监听器本身中使用。
@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
info = "this is the something listener")
public void listen(@Payload Thing thing,
@Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
...
}
当在 RecordInterceptor 或 RecordFilterStrategy 实现中使用时,头在消费者记录中作为字节数组,使用 KafkaListenerAnnotationBeanPostProcessor 的 charSet 属性进行转换。
头映射器在从消费者记录创建 MessageHeaders 时也会转换为 String,并且永远不会在出站记录上映射此头。
对于 POJO 批处理监听器,从 2.8.6 版本开始,头会复制到批处理的每个成员中,并且在转换后也作为单个 String 参数可用。
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
info = "info for batch")
public void listen(List<Thing> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.LISTENER_INFO) String info) {
...
}
如果批处理监听器有过滤器,并且过滤器导致空批处理,则需要向 @Header 参数添加 required = false,因为空批处理没有信息。 |
如果您收到 List<Message<Thing>>,则信息位于每个 Message<?> 的 KafkaHeaders.LISTENER_INFO 头中。
有关消费批次的更多信息,请参见 批处理监听器。
发布死信记录
当记录的失败次数达到最大值时,您可以为 DefaultErrorHandler 和 DefaultAfterRollbackProcessor 配置一个记录恢复器。框架提供了 DeadLetterPublishingRecoverer,它将失败的消息发布到另一个主题。恢复器需要一个 KafkaTemplate<Object, Object>,用于发送记录。您还可以选择配置一个 BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>,该函数用于解析目标主题和分区。
默认情况下,死信记录被发送到名为 <originalTopic>-dlt 的主题(原始主题名称后缀为 -dlt),并且发送到与原始记录相同的分区。因此,当您使用默认解析器时,死信主题必须至少具有与原始主题相同数量的分区。 |
如果返回的 TopicPartition 具有负分区,则不会在 ProducerRecord 中设置分区,因此分区由 Kafka 选择。从 2.2.4 版本开始,任何 ListenerExecutionFailedException(例如,当在 @KafkaListener 方法中检测到异常时抛出)都使用 groupId 属性进行了增强。这允许目标解析器除了 ConsumerRecord 中的信息外,还可以使用此信息来选择死信主题。
以下示例演示了如何连接自定义目标解析器:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
}
else {
return new TopicPartition(r.topic() + ".other.failures", r.partition());
}
});
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));
发送到死信主题的记录将添加以下标头:
-
KafkaHeaders.DLT_EXCEPTION_FQCN: 异常类名(通常是ListenerExecutionFailedException,但也可以是其他)。 -
KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN: 异常原因类名,如果存在(自 2.8 版本起)。 -
KafkaHeaders.DLT_EXCEPTION_STACKTRACE: 异常堆栈跟踪。 -
KafkaHeaders.DLT_EXCEPTION_MESSAGE: 异常消息。 -
KafkaHeaders.DLT_KEY_EXCEPTION_FQCN: 异常类名(仅限键反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE: 异常堆栈跟踪(仅限键反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE: 异常消息(仅限键反序列化错误)。 -
KafkaHeaders.DLT_ORIGINAL_TOPIC: 原始主题。 -
KafkaHeaders.DLT_ORIGINAL_PARTITION: 原始分区。 -
KafkaHeaders.DLT_ORIGINAL_OFFSET: 原始偏移量。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP: 原始时间戳。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE: 原始时间戳类型。 -
KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP: 原始消费者组,未能处理该记录(自 2.8 版本起)。
键异常仅由 DeserializationException 引起,因此没有 DLT_KEY_EXCEPTION_CAUSE_FQCN。
有两种机制可以添加更多标头。
-
子类化恢复器并重写
createProducerRecord()- 调用super.createProducerRecord()并添加更多标头。 -
提供一个
BiFunction来接收消费者记录和异常,返回一个Headers对象;其中的标头将被复制到最终的生产者记录;另请参阅 管理死信记录标头。使用setHeadersFunction()设置BiFunction。
第二种实现更简单,但第一种提供了更多可用信息,包括已组装的标准标头。
从 2.3 版本开始,当与 ErrorHandlingDeserializer 结合使用时,发布者会将死信生产者记录中的记录 value() 恢复为未能反序列化的原始值。以前,value() 为空,用户代码必须从消息头中解码 DeserializationException。此外,您可以向发布者提供多个 KafkaTemplate;例如,如果您想发布来自 DeserializationException 的 byte[] 以及使用与成功反序列化记录不同的序列化器的值,则可能需要这样做。这是一个配置发布者,使其具有使用 String 和 byte[] 序列化器的 KafkaTemplate 的示例:
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
KafkaTemplate<?, ?> bytesTemplate) {
Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>();
templates.put(String.class, stringTemplate);
templates.put(byte[].class, bytesTemplate);
return new DeadLetterPublishingRecoverer(templates);
}
发布者使用映射键来查找适合要发布的 value() 的模板。建议使用 LinkedHashMap,以便按顺序检查键。
当发布 null 值时,如果有多个模板,恢复器将查找 Void 类的模板;如果不存在,将使用 values().iterator() 中的第一个模板。
自 2.7 版本起,您可以使用 setFailIfSendResultIsError 方法,以便在消息发布失败时抛出异常。您还可以使用 setWaitForSendResultTimeout 设置发送成功验证的超时时间。
如果恢复器失败(抛出异常),失败的记录将包含在寻址操作中。从 2.5.5 版本开始,如果恢复器失败,默认情况下 BackOff 将被重置,并且重新传递将再次通过退避操作,然后再次尝试恢复。在早期版本中,BackOff 未被重置,并在下次失败时重新尝试恢复。要恢复到以前的行为,请将错误处理器的 resetStateOnRecoveryFailure 属性设置为 false。 |
从 2.6.3 版本开始,将 resetStateOnExceptionChange 设置为 true,如果异常类型在失败之间发生变化,则重试序列将重新启动(包括选择一个新的 BackOff,如果已配置)。默认情况下,不考虑异常类型。
从 2.3 版本开始,恢复器也可以与 Kafka Streams 一起使用 - 有关更多信息,请参见 从反序列化异常中恢复。
ErrorHandlingDeserializer 将反序列化异常添加到头 ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER 和 ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER 中(使用 Java 序列化)。默认情况下,这些头不会保留在发布到死信主题的消息中。从 2.7 版本开始,如果键和值都未能反序列化,则两者的原始值都将填充到发送到 DLT 的记录中。
如果传入的记录相互依赖,但可能乱序到达,则将失败的记录重新发布到原始主题的尾部(一定次数),而不是直接发送到死信主题可能很有用。请参见 此 Stack Overflow 问题 以获取示例。
以下错误处理器配置将完全实现这一点:
@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
(rec, ex) -> {
org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
if (retries == null) {
retries = new RecordHeader("retries", new byte[] { 1 });
rec.headers().add(retries);
}
else {
retries.value()[0]++;
}
return retries.value()[0] > 5
? new TopicPartition("topic-dlt", rec.partition())
: new TopicPartition("topic", rec.partition());
}), new FixedBackOff(0L, 0L));
}
从 2.7 版本开始,恢复器会检查目标解析器选择的分区是否存在。如果分区不存在,则 ProducerRecord 中的分区将设置为 null,允许 KafkaProducer 选择分区。您可以通过将 verifyPartition 属性设置为 false 来禁用此检查。
从 3.1 版本开始,将 logRecoveryRecord 属性设置为 true 将记录恢复记录和异常。
管理死信记录头
-
appendOriginalHeaders(默认true) -
stripPreviousExceptionHeaders(默认true,自 2.8 版本起)
Apache Kafka 支持具有相同名称的多个头;要获取“最新”值,可以使用 headers.lastHeader(headerName);要获取多个头的迭代器,可以使用 headers.headers(headerName).iterator()。
当重复发布失败的记录时,这些标头可能会增长(并最终因 RecordTooLargeException 导致发布失败);对于异常标头,特别是堆栈跟踪标头,更是如此。
这两个属性的原因是,虽然您可能只希望保留最新的异常信息,但您可能希望保留记录在每次失败时经过哪些主题的历史记录。
appendOriginalHeaders 应用于所有名为 ORIGINAL 的头,而 stripPreviousExceptionHeaders 应用于所有名为 EXCEPTION 的头。
从 2.8.4 版本开始,您现在可以控制哪些标准头将添加到输出记录中。请参阅 enum HeadersToAdd 以了解默认添加的(目前)10 个标准头的通用名称(这些不是实际的头名称,只是一个抽象;实际的头名称由 getHeaderNames() 方法设置,子类可以重写此方法)。
要排除头,请使用 excludeHeaders() 方法;例如,要禁止在头中添加异常堆栈跟踪,请使用:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);
此外,您可以通过添加 ExceptionHeadersCreator 来完全自定义异常头的添加;这也将禁用所有标准异常头。
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
kafkaHeaders.add(new RecordHeader(..., ...));
});
同样从 2.8.4 版本开始,您现在可以通过 addHeadersFunction 方法提供多个头函数。这允许应用额外的函数,即使已经注册了另一个函数,例如,在使用 非阻塞重试 时。
ExponentialBackOffWithMaxRetries 实现
Spring Framework 提供了许多 BackOff 实现。默认情况下,ExponentialBackOff 将无限期重试;要在一定次数的重试尝试后放弃,需要计算 maxElapsedTime。自 2.7.3 版本起,Spring for Apache Kafka 提供了 ExponentialBackOffWithMaxRetries,这是一个子类,它接收 maxRetries 属性并自动计算 maxElapsedTime,这更方便一些。
@Bean
DefaultErrorHandler handler() {
ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
bo.setInitialInterval(1_000L);
bo.setMultiplier(2.0);
bo.setMaxInterval(10_000L);
return new DefaultErrorHandler(myRecoverer, bo);
}
这将分别在 1、2、4、8、10、10 秒后重试,然后调用恢复器。