在类上使用 @KafkaListener
当你在类级别使用 @KafkaListener 时,你必须在方法级别指定 @KafkaHandler。消息投递时,会根据转换后的消息载荷类型来确定调用哪个方法。以下示例展示了如何做到这一点:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
}
从 2.1.3 版本开始,你可以将一个 @KafkaHandler 方法指定为默认方法,如果没有匹配到其他方法,则会调用该方法。最多只能指定一个默认方法。当使用 @KafkaHandler 方法时,载荷必须已经转换为领域对象(以便进行匹配)。请使用自定义反序列化器、JsonDeserializer 或将 TypePrecedence 设置为 TYPE_ID 的 JsonMessageConverter。有关更多信息,请参阅 序列化、反序列化和消息转换。
由于 Spring 解析方法参数的方式存在一些限制,默认的 @KafkaHandler 不能接收离散的消息头;它必须使用 ConsumerRecordMetadata,如 消费者记录元数据 中所述。 |
例如:
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
如果对象是 String 类型,这将不起作用;topic 参数也将获得对 object 的引用。
如果你在默认方法中需要记录的元数据,请使用以下方式:
@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
}