过滤消息
在某些场景下,例如再平衡(rebalancing),已经处理过的消息可能会被重新投递。框架无法知道这样的消息是否已经被处理。这是应用层面的功能。这被称为幂等接收者模式,Spring Integration 提供了一个实现。
Spring for Apache Kafka 项目也通过 FilteringMessageListenerAdapter
类提供了一些帮助,它可以包装你的 MessageListener
。这个类接受一个 RecordFilterStrategy
的实现,你在其中实现 filter
方法来标记消息是否是重复的,是否应该被丢弃。它还有一个额外的属性 ackDiscarded
,表示适配器是否应该确认(acknowledge)被丢弃的记录。默认值是 false
。
当你使用 @KafkaListener
时,在容器工厂上设置 RecordFilterStrategy
(以及可选的 ackDiscarded
),以便监听器被包装在适当的过滤适配器中。
此外,还提供了 FilteringBatchMessageListenerAdapter
,用于你使用批量消息监听器的场景。
如果你的 @KafkaListener 接收的是 ConsumerRecords<?, ?> 而不是 List<ConsumerRecord<?, ?>> ,FilteringBatchMessageListenerAdapter 将被忽略,因为 ConsumerRecords 是不可变的。 |
从 2.8.4 版本开始,你可以通过监听器注解上的 filter
属性覆盖监听器容器工厂的默认 RecordFilterStrategy
。
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}
从 3.3 版本开始,支持忽略因 RecordFilterStrategy
过滤而产生的空批次。在实现 RecordFilterStrategy
时,可以通过 ignoreEmptyBatch()
进行配置。默认设置是 false
,表示即使所有 ConsumerRecord
都被过滤掉,KafkaListener
仍将被调用。
如果返回 true
,当所有 ConsumerRecord
都被过滤掉时,KafkaListener
将不会被调用。然而,提交到 broker 的操作仍然会执行。
如果返回 false
,当所有 ConsumerRecord
都被过滤掉时,KafkaListener
将会被调用。
以下是一些示例。
public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return true;
}
};
// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
在这种情况下,IgnoreEmptyBatchRecordFilterStrategy
总是返回空列表,并且 ignoreEmptyBatch()
的结果为 true
。因此 KafkaListener#listen(…)
将永远不会被调用。
public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return false;
}
};
// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
然而,在这种情况下,IgnoreEmptyBatchRecordFilterStrategy
总是返回空列表,并且 ignoreEmptyBatch()
的结果为 false
。因此 KafkaListener#listen(…)
总是会被调用。