异常处理

本节介绍在使用 Spring for Apache Kafka 时可能出现的各种异常的处理方式。

监听器错误处理器

从 2.0 版本开始,@KafkaListener 注解新增了一个属性:errorHandler

您可以使用 errorHandler 提供 KafkaListenerErrorHandler 实现的 bean 名称。这个函数式接口有一个方法,如下所示:

@FunctionalInterface
public interface KafkaListenerErrorHandler {

    Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;

}

您可以访问消息转换器生成的 spring-messaging Message<?> 对象以及监听器抛出的异常(该异常被包装在 ListenerExecutionFailedException 中)。错误处理器可以抛出原始异常或新异常,这些异常会被抛到容器。错误处理器返回的任何值都会被忽略。

从 2.7 版本开始,您可以在 MessagingMessageConverterBatchMessagingMessageConverter 上设置 rawRecordHeader 属性,这将导致原始的 ConsumerRecord 被添加到已转换的 Message<?>KafkaHeaders.RAW_DATA 头部中。例如,如果您希望在监听器错误处理器中使用 DeadLetterPublishingRecoverer,这非常有用。它可能用于请求/回复场景,在这种场景下,您希望在几次重试后,将失败结果发送给发送方,并在死信主题中捕获失败的记录。

@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
    return (msg, ex) -> {
        if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
            recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
            return "FAILED";
        }
        throw ex;
    };
}

它有一个子接口(ConsumerAwareListenerErrorHandler),通过以下方法可以访问 consumer 对象:

Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);

另一个子接口(ManualAckListenerErrorHandler)在使用手动 AckMode 时提供对 Acknowledgment 对象的访问。

Object handleError(Message<?> message, ListenerExecutionFailedException exception,
			Consumer<?, ?> consumer, @Nullable Acknowledgment ack);

在任何一种情况下,您都不应该对 consumer 执行 seek 操作,因为容器对此一无所知。

容器错误处理器

从 2.8 版本开始,遗留的 ErrorHandlerBatchErrorHandler 接口已被新的 CommonErrorHandler 取代。这些错误处理器可以处理 record 监听器和 batch 监听器中的错误,允许单个监听器容器工厂为两种类型的监听器创建容器。框架提供了 CommonErrorHandler 的实现来替代大多数遗留框架错误处理器实现。

有关将自定义遗留错误处理器实现迁移到 CommonErrorHandler 的信息,请参阅将自定义遗留错误处理器实现迁移到 CommonErrorHandler

使用事务时,默认情况下未配置错误处理器,以便异常可以回滚事务。事务容器的错误处理由 AfterRollbackProcessor 处理。如果在事务中使用自定义错误处理器,则必须抛出异常才能使事务回滚。

这个接口有一个默认方法 isAckAfterHandle(),容器会调用它来确定如果错误处理器返回而没有抛出异常,是否应该提交 offset(s);默认情况下它返回 true。

通常,框架提供的错误处理器在错误未被“处理”(例如,执行 seek 操作后)时会抛出异常。默认情况下,容器会在 ERROR 级别记录此类异常。所有框架错误处理器都扩展了 KafkaExceptionLogLevelAware,它允许您控制这些异常的日志级别。

/**
 * Set the level at which the exception thrown by this handler is logged.
 * @param logLevel the level (default ERROR).
 */
public void setLogLevel(KafkaException.Level logLevel) {
    ...
}

您可以指定一个全局错误处理器,用于容器工厂中的所有监听器。以下示例展示了如何进行设置:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
        kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setCommonErrorHandler(myErrorHandler);
    ...
    return factory;
}

默认情况下,如果带注解的监听器方法抛出异常,该异常会抛到容器,并根据容器配置处理消息。

容器在调用错误处理器之前提交任何待处理的 offset 提交。

如果您使用 Spring Boot,只需将错误处理器添加为 @Bean,Boot 会将其添加到自动配置的工厂中。

退避处理器

DefaultErrorHandler 这样的错误处理器使用 BackOff 来确定重试投递前需要等待多长时间。从 2.9 版本开始,您可以配置自定义的 BackOffHandler。默认处理器仅暂停线程,直到退避时间过去(或容器停止)。框架还提供了 ContainerPausingBackOffHandler,它会暂停监听器容器,直到退避时间过去,然后恢复容器。当延迟时间长于 consumer 属性 max.poll.interval.ms 时,这非常有用。请注意,实际退避时间的精度会受到 pollTimeout 容器属性的影响。

