重平衡监听器

ContainerProperties 有一个名为 consumerRebalanceListener 的属性,它接受 Kafka 客户端 ConsumerRebalanceListener 接口的实现。如果未提供此属性,容器会配置一个日志监听器,用于在 INFO 级别记录重平衡事件。该框架还添加了一个子接口 ConsumerAwareRebalanceListener。以下列表显示了 ConsumerAwareRebalanceListener 接口的定义

public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {

    void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

}

请注意,当分区被撤销时有两个回调。第一个会立即调用。第二个会在任何待处理的偏移量提交后调用。如果您希望在某些外部仓库中维护偏移量,这会非常有用,如下例所示

containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

    @Override
    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // acknowledge any pending Acknowledgments (if using manual acks)
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // ...
        store(consumer.position(partition));
        // ...
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // ...
        consumer.seek(partition, offsetTracker.getOffset() + 1);
        // ...
    }
});
从版本 2.4 开始,新增了一个方法 onPartitionsLost()(类似于 ConsumerRebalanceLister 中同名的方法)。ConsumerRebalanceLister 的默认实现仅调用 onPartitionsRevokedConsumerAwareRebalanceListener 的默认实现不做任何事情。当为监听器容器提供自定义监听器(任一类型)时,重要的是您的实现不要在 onPartitionsLost 中调用 onPartitionsRevoked。如果您实现了 ConsumerRebalanceListener,您应该覆盖默认方法。这是因为监听器容器在调用您的实现上的方法后,将从其 onPartitionsLost 的实现中调用其自身的 onPartitionsRevoked。如果您的实现委托给默认行为,那么每次 Consumer 调用容器监听器上的该方法时,onPartitionsRevoked 将被调用两次。