错误处理
Apache Kafka Streams 提供了原生处理反序列化错误异常的能力。有关此支持的详细信息,请参阅此链接。开箱即用地,Apache Kafka Streams 提供了两种反序列化异常处理器 - LogAndContinueExceptionHandler
和 LogAndFailExceptionHandler
。顾名思义,前者将记录错误并继续处理下一条记录,而后者将记录错误并失败。LogAndFailExceptionHandler
是默认的反序列化异常处理器。
在 Binder 中处理反序列化异常
Kafka Streams Binder 允许使用以下属性指定上述反序列化异常处理器。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
或
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail
除了上述两种反序列化异常处理器,Binder 还提供了第三种,用于将错误记录(毒丸消息)发送到 DLQ(死信队列)主题。启用此 DLQ 异常处理器的方法如下。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq
设置上述属性后,所有反序列化错误的记录都会自动发送到 DLQ 主题。
您可以如下设置发布 DLQ 消息的主题名称。
您可以提供 DlqDestinationResolver
的实现,它是一个函数式接口。DlqDestinationResolver
接受 ConsumerRecord
和异常作为输入,然后允许指定一个主题名称作为输出。通过访问 Kafka ConsumerRecord
,可以在 BiFunction
的实现中检查头记录。
以下是提供 DlqDestinationResolver
实现的一个示例。
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
在提供 DlqDestinationResolver
实现时,需要记住一件重要的事情是,Binder 中的 provisioner 不会自动为应用程序创建主题。这是因为 Binder 无法推断出实现可能发送到的所有 DLQ 主题的名称。因此,如果使用此策略提供 DLQ 名称,应用程序有责任确保这些主题事先创建。
如果 DlqDestinationResolver
在应用程序中作为 bean 存在,则其优先级更高。如果您不想遵循此方法,而是希望使用配置提供静态 DLQ 名称,则可以设置以下属性。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)
如果设置了此属性,则错误记录将发送到主题 custom-dlq
。如果应用程序未使用上述任何一种策略,它将创建一个名为 error.<input-topic-name>.<application-id>
的 DLQ 主题。例如,如果您的绑定的目标主题是 inputTopic
,并且应用程序 ID 是 process-applicationId
,则默认的 DLQ 主题是 error.inputTopic.process-applicationId
。如果您打算启用 DLQ,始终建议为每个输入绑定显式创建一个 DLQ 主题。
每个输入消费者绑定的 DLQ
属性 spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
适用于整个应用程序。这意味着如果同一个应用程序中有多个函数,则此属性将应用于所有这些函数。但是,如果单个 Processor 中有多个 Processor 或多个输入绑定,则可以使用 Binder 为每个输入消费者绑定提供的更细粒度的 DLQ 控制。
如果您有以下 Processor:
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
并且您只想在第一个输入绑定上启用 DLQ,在第二个绑定上启用 skipAndContinue,那么可以在消费者上如下操作。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue
以这种方式设置反序列化异常处理器比在 Binder 级别设置具有更高的优先级。
DLQ 分区
默认情况下,记录使用与原始记录相同的分区发布到死信主题。这意味着死信主题必须至少有与原始记录相同数量的分区。
要更改此行为,请将 DlqPartitionFunction
实现作为 @Bean
添加到应用程序上下文中。只能存在一个此类 bean。该函数提供消费者组(在大多数情况下与应用程序 ID 相同)、失败的 ConsumerRecord
和异常。例如,如果您总是想路由到分区 0,您可以使用
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果将消费者绑定的 dlqPartitions 属性设置为 1(并且 Binder 的 minPartitionCount 等于 1 ),则无需提供 DlqPartitionFunction ;框架将始终使用分区 0。如果将消费者绑定的 dlqPartitions 属性设置为大于 1 的值(或 Binder 的 minPartitionCount 大于 1 ),则即使分区数量与原始主题相同,您也**必须**提供一个 DlqPartitionFunction bean。 |
使用 Kafka Streams Binder 的异常处理功能时需要记住几点。
-
属性
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
适用于整个应用程序。这意味着如果同一个应用程序中有多个函数,则此属性将应用于所有这些函数。 -
反序列化的异常处理与原生反序列化和框架提供的消息转换一致。
在 Binder 中处理生产异常
与上述描述的反序列化异常处理器的支持不同,Binder 没有提供这种一流的机制来处理生产异常。但是,您仍然可以使用 StreamsBuilderFactoryBean
定制器配置生产异常处理器,有关更多详细信息,请参阅下面的后续部分。
运行时错误处理
处理应用程序代码中的错误,即业务逻辑执行中的错误时,通常由应用程序负责处理。因为 Kafka Streams Binder 没有办法干预应用程序代码。然而,为了让应用程序更容易一些,Binder 提供了一个方便的 RecordRecoverableProcessor
,您可以使用它来指定如何处理应用程序级别的错误。
考虑以下代码。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input
.map(...);
}
如果上面您的 map
调用中的业务代码抛出异常,则您有责任处理该错误。这就是 RecordRecoverableProcessor
变得方便的地方。默认情况下,RecordRecoverableProcessor
将只记录错误并让应用程序继续。假设您想将失败的记录发布到 DLT,而不是在应用程序内部处理它。在这种情况下,您必须使用 RecordRecoverableProcessor
的自定义实现,称为 DltAwareProcessor
。您可以这样做:
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process(DltPublishingContext dltSenderContext) {
return input -> input
.process(() -> new DltAwareProcessor<>(record -> {
throw new RuntimeException("error");
}, "hello-dlt-1", dltPublishingContext));
}
原始 map
调用中的业务逻辑代码现在已作为 KStream#process
方法调用的一部分移动,该方法接受一个 ProcessorSupplier
。然后,我们传入自定义的 DltAwareProcessor
,它能够发布到 DLT。上面 DltAwareProcessor
的构造函数接受三个参数 - 一个接受输入记录并将业务逻辑操作作为 Function
主体一部分的 Function
,DLT 主题,以及最后的 DltPublishingContext
。当 Function
的 lambda 表达式抛出异常时,DltAwareProcessor
会将输入记录发送到 DLT。DltPublishingContext
为 DltAwareProcessor
提供必要的发布基础设施 bean。DltPublishingContext
由 Binder 自动配置,因此您可以直接将其注入到应用程序中。
如果您不希望 Binder 将失败的记录发布到 DLT,则必须直接使用 RecordRecoverableProcessor
而不是 DltAwareProcessor
。您可以提供自己的 recoverer 作为 BiConsumer
,它将输入 Record
和异常作为参数。假设一个场景,您不想将记录发送到 DLT,而只想简单地记录消息然后继续。下面是如何实现该目标的一个示例。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input
.process(() -> new RecordRecoverableProcessor<>(record -> {
throw new RuntimeException("error");
},
(record, exception) -> {
// Handle the record
}));
}
在这种情况下,当记录失败时,RecordRecoverableProcessor
使用用户提供的 recoverer,它是一个 BiConsumer
,接受失败的记录和抛出的异常作为参数。
在 DltAwareProcessor 中处理记录键
使用 DltAwareProcessor
将失败记录发送到 DLT 时,如果您想将记录键发送到 DLT 主题,则需要在 DLT 绑定上设置正确的序列化器。这是因为 DltAwareProcessor
使用 StreamBridge
,它使用常规的 Kafka Binder(基于消息通道),默认情况下对键使用 ByteArraySerializer
。对于记录值,Spring Cloud Stream 会将有效载荷转换为适当的 byte[]
;但是,对于键,情况并非如此,因为它只是传递在头部接收到的内容作为键。如果您提供的键不是字节数组,则可能会导致类转换异常,为避免这种情况,您需要在 DLT 绑定上如下设置序列化器。
假设 DLT 目标是 hello-dlt-1
并且记录键是 String 数据类型。
spring.cloud.stream.kafka.bindings.hello-dlt-1.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer