异常处理

本节描述了在使用 Spring for Apache Kafka 时如何处理可能出现的各种异常。

监听器错误处理器

从 2.0 版本开始,@KafkaListener 注解新增了一个属性:errorHandler

您可以使用 errorHandler 来提供 KafkaListenerErrorHandler 实现的 bean 名称。此函数式接口有一个方法,如下所示:

@FunctionalInterface
public interface KafkaListenerErrorHandler {

    Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;

}

您可以访问消息转换器生成的 spring-messaging Message<?> 对象以及监听器抛出的异常,该异常封装在 ListenerExecutionFailedException 中。错误处理器可以抛出原始异常或新异常,该异常会被抛给容器。错误处理器返回的任何内容都会被忽略。

从 2.7 版本开始,您可以在 MessagingMessageConverterBatchMessagingMessageConverter 上设置 rawRecordHeader 属性,这会导致原始的 ConsumerRecord 被添加到转换后的 Message<?>KafkaHeaders.RAW_DATA 标头中。这很有用,例如,如果您希望在监听器错误处理器中使用 DeadLetterPublishingRecoverer。它可以在请求/回复场景中使用,在该场景中,您希望在多次重试后,将失败结果发送给发送者,并将失败的记录捕获到死信主题中。

@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
    return (msg, ex) -> {
        if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
            recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
            return "FAILED";
        }
        throw ex;
    };
}

它有一个子接口(ConsumerAwareListenerErrorHandler),通过以下方法可以访问消费者对象:

Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);

另一个子接口(ManualAckListenerErrorHandler)在使用手动 AckMode 时提供对 Acknowledgment 对象的访问。

Object handleError(Message<?> message, ListenerExecutionFailedException exception,
			Consumer<?, ?> consumer, @Nullable Acknowledgment ack);

在这两种情况下,您都不应该在消费者上执行任何寻址操作,因为容器将无法感知它们。

容器错误处理器

从 2.8 版本开始,旧的 ErrorHandlerBatchErrorHandler 接口已被新的 CommonErrorHandler 取代。这些错误处理器可以处理记录和批处理监听器的错误,允许单个监听器容器工厂为两种类型的监听器创建容器。提供了 CommonErrorHandler 的实现以替换大多数旧框架错误处理器实现。

有关将自定义旧错误处理器迁移到 CommonErrorHandler 的信息,请参见 将自定义旧错误处理器实现迁移到 CommonErrorHandler

当使用事务时,默认情况下不配置错误处理器,以便异常会回滚事务。事务容器的错误处理由 AfterRollbackProcessor 处理。如果在使用事务时提供自定义错误处理器,如果您希望回滚事务,它必须抛出异常。

该接口有一个默认方法 isAckAfterHandle(),容器调用它来确定如果错误处理器返回而不抛出异常,是否应提交偏移量;它默认返回 true。

通常,框架提供的错误处理器在错误未“处理”(例如,执行寻址操作后)时会抛出异常。默认情况下,此类异常由容器以 ERROR 级别记录。所有框架错误处理器都扩展了 KafkaExceptionLogLevelAware,它允许您控制这些异常的日志级别。

/**
 * Set the level at which the exception thrown by this handler is logged.
 * @param logLevel the level (default ERROR).
 */
public void setLogLevel(KafkaException.Level logLevel) {
    ...
}

您可以指定一个全局错误处理器,用于容器工厂中的所有监听器。以下示例演示了如何实现:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
        kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setCommonErrorHandler(myErrorHandler);
    ...
    return factory;
}

默认情况下,如果带注解的监听器方法抛出异常,它会被抛给容器,并且消息将根据容器配置进行处理。

容器在调用错误处理器之前提交任何挂起的偏移量提交。

如果您正在使用 Spring Boot,您只需将错误处理器添加为 @Bean,Boot 将其添加到自动配置的工厂中。

退避处理器

诸如 DefaultErrorHandler 之类的错误处理器使用 BackOff 来确定在重试传递之前等待多长时间。从 2.9 版本开始,您可以配置自定义的 BackOffHandler。默认处理器只是暂停线程,直到退避时间过去(或容器停止)。框架还提供了 ContainerPausingBackOffHandler,它会暂停监听器容器,直到退避时间过去,然后恢复容器。当延迟时间长于 max.poll.interval.ms 消费者属性时,这很有用。请注意,实际退避时间的解析将受到 pollTimeout 容器属性的影响。

DefaultErrorHandler

这个新的错误处理器取代了 SeekToCurrentErrorHandlerRecoveringBatchErrorHandler,它们已经作为默认错误处理器存在了几个版本。一个不同之处是,对于批处理监听器(当抛出 BatchListenerFailedException 以外的异常时)的备用行为相当于 重试完整批次

从 2.9 版本开始,可以配置 DefaultErrorHandler 以提供与下面讨论的寻求未处理记录偏移量相同的语义,但实际上不进行寻求。相反,记录由监听器容器保留,并在错误处理器退出后(以及在执行一次暂停的 poll() 以保持消费者活动后;如果正在使用 非阻塞重试ContainerPausingBackOffHandler,暂停可能会持续多次轮询)重新提交给监听器。错误处理器向容器返回一个结果,指示当前失败的记录是否可以重新提交,或者它是否已恢复,然后将不再发送给监听器。要启用此模式,请将属性 seekAfterError 设置为 false