DefaultErrorHandler

这个新的错误处理器取代了 SeekToCurrentErrorHandlerRecoveringBatchErrorHandler,它们在之前的几个版本中一直是默认的错误处理器。一个区别是,对于 batch 监听器(当抛出 BatchListenerFailedException 以外的异常时),回退行为等同于重试完整的批次

从 2.9 版本开始,DefaultErrorHandler 可以配置为提供与 seek 未处理记录 offset 相同的语义(如下所述),但实际上并不执行 seek 操作。相反,记录由监听器容器保留,并在错误处理器退出后(以及执行单次暂停的 poll() 以保持 consumer 活跃后;如果使用了非阻塞重试ContainerPausingBackOffHandler,暂停可能会持续多次 poll)重新提交给监听器。错误处理器向容器返回一个结果,指示当前失败的记录是否可以重新提交,或者是否已恢复且不会再次发送给监听器。要启用此模式,请将属性 seekAfterError 设置为 false

错误处理器可以恢复(跳过)持续失败的记录。默认情况下,在失败十次后,失败的记录会被记录日志(在 ERROR 级别)。您可以为处理器配置一个自定义的 recoverer (BiConsumer) 和一个 BackOff,用于控制投递尝试次数和每次尝试之间的延迟。使用 FixedBackOff 并设置 FixedBackOff.UNLIMITED_ATTEMPTS 会导致(实际上是)无限重试。以下示例配置了失败三次后进行恢复:

DefaultErrorHandler errorHandler =
    new DefaultErrorHandler((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

要使用此处理器的自定义实例配置监听器容器,请将其添加到容器工厂。

例如,使用 @KafkaListener 容器工厂,您可以按如下方式添加 DefaultErrorHandler

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
    return factory;
}

对于 record 监听器,这将重试投递最多 2 次(共 3 次尝试),每次间隔 1 秒,而不是默认配置(FixedBackOff(0L, 9))。重试耗尽后,失败只会被记录日志。

例如,如果 poll 返回六条记录(分区 0、1、2 各两条),并且监听器在第四条记录上抛出异常,容器会通过提交前三条消息的 offset 来确认它们。DefaultErrorHandler 会 seek 到分区 1 的 offset 1 和分区 2 的 offset 0。下一次 poll() 会返回三条未处理的记录。

如果 AckModeBATCH,容器会在调用错误处理器之前提交前两个分区的 offset。

对于 batch 监听器,监听器必须抛出 BatchListenerFailedException 来指示批次中的哪些记录失败了。

事件序列如下:

  • 提交索引之前的记录的 offset。

  • 如果重试次数未耗尽,则执行 seek 操作,以便所有剩余记录(包括失败的记录)都将重新投递。

  • 如果重试次数耗尽,尝试恢复失败的记录(默认仅记录日志),并执行 seek 操作,以便剩余记录(不包括失败的记录)将重新投递。已恢复记录的 offset 会被提交。

  • 如果重试次数耗尽且恢复失败,则执行 seek 操作,就像重试次数未耗尽一样。

从 2.9 版本开始,DefaultErrorHandler 可以配置为提供与 seek 未处理记录 offset 相同的语义(如上所述),但实际上并不执行 seek 操作。相反,错误处理器创建一个新的 ConsumerRecords<?, ?>,其中只包含未处理的记录,然后将这些记录提交给监听器(在执行单次暂停的 poll() 以保持 consumer 活跃后)。要启用此模式,请将属性 seekAfterError 设置为 false

默认的 recoverer 在重试次数耗尽后记录失败的记录。您可以使用自定义的 recoverer,或者使用框架提供的 recoverer,例如 DeadLetterPublishingRecoverer

使用 POJO batch 监听器(例如 List<Thing>)时,如果您没有完整的 consumer record 可以添加到异常中,只需添加失败记录的索引即可:

@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
    for (int i = 0; i < things.size(); i++) {
        try {
            process(things.get(i));
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to process", i);
        }
    }
}

当容器配置为 AckMode.MANUAL_IMMEDIATE 时,错误处理器可以配置为提交已恢复记录的 offset;将 commitRecovered 属性设置为 true

