空载荷和 'Tombstone' 记录的日志压缩

使用日志压缩 (Log Compaction) 时,您可以发送和接收包含 null 载荷的消息,以标识某个键已被删除。

您也可能由于其他原因收到 null 值,例如反序列化器 (Deserializer) 在无法反序列化值时可能返回 null

要使用 KafkaTemplate 发送 null 载荷,可以将 null 传递给 send() 方法的值参数。一个例外是 send(Message) 变体。由于 spring-messagingMessage 不能包含 null 载荷,您可以使用一个特殊的载荷类型 KafkaNull,框架会发送 null。为了方便起见,提供了静态的 KafkaNull.INSTANCE

当您使用消息监听器容器时,接收到的 ConsumerRecordvalue()null

要配置 @KafkaListener 来处理 null 载荷,必须使用 @Payload 注解并设置 required = false。如果它是压缩日志的 Tombstone 消息,通常您还需要获取键,以便您的应用能够确定哪个键被“删除”了。以下示例展示了这样的配置:

@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
    // value == null represents key deletion
}

当您在类级别使用 @KafkaListener 并且包含多个 @KafkaHandler 方法时,需要额外的配置。具体来说,您需要一个使用 KafkaNull 载荷的 @KafkaHandler 方法。以下示例展示了如何配置:

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String cat) {
        ...
    }

    @KafkaHandler
    public void listen(Integer hat) {
        ...
    }

    @KafkaHandler
    public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }

}

请注意,这里的参数是 null,而不是 KafkaNull

此功能需要使用 KafkaNullAwarePayloadArgumentResolver,框架在使用默认的 MessageHandlerMethodFactory 时会进行配置。如果使用自定义的 MessageHandlerMethodFactory,请参见 @KafkaListener 添加自定义 HandlerMethodArgumentResolver