容器工厂

@KafkaListener 注解中所述,ConcurrentKafkaListenerContainerFactory 用于为带有注解的方法创建容器。

从 2.2 版本开始,您可以使用同一个工厂创建任何 ConcurrentMessageListenerContainer。如果您想创建具有类似属性的多个容器,或者希望使用一些外部配置的工厂,例如 Spring Boot 自动配置提供的工厂,这可能会很有用。容器创建后,您可以进一步修改其属性,其中许多属性是通过使用 container.getContainerProperties() 设置的。以下示例配置了一个 ConcurrentMessageListenerContainer

@Bean
public ConcurrentMessageListenerContainer<String, String>(
        ConcurrentKafkaListenerContainerFactory<String, String> factory) {

    ConcurrentMessageListenerContainer<String, String> container =
        factory.createContainer("topic1", "topic2");
    container.setMessageListener(m -> { ... } );
    return container;
}
以这种方式创建的容器不会添加到端点注册中心。它们应该作为 @Bean 定义创建,以便注册到应用程序上下文中。

从 2.3.4 版本开始,您可以在工厂中添加一个 ContainerCustomizer,以便在容器创建和配置后进一步配置每个容器。

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setContainerCustomizer(container -> { /* customize the container */ });
    return factory;
}

从 3.1 版本开始,还可以通过在 KafkaListener 注解上指定 'ContainerPostProcessor' 的 bean 名称来对单个监听器应用相同类型的自定义配置。

@Bean
public ContainerPostProcessor<String, String, AbstractMessageListenerContainer<String, String>> customContainerPostProcessor() {
    return container -> { /* customize the container */ };
}

...

@KafkaListener(..., containerPostProcessor="customContainerPostProcessor", ...)