过滤消息

在某些场景下,例如再平衡(rebalancing),已经处理过的消息可能会被重新投递。框架无法知道这样的消息是否已经被处理。这是应用层面的功能。这被称为幂等接收者模式,Spring Integration 提供了一个实现

Spring for Apache Kafka 项目也通过 FilteringMessageListenerAdapter 类提供了一些帮助,它可以包装你的 MessageListener。这个类接受一个 RecordFilterStrategy 的实现,你在其中实现 filter 方法来标记消息是否是重复的,是否应该被丢弃。它还有一个额外的属性 ackDiscarded,表示适配器是否应该确认(acknowledge)被丢弃的记录。默认值是 false

当你使用 @KafkaListener 时,在容器工厂上设置 RecordFilterStrategy(以及可选的 ackDiscarded),以便监听器被包装在适当的过滤适配器中。

此外,还提供了 FilteringBatchMessageListenerAdapter,用于你使用批量消息监听器的场景。

如果你的 @KafkaListener 接收的是 ConsumerRecords<?, ?> 而不是 List<ConsumerRecord<?, ?>>FilteringBatchMessageListenerAdapter 将被忽略,因为 ConsumerRecords 是不可变的。

从 2.8.4 版本开始,你可以通过监听器注解上的 filter 属性覆盖监听器容器工厂的默认 RecordFilterStrategy

@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
    ...
}

从 3.3 版本开始,支持忽略因 RecordFilterStrategy 过滤而产生的空批次。在实现 RecordFilterStrategy 时,可以通过 ignoreEmptyBatch() 进行配置。默认设置是 false,表示即使所有 ConsumerRecord 都被过滤掉,KafkaListener 仍将被调用。

如果返回 true,当所有 ConsumerRecord 都被过滤掉时,KafkaListener 将不会被调用。然而,提交到 broker 的操作仍然会执行。

如果返回 false,当所有 ConsumerRecord 都被过滤掉时,KafkaListener 将会被调用

以下是一些示例。

public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
    ...
    @Override
    public List<ConsumerRecord<String, String>> filterBatch(
            List<ConsumerRecord<String, String>> consumerRecords) {
        return List.of();
    }

    @Override
    public boolean ignoreEmptyBatch() {
        return true;
    }
};

// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
    ...
}

在这种情况下,IgnoreEmptyBatchRecordFilterStrategy 总是返回空列表,并且 ignoreEmptyBatch() 的结果为 true。因此 KafkaListener#listen(…​) 将永远不会被调用。

public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
    ...
    @Override
    public List<ConsumerRecord<String, String>> filterBatch(
            List<ConsumerRecord<String, String>> consumerRecords) {
        return List.of();
    }

    @Override
    public boolean ignoreEmptyBatch() {
        return false;
    }
};

// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
    ...
}

然而,在这种情况下,IgnoreEmptyBatchRecordFilterStrategy 总是返回空列表,并且 ignoreEmptyBatch() 的结果为 false。因此 KafkaListener#listen(…​) 总是会被调用。