类上的 @KafkaListener
当您在类级别使用 @KafkaListener 时,您必须在方法级别指定 @KafkaHandler。如果此类或其子类的任何方法上没有 @KafkaHandler,框架将拒绝此类配置。@KafkaHandler 注解是方法明确和简洁目的所必需的。否则,很难在没有额外限制的情况下决定此方法或其他方法。
当消息传递时,转换后的消息载荷类型用于确定要调用的方法。以下示例展示了如何实现此目的
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
}
从 2.1.3 版本开始,您可以将 @KafkaHandler 方法指定为默认方法,如果与其他方法不匹配,则调用此方法。最多只能指定一个此类方法。当使用 @KafkaHandler 方法时,载荷必须已经转换为域对象(以便可以执行匹配)。使用自定义反序列化器、JacksonJsonDeserializer 或将 TypePrecedence 设置为 TYPE_ID 的 JacksonJsonMessageConverter。有关更多信息,请参阅序列化、反序列化和消息转换。
由于 Spring 解析方法参数的方式存在一些限制,默认的 @KafkaHandler 无法接收离散的标头;它必须使用 消费者记录元数据 中讨论的 ConsumerRecordMetadata。 |
例如:
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
如果对象是 String,这将不起作用;topic 参数也将获得对 object 的引用。
如果您在默认方法中需要有关记录的元数据,请使用此方法
@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
}
此外,这也将不起作用。topic 被解析为 payload。
@KafkaHandler(isDefault = true)
public void listenDefault(String payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
// payload.equals(topic) is True.
...
}
如果存在默认方法中需要离散自定义标头的用例,请使用此方法
@KafkaHandler(isDefault = true)
void listenDefault(String payload, @Headers Map<String, Object> headers) {
Object myValue = headers.get("MyCustomHeader");
...
}