另请参阅发布死信记录

使用事务时,DefaultAfterRollbackProcessor 提供了类似的功能。请参阅回滚后处理器

DefaultErrorHandler 认为某些异常是致命的,会跳过此类异常的重试;recoverer 在第一次失败时就被调用。默认情况下,被认为是致命的异常包括:

  • DeserializationException

  • MessageConversionException

  • ConversionException

  • MethodArgumentResolutionException

  • NoSuchMethodException

  • ClassCastException

因为这些异常在重试投递时不太可能得到解决。

您可以将更多异常类型添加到不可重试类别中,或者完全替换已分类异常的映射。有关更多信息,请参阅 DefaultErrorHandler.addNotRetryableException()DefaultErrorHandler.setClassifications() 的 Javadoc,以及 spring-retryBinaryExceptionClassifier 的 Javadoc。

以下是将 IllegalArgumentException 添加到不可重试异常的示例:

@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
    DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
    handler.addNotRetryableExceptions(IllegalArgumentException.class);
    return handler;
}

错误处理器可以配置一个或多个 RetryListener,接收重试和恢复进度的通知。从 2.8.10 版本开始,添加了用于 batch 监听器的方法。

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

    default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
    }

    default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
    }

	default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
	}

}

有关更多信息,请参阅 Javadoc。

如果 recoverer 失败(抛出异常),失败的记录将包含在 seek 操作中。从 2.5.5 版本开始,如果 recoverer 失败,BackOff 默认情况下会重置,并且在再次尝试恢复之前,重新投递会再次经历退避过程。在早期版本中,BackOff 不会重置,并在下一次失败时再次尝试恢复。要恢复到之前的行为,请将处理器的 resetStateOnRecoveryFailure 属性设置为 false

从 2.6 版本开始,您现在可以为处理器提供一个 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,以便根据失败的记录和/或异常来确定使用的 BackOff

handler.setBackOffFunction((record, ex) -> { ... });

如果函数返回 null,则使用处理器的默认 BackOff

从 2.6.3 版本开始,将 resetStateOnExceptionChange 设置为 true,如果异常类型在失败之间发生变化,重试序列将重新启动(包括选择新的 BackOff,如果已配置)。默认情况下,不考虑异常类型。

从 2.9 版本开始,默认情况下此值为 true

另请参阅投递尝试次数头部

Batch 错误处理器中的转换错误

从 2.8 版本开始,使用 ByteArrayDeserializerBytesDeserializerStringDeserializer 以及 DefaultErrorHandlerMessageConverter 时,batch 监听器现在可以正确处理转换错误。发生转换错误时,payload 被设置为 null,并且会将反序列化异常添加到记录头部中,类似于 ErrorHandlingDeserializer。监听器中提供了一个 ConversionException 列表,因此监听器可以抛出 BatchListenerFailedException 来指示发生转换异常的第一个索引。

示例

@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
    for (int i = 0; i < in.size(); i++) {
        Foo foo = in.get(i);
        if (foo == null && exceptions.get(i) != null) {
            throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
        }
        process(foo);
    }
}

重试完整的批次

当 batch 监听器抛出 BatchListenerFailedException 以外的异常时,这现在是 DefaultErrorHandler 的回退行为。

无法保证在批次重新投递时,批次中的记录数量相同和/或重新投递的记录顺序相同。因此,很难为批次轻松维护重试状态。FallbackBatchErrorHandler 采用以下方法。如果 batch 监听器抛出非 BatchListenerFailedException 的异常,则从内存中的记录批次执行重试。为了避免在长时间的重试序列中发生 rebalance,错误处理器会暂停 consumer,在每次重试的退避睡眠之前进行 poll,然后再次调用监听器。如果/当重试耗尽时,会为批次中的每条记录调用 ConsumerRecordRecoverer。如果 recoverer 抛出异常,或者线程在其睡眠期间被中断,则批次记录将在下一次 poll 时重新投递。在退出之前,无论结果如何,consumer 都会恢复。

此机制不能用于事务。

在等待 BackOff 间隔期间,错误处理器会循环短暂睡眠,直到达到期望的延迟,同时检查容器是否已停止,以便睡眠能在 stop() 后尽快退出,而不是导致延迟。

停止容器错误处理器

