暂停和恢复监听器容器

版本 2.1.3 为监听器容器添加了 pause()resume() 方法。以前,您可以在 ConsumerAwareMessageListener 中暂停消费者,并通过监听 ListenerContainerIdleEvent 来恢复它,ListenerContainerIdleEvent 提供了对 Consumer 对象的访问。虽然您可以使用事件监听器在空闲容器中暂停消费者,但在某些情况下,这并非线程安全,因为无法保证事件监听器在消费者线程上调用。要安全地暂停和恢复消费者,您应该使用监听器容器上的 pauseresume 方法。pause() 在下一次 poll() 之前生效;resume() 在当前 poll() 返回之后立即生效。当容器暂停时,它会继续 poll() 消费者,如果在进行组管理,则避免了重平衡,但它不会检索任何记录。有关更多信息,请参阅 Kafka 文档。

从版本 2.1.5 开始,您可以调用 isPauseRequested() 来查看是否已调用 pause()。但是,消费者可能实际上尚未暂停。如果所有 Consumer 实例都已实际暂停,isConsumerPaused() 返回 true。

此外(同样从 2.1.5 开始),会发布 ConsumerPausedEventConsumerResumedEvent 实例,其中容器作为 source 属性,涉及的 TopicPartition 实例在 partitions 属性中。

从版本 2.9 开始,一个新的容器属性 pauseImmediate,当设置为 true 时,会导致暂停在处理当前记录后立即生效。默认情况下,暂停在前一次 poll 的所有记录都处理完毕后生效。请参阅 pauseImmediate

以下简单的 Spring Boot 应用程序演示了如何使用容器注册表获取 `@KafkaListener` 方法的容器引用,并暂停或恢复其消费者以及接收相应的事件。

@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Override
    public void onApplicationEvent(KafkaEvent event) {
        System.out.println(event);
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            KafkaTemplate<String, String> template) {
        return args -> {
            template.send("pause.resume.topic", "thing1");
            Thread.sleep(10_000);
            System.out.println("pausing");
            registry.getListenerContainer("pause.resume").pause();
            Thread.sleep(10_000);
            template.send("pause.resume.topic", "thing2");
            Thread.sleep(10_000);
            System.out.println("resuming");
            registry.getListenerContainer("pause.resume").resume();
            Thread.sleep(10_000);
        };
    }

    @KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("pause.resume.topic")
            .partitions(2)
            .replicas(1)
            .build();
    }

}

以下列表显示了前面示例的结果。

partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2