过滤消息
Spring for Apache Kafka 项目还通过 FilteringMessageListenerAdapter 类提供了一些帮助,该类可以包装您的 MessageListener。该类需要一个 RecordFilterStrategy 的实现,您在该实现中实现 filter 方法以指示消息是重复的并且应该被丢弃。它还有一个附加属性 ackDiscarded,指示适配器是否应该确认被丢弃的记录。默认情况下,它为 false。
当您使用 @KafkaListener 时,在容器工厂上设置 RecordFilterStrategy(并可选地设置 ackDiscarded),以便监听器被包装在适当的过滤适配器中。
此外,还提供了 FilteringBatchMessageListenerAdapter,用于当您使用批量消息监听器时。
如果您的 @KafkaListener 接收 ConsumerRecords<?, ?> 而不是 List<ConsumerRecord<?, ?>>,则 FilteringBatchMessageListenerAdapter 将被忽略,因为 ConsumerRecords 是不可变的。 |
从版本 2.8.4 开始,您可以通过使用监听器注解上的 filter 属性来覆盖监听器容器工厂的默认 RecordFilterStrategy。
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}
从版本 3.3 开始,支持忽略由 RecordFilterStrategy 过滤产生的空批次。在实现 RecordFilterStrategy 时,可以通过 ignoreEmptyBatch() 进行配置。默认设置为 false,表示即使所有 ConsumerRecord 都被过滤掉,KafkaListener 仍将被调用。
如果返回 true,当所有 ConsumerRecord 都被过滤掉时,KafkaListener 将不会被调用。但是,对 broker 的提交仍将执行。
如果返回 false,当所有 ConsumerRecord 都被过滤掉时,KafkaListener 仍将被调用。
以下是一些示例。
public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return true;
}
}
// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
在这种情况下,IgnoreEmptyBatchRecordFilterStrategy 始终返回空列表,并且 ignoreEmptyBatch() 的结果返回 true。因此,KafkaListener#listen(…) 将永远不会被调用。
public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return false;
}
}
// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
然而,在这种情况下,IgnoreEmptyBatchRecordFilterStrategy 始终返回空列表,并且 ignoreEmptyBatch() 的结果返回 false。因此,KafkaListener#listen(…) 总是会被调用。