如果监听器抛出异常,CommonContainerStoppingErrorHandler 会停止容器。对于 record 监听器,当 AckModeRECORD 时,已处理记录的 offset 会被提交。对于 record 监听器,当 AckMode 是任何手动值时,已确认记录的 offset 会被提交。对于 record 监听器,当 AckModeBATCH,或者对于 batch 监听器,容器重新启动时会重新处理整个批次。

容器停止后,会抛出一个包装了 ListenerExecutionFailedException 的异常。这是为了使事务回滚(如果启用了事务)。

委托错误处理器

CommonDelegatingErrorHandler 可以根据异常类型委托给不同的错误处理器。例如,您可能希望对大多数异常调用 DefaultErrorHandler,而对其他异常调用 CommonContainerStoppingErrorHandler

所有委托必须共享相同的兼容属性(ackAfterHandle, seekAfterError 等)。

记录日志错误处理器

CommonLoggingErrorHandler 只记录异常日志;对于 record 监听器,前一次 poll 中剩余的记录会传递给监听器。对于 batch 监听器,批次中的所有记录都会被记录日志。

对 Record 和 Batch 监听器使用不同的通用错误处理器

如果您希望对 record 和 batch 监听器采用不同的错误处理策略,框架提供了 CommonMixedErrorHandler,它允许为每种监听器类型配置特定的错误处理器。

通用错误处理器总结

  • DefaultErrorHandler

  • CommonContainerStoppingErrorHandler

  • CommonDelegatingErrorHandler

  • CommonLoggingErrorHandler

  • CommonMixedErrorHandler

遗留错误处理器及其替代方案

遗留错误处理器 替代方案

LoggingErrorHandler

CommonLoggingErrorHandler

BatchLoggingErrorHandler

CommonLoggingErrorHandler

ConditionalDelegatingErrorHandler

DelegatingErrorHandler

ConditionalDelegatingBatchErrorHandler

DelegatingErrorHandler

ContainerStoppingErrorHandler

CommonContainerStoppingErrorHandler

ContainerStoppingBatchErrorHandler

CommonContainerStoppingErrorHandler

SeekToCurrentErrorHandler

DefaultErrorHandler

SeekToCurrentBatchErrorHandler

没有替代方案,使用配置了无限 BackOffDefaultErrorHandler

RecoveringBatchErrorHandler

DefaultErrorHandler

RetryingBatchErrorHandler

没有替代方案,使用 DefaultErrorHandler 并抛出 BatchListenerFailedException 以外的异常。

将自定义遗留错误处理器实现迁移到 CommonErrorHandler

请参阅 CommonErrorHandler 中的 Javadoc。

要替换 ErrorHandlerConsumerAwareErrorHandler 实现,您应该实现 handleOne() 并让 seeksAfterHandle() 返回 false(默认值)。您还应该实现 handleOtherException() 来处理在记录处理范围之外发生的异常(例如 consumer 错误)。

要替换 RemainingRecordsErrorHandler 实现,您应该实现 handleRemaining() 并重写 seeksAfterHandle() 以返回 true(错误处理器必须执行必要的 seek 操作)。您还应该实现 handleOtherException() - 以处理在记录处理范围之外发生的异常(例如 consumer 错误)。

要替换任何 BatchErrorHandler 实现,您应该实现 handleBatch()。您还应该实现 handleOtherException() - 以处理在记录处理范围之外发生的异常(例如 consumer 错误)。

回滚后处理器

使用事务时,如果监听器抛出异常(并且如果存在错误处理器,它也抛出异常),事务将回滚。默认情况下,任何未处理的记录(包括失败的记录)会在下一次 poll 时重新获取。这是通过在 DefaultAfterRollbackProcessor 中执行 seek 操作实现的。对于 batch 监听器,整个批次记录会重新处理(容器不知道批次中哪个记录失败了)。要修改此行为,您可以为监听器容器配置自定义的 AfterRollbackProcessor。例如,对于基于 record 的监听器,您可能希望跟踪失败的记录并在尝试一定次数后放弃,或许通过将其发布到死信主题。

从 2.2 版本开始,DefaultAfterRollbackProcessor 现在可以恢复(跳过)持续失败的记录。默认情况下,在失败十次后,失败的记录会被记录日志(在 ERROR 级别)。您可以为处理器配置自定义的 recoverer (BiConsumer) 和最大失败次数。将 maxFailures 属性设置为负数会导致无限重试。以下示例配置了失败三次后进行恢复:

AfterRollbackProcessor<String, String> processor =
    new DefaultAfterRollbackProcessor((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

当您不使用事务时,可以通过配置 DefaultErrorHandler 实现类似功能。请参阅容器错误处理器

从 3.2 版本开始,现在可以恢复(跳过)持续失败的整个批次记录。设置 ContainerProperties.setBatchRecoverAfterRollback(true) 来启用此功能。

默认行为下,对于 batch 监听器无法进行恢复,因为框架不知道批次中哪个记录持续失败。在这种情况下,应用程序监听器必须处理持续失败的记录。

另请参阅发布死信记录

从 2.2.5 版本开始,可以在新事务中调用 DefaultAfterRollbackProcessor(在失败事务回滚后启动)。然后,如果您使用 DeadLetterPublishingRecoverer 发布失败的记录,处理器会将已恢复记录在原始 topic/partition 中的 offset 发送到该事务。要启用此功能,请在 DefaultAfterRollbackProcessor 上设置 commitRecoveredkafkaTemplate 属性。

如果 recoverer 失败(抛出异常),失败的记录将包含在 seek 操作中。从 2.5.5 版本开始,如果 recoverer 失败,BackOff 默认情况下会重置,并且在再次尝试恢复之前,重新投递会再次经历退避过程。在早期版本中,BackOff 不会重置,并在下一次失败时再次尝试恢复。要恢复到之前的行为,请将处理器的 resetStateOnRecoveryFailure 属性设置为 false

从 2.6 版本开始,您现在可以为处理器提供一个 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,以便根据失败的记录和/或异常来确定使用的 BackOff

handler.setBackOffFunction((record, ex) -> { ... });

如果函数返回 null,则使用处理器的默认 BackOff

从 2.6.3 版本开始,将 resetStateOnExceptionChange 设置为 true,如果异常类型在失败之间发生变化,重试序列将重新启动(包括选择新的 BackOff,如果已配置)。默认情况下,不考虑异常类型。

从 2.3.1 版本开始,与 DefaultErrorHandler 类似,DefaultAfterRollbackProcessor 认为某些异常是致命的,会跳过此类异常的重试;recoverer 在第一次失败时就被调用。默认情况下,被认为是致命的异常包括:

  • DeserializationException

  • MessageConversionException

  • ConversionException

  • MethodArgumentResolutionException

  • NoSuchMethodException

  • ClassCastException

因为这些异常在重试投递时不太可能得到解决。

您可以将更多异常类型添加到不可重试类别中,或者完全替换已分类异常的映射。有关更多信息,请参阅 DefaultAfterRollbackProcessor.setClassifications() 的 Javadoc,以及 spring-retryBinaryExceptionClassifier 的 Javadoc。

以下是将 IllegalArgumentException 添加到不可重试异常的示例:

@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
    DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
    processor.addNotRetryableException(IllegalArgumentException.class);
    return processor;
}

另请参阅投递尝试次数头部

使用当前的 kafka-clients,容器无法检测到 ProducerFencedException 是由 rebalance 引起的,还是由于超时或过期导致 producer 的 transactional.id 被撤销。因为在大多数情况下,它是由 rebalance 引起的,容器不会调用 AfterRollbackProcessor(因为对分区执行 seek 操作不合适,因为我们不再被分配这些分区)。如果您确保超时时间足够长以处理每个事务,并定期执行“空”事务(例如,通过 ListenerContainerIdleEvent),则可以避免因超时和过期导致的 fencing。或者,您可以将 stopContainerWhenFenced 容器属性设置为 true,容器将停止,避免记录丢失。您可以消费 ConsumerStoppedEvent 并检查其 Reason 属性是否为 FENCED 来检测此情况。由于事件还包含对容器的引用,您可以使用此事件重新启动容器。

从 2.7 版本开始,在等待 BackOff 间隔期间,错误处理器会循环短暂睡眠,直到达到期望的延迟,同时检查容器是否已停止,以便睡眠能在 stop() 后尽快退出,而不是导致延迟。

从 2.7 版本开始,处理器可以配置一个或多个 RetryListener,接收重试和恢复进度的通知。

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

}

有关更多信息,请参阅 Javadoc。

投递尝试次数头部

以下内容仅适用于 record 监听器,不适用于 batch 监听器。

从 2.5 版本开始,使用实现 DeliveryAttemptAwareErrorHandlerAfterRollbackProcessor 时,可以将 KafkaHeaders.DELIVERY_ATTEMPT 头部 (kafka_deliveryAttempt) 添加到记录中。此头部的值是一个从 1 开始递增的整数。接收原始 ConsumerRecord<?, ?> 时,该整数存储在 byte[4] 中。

int delivery = ByteBuffer.wrap(record.headers()
    .lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
    .getInt();

使用 @KafkaListener 并配合 DefaultKafkaHeaderMapperSimpleKafkaHeaderMapper 时,可以通过向监听器方法添加参数 @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery 来获取该头部。

要启用此头部的填充,请将容器属性 deliveryAttemptHeader 设置为 true。默认情况下禁用此功能,以避免查找每条记录的状态并添加头部带来的(较小的)开销。

DefaultErrorHandlerDefaultAfterRollbackProcessor 支持此功能。

Batch 监听器的投递尝试次数头部

使用 BatchListener 处理 ConsumerRecord 时,与 SingleRecordListener 相比,KafkaHeaders.DELIVERY_ATTEMPT 头部可能以不同的方式存在。

从 3.3 版本开始,如果您希望在使用 BatchListener 时将 KafkaHeaders.DELIVERY_ATTEMPT 头部注入到 ConsumerRecord 中,请将 DeliveryAttemptAwareRetryListener 设置为 ErrorHandler 中的 RetryListener

请参考以下代码。

final FixedBackOff fixedBackOff = new FixedBackOff(1, 10);
final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);

然后,每当批次未能完成时,DeliveryAttemptAwareRetryListener 会将 KafkaHeaders.DELIVERY_ATTMPT 头部注入到 ConsumerRecord 中。

监听器信息头部

在某些情况下,能够知道监听器运行在哪个容器中很有用。

从 2.8.4 版本开始,您现在可以在监听器容器上设置 listenerInfo 属性,或在 @KafkaListener 注解上设置 info 属性。然后,容器会将此信息添加到所有入站消息的 KafkaListener.LISTENER_INFO 头部中;此头部可用于 record 拦截器、过滤器等,或在监听器本身中使用。

@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
        info = "this is the something listener")
public void listen(@Payload Thing thing,
        @Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
    ...
}

RecordInterceptorRecordFilterStrategy 实现中使用时,头部以字节数组形式存在于 consumer record 中,使用 KafkaListenerAnnotationBeanPostProcessorcharSet 属性进行转换。

头部映射器在从 consumer record 创建 MessageHeaders 时也会转换为 String,并且永远不会在出站记录上映射此头部。

对于 POJO batch 监听器,从 2.8.6 版本开始,头部会复制到批次中的每个成员,并在转换后也作为单个 String 参数可用。

@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
        info = "info for batch")
public void listen(List<Thing> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets,
        @Header(KafkaHeaders.LISTENER_INFO) String info) {
            ...
}
如果 batch 监听器有过滤器且过滤结果为空批次,您需要在 @Header 参数中添加 required = false,因为空批次不提供该信息。

如果您接收到 List<Message<Thing>>,则信息位于每个 Message<?>KafkaHeaders.LISTENER_INFO 头部中。

有关消费批次的更多信息,请参阅Batch 监听器

发布死信记录

您可以使用记录恢复器配置 DefaultErrorHandlerDefaultAfterRollbackProcessor,以便在记录达到最大失败次数时进行处理。该框架提供了 DeadLetterPublishingRecoverer,它将失败的消息发布到另一个主题。此恢复器需要一个 KafkaTemplate<Object, Object>,用于发送记录。您还可以选择性地配置一个 BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>,它会被调用来解析目标主题和分区。

默认情况下,死信记录会被发送到名为 <originalTopic>-dlt 的主题(原始主题名称后附加 -dlt),并发送到与原始记录相同的分区。因此,当您使用默认解析器时,死信主题必须至少拥有与原始主题一样多的分区。

如果返回的 TopicPartition 的分区为负数,则在 ProducerRecord 中不会设置分区,分区将由 Kafka 选择。从版本 2.2.4 开始,任何 ListenerExecutionFailedException(例如,在 @KafkaListener 方法中检测到异常时抛出)都增强了 groupId 属性。这使得目标解析器除了使用 ConsumerRecord 中的信息外,还可以利用此属性来选择死信主题。

以下示例展示了如何配置一个自定义目标解析器

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
        (r, e) -> {
            if (e instanceof FooException) {
                return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
            }
            else {
                return new TopicPartition(r.topic() + ".other.failures", r.partition());
            }
        });
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));

发送到死信主题的记录会增强以下头部(headers)

  • KafkaHeaders.DLT_EXCEPTION_FQCN:异常类名(通常是 ListenerExecutionFailedException,但也可以是其他)。

  • KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN:异常原因的类名,如果存在的话(从版本 2.8 开始)。

  • KafkaHeaders.DLT_EXCEPTION_STACKTRACE:异常的堆栈跟踪。

  • KafkaHeaders.DLT_EXCEPTION_MESSAGE:异常消息。

  • KafkaHeaders.DLT_KEY_EXCEPTION_FQCN:异常类名(仅限键反序列化错误)。

  • KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE:异常的堆栈跟踪(仅限键反序列化错误)。

  • KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE:异常消息(仅限键反序列化错误)。

  • KafkaHeaders.DLT_ORIGINAL_TOPIC:原始主题。

  • KafkaHeaders.DLT_ORIGINAL_PARTITION:原始分区。

  • KafkaHeaders.DLT_ORIGINAL_OFFSET:原始偏移量。

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP:原始时间戳。

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE:原始时间戳类型。

  • KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP:未能处理该记录的原始消费者组(从版本 2.8 开始)。

键异常仅由 DeserializationException 引起,因此没有 DLT_KEY_EXCEPTION_CAUSE_FQCN

有两种机制可以添加更多头部。

  1. 子类化恢复器并覆盖 createProducerRecord() 方法 - 调用 super.createProducerRecord() 并添加更多头部。

  2. 提供一个 BiFunction 来接收消费者记录和异常,并返回一个 Headers 对象;这些头部将复制到最终的生产者记录中;另请参阅 管理死信记录头部。使用 setHeadersFunction() 方法设置此 BiFunction

第二种实现起来更简单,但第一种提供了更多可用信息,包括已经组装好的标准头部。

从版本 2.3 开始,当与 ErrorHandlingDeserializer 一起使用时,发布器将在死信生产者记录中恢复记录的 value() 为原始的、未能反序列化的值。在此之前,value() 是 null,用户代码必须从消息头部中解码 DeserializationException。此外,您可以向发布器提供多个 KafkaTemplate;例如,如果您想发布 DeserializationException 中的 byte[],以及使用与成功反序列化的记录不同的序列化器发布值,这可能就需要这样做。以下是使用 Stringbyte[] 序列化器配置发布器的示例

@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
        KafkaTemplate<?, ?> bytesTemplate) {
    Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
    templates.put(String.class, stringTemplate);
    templates.put(byte[].class, bytesTemplate);
    return new DeadLetterPublishingRecoverer(templates);
}

发布器使用 Map 的键来查找适用于即将发布的值(value())的模板。建议使用 LinkedHashMap,以便按顺序检查键。

发布 null 值且存在多个模板时,恢复器将查找适用于 Void 类的模板;如果不存在,将使用 values().iterator() 中的第一个模板。

从 2.7 版本开始,您可以使用 setFailIfSendResultIsError 方法,以便在消息发布失败时抛出异常。您还可以使用 setWaitForSendResultTimeout 方法设置发送器成功验证的超时时间。

如果恢复器失败(抛出异常),失败的记录将包含在 seeks 操作中。从版本 2.5.5 开始,如果恢复器失败,默认情况下将重置 BackOff,并且在再次尝试恢复之前,重新投递将再次经历回退过程。在早期版本中,BackOff 未被重置,并在下次失败时再次尝试恢复。要恢复到先前的行为,请将错误处理器的 resetStateOnRecoveryFailure 属性设置为 false

从 2.6.3 版本开始,将 resetStateOnExceptionChange 设置为 true,如果异常类型在失败之间发生变化,重试序列将重新启动(包括选择新的 BackOff,如果已配置)。默认情况下,不考虑异常类型。

从版本 2.3 开始,恢复器也可以用于 Kafka Streams - 有关更多信息,请参阅 从反序列化异常中恢复

ErrorHandlingDeserializer 在头部 ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADERErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER 中添加反序列化异常(使用 Java 序列化)。默认情况下,这些头部不会保留在发布到死信主题的消息中。从版本 2.7 开始,如果键和值都反序列化失败,则两者在发送到 DLT 的记录中都会填充原始值。

如果入站记录相互依赖,但可能乱序到达,将失败的记录重新发布到原始主题的末尾(若干次)可能比直接将其发送到死信主题更有用。有关示例,请参阅 此 Stack Overflow 问题

以下错误处理器配置将完全实现这一点

@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
    return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
            (rec, ex) -> {
                org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
                if (retries == null) {
                    retries = new RecordHeader("retries", new byte[] { 1 });
                    rec.headers().add(retries);
                }
                else {
                    retries.value()[0]++;
                }
                return retries.value()[0] > 5
                        ? new TopicPartition("topic-dlt", rec.partition())
                        : new TopicPartition("topic", rec.partition());
            }), new FixedBackOff(0L, 0L));
}

从版本 2.7 开始,恢复器检查目标解析器选择的分区是否存在。如果分区不存在,ProducerRecord 中的分区将设置为 null,允许 KafkaProducer 选择分区。您可以通过将 verifyPartition 属性设置为 false 来禁用此检查。

从版本 3.1 开始,将 logRecoveryRecord 属性设置为 true 将记录恢复记录和异常。

管理死信记录头部

参考上文的 发布死信记录DeadLetterPublishingRecoverer 有两个属性用于在头部已经存在时(例如,当重新处理失败的死信记录时,包括使用 非阻塞重试 时)管理头部。

  • appendOriginalHeaders (默认 true)

  • stripPreviousExceptionHeaders (自版本 2.8 起默认 true)

Apache Kafka 支持多个同名头部;要获取“最新”值,您可以使用 headers.lastHeader(headerName);要获取多个头部的迭代器,请使用 headers.headers(headerName).iterator()

当重复发布失败的记录时,这些头部可能会增长(并最终由于 RecordTooLargeException 导致发布失败);异常头部尤其如此,特别是堆栈跟踪头部。

这两个属性的原因是,虽然您可能只想保留最后的异常信息,但您可能希望保留记录在每次失败时经过哪些主题的历史记录。

appendOriginalHeaders 应用于所有名为 ORIGINAL 的头部,而 stripPreviousExceptionHeaders 应用于所有名为 EXCEPTION 的头部。

从版本 2.8.4 开始,您现在可以控制将哪些标准头部添加到输出记录中。请参阅 enum HeadersToAdd 以了解默认添加的(当前)10 个标准头部的通用名称(这些不是实际的头部名称,只是一个抽象;实际的头部名称由 getHeaderNames() 方法设置,子类可以覆盖此方法)。

要排除头部,请使用 excludeHeaders() 方法;例如,要禁止在头部中添加异常堆栈跟踪,请使用

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);

此外,您可以通过添加一个 ExceptionHeadersCreator 来完全自定义异常头部的添加;这也会禁用所有标准异常头部。

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
    kafkaHeaders.add(new RecordHeader(..., ...));
});

同样从版本 2.8.4 开始,您现在可以通过 addHeadersFunction 方法提供多个头部函数。这允许应用额外的函数,即使已经注册了另一个函数,例如在使用 非阻塞重试 时。

ExponentialBackOffWithMaxRetries 实现

Spring Framework 提供了许多 BackOff 实现。默认情况下,ExponentialBackOff 会无限重试;要在若干次重试尝试后放弃,需要计算 maxElapsedTime。从版本 2.7.3 开始,Spring for Apache Kafka 提供了 ExponentialBackOffWithMaxRetries,它是 ExponentialBackOff 的一个子类,接收 maxRetries 属性并自动计算 maxElapsedTime,这会更方便一些。

@Bean
DefaultErrorHandler handler() {
    ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
    bo.setInitialInterval(1_000L);
    bo.setMultiplier(2.0);
    bo.setMaxInterval(10_000L);
    return new DefaultErrorHandler(myRecoverer, bo);
}

这将在 1、2、4、8、10、10 秒后重试,然后调用恢复器。