暂停和恢复监听器容器

版本 2.1.3 为侦听器容器添加了 pause()resume() 方法。此前,您可以在 ConsumerAwareMessageListener 中暂停消费者,并通过侦听 ListenerContainerIdleEvent 来恢复它,该事件提供了对 Consumer 对象的访问。虽然您可以使用事件侦听器在空闲容器中暂停消费者,但在某些情况下,这并非线程安全,因为无法保证事件侦听器在消费者线程上调用。为了安全地暂停和恢复消费者,您应该使用侦听器容器上的 pauseresume 方法。pause() 在下一次 poll() 之前生效;resume() 在当前 poll() 返回之后生效。当容器暂停时,它会继续 poll() 消费者,避免在使用组管理时重新平衡,但它不会检索任何记录。有关更多信息,请参阅 Kafka 文档。

从版本 2.1.5 开始,您可以调用 isPauseRequested() 来查看是否已调用 pause()。但是,消费者可能尚未实际暂停。如果所有 Consumer 实例都已实际暂停,则 isConsumerPaused() 返回 true。

此外(同样从 2.1.5 开始),ConsumerPausedEventConsumerResumedEvent 实例会发布,其中容器作为 source 属性,涉及的 TopicPartition 实例作为 partitions 属性。

从版本 2.9 开始,一个新的容器属性 pauseImmediate,当设置为 true 时,会导致暂停在当前记录处理后立即生效。默认情况下,暂停会在处理完上次 poll 的所有记录后生效。请参阅 pauseImmediate

以下简单的 Spring Boot 应用程序通过使用容器注册表获取 @KafkaListener 方法的容器引用,并暂停或恢复其消费者以及接收相应的事件来演示

@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Override
    public void onApplicationEvent(KafkaEvent event) {
        System.out.println(event);
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            KafkaTemplate<String, String> template) {
        return args -> {
            template.send("pause.resume.topic", "thing1");
            Thread.sleep(10_000);
            System.out.println("pausing");
            registry.getListenerContainer("pause.resume").pause();
            Thread.sleep(10_000);
            template.send("pause.resume.topic", "thing2");
            Thread.sleep(10_000);
            System.out.println("resuming");
            registry.getListenerContainer("pause.resume").resume();
            Thread.sleep(10_000);
        };
    }

    @KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("pause.resume.topic")
            .partitions(2)
            .replicas(1)
            .build();
    }

}

以下列表显示了前一个示例的结果

partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2
© . This site is unofficial and not affiliated with VMware.