错误处理器可以恢复(跳过)持续失败的记录。默认情况下,在十次失败后,失败的记录会被记录(以 ERROR 级别)。您可以使用自定义的恢复器(BiConsumer)和控制传递尝试和每次尝试之间延迟的 BackOff 来配置处理器。使用 FixedBackOffFixedBackOff.UNLIMITED_ATTEMPTS 会导致(实际上)无限重试。以下示例配置了三次尝试后的恢复:

DefaultErrorHandler errorHandler =
    new DefaultErrorHandler((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

要为监听器容器配置此处理器的自定义实例,请将其添加到容器工厂。

例如,对于 @KafkaListener 容器工厂,您可以按如下方式添加 DefaultErrorHandler

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
    return factory;
}

对于记录监听器,这将重试传递最多 2 次(3 次传递尝试),退避时间为 1 秒,而不是默认配置(FixedBackOff(0L, 9))。重试耗尽后,失败只会记录日志。

例如,如果 poll 返回六条记录(分区 0、1、2 各两条),并且监听器在第四条记录上抛出异常,则容器通过提交其偏移量来确认前三条消息。DefaultErrorHandler 寻求分区 1 的偏移量 1 和分区 2 的偏移量 0。下一次 poll() 返回三条未处理的记录。

如果 AckModeBATCH,则容器在调用错误处理器之前提交前两个分区的偏移量。

对于批处理监听器,监听器必须抛出 BatchListenerFailedException,指示批处理中的哪些记录失败。

事件序列是:

  • 提交索引之前的记录的偏移量。

  • 如果重试未耗尽,则执行寻求操作,以便所有剩余记录(包括失败记录)将重新传递。

  • 如果重试耗尽,尝试恢复失败记录(默认只记录日志),并执行寻求操作,以便剩余记录(不包括失败记录)将重新传递。已恢复记录的偏移量将被提交。

  • 如果重试耗尽且恢复失败,则像重试未耗尽一样执行寻求操作。

从 2.9 版本开始,可以配置 DefaultErrorHandler 以提供与上面讨论的寻求未处理记录偏移量相同的语义,但实际上不进行寻求。相反,错误处理器创建一个新的 ConsumerRecords<?, ?>,其中只包含未处理的记录,然后将其提交给监听器(在执行一次暂停的 poll() 以保持消费者活动后)。要启用此模式,请将属性 seekAfterError 设置为 false

默认的恢复器在重试耗尽后记录失败的记录。您可以使用自定义恢复器,或框架提供的恢复器,例如 DeadLetterPublishingRecoverer

当使用 POJO 批处理监听器(例如 List<Thing>),并且您没有完整的消费者记录要添加到异常中时,您只需添加失败记录的索引:

@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
    for (int i = 0; i < things.size(); i++) {
        try {
            process(things.get(i));
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to process", i);
        }
    }
}

当容器配置为 AckMode.MANUAL_IMMEDIATE 时,可以将错误处理器配置为提交已恢复记录的偏移量;将 commitRecovered 属性设置为 true

另请参见 发布死信记录

当使用事务时,DefaultAfterRollbackProcessor 提供了类似的功能。请参见 回滚后处理器

DefaultErrorHandler 认为某些异常是致命的,并跳过这些异常的重试;恢复器在第一次失败时被调用。默认情况下被认为是致命的异常有:

  • DeserializationException

  • MessageConversionException

  • ConversionException

  • MethodArgumentResolutionException

  • NoSuchMethodException

  • ClassCastException

因为这些异常不太可能在重试传递时得到解决。

您可以将更多异常类型添加到不可重试类别中,或者完全替换分类异常的映射。有关更多信息,请参见 DefaultErrorHandler.addNotRetryableException()DefaultErrorHandler.setClassifications() 的 Javadoc,以及 ExceptionMatcher

这是一个将 IllegalArgumentException 添加到不可重试异常的示例:

@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
    DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
    handler.addNotRetryableExceptions(IllegalArgumentException.class);
    return handler;
}
DefaultErrorHandler 只处理继承自 RuntimeException 的异常。继承自 Error 的异常会完全绕过错误处理器,导致消费者立即终止,关闭 Kafka 连接,并跳过所有重试/恢复机制。这种关键的区别意味着应用程序可能会报告健康状态,尽管已终止的消费者不再处理消息。始终确保消息处理代码中抛出的异常明确地扩展自 RuntimeException 而不是 Error,以允许正确的错误处理。换句话说,如果应用程序抛出异常,请确保它扩展自 RuntimeException,而不是无意中继承自 Error。像 OutOfMemoryErrorIllegalAccessError 和应用程序无法控制的其他错误仍然被视为 Error,并且不会重试。

错误处理器可以配置一个或多个 RetryListener,接收重试和恢复进度的通知。从 2.8.10 版本开始,添加了批处理监听器的方法。

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

    default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
    }

    default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
    }

	default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
	}

}

