空载荷和 'Tombstone' 记录的日志压缩
使用日志压缩 (Log Compaction) 时,您可以发送和接收包含 null 载荷的消息,以标识某个键已被删除。
您也可能由于其他原因收到 null 值,例如反序列化器 (Deserializer) 在无法反序列化值时可能返回 null。
要使用 KafkaTemplate 发送 null 载荷,可以将 null 传递给 send() 方法的值参数。一个例外是 send(Message>) 变体。由于 spring-messaging 的 Message> 不能包含 null 载荷,您可以使用一个特殊的载荷类型 KafkaNull,框架会发送 null。为了方便起见,提供了静态的 KafkaNull.INSTANCE。
当您使用消息监听器容器时,接收到的 ConsumerRecord 的 value() 为 null。
要配置 @KafkaListener 来处理 null 载荷,必须使用 @Payload 注解并设置 required = false。如果它是压缩日志的 Tombstone 消息,通常您还需要获取键,以便您的应用能够确定哪个键被“删除”了。以下示例展示了这样的配置:
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
当您在类级别使用 @KafkaListener 并且包含多个 @KafkaHandler 方法时,需要额外的配置。具体来说,您需要一个使用 KafkaNull 载荷的 @KafkaHandler 方法。以下示例展示了如何配置:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
请注意,这里的参数是 null,而不是 KafkaNull。
| 参见 手动分配所有分区。 |
此功能需要使用 KafkaNullAwarePayloadArgumentResolver,框架在使用默认的 MessageHandlerMethodFactory 时会进行配置。如果使用自定义的 MessageHandlerMethodFactory,请参见 向 @KafkaListener 添加自定义 HandlerMethodArgumentResolver。 |