按顺序启动 @KafkaListener
s
一个常见的用例是在另一个监听器消费完主题中的所有记录后启动监听器。例如,您可能希望在处理来自其他主题的记录之前,将一个或多个压缩主题的内容加载到内存中。从 2.7.3 版本开始,引入了一个新的组件 ContainerGroupSequencer
。它使用 @KafkaListener
的 containerGroup
属性将容器分组在一起,并在当前组中的所有容器空闲时启动下一组中的容器。
最好用一个例子来说明。
@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}
@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}
@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}
@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}
@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}
这里,我们在两个组 g1
和 g2
中有 4 个监听器。
在应用程序上下文初始化期间,排序器将提供的组中所有容器的 autoStartup
属性设置为 false
。它还为任何容器(尚未设置的容器)设置 idleEventInterval
为提供的 value(在本例中为 5000ms)。然后,当应用程序上下文启动排序器时,第一组中的容器将启动。当接收到 ListenerContainerIdleEvent
时,每个容器中的每个子容器都将停止。当 ConcurrentMessageListenerContainer
中的所有子容器停止时,父容器将停止。当一组中的所有容器都停止时,将启动下一组中的容器。组的数量或组中容器的数量没有限制。
默认情况下,最后一组(上面为 g2
)中的容器在空闲时不会停止。要修改此行为,请将排序器上的 stopLastGroupWhenIdle
设置为 true
。
顺便说一句,以前每个组中的容器都被添加到类型为 Collection<MessageListenerContainer>
的 bean 中,bean 名称是 containerGroup
。这些集合现在已弃用,取而代之的是类型为 ContainerGroup
的 bean,其 bean 名称是组名后缀为 .group
;在上例中,将会有 2 个 bean g1.group
和 g2.group
。Collection
bean 将在未来的版本中删除。