有关更多信息,请参见 JavaDocs。

如果恢复器失败(抛出异常),失败的记录将包含在寻址操作中。如果恢复器失败,默认情况下 BackOff 将被重置,并且重新传递将再次通过退避操作,然后再次尝试恢复。要在恢复失败后跳过重试,请将错误处理器的 resetStateOnRecoveryFailure 设置为 false

您可以为错误处理器提供一个 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,以便根据失败的记录和/或异常来确定要使用的 BackOff

handler.setBackOffFunction((record, ex) -> { ... });

如果函数返回 null,则将使用处理器的默认 BackOff

resetStateOnExceptionChange 设置为 true,如果异常类型在失败之间发生变化,则重试序列将重新启动(包括选择一个新的 BackOff,如果已配置)。当为 false(2.9 版本之前的默认值)时,不考虑异常类型。

从 2.9 版本开始,这现在默认为 true

另请参见 传递尝试头

批处理监听器与死信主题的错误处理

非阻塞重试 (@RetryableTopic 注解) 不支持批处理监听器。对于批处理监听器与死信主题功能,请使用 DefaultErrorHandlerDeadLetterPublishingRecoverer

使用 BatchListenerFailedException

要指示批次中哪个特定记录失败,请抛出 BatchListenerFailedException

@KafkaListener(id = "batch-listener", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<String, Order>> records) {
    for (ConsumerRecord<String, Order> record : records) {
        try {
            process(record.value());
        }
        catch (Exception e) {
            // Identifies the failed record for error handling
            throw new BatchListenerFailedException("Failed to process", e, record);
        }
    }
}

对于没有 ConsumerRecord 的 POJO 批处理监听器,请改用索引:

@KafkaListener(id = "batch-listener", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<Order> orders) {
    for (int i = 0; i < orders.size(); i++) {
        try {
            process(orders.get(i));
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to process", e, i);
        }
    }
}

为批处理监听器配置死信主题

在您的批处理监听器容器工厂上配置一个带有 DeadLetterPublishingRecovererDefaultErrorHandler

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> batchFactory(
        ConsumerFactory<String, Order> consumerFactory,
        KafkaTemplate<String, Order> kafkaTemplate) {

    ConcurrentKafkaListenerContainerFactory<String, Order> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setBatchListener(true);

    // Configure Dead Letter Publishing
    DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
            (record, ex) -> new TopicPartition(record.topic() + "-dlt", record.partition()));

    // Configure retries: 3 attempts with 1 second between each
    DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer,
            new FixedBackOff(1000L, 2L)); // 2 retries = 3 total attempts

    factory.setCommonErrorHandler(errorHandler);
    return factory;
}

批处理错误处理的工作原理

当抛出 BatchListenerFailedException 时,DefaultErrorHandler 会:

  1. 提交失败记录之前所有记录的偏移量

  2. 根据 BackOff 配置重试失败记录(以及后续记录)

  3. 当重试耗尽时,发布到 DLT - 只有失败的记录被发送到 DLT

  4. 提交失败记录的偏移量并重新传递剩余记录进行处理

批处理包含 6 条记录,其中索引为 2 的记录失败的示例流程:

  • 第一次尝试:记录 0、1 成功处理;记录 2 失败

  • 容器提交记录 0、1 的偏移量

  • 重试尝试 1:记录 2、3、4、5 被重试

  • 重试尝试 2:记录 2、3、4、5 再次被重试

  • 重试耗尽后:记录 2 发布到 DLT 并提交其偏移量

  • 容器继续处理记录 3、4、5

跳过特定异常的重试

默认情况下,DefaultErrorHandler 会重试所有异常,除了致命异常(如 DeserializationExceptionMessageConversionException 等)。要跳过您自己的异常类型的重试,请使用异常分类配置错误处理器。

错误处理器检查 BatchListenerFailedException原因以确定是否应跳过重试:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> batchFactory(
        ConsumerFactory<String, Order> consumerFactory,
        KafkaTemplate<String, Order> kafkaTemplate) {

    ConcurrentKafkaListenerContainerFactory<String, Order> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setBatchListener(true);

    DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
    DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer,
            new FixedBackOff(1000L, 2L));

    // Add custom exception types that should skip retries and go directly to DLT
    errorHandler.addNotRetryableExceptions(ValidationException.class, InvalidFormatException.class);

    factory.setCommonErrorHandler(errorHandler);
    return factory;
}

现在在您的监听器中:

@KafkaListener(id = "batch-listener", topics = "orders", containerFactory = "batchFactory")
public void processOrders(List<ConsumerRecord<String, Order>> records) {
    for (ConsumerRecord<String, Order> record : records) {
        try {
            process(record.value());
        }
        catch (DatabaseException e) {
            // Will be retried 3 times (according to BackOff configuration)
            throw new BatchListenerFailedException("Database error", e, record);
        }
        catch (ValidationException e) {
            // Skips retries - goes directly to DLT
            // (because ValidationException is configured as not retryable)
            throw new BatchListenerFailedException("Validation failed", e, record);
        }
    }
}
错误处理器检查 BatchListenerFailedException原因(第二个参数)。如果原因被归类为不可重试,则记录会立即发送到 DLT,而不进行重试。

