强制消费者再平衡

Kafka 客户端现在支持触发强制再平衡的选项。从版本 3.1.2 开始,Spring for Apache Kafka 提供了通过消息监听器容器在 Kafka 消费者上调用此 API 的选项。调用此 API 时,只是通知 Kafka 消费者触发强制再平衡;实际的再平衡只会在下一次 poll() 操作中发生。如果已经有再平衡正在进行中,调用强制再平衡是无效操作(NO-OP)。调用者必须等待当前再平衡完成才能再次调用。更多详情请参阅 enforceRebalance 的 javadocs。

以下代码片段展示了使用消息监听器容器强制执行再平衡的要点。

@KafkaListener(id = "my.id", topics = "my-topic")
void listen(ConsumerRecord<String, String> in) {
    System.out.println("From KafkaListener: " + in);
}

@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template, KafkaListenerEndpointRegistry registry) {
    return args -> {
        final MessageListenerContainer listenerContainer = registry.getListenerContainer("my.id");
        System.out.println("Enforcing a rebalance");
        Thread.sleep(5_000);
        listenerContainer.enforceRebalance();
        Thread.sleep(5_000);
    };
}

如上面的代码所示,应用使用 KafkaListenerEndpointRegistry 来获取消息监听器容器的访问权限,然后调用其上的 enforceRebalance API。当在监听器容器上调用 enforceRebalance 时,它会将调用委托给底层的 Kafka 消费者。Kafka 消费者将在下一次 poll() 操作中触发再平衡。