`@KafkaListener` 生命周期管理

`@KafkaListener` 注解创建的监听器容器不是应用上下文中的 bean。它们被注册到一个类型为 `KafkaListenerEndpointRegistry` 的基础设施 bean 中。这个 bean 由框架自动声明,并管理容器的生命周期;它会自动启动任何 `autoStartup` 设置为 `true` 的容器。所有容器工厂创建的所有容器必须位于相同的 `phase`。更多信息请参见 监听器容器自动启动。您可以通过注册表以编程方式管理生命周期。启动或停止注册表将启动或停止所有已注册的容器。或者,您可以使用容器的 `id` 属性获取对单个容器的引用。您可以在注解上设置 `autoStartup`,这将覆盖容器工厂中配置的默认设置。您可以从应用上下文中获取对该 bean 的引用(例如通过自动注入),以管理其注册的容器。以下示例展示了如何实现这一点:

@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;

...

    this.registry.getListenerContainer("myContainer").start();

...

注册表只维护它管理的容器的生命周期;声明为 bean 的容器不受注册表管理,并且可以从应用上下文中获取。通过调用注册表的 `getListenerContainers()` 方法可以获取托管容器的集合。版本 2.2.5 增加了一个便利方法 `getAllListenerContainers()`,它返回所有容器的集合,包括注册表管理的容器和声明为 bean 的容器。返回的集合将包含任何已初始化的原型 bean,但不会初始化任何延迟加载的 bean 声明。

应用上下文刷新后注册的端点会立即启动,无论其 `autoStartup` 属性为何,这是为了遵守 `SmartLifecycle` 契约,其中 `autoStartup` 只在应用上下文初始化期间考虑。延迟注册的一个例子是原型作用域中包含 `@KafkaListener` 的 bean,其实例在上下文初始化后创建。从版本 2.8.7 开始,您可以将注册表的 `alwaysStartAfterRefresh` 属性设置为 `false`,然后容器的 `autoStartup` 属性将决定容器是否启动。

从 KafkaListenerEndpointRegistry 中检索 MessageListenerContainer

`KafkaListenerEndpointRegistry` 提供了多种检索 `MessageListenerContainer` 实例的方法,以适应不同的管理场景:

所有容器:对于涵盖所有监听器容器的操作,使用 `getListenerContainers()` 方法来获取一个完整的集合。

Collection<MessageListenerContainer> allContainers = registry.getListenerContainers();

按 ID 获取特定容器:要管理单个容器,可以使用 `getListenerContainer(String id)` 方法通过其 ID 进行检索。

MessageListenerContainer specificContainer = registry.getListenerContainer("myContainerId");

动态容器过滤:版本 3.2 中引入了两个重载的 `getListenerContainersMatching` 方法,可以实现更精细的容器选择。一个方法接受一个 `Predicate<String>` 作为参数,用于基于 ID 的过滤;另一个方法接受一个 `BiPredicate<String, MessageListenerContainer>` 作为参数,用于更高级的条件过滤,这些条件可能包括容器属性或状态。

// Prefix matching (Predicate<String>)
Collection<MessageListenerContainer> filteredContainers =
    registry.getListenerContainersMatching(id -> id.startsWith("productListener-retry-"));

// Regex matching (Predicate<String>)
Collection<MessageListenerContainer> regexFilteredContainers =
    registry.getListenerContainersMatching(myPattern::matches);

// Pre-built Set of IDs (Predicate<String>)
Collection<MessageListenerContainer> setFilteredContainers =
    registry.getListenerContainersMatching(myIdSet::contains);

// Advanced Filtering: ID prefix and running state (BiPredicate<String, MessageListenerContainer>)
Collection<MessageListenerContainer> advancedFilteredContainers =
    registry.getListenerContainersMatching(
        (id, container) -> id.startsWith("specificPrefix-") && container.isRunning()
    );

利用这些方法可以有效地管理和查询应用程序中的 `MessageListenerContainer` 实例。