偏移量提交行为

了解偏移量提交对于批处理错误处理很重要:

  • AckMode.BATCH(批处理监听器最常见)

    • 失败记录之前的偏移量在错误处理之前提交

    • 失败记录的偏移量在成功恢复(DLT 发布)后提交

  • AckMode.MANUAL_IMMEDIATE:

    • 设置 errorHandler.setCommitRecovered(true) 以提交已恢复记录的偏移量

    • 您在监听器中控制确认时机

手动确认示例

@KafkaListener(id = "manual-batch", topics = "myTopic", containerFactory = "manualBatchFactory")
public void listen(List<ConsumerRecord<String, Order>> records, Acknowledgment ack) {
    for (ConsumerRecord<String, Order> record : records) {
        try {
            process(record.value());
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Processing failed", e, record);
        }
    }
    ack.acknowledge();
}

批处理错误处理器中的转换错误

从 2.8 版本开始,批处理监听器现在可以正确处理转换错误,当使用带有 ByteArrayDeserializerBytesDeserializerStringDeserializerMessageConverterDefaultErrorHandler 时。当发生转换错误时,有效负载被设置为 null,并且反序列化异常被添加到记录头中,类似于 ErrorHandlingDeserializer。监听器中会提供一个 ConversionException 列表,因此监听器可以抛出 BatchListenerFailedException,指示发生转换异常的第一个索引。

示例

@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
    for (int i = 0; i < in.size(); i++) {
        Foo foo = in.get(i);
        if (foo == null && exceptions.get(i) != null) {
            throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
        }
        process(foo);
    }
}

批处理监听器中的反序列化错误

批处理监听器需要手动处理反序列化错误。与记录监听器不同,没有自动错误处理器可以检测并将反序列化失败路由到 DLT。您必须显式检查失败的记录并抛出 BatchListenerFailedException

使用 ErrorHandlingDeserializer 可防止反序列化失败停止整个批次

@Bean
public ConsumerFactory<String, Order> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    // Wrap your deserializer with ErrorHandlingDeserializer
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());

    return new DefaultKafkaConsumerFactory<>(props);
}

在您的监听器中,您必须手动检查 null 值,这些值表示反序列化失败

@KafkaListener(id = "batch-deser", topics = "orders", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<String, Order>> records) {
    for (ConsumerRecord<String, Order> record : records) {
        if (record.value() == null) {
            // Deserialization failed - throw exception to send to DLT
            throw new BatchListenerFailedException("Deserialization failed", record);
        }
        process(record.value());
    }
}

DeadLetterPublishingRecoverer 将反序列化失败发布到 DLT 时

  • 无法反序列化的原始 byte[] 数据将作为记录值恢复

  • 异常信息(类名、消息、堆栈跟踪)将添加到标准 DLT 异常头中

  • 原始的 ErrorHandlingDeserializer 异常头默认会被移除(设置 recoverer 的 setRetainExceptionHeader(true) 以保留它)

重试完整批次

对于批处理监听器,当监听器抛出 BatchListenerFailedException 以外的异常时,这现在是 DefaultErrorHandler 的备用行为。

无法保证当批次重新传递时,批次具有相同数量的记录和/或重新传递的记录顺序相同。因此,不可能轻易地维护批次的重试状态。FallbackBatchErrorHandler 采用以下方法。如果批处理监听器抛出不是 BatchListenerFailedException 的异常,则从内存中的记录批次执行重试。为了避免在扩展重试序列期间发生再平衡,错误处理器会暂停消费者,在每次重试时,在退避休眠之前对其进行轮询,并再次调用监听器。如果/当重试耗尽时,会为批次中的每条记录调用 ConsumerRecordRecoverer。如果恢复器抛出异常,或者线程在其休眠期间被中断,则记录批次将在下一次轮询时重新传递。在退出之前,无论结果如何,消费者都会恢复。

此机制不能与事务一起使用。

在等待 BackOff 间隔时,错误处理器将以短时休眠循环,直到达到所需的延迟,同时检查容器是否已停止,从而允许休眠在 stop() 后立即退出,而不是导致延迟。

容器停止错误处理器

如果监听器抛出异常,CommonContainerStoppingErrorHandler 将停止容器。对于记录监听器,当 AckModeRECORD 时,已处理记录的偏移量将提交。对于记录监听器,当 AckMode 为任何手动值时,已确认记录的偏移量将提交。对于记录监听器,当 AckModeBATCH 时,或对于批处理监听器,当容器重新启动时,整个批次将重播。

容器停止后,会抛出一个包装 ListenerExecutionFailedException 的异常。这是为了导致事务回滚(如果启用了事务)。

委派错误处理器

CommonDelegatingErrorHandler 可以根据异常类型委托给不同的错误处理器。例如,您可能希望为大多数异常调用 DefaultErrorHandler,或者为其他异常调用 CommonContainerStoppingErrorHandler

所有委托必须共享相同的兼容属性(ackAfterHandleseekAfterError…​)。

