空载荷和 'Tombstone' 记录的日志压缩
使用日志压缩 (Log Compaction) 时,您可以发送和接收包含 null
载荷的消息,以标识某个键已被删除。
您也可能由于其他原因收到 null
值,例如反序列化器 (Deserializer) 在无法反序列化值时可能返回 null
。
要使用 KafkaTemplate
发送 null
载荷,可以将 null
传递给 send()
方法的值参数。一个例外是 send(Message>)
变体。由于 spring-messaging
的 Message>
不能包含 null
载荷,您可以使用一个特殊的载荷类型 KafkaNull
,框架会发送 null
。为了方便起见,提供了静态的 KafkaNull.INSTANCE
。
当您使用消息监听器容器时,接收到的 ConsumerRecord
的 value()
为 null
。
要配置 @KafkaListener
来处理 null
载荷,必须使用 @Payload
注解并设置 required = false
。如果它是压缩日志的 Tombstone 消息,通常您还需要获取键,以便您的应用能够确定哪个键被“删除”了。以下示例展示了这样的配置:
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
当您在类级别使用 @KafkaListener
并且包含多个 @KafkaHandler
方法时,需要额外的配置。具体来说,您需要一个使用 KafkaNull
载荷的 @KafkaHandler
方法。以下示例展示了如何配置:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
请注意,这里的参数是 null
,而不是 KafkaNull
。
参见 手动分配所有分区。 |
此功能需要使用 KafkaNullAwarePayloadArgumentResolver ,框架在使用默认的 MessageHandlerMethodFactory 时会进行配置。如果使用自定义的 MessageHandlerMethodFactory ,请参见 向 @KafkaListener 添加自定义 HandlerMethodArgumentResolver 。 |