异常处理
本节介绍在使用 Spring for Apache Kafka 时可能出现的各种异常的处理方式。
监听器错误处理器
从 2.0 版本开始,@KafkaListener
注解新增了一个属性:errorHandler
。
您可以使用 errorHandler
提供 KafkaListenerErrorHandler
实现的 bean 名称。这个函数式接口有一个方法,如下所示:
@FunctionalInterface
public interface KafkaListenerErrorHandler {
Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;
}
您可以访问消息转换器生成的 spring-messaging Message<?>
对象以及监听器抛出的异常(该异常被包装在 ListenerExecutionFailedException
中)。错误处理器可以抛出原始异常或新异常,这些异常会被抛到容器。错误处理器返回的任何值都会被忽略。
从 2.7 版本开始,您可以在 MessagingMessageConverter
和 BatchMessagingMessageConverter
上设置 rawRecordHeader
属性,这将导致原始的 ConsumerRecord
被添加到已转换的 Message<?>
的 KafkaHeaders.RAW_DATA
头部中。例如,如果您希望在监听器错误处理器中使用 DeadLetterPublishingRecoverer
,这非常有用。它可能用于请求/回复场景,在这种场景下,您希望在几次重试后,将失败结果发送给发送方,并在死信主题中捕获失败的记录。
@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
return "FAILED";
}
throw ex;
};
}
它有一个子接口(ConsumerAwareListenerErrorHandler
),通过以下方法可以访问 consumer 对象:
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
另一个子接口(ManualAckListenerErrorHandler
)在使用手动 AckMode
时提供对 Acknowledgment
对象的访问。
Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer, @Nullable Acknowledgment ack);
在任何一种情况下,您都不应该对 consumer 执行 seek 操作,因为容器对此一无所知。
容器错误处理器
从 2.8 版本开始,遗留的 ErrorHandler
和 BatchErrorHandler
接口已被新的 CommonErrorHandler
取代。这些错误处理器可以处理 record 监听器和 batch 监听器中的错误,允许单个监听器容器工厂为两种类型的监听器创建容器。框架提供了 CommonErrorHandler
的实现来替代大多数遗留框架错误处理器实现。
有关将自定义遗留错误处理器实现迁移到 CommonErrorHandler
的信息,请参阅将自定义遗留错误处理器实现迁移到 CommonErrorHandler
。
使用事务时,默认情况下未配置错误处理器,以便异常可以回滚事务。事务容器的错误处理由 AfterRollbackProcessor
处理。如果在事务中使用自定义错误处理器,则必须抛出异常才能使事务回滚。
这个接口有一个默认方法 isAckAfterHandle()
,容器会调用它来确定如果错误处理器返回而没有抛出异常,是否应该提交 offset(s);默认情况下它返回 true。
通常,框架提供的错误处理器在错误未被“处理”(例如,执行 seek 操作后)时会抛出异常。默认情况下,容器会在 ERROR
级别记录此类异常。所有框架错误处理器都扩展了 KafkaExceptionLogLevelAware
,它允许您控制这些异常的日志级别。
/**
* Set the level at which the exception thrown by this handler is logged.
* @param logLevel the level (default ERROR).
*/
public void setLogLevel(KafkaException.Level logLevel) {
...
}
您可以指定一个全局错误处理器,用于容器工厂中的所有监听器。以下示例展示了如何进行设置:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setCommonErrorHandler(myErrorHandler);
...
return factory;
}
默认情况下,如果带注解的监听器方法抛出异常,该异常会抛到容器,并根据容器配置处理消息。
容器在调用错误处理器之前提交任何待处理的 offset 提交。
如果您使用 Spring Boot,只需将错误处理器添加为 @Bean
,Boot 会将其添加到自动配置的工厂中。
退避处理器
像 DefaultErrorHandler
这样的错误处理器使用 BackOff
来确定重试投递前需要等待多长时间。从 2.9 版本开始,您可以配置自定义的 BackOffHandler
。默认处理器仅暂停线程,直到退避时间过去(或容器停止)。框架还提供了 ContainerPausingBackOffHandler
,它会暂停监听器容器,直到退避时间过去,然后恢复容器。当延迟时间长于 consumer 属性 max.poll.interval.ms
时,这非常有用。请注意,实际退避时间的精度会受到 pollTimeout
容器属性的影响。
DefaultErrorHandler
这个新的错误处理器取代了 SeekToCurrentErrorHandler
和 RecoveringBatchErrorHandler
,它们在之前的几个版本中一直是默认的错误处理器。一个区别是,对于 batch 监听器(当抛出 BatchListenerFailedException
以外的异常时),回退行为等同于重试完整的批次。
从 2.9 版本开始,DefaultErrorHandler 可以配置为提供与 seek 未处理记录 offset 相同的语义(如下所述),但实际上并不执行 seek 操作。相反,记录由监听器容器保留,并在错误处理器退出后(以及执行单次暂停的 poll() 以保持 consumer 活跃后;如果使用了非阻塞重试或 ContainerPausingBackOffHandler ,暂停可能会持续多次 poll)重新提交给监听器。错误处理器向容器返回一个结果,指示当前失败的记录是否可以重新提交,或者是否已恢复且不会再次发送给监听器。要启用此模式,请将属性 seekAfterError 设置为 false 。 |
错误处理器可以恢复(跳过)持续失败的记录。默认情况下,在失败十次后,失败的记录会被记录日志(在 ERROR
级别)。您可以为处理器配置一个自定义的 recoverer (BiConsumer
) 和一个 BackOff
,用于控制投递尝试次数和每次尝试之间的延迟。使用 FixedBackOff
并设置 FixedBackOff.UNLIMITED_ATTEMPTS
会导致(实际上是)无限重试。以下示例配置了失败三次后进行恢复:
DefaultErrorHandler errorHandler =
new DefaultErrorHandler((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
要使用此处理器的自定义实例配置监听器容器,请将其添加到容器工厂。
例如,使用 @KafkaListener
容器工厂,您可以按如下方式添加 DefaultErrorHandler
:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
return factory;
}
对于 record 监听器,这将重试投递最多 2 次(共 3 次尝试),每次间隔 1 秒,而不是默认配置(FixedBackOff(0L, 9)
)。重试耗尽后,失败只会被记录日志。
例如,如果 poll
返回六条记录(分区 0、1、2 各两条),并且监听器在第四条记录上抛出异常,容器会通过提交前三条消息的 offset 来确认它们。DefaultErrorHandler
会 seek 到分区 1 的 offset 1 和分区 2 的 offset 0。下一次 poll()
会返回三条未处理的记录。
如果 AckMode
是 BATCH
,容器会在调用错误处理器之前提交前两个分区的 offset。
对于 batch 监听器,监听器必须抛出 BatchListenerFailedException
来指示批次中的哪些记录失败了。
事件序列如下:
-
提交索引之前的记录的 offset。
-
如果重试次数未耗尽,则执行 seek 操作,以便所有剩余记录(包括失败的记录)都将重新投递。
-
如果重试次数耗尽,尝试恢复失败的记录(默认仅记录日志),并执行 seek 操作,以便剩余记录(不包括失败的记录)将重新投递。已恢复记录的 offset 会被提交。
-
如果重试次数耗尽且恢复失败,则执行 seek 操作,就像重试次数未耗尽一样。
从 2.9 版本开始,DefaultErrorHandler 可以配置为提供与 seek 未处理记录 offset 相同的语义(如上所述),但实际上并不执行 seek 操作。相反,错误处理器创建一个新的 ConsumerRecords<?, ?> ,其中只包含未处理的记录,然后将这些记录提交给监听器(在执行单次暂停的 poll() 以保持 consumer 活跃后)。要启用此模式,请将属性 seekAfterError 设置为 false 。 |
默认的 recoverer 在重试次数耗尽后记录失败的记录。您可以使用自定义的 recoverer,或者使用框架提供的 recoverer,例如 DeadLetterPublishingRecoverer
。
使用 POJO batch 监听器(例如 List<Thing>
)时,如果您没有完整的 consumer record 可以添加到异常中,只需添加失败记录的索引即可:
@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
for (int i = 0; i < things.size(); i++) {
try {
process(things.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", i);
}
}
}
当容器配置为 AckMode.MANUAL_IMMEDIATE
时,错误处理器可以配置为提交已恢复记录的 offset;将 commitRecovered
属性设置为 true
。
另请参阅发布死信记录。
使用事务时,DefaultAfterRollbackProcessor
提供了类似的功能。请参阅回滚后处理器。
DefaultErrorHandler
认为某些异常是致命的,会跳过此类异常的重试;recoverer 在第一次失败时就被调用。默认情况下,被认为是致命的异常包括:
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
因为这些异常在重试投递时不太可能得到解决。
您可以将更多异常类型添加到不可重试类别中,或者完全替换已分类异常的映射。有关更多信息,请参阅 DefaultErrorHandler.addNotRetryableException()
和 DefaultErrorHandler.setClassifications()
的 Javadoc,以及 spring-retry
的 BinaryExceptionClassifier
的 Javadoc。
以下是将 IllegalArgumentException
添加到不可重试异常的示例:
@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
handler.addNotRetryableExceptions(IllegalArgumentException.class);
return handler;
}
错误处理器可以配置一个或多个 RetryListener
,接收重试和恢复进度的通知。从 2.8.10 版本开始,添加了用于 batch 监听器的方法。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
}
default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
}
default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
}
}
有关更多信息,请参阅 Javadoc。
如果 recoverer 失败(抛出异常),失败的记录将包含在 seek 操作中。从 2.5.5 版本开始,如果 recoverer 失败,BackOff 默认情况下会重置,并且在再次尝试恢复之前,重新投递会再次经历退避过程。在早期版本中,BackOff 不会重置,并在下一次失败时再次尝试恢复。要恢复到之前的行为,请将处理器的 resetStateOnRecoveryFailure 属性设置为 false 。 |
从 2.6 版本开始,您现在可以为处理器提供一个 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>
,以便根据失败的记录和/或异常来确定使用的 BackOff
:
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回 null
,则使用处理器的默认 BackOff
。
从 2.6.3 版本开始,将 resetStateOnExceptionChange
设置为 true
,如果异常类型在失败之间发生变化,重试序列将重新启动(包括选择新的 BackOff
,如果已配置)。默认情况下,不考虑异常类型。
从 2.9 版本开始,默认情况下此值为 true
。
另请参阅投递尝试次数头部。
Batch 错误处理器中的转换错误
从 2.8 版本开始,使用 ByteArrayDeserializer
、BytesDeserializer
或 StringDeserializer
以及 DefaultErrorHandler
的 MessageConverter
时,batch 监听器现在可以正确处理转换错误。发生转换错误时,payload 被设置为 null
,并且会将反序列化异常添加到记录头部中,类似于 ErrorHandlingDeserializer
。监听器中提供了一个 ConversionException
列表,因此监听器可以抛出 BatchListenerFailedException
来指示发生转换异常的第一个索引。
示例
@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
for (int i = 0; i < in.size(); i++) {
Foo foo = in.get(i);
if (foo == null && exceptions.get(i) != null) {
throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
}
process(foo);
}
}
重试完整的批次
当 batch 监听器抛出 BatchListenerFailedException
以外的异常时,这现在是 DefaultErrorHandler
的回退行为。
无法保证在批次重新投递时,批次中的记录数量相同和/或重新投递的记录顺序相同。因此,很难为批次轻松维护重试状态。FallbackBatchErrorHandler
采用以下方法。如果 batch 监听器抛出非 BatchListenerFailedException
的异常,则从内存中的记录批次执行重试。为了避免在长时间的重试序列中发生 rebalance,错误处理器会暂停 consumer,在每次重试的退避睡眠之前进行 poll,然后再次调用监听器。如果/当重试耗尽时,会为批次中的每条记录调用 ConsumerRecordRecoverer
。如果 recoverer 抛出异常,或者线程在其睡眠期间被中断,则批次记录将在下一次 poll 时重新投递。在退出之前,无论结果如何,consumer 都会恢复。
此机制不能用于事务。 |
在等待 BackOff
间隔期间,错误处理器会循环短暂睡眠,直到达到期望的延迟,同时检查容器是否已停止,以便睡眠能在 stop()
后尽快退出,而不是导致延迟。
停止容器错误处理器
如果监听器抛出异常,CommonContainerStoppingErrorHandler
会停止容器。对于 record 监听器,当 AckMode
是 RECORD
时,已处理记录的 offset 会被提交。对于 record 监听器,当 AckMode
是任何手动值时,已确认记录的 offset 会被提交。对于 record 监听器,当 AckMode
是 BATCH
,或者对于 batch 监听器,容器重新启动时会重新处理整个批次。
容器停止后,会抛出一个包装了 ListenerExecutionFailedException
的异常。这是为了使事务回滚(如果启用了事务)。
委托错误处理器
CommonDelegatingErrorHandler
可以根据异常类型委托给不同的错误处理器。例如,您可能希望对大多数异常调用 DefaultErrorHandler
,而对其他异常调用 CommonContainerStoppingErrorHandler
。
所有委托必须共享相同的兼容属性(ackAfterHandle
, seekAfterError
等)。
记录日志错误处理器
CommonLoggingErrorHandler
只记录异常日志;对于 record 监听器,前一次 poll 中剩余的记录会传递给监听器。对于 batch 监听器,批次中的所有记录都会被记录日志。
对 Record 和 Batch 监听器使用不同的通用错误处理器
如果您希望对 record 和 batch 监听器采用不同的错误处理策略,框架提供了 CommonMixedErrorHandler
,它允许为每种监听器类型配置特定的错误处理器。
通用错误处理器总结
-
DefaultErrorHandler
-
CommonContainerStoppingErrorHandler
-
CommonDelegatingErrorHandler
-
CommonLoggingErrorHandler
-
CommonMixedErrorHandler
遗留错误处理器及其替代方案
遗留错误处理器 | 替代方案 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
没有替代方案,使用配置了无限 |
|
|
|
没有替代方案,使用 |
将自定义遗留错误处理器实现迁移到 CommonErrorHandler
请参阅 CommonErrorHandler
中的 Javadoc。
要替换 ErrorHandler
或 ConsumerAwareErrorHandler
实现,您应该实现 handleOne()
并让 seeksAfterHandle()
返回 false
(默认值)。您还应该实现 handleOtherException()
来处理在记录处理范围之外发生的异常(例如 consumer 错误)。
要替换 RemainingRecordsErrorHandler
实现,您应该实现 handleRemaining()
并重写 seeksAfterHandle()
以返回 true
(错误处理器必须执行必要的 seek 操作)。您还应该实现 handleOtherException()
- 以处理在记录处理范围之外发生的异常(例如 consumer 错误)。
要替换任何 BatchErrorHandler
实现,您应该实现 handleBatch()
。您还应该实现 handleOtherException()
- 以处理在记录处理范围之外发生的异常(例如 consumer 错误)。
回滚后处理器
使用事务时,如果监听器抛出异常(并且如果存在错误处理器,它也抛出异常),事务将回滚。默认情况下,任何未处理的记录(包括失败的记录)会在下一次 poll 时重新获取。这是通过在 DefaultAfterRollbackProcessor
中执行 seek 操作实现的。对于 batch 监听器,整个批次记录会重新处理(容器不知道批次中哪个记录失败了)。要修改此行为,您可以为监听器容器配置自定义的 AfterRollbackProcessor
。例如,对于基于 record 的监听器,您可能希望跟踪失败的记录并在尝试一定次数后放弃,或许通过将其发布到死信主题。
从 2.2 版本开始,DefaultAfterRollbackProcessor
现在可以恢复(跳过)持续失败的记录。默认情况下,在失败十次后,失败的记录会被记录日志(在 ERROR
级别)。您可以为处理器配置自定义的 recoverer (BiConsumer
) 和最大失败次数。将 maxFailures
属性设置为负数会导致无限重试。以下示例配置了失败三次后进行恢复:
AfterRollbackProcessor<String, String> processor =
new DefaultAfterRollbackProcessor((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
当您不使用事务时,可以通过配置 DefaultErrorHandler
实现类似功能。请参阅容器错误处理器。
从 3.2 版本开始,现在可以恢复(跳过)持续失败的整个批次记录。设置 ContainerProperties.setBatchRecoverAfterRollback(true)
来启用此功能。
默认行为下,对于 batch 监听器无法进行恢复,因为框架不知道批次中哪个记录持续失败。在这种情况下,应用程序监听器必须处理持续失败的记录。 |
另请参阅发布死信记录。
从 2.2.5 版本开始,可以在新事务中调用 DefaultAfterRollbackProcessor
(在失败事务回滚后启动)。然后,如果您使用 DeadLetterPublishingRecoverer
发布失败的记录,处理器会将已恢复记录在原始 topic/partition 中的 offset 发送到该事务。要启用此功能,请在 DefaultAfterRollbackProcessor
上设置 commitRecovered
和 kafkaTemplate
属性。
如果 recoverer 失败(抛出异常),失败的记录将包含在 seek 操作中。从 2.5.5 版本开始,如果 recoverer 失败,BackOff 默认情况下会重置,并且在再次尝试恢复之前,重新投递会再次经历退避过程。在早期版本中,BackOff 不会重置,并在下一次失败时再次尝试恢复。要恢复到之前的行为,请将处理器的 resetStateOnRecoveryFailure 属性设置为 false 。 |
从 2.6 版本开始,您现在可以为处理器提供一个 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>
,以便根据失败的记录和/或异常来确定使用的 BackOff
:
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回 null
,则使用处理器的默认 BackOff
。
从 2.6.3 版本开始,将 resetStateOnExceptionChange
设置为 true
,如果异常类型在失败之间发生变化,重试序列将重新启动(包括选择新的 BackOff
,如果已配置)。默认情况下,不考虑异常类型。
从 2.3.1 版本开始,与 DefaultErrorHandler
类似,DefaultAfterRollbackProcessor
认为某些异常是致命的,会跳过此类异常的重试;recoverer 在第一次失败时就被调用。默认情况下,被认为是致命的异常包括:
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
因为这些异常在重试投递时不太可能得到解决。
您可以将更多异常类型添加到不可重试类别中,或者完全替换已分类异常的映射。有关更多信息,请参阅 DefaultAfterRollbackProcessor.setClassifications()
的 Javadoc,以及 spring-retry
的 BinaryExceptionClassifier
的 Javadoc。
以下是将 IllegalArgumentException
添加到不可重试异常的示例:
@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
processor.addNotRetryableException(IllegalArgumentException.class);
return processor;
}
另请参阅投递尝试次数头部。
使用当前的 kafka-clients ,容器无法检测到 ProducerFencedException 是由 rebalance 引起的,还是由于超时或过期导致 producer 的 transactional.id 被撤销。因为在大多数情况下,它是由 rebalance 引起的,容器不会调用 AfterRollbackProcessor (因为对分区执行 seek 操作不合适,因为我们不再被分配这些分区)。如果您确保超时时间足够长以处理每个事务,并定期执行“空”事务(例如,通过 ListenerContainerIdleEvent ),则可以避免因超时和过期导致的 fencing。或者,您可以将 stopContainerWhenFenced 容器属性设置为 true ,容器将停止,避免记录丢失。您可以消费 ConsumerStoppedEvent 并检查其 Reason 属性是否为 FENCED 来检测此情况。由于事件还包含对容器的引用,您可以使用此事件重新启动容器。 |
从 2.7 版本开始,在等待 BackOff
间隔期间,错误处理器会循环短暂睡眠,直到达到期望的延迟,同时检查容器是否已停止,以便睡眠能在 stop()
后尽快退出,而不是导致延迟。
从 2.7 版本开始,处理器可以配置一个或多个 RetryListener
,接收重试和恢复进度的通知。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
}
有关更多信息,请参阅 Javadoc。
投递尝试次数头部
以下内容仅适用于 record 监听器,不适用于 batch 监听器。
从 2.5 版本开始,使用实现 DeliveryAttemptAware
的 ErrorHandler
或 AfterRollbackProcessor
时,可以将 KafkaHeaders.DELIVERY_ATTEMPT
头部 (kafka_deliveryAttempt
) 添加到记录中。此头部的值是一个从 1 开始递增的整数。接收原始 ConsumerRecord<?, ?>
时,该整数存储在 byte[4]
中。
int delivery = ByteBuffer.wrap(record.headers()
.lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
.getInt();
使用 @KafkaListener
并配合 DefaultKafkaHeaderMapper
或 SimpleKafkaHeaderMapper
时,可以通过向监听器方法添加参数 @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery
来获取该头部。
要启用此头部的填充,请将容器属性 deliveryAttemptHeader
设置为 true
。默认情况下禁用此功能,以避免查找每条记录的状态并添加头部带来的(较小的)开销。
DefaultErrorHandler
和 DefaultAfterRollbackProcessor
支持此功能。
Batch 监听器的投递尝试次数头部
使用 BatchListener
处理 ConsumerRecord
时,与 SingleRecordListener
相比,KafkaHeaders.DELIVERY_ATTEMPT
头部可能以不同的方式存在。
从 3.3 版本开始,如果您希望在使用 BatchListener
时将 KafkaHeaders.DELIVERY_ATTEMPT
头部注入到 ConsumerRecord
中,请将 DeliveryAttemptAwareRetryListener
设置为 ErrorHandler
中的 RetryListener
。
请参考以下代码。
final FixedBackOff fixedBackOff = new FixedBackOff(1, 10);
final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);
然后,每当批次未能完成时,DeliveryAttemptAwareRetryListener
会将 KafkaHeaders.DELIVERY_ATTMPT
头部注入到 ConsumerRecord
中。
监听器信息头部
在某些情况下,能够知道监听器运行在哪个容器中很有用。
从 2.8.4 版本开始,您现在可以在监听器容器上设置 listenerInfo
属性,或在 @KafkaListener
注解上设置 info
属性。然后,容器会将此信息添加到所有入站消息的 KafkaListener.LISTENER_INFO
头部中;此头部可用于 record 拦截器、过滤器等,或在监听器本身中使用。
@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
info = "this is the something listener")
public void listen(@Payload Thing thing,
@Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
...
}
在 RecordInterceptor
或 RecordFilterStrategy
实现中使用时,头部以字节数组形式存在于 consumer record 中,使用 KafkaListenerAnnotationBeanPostProcessor
的 charSet
属性进行转换。
头部映射器在从 consumer record 创建 MessageHeaders
时也会转换为 String
,并且永远不会在出站记录上映射此头部。
对于 POJO batch 监听器,从 2.8.6 版本开始,头部会复制到批次中的每个成员,并在转换后也作为单个 String
参数可用。
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
info = "info for batch")
public void listen(List<Thing> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.LISTENER_INFO) String info) {
...
}
如果 batch 监听器有过滤器且过滤结果为空批次,您需要在 @Header 参数中添加 required = false ,因为空批次不提供该信息。 |
如果您接收到 List<Message<Thing>>
,则信息位于每个 Message<?>
的 KafkaHeaders.LISTENER_INFO
头部中。
有关消费批次的更多信息,请参阅Batch 监听器。
发布死信记录
您可以使用记录恢复器配置 DefaultErrorHandler
和 DefaultAfterRollbackProcessor
,以便在记录达到最大失败次数时进行处理。该框架提供了 DeadLetterPublishingRecoverer
,它将失败的消息发布到另一个主题。此恢复器需要一个 KafkaTemplate<Object, Object>
,用于发送记录。您还可以选择性地配置一个 BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
,它会被调用来解析目标主题和分区。
默认情况下,死信记录会被发送到名为 <originalTopic>-dlt 的主题(原始主题名称后附加 -dlt ),并发送到与原始记录相同的分区。因此,当您使用默认解析器时,死信主题必须至少拥有与原始主题一样多的分区。 |
如果返回的 TopicPartition
的分区为负数,则在 ProducerRecord
中不会设置分区,分区将由 Kafka 选择。从版本 2.2.4 开始,任何 ListenerExecutionFailedException
(例如,在 @KafkaListener
方法中检测到异常时抛出)都增强了 groupId
属性。这使得目标解析器除了使用 ConsumerRecord
中的信息外,还可以利用此属性来选择死信主题。
以下示例展示了如何配置一个自定义目标解析器
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
}
else {
return new TopicPartition(r.topic() + ".other.failures", r.partition());
}
});
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));
发送到死信主题的记录会增强以下头部(headers)
-
KafkaHeaders.DLT_EXCEPTION_FQCN
:异常类名(通常是ListenerExecutionFailedException
,但也可以是其他)。 -
KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN
:异常原因的类名,如果存在的话(从版本 2.8 开始)。 -
KafkaHeaders.DLT_EXCEPTION_STACKTRACE
:异常的堆栈跟踪。 -
KafkaHeaders.DLT_EXCEPTION_MESSAGE
:异常消息。 -
KafkaHeaders.DLT_KEY_EXCEPTION_FQCN
:异常类名(仅限键反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE
:异常的堆栈跟踪(仅限键反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE
:异常消息(仅限键反序列化错误)。 -
KafkaHeaders.DLT_ORIGINAL_TOPIC
:原始主题。 -
KafkaHeaders.DLT_ORIGINAL_PARTITION
:原始分区。 -
KafkaHeaders.DLT_ORIGINAL_OFFSET
:原始偏移量。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP
:原始时间戳。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE
:原始时间戳类型。 -
KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP
:未能处理该记录的原始消费者组(从版本 2.8 开始)。
键异常仅由 DeserializationException
引起,因此没有 DLT_KEY_EXCEPTION_CAUSE_FQCN
。
有两种机制可以添加更多头部。
-
子类化恢复器并覆盖
createProducerRecord()
方法 - 调用super.createProducerRecord()
并添加更多头部。 -
提供一个
BiFunction
来接收消费者记录和异常,并返回一个Headers
对象;这些头部将复制到最终的生产者记录中;另请参阅 管理死信记录头部。使用setHeadersFunction()
方法设置此BiFunction
。
第二种实现起来更简单,但第一种提供了更多可用信息,包括已经组装好的标准头部。
从版本 2.3 开始,当与 ErrorHandlingDeserializer
一起使用时,发布器将在死信生产者记录中恢复记录的 value()
为原始的、未能反序列化的值。在此之前,value()
是 null,用户代码必须从消息头部中解码 DeserializationException
。此外,您可以向发布器提供多个 KafkaTemplate
;例如,如果您想发布 DeserializationException
中的 byte[]
,以及使用与成功反序列化的记录不同的序列化器发布值,这可能就需要这样做。以下是使用 String
和 byte[]
序列化器配置发布器的示例
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
KafkaTemplate<?, ?> bytesTemplate) {
Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
templates.put(String.class, stringTemplate);
templates.put(byte[].class, bytesTemplate);
return new DeadLetterPublishingRecoverer(templates);
}
发布器使用 Map 的键来查找适用于即将发布的值(value()
)的模板。建议使用 LinkedHashMap
,以便按顺序检查键。
发布 null
值且存在多个模板时,恢复器将查找适用于 Void
类的模板;如果不存在,将使用 values().iterator()
中的第一个模板。
从 2.7 版本开始,您可以使用 setFailIfSendResultIsError
方法,以便在消息发布失败时抛出异常。您还可以使用 setWaitForSendResultTimeout
方法设置发送器成功验证的超时时间。
如果恢复器失败(抛出异常),失败的记录将包含在 seeks 操作中。从版本 2.5.5 开始,如果恢复器失败,默认情况下将重置 BackOff ,并且在再次尝试恢复之前,重新投递将再次经历回退过程。在早期版本中,BackOff 未被重置,并在下次失败时再次尝试恢复。要恢复到先前的行为,请将错误处理器的 resetStateOnRecoveryFailure 属性设置为 false 。 |
从 2.6.3 版本开始,将 resetStateOnExceptionChange
设置为 true
,如果异常类型在失败之间发生变化,重试序列将重新启动(包括选择新的 BackOff
,如果已配置)。默认情况下,不考虑异常类型。
从版本 2.3 开始,恢复器也可以用于 Kafka Streams - 有关更多信息,请参阅 从反序列化异常中恢复。
ErrorHandlingDeserializer
在头部 ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER
和 ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER
中添加反序列化异常(使用 Java 序列化)。默认情况下,这些头部不会保留在发布到死信主题的消息中。从版本 2.7 开始,如果键和值都反序列化失败,则两者在发送到 DLT 的记录中都会填充原始值。
如果入站记录相互依赖,但可能乱序到达,将失败的记录重新发布到原始主题的末尾(若干次)可能比直接将其发送到死信主题更有用。有关示例,请参阅 此 Stack Overflow 问题。
以下错误处理器配置将完全实现这一点
@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
(rec, ex) -> {
org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
if (retries == null) {
retries = new RecordHeader("retries", new byte[] { 1 });
rec.headers().add(retries);
}
else {
retries.value()[0]++;
}
return retries.value()[0] > 5
? new TopicPartition("topic-dlt", rec.partition())
: new TopicPartition("topic", rec.partition());
}), new FixedBackOff(0L, 0L));
}
从版本 2.7 开始,恢复器检查目标解析器选择的分区是否存在。如果分区不存在,ProducerRecord
中的分区将设置为 null
,允许 KafkaProducer
选择分区。您可以通过将 verifyPartition
属性设置为 false
来禁用此检查。
从版本 3.1 开始,将 logRecoveryRecord
属性设置为 true
将记录恢复记录和异常。
管理死信记录头部
-
appendOriginalHeaders
(默认true
) -
stripPreviousExceptionHeaders
(自版本 2.8 起默认true
)
Apache Kafka 支持多个同名头部;要获取“最新”值,您可以使用 headers.lastHeader(headerName)
;要获取多个头部的迭代器,请使用 headers.headers(headerName).iterator()
。
当重复发布失败的记录时,这些头部可能会增长(并最终由于 RecordTooLargeException
导致发布失败);异常头部尤其如此,特别是堆栈跟踪头部。
这两个属性的原因是,虽然您可能只想保留最后的异常信息,但您可能希望保留记录在每次失败时经过哪些主题的历史记录。
appendOriginalHeaders
应用于所有名为 ORIGINAL
的头部,而 stripPreviousExceptionHeaders
应用于所有名为 EXCEPTION
的头部。
从版本 2.8.4 开始,您现在可以控制将哪些标准头部添加到输出记录中。请参阅 enum HeadersToAdd
以了解默认添加的(当前)10 个标准头部的通用名称(这些不是实际的头部名称,只是一个抽象;实际的头部名称由 getHeaderNames()
方法设置,子类可以覆盖此方法)。
要排除头部,请使用 excludeHeaders()
方法;例如,要禁止在头部中添加异常堆栈跟踪,请使用
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);
此外,您可以通过添加一个 ExceptionHeadersCreator
来完全自定义异常头部的添加;这也会禁用所有标准异常头部。
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
kafkaHeaders.add(new RecordHeader(..., ...));
});
同样从版本 2.8.4 开始,您现在可以通过 addHeadersFunction
方法提供多个头部函数。这允许应用额外的函数,即使已经注册了另一个函数,例如在使用 非阻塞重试 时。
另请参阅 使用非阻塞重试进行失败头部管理。
ExponentialBackOffWithMaxRetries
实现
Spring Framework 提供了许多 BackOff
实现。默认情况下,ExponentialBackOff
会无限重试;要在若干次重试尝试后放弃,需要计算 maxElapsedTime
。从版本 2.7.3 开始,Spring for Apache Kafka 提供了 ExponentialBackOffWithMaxRetries
,它是 ExponentialBackOff
的一个子类,接收 maxRetries
属性并自动计算 maxElapsedTime
,这会更方便一些。
@Bean
DefaultErrorHandler handler() {
ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
bo.setInitialInterval(1_000L);
bo.setMultiplier(2.0);
bo.setMaxInterval(10_000L);
return new DefaultErrorHandler(myRecoverer, bo);
}
这将在 1、2、4、8、10、10 秒后重试,然后调用恢复器。