日志错误处理器

CommonLoggingErrorHandler 只是记录异常;对于记录监听器,来自上次轮询的剩余记录会传递给监听器。对于批处理监听器,批处理中的所有记录都会被记录。

对记录和批处理监听器使用不同的通用错误处理器

如果您希望对记录和批处理监听器使用不同的错误处理策略,则提供 CommonMixedErrorHandler,允许为每种监听器类型配置特定的错误处理器。

通用错误处理器摘要

  • DefaultErrorHandler

  • CommonContainerStoppingErrorHandler

  • CommonDelegatingErrorHandler

  • CommonLoggingErrorHandler

  • CommonMixedErrorHandler

旧版错误处理器及其替代品

旧版错误处理器 替代

LoggingErrorHandler

CommonLoggingErrorHandler

BatchLoggingErrorHandler

CommonLoggingErrorHandler

ConditionalDelegatingErrorHandler

DelegatingErrorHandler

ConditionalDelegatingBatchErrorHandler

DelegatingErrorHandler

ContainerStoppingErrorHandler

CommonContainerStoppingErrorHandler

ContainerStoppingBatchErrorHandler

CommonContainerStoppingErrorHandler

SeekToCurrentErrorHandler

DefaultErrorHandler

SeekToCurrentBatchErrorHandler

无替代品,使用具有无限 BackOffDefaultErrorHandler

RecoveringBatchErrorHandler

DefaultErrorHandler

RetryingBatchErrorHandler

无替代品,使用 DefaultErrorHandler 并抛出 BatchListenerFailedException 以外的异常。

将自定义旧版错误处理器实现迁移到 CommonErrorHandler

请参阅 CommonErrorHandler 中的 JavaDocs。

要替换 ErrorHandlerConsumerAwareErrorHandler 实现,您应该实现 handleOne() 并让 seeksAfterHandle() 返回 false(默认)。您还应该实现 handleOtherException() 来处理在记录处理范围之外发生的异常(例如消费者错误)。

要替换 RemainingRecordsErrorHandler 实现,您应该实现 handleRemaining() 并重写 seeksAfterHandle() 以返回 true(错误处理器必须执行必要的寻求)。您还应该实现 handleOtherException() - 以处理在记录处理范围之外发生的异常(例如消费者错误)。

要替换任何 BatchErrorHandler 实现,您应该实现 handleBatch()。您还应该实现 handleOtherException() - 以处理在记录处理范围之外发生的异常(例如消费者错误)。

回滚后处理器

使用事务时,如果监听器抛出异常(并且错误处理器,如果存在,也抛出异常),事务将回滚。默认情况下,任何未处理的记录(包括失败的记录)将在下一次轮询时重新获取。这是通过在 DefaultAfterRollbackProcessor 中执行 seek 操作来实现的。对于批处理监听器,整个批处理记录都会被重新处理(容器不知道批处理中的哪个记录失败)。要修改此行为,您可以使用自定义 AfterRollbackProcessor 配置监听器容器。例如,对于基于记录的监听器,您可能希望跟踪失败的记录并在尝试一定次数后放弃,也许通过将其发布到死信主题。

从 2.2 版本开始,DefaultAfterRollbackProcessor 现在可以恢复(跳过)持续失败的记录。默认情况下,在十次失败后,失败的记录会被记录(以 ERROR 级别)。您可以配置处理器,使其具有自定义的恢复器(BiConsumer)和最大失败次数。将 maxFailures 属性设置为负数会导致无限重试。以下示例配置了三次尝试后的恢复:

AfterRollbackProcessor<String, String> processor =
    new DefaultAfterRollbackProcessor((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

当您不使用事务时,可以通过配置 DefaultErrorHandler 来实现类似的功能。请参阅 容器错误处理器

从 3.2 版本开始,恢复现在可以恢复(跳过)持续失败的整个批处理记录。将 ContainerProperties.setBatchRecoverAfterRollback(true) 设置为 true 以启用此功能。

默认行为是,批处理监听器无法进行恢复,因为框架不知道批处理中哪个记录持续失败。在这种情况下,应用程序监听器必须处理持续失败的记录。

另请参见 发布死信记录

从 2.2.5 版本开始,DefaultAfterRollbackProcessor 可以在新事务中调用(在失败事务回滚后启动)。然后,如果您正在使用 DeadLetterPublishingRecoverer 发布失败记录,处理器将在原始主题/分区中将已恢复记录的偏移量发送到事务。要启用此功能,请在 DefaultAfterRollbackProcessor 上设置 commitRecoveredkafkaTemplate 属性。

如果恢复器失败(抛出异常),失败的记录将包含在寻址操作中。从 2.5.5 版本开始,如果恢复器失败,默认情况下 BackOff 将被重置,并且重新传递将再次通过退避操作,然后再次尝试恢复。在早期版本中,BackOff 未被重置,并在下次失败时重新尝试恢复。要恢复到以前的行为,请将处理器的 resetStateOnRecoveryFailure 属性设置为 false

从 2.6 版本开始,您现在可以为处理器提供一个 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,以便根据失败的记录和/或异常来确定要使用的 BackOff

handler.setBackOffFunction((record, ex) -> { ... });

如果函数返回 null,则将使用处理器的默认 BackOff

从 2.6.3 版本开始,将 resetStateOnExceptionChange 设置为 true,如果异常类型在失败之间发生变化,则重试序列将重新启动(包括选择一个新的 BackOff,如果已配置)。默认情况下,不考虑异常类型。

从 2.3.1 版本开始,类似于 DefaultErrorHandlerDefaultAfterRollbackProcessor 认为某些异常是致命的,并跳过这些异常的重试;恢复器在第一次失败时被调用。默认情况下被认为是致命的异常有:

  • DeserializationException

  • MessageConversionException

  • ConversionException

  • MethodArgumentResolutionException

  • NoSuchMethodException

  • ClassCastException

因为这些异常不太可能在重试传递时得到解决。

您可以将更多异常类型添加到不可重试类别中,或者完全替换分类异常的映射。有关更多信息,请参见 DefaultAfterRollbackProcessor.setClassifications() 的 Javadoc,以及 ExceptionMatcher

这是一个将 IllegalArgumentException 添加到不可重试异常的示例:

@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
    DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
    processor.addNotRetryableException(IllegalArgumentException.class);
    return processor;
}

另请参见 传递尝试头

使用当前的 kafka-clients,容器无法检测到 ProducerFencedException 是由再平衡引起的,还是由于超时或过期导致生产者的 transactional.id 被撤销。因为在大多数情况下,它是由再平衡引起的,容器不会调用 AfterRollbackProcessor(因为它不适合寻求分区,因为我们不再分配它们)。如果您确保超时足够大以处理每个事务并定期执行“空”事务(例如通过 ListenerContainerIdleEvent),您可以避免因超时和过期而导致的隔离。或者,您可以将 stopContainerWhenFenced 容器属性设置为 true,容器将停止,避免记录丢失。您可以消费 ConsumerStoppedEvent 并检查 Reason 属性的 FENCED 以检测此情况。由于事件还具有对容器的引用,您可以使用此事件重新启动容器。

从 2.7 版本开始,在等待 BackOff 间隔时,错误处理器将以短时休眠循环,直到达到所需的延迟,同时检查容器是否已停止,从而允许休眠在 stop() 后立即退出,而不是导致延迟。

从 2.7 版本开始,处理器可以配置一个或多个 RetryListener,接收重试和恢复进度的通知。

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

}

有关更多信息,请参见 JavaDocs。

传递尝试头

以下仅适用于记录监听器,不适用于批处理监听器。

从 2.5 版本开始,当使用实现了 DeliveryAttemptAwareErrorHandlerAfterRollbackProcessor 时,可以启用添加 KafkaHeaders.DELIVERY_ATTEMPT 头(kafka_deliveryAttempt)到记录中。此头的值是一个从 1 开始递增的整数。当接收原始 ConsumerRecord<?, ?> 时,整数位于 byte[4] 中。

int delivery = ByteBuffer.wrap(record.headers()
    .lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
    .getInt();

当使用 @KafkaListenerJsonKafkaHeaderMapperSimpleKafkaHeaderMapper 时,可以通过在监听器方法中添加 @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery 作为参数来获取。

要启用此标头的填充,请将容器属性 deliveryAttemptHeader 设置为 true。默认情况下禁用此功能,以避免查找每个记录状态和添加标头(少量)的开销。

DefaultErrorHandlerDefaultAfterRollbackProcessor 支持此功能。

批处理监听器的传递尝试头

当使用 BatchListener 处理 ConsumerRecord 时,KafkaHeaders.DELIVERY_ATTEMPT 头可能以与 SingleRecordListener 不同的方式存在。

从 3.3 版本开始,如果您想在使用 BatchListener 时将 KafkaHeaders.DELIVERY_ATTEMPT 头注入到 ConsumerRecord 中,请将 DeliveryAttemptAwareRetryListener 设置为 ErrorHandler 中的 RetryListener

请参阅以下代码。

final FixedBackOff fixedBackOff = new FixedBackOff(1, 10);
final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);

然后,每当批处理未能完成时,DeliveryAttemptAwareRetryListener 将把 KafkaHeaders.DELIVERY_ATTMPT 头注入到 ConsumerRecord 中。

监听器信息头

在某些情况下,能够知道监听器正在哪个容器中运行是有用的。

从 2.8.4 版本开始,您现在可以设置监听器容器上的 listenerInfo 属性,或者在 @KafkaListener 注解上设置 info 属性。然后,容器会将此信息添加到所有传入消息的 KafkaListener.LISTENER_INFO 头中;它可以在记录拦截器、过滤器等中,或在监听器本身中使用。

@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
        info = "this is the something listener")
public void listen(@Payload Thing thing,
        @Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
    ...
}

当在 RecordInterceptorRecordFilterStrategy 实现中使用时,头在消费者记录中作为字节数组,使用 KafkaListenerAnnotationBeanPostProcessorcharSet 属性进行转换。

