空载荷和“墓碑”记录的日志压缩
当您使用日志压缩时,可以发送和接收带有null载荷的消息,以标识键的删除。
您也可能因为其他原因收到null值,例如Deserializer在无法反序列化值时可能会返回null。
要使用KafkaTemplate发送null载荷,您可以将null传递给send()方法的value参数。其中一个例外是send(Message<?> message)变体。由于spring-messaging的Message<?>不能有null载荷,您可以使用一种特殊的载荷类型,称为KafkaNull,框架会发送null。为方便起见,提供了静态的KafkaNull.INSTANCE。
当您使用消息监听器容器时,收到的ConsumerRecord的value()为null。
要配置@KafkaListener来处理null载荷,您必须使用@Payload注解并设置required = false。如果它是用于压缩日志的墓碑消息,您通常还需要键,以便您的应用程序可以确定哪个键被“删除”了。以下示例展示了这样的配置。
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
当您使用类级别的@KafkaListener和多个@KafkaHandler方法时,需要一些额外的配置。具体来说,您需要一个带有KafkaNull载荷的@KafkaHandler方法。以下示例展示了如何配置一个。
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
请注意,参数是null,而不是KafkaNull。
| 请参阅手动分配所有分区。 |
此功能需要使用KafkaNullAwarePayloadArgumentResolver,框架在使用默认MessageHandlerMethodFactory时将配置它。当使用自定义MessageHandlerMethodFactory时,请参阅向@KafkaListener添加自定义HandlerMethodArgumentResolver。 |