重新平衡监听器

ContainerProperties 有一个名为 consumerRebalanceListener 的属性,它接受 Kafka 客户端 ConsumerRebalanceListener 接口的实现。如果未提供此属性,容器将配置一个日志监听器,以 INFO 级别记录重新平衡事件。该框架还添加了一个子接口 ConsumerAwareRebalanceListener。以下列表显示了 ConsumerAwareRebalanceListener 接口定义

public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {

    void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

}

请注意,当分区被撤销时有两个回调。第一个会立即被调用。第二个在任何待处理的偏移量提交后被调用。如果您希望在某些外部存储库中维护偏移量,这会很有用,如以下示例所示

containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

    @Override
    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // acknowledge any pending Acknowledgments (if using manual acks)
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // ...
        store(consumer.position(partition));
        // ...
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // ...
        consumer.seek(partition, offsetTracker.getOffset() + 1);
        // ...
    }
});
从 2.4 版本开始,添加了一个新方法 onPartitionsLost()(类似于 ConsumerRebalanceLister 中同名的方法)。ConsumerRebalanceLister 的默认实现只是调用 onPartitionsRevokedConsumerAwareRebalanceListener 的默认实现不执行任何操作。当为监听器容器提供自定义监听器(任一类型)时,重要的是您的实现不要从 onPartitionsLost 调用 onPartitionsRevoked。如果您实现 ConsumerRebalanceListener,则应覆盖默认方法。这是因为监听器容器将在调用您的实现上的方法后,从其 onPartitionsLost 实现中调用其自身的 onPartitionsRevoked。如果您的实现委托给默认行为,那么每次 Consumer 在容器的监听器上调用该方法时,onPartitionsRevoked 将被调用两次。

Kafka 4.0 消费者重新平衡协议

Spring for Apache Kafka 4.0 支持 Apache Kafka 4.0 的新消费者重新平衡协议 (KIP-848),该协议通过服务器驱动的增量分区分配提高了性能。这减少了消费者组的重新平衡停机时间。

要启用新协议,请配置 group.protocol 属性

spring.kafka.consumer.properties.group.protocol=consumer

请记住,上述属性是 Spring Boot 属性。如果您不使用 Spring Boot,您可能需要手动设置它,如下所示。

或者,通过编程方式设置它

Map<String, Object> props = new HashMap<>();
props.put("group.protocol", "consumer");
ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(props);

新协议与 ConsumerAwareRebalanceListener 无缝协作。由于增量重新平衡,onPartitionsAssigned 可能会被多次调用,并带有较小的分区集,这与传统协议中典型的单次回调不同。

新协议使用服务器端分区分配,忽略通过 spring.kafka.consumer.partition-assignment-strategy 设置的客户端自定义分配器。如果检测到自定义分配器,将记录警告。要使用自定义分配器,请设置 group.protocol=classic(如果您未指定 group.protocol 的值,则这是默认值)。

© . This site is unofficial and not affiliated with VMware.