头映射器在从消费者记录创建 MessageHeaders 时也会转换为 String,并且永远不会在出站记录上映射此头。

对于 POJO 批处理监听器,从 2.8.6 版本开始,头会复制到批处理的每个成员中,并且在转换后也作为单个 String 参数可用。

@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
        info = "info for batch")
public void listen(List<Thing> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets,
        @Header(KafkaHeaders.LISTENER_INFO) String info) {
            ...
}
如果批处理监听器有过滤器,并且过滤器导致空批处理,则需要向 @Header 参数添加 required = false,因为空批处理没有信息。

如果您收到 List<Message<Thing>>,则信息位于每个 Message<?>KafkaHeaders.LISTENER_INFO 头中。

有关消费批次的更多信息,请参见 批处理监听器

发布死信记录

当记录的失败次数达到最大值时,您可以为 DefaultErrorHandlerDefaultAfterRollbackProcessor 配置一个记录恢复器。框架提供了 DeadLetterPublishingRecoverer,它将失败的消息发布到另一个主题。恢复器需要一个 KafkaTemplate<Object, Object>,用于发送记录。您还可以选择配置一个 BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>,该函数用于解析目标主题和分区。

默认情况下,死信记录被发送到名为 <originalTopic>-dlt 的主题(原始主题名称后缀为 -dlt),并且发送到与原始记录相同的分区。因此,当您使用默认解析器时,死信主题必须至少具有与原始主题相同数量的分区。

如果返回的 TopicPartition 具有负分区,则不会在 ProducerRecord 中设置分区,因此分区由 Kafka 选择。从 2.2.4 版本开始,任何 ListenerExecutionFailedException(例如,当在 @KafkaListener 方法中检测到异常时抛出)都使用 groupId 属性进行了增强。这允许目标解析器除了 ConsumerRecord 中的信息外,还可以使用此信息来选择死信主题。

以下示例演示了如何连接自定义目标解析器:

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
        (r, e) -> {
            if (e instanceof FooException) {
                return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
            }
            else {
                return new TopicPartition(r.topic() + ".other.failures", r.partition());
            }
        });
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));

发送到死信主题的记录将添加以下标头:

  • KafkaHeaders.DLT_EXCEPTION_FQCN: 异常类名(通常是 ListenerExecutionFailedException,但也可以是其他)。

  • KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN: 异常原因类名,如果存在(自 2.8 版本起)。

  • KafkaHeaders.DLT_EXCEPTION_STACKTRACE: 异常堆栈跟踪。

  • KafkaHeaders.DLT_EXCEPTION_MESSAGE: 异常消息。

  • KafkaHeaders.DLT_KEY_EXCEPTION_FQCN: 异常类名(仅限键反序列化错误)。

  • KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE: 异常堆栈跟踪(仅限键反序列化错误)。

  • KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE: 异常消息(仅限键反序列化错误)。

  • KafkaHeaders.DLT_ORIGINAL_TOPIC: 原始主题。

  • KafkaHeaders.DLT_ORIGINAL_PARTITION: 原始分区。

  • KafkaHeaders.DLT_ORIGINAL_OFFSET: 原始偏移量。

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP: 原始时间戳。

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE: 原始时间戳类型。

  • KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP: 原始消费者组,未能处理该记录(自 2.8 版本起)。

键异常仅由 DeserializationException 引起,因此没有 DLT_KEY_EXCEPTION_CAUSE_FQCN

有两种机制可以添加更多标头。

  1. 子类化恢复器并重写 createProducerRecord() - 调用 super.createProducerRecord() 并添加更多标头。

  2. 提供一个 BiFunction 来接收消费者记录和异常,返回一个 Headers 对象;其中的标头将被复制到最终的生产者记录;另请参阅 管理死信记录标头。使用 setHeadersFunction() 设置 BiFunction

第二种实现更简单,但第一种提供了更多可用信息,包括已组装的标准标头。

从 2.3 版本开始,当与 ErrorHandlingDeserializer 结合使用时,发布者会将死信生产者记录中的记录 value() 恢复为未能反序列化的原始值。以前,value() 为空,用户代码必须从消息头中解码 DeserializationException。此外,您可以向发布者提供多个 KafkaTemplate;例如,如果您想发布来自 DeserializationExceptionbyte[] 以及使用与成功反序列化记录不同的序列化器的值,则可能需要这样做。这是一个配置发布者,使其具有使用 Stringbyte[] 序列化器的 KafkaTemplate 的示例:

@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
        KafkaTemplate<?, ?> bytesTemplate) {
    Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>();
    templates.put(String.class, stringTemplate);
    templates.put(byte[].class, bytesTemplate);
    return new DeadLetterPublishingRecoverer(templates);
}

发布者使用映射键来查找适合要发布的 value() 的模板。建议使用 LinkedHashMap,以便按顺序检查键。

当发布 null 值时,如果有多个模板,恢复器将查找 Void 类的模板;如果不存在,将使用 values().iterator() 中的第一个模板。

自 2.7 版本起,您可以使用 setFailIfSendResultIsError 方法,以便在消息发布失败时抛出异常。您还可以使用 setWaitForSendResultTimeout 设置发送成功验证的超时时间。

