空载荷和“墓碑”记录的日志压缩

当您使用日志压缩时,可以发送和接收带有null载荷的消息,以标识键的删除。

您也可能因为其他原因收到null值,例如Deserializer在无法反序列化值时可能会返回null

要使用KafkaTemplate发送null载荷,您可以将null传递给send()方法的value参数。其中一个例外是send(Message<?> message)变体。由于spring-messagingMessage<?>不能有null载荷,您可以使用一种特殊的载荷类型,称为KafkaNull,框架会发送null。为方便起见,提供了静态的KafkaNull.INSTANCE

当您使用消息监听器容器时,收到的ConsumerRecordvalue()null

要配置@KafkaListener来处理null载荷,您必须使用@Payload注解并设置required = false。如果它是用于压缩日志的墓碑消息,您通常还需要键,以便您的应用程序可以确定哪个键被“删除”了。以下示例展示了这样的配置。

@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
    // value == null represents key deletion
}

当您使用类级别的@KafkaListener和多个@KafkaHandler方法时,需要一些额外的配置。具体来说,您需要一个带有KafkaNull载荷的@KafkaHandler方法。以下示例展示了如何配置一个。

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String cat) {
        ...
    }

    @KafkaHandler
    public void listen(Integer hat) {
        ...
    }

    @KafkaHandler
    public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }

}

请注意,参数是null,而不是KafkaNull

此功能需要使用KafkaNullAwarePayloadArgumentResolver,框架在使用默认MessageHandlerMethodFactory时将配置它。当使用自定义MessageHandlerMethodFactory时,请参阅@KafkaListener添加自定义HandlerMethodArgumentResolver
© . This site is unofficial and not affiliated with VMware.