如果恢复器失败(抛出异常),失败的记录将包含在寻址操作中。从 2.5.5 版本开始,如果恢复器失败,默认情况下 BackOff 将被重置,并且重新传递将再次通过退避操作,然后再次尝试恢复。在早期版本中,BackOff 未被重置,并在下次失败时重新尝试恢复。要恢复到以前的行为,请将错误处理器的 resetStateOnRecoveryFailure 属性设置为 false

从 2.6.3 版本开始,将 resetStateOnExceptionChange 设置为 true,如果异常类型在失败之间发生变化,则重试序列将重新启动(包括选择一个新的 BackOff,如果已配置)。默认情况下,不考虑异常类型。

从 2.3 版本开始,恢复器也可以与 Kafka Streams 一起使用 - 有关更多信息,请参见 从反序列化异常中恢复

ErrorHandlingDeserializer 将反序列化异常添加到头 ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADERErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER 中(使用 Java 序列化)。默认情况下,这些头不会保留在发布到死信主题的消息中。从 2.7 版本开始,如果键和值都未能反序列化,则两者的原始值都将填充到发送到 DLT 的记录中。

如果传入的记录相互依赖,但可能乱序到达,则将失败的记录重新发布到原始主题的尾部(一定次数),而不是直接发送到死信主题可能很有用。请参见 此 Stack Overflow 问题 以获取示例。

以下错误处理器配置将完全实现这一点:

@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
    return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
            (rec, ex) -> {
                org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
                if (retries == null) {
                    retries = new RecordHeader("retries", new byte[] { 1 });
                    rec.headers().add(retries);
                }
                else {
                    retries.value()[0]++;
                }
                return retries.value()[0] > 5
                        ? new TopicPartition("topic-dlt", rec.partition())
                        : new TopicPartition("topic", rec.partition());
            }), new FixedBackOff(0L, 0L));
}

从 2.7 版本开始,恢复器会检查目标解析器选择的分区是否存在。如果分区不存在,则 ProducerRecord 中的分区将设置为 null,允许 KafkaProducer 选择分区。您可以通过将 verifyPartition 属性设置为 false 来禁用此检查。

从 3.1 版本开始,将 logRecoveryRecord 属性设置为 true 将记录恢复记录和异常。

管理死信记录头

参考上面的 发布死信记录DeadLetterPublishingRecoverer 有两个属性用于管理已存在的头(例如,当重新处理失败的死信记录时,包括使用 非阻塞重试 时)。

  • appendOriginalHeaders (默认 true)

  • stripPreviousExceptionHeaders (默认 true,自 2.8 版本起)

Apache Kafka 支持具有相同名称的多个头;要获取“最新”值,可以使用 headers.lastHeader(headerName);要获取多个头的迭代器,可以使用 headers.headers(headerName).iterator()

当重复发布失败的记录时,这些标头可能会增长(并最终因 RecordTooLargeException 导致发布失败);对于异常标头,特别是堆栈跟踪标头,更是如此。

这两个属性的原因是,虽然您可能只希望保留最新的异常信息,但您可能希望保留记录在每次失败时经过哪些主题的历史记录。

appendOriginalHeaders 应用于所有名为 ORIGINAL 的头,而 stripPreviousExceptionHeaders 应用于所有名为 EXCEPTION 的头。

从 2.8.4 版本开始,您现在可以控制哪些标准头将添加到输出记录中。请参阅 enum HeadersToAdd 以了解默认添加的(目前)10 个标准头的通用名称(这些不是实际的头名称,只是一个抽象;实际的头名称由 getHeaderNames() 方法设置,子类可以重写此方法)。

要排除头,请使用 excludeHeaders() 方法;例如,要禁止在头中添加异常堆栈跟踪,请使用:

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);

此外,您可以通过添加 ExceptionHeadersCreator 来完全自定义异常头的添加;这也将禁用所有标准异常头。

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
    kafkaHeaders.add(new RecordHeader(..., ...));
});

同样从 2.8.4 版本开始,您现在可以通过 addHeadersFunction 方法提供多个头函数。这允许应用额外的函数,即使已经注册了另一个函数,例如,在使用 非阻塞重试 时。

另请参见 故障头管理非阻塞重试

ExponentialBackOffWithMaxRetries 实现

Spring Framework 提供了许多 BackOff 实现。默认情况下,ExponentialBackOff 将无限期重试;要在一定次数的重试尝试后放弃,需要计算 maxElapsedTime。自 2.7.3 版本起,Spring for Apache Kafka 提供了 ExponentialBackOffWithMaxRetries,这是一个子类,它接收 maxRetries 属性并自动计算 maxElapsedTime,这更方便一些。

@Bean
DefaultErrorHandler handler() {
    ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
    bo.setInitialInterval(1_000L);
    bo.setMultiplier(2.0);
    bo.setMaxInterval(10_000L);
    return new DefaultErrorHandler(myRecoverer, bo);
}

这将分别在 1、2、4、8、10、10 秒后重试,然后调用恢复器。

© . This site is unofficial and not affiliated with VMware.