手动启动 Kafka Streams 处理器

Spring Cloud Stream Kafka Streams Binder 在 Spring for Apache Kafka 的 StreamsBuilderFactoryBean 之上提供了一个名为 StreamsBuilderFactoryManager 的抽象。这个管理器 API 用于控制基于 Binder 的应用中每个处理器对应的多个 StreamsBuilderFactoryBean。因此,在使用 Binder 时,如果您想手动控制应用中各个 StreamsBuilderFactoryBean 对象的自动启动,您需要使用 StreamsBuilderFactoryManager。您可以使用属性 spring.kafka.streams.auto-startup 并将其设置为 false 来关闭处理器的自动启动。然后,在应用中,您可以使用如下所示的方式,通过 StreamsBuilderFactoryManager 来启动处理器。

@Bean
public ApplicationRunner runner(StreamsBuilderFactoryManager sbfm) {
    return args -> {
        sbfm.start();
    };
}

当您希望应用在主线程中启动,并让 Kafka Streams 处理器独立启动时,此功能非常有用。例如,当您有一个大型状态存储需要恢复时,如果处理器像默认情况一样正常启动,这可能会阻塞您的应用启动。如果您正在使用某种存活探针机制(例如在 Kubernetes 上),它可能会认为应用已宕机并尝试重启。为了纠正这种情况,您可以将 spring.kafka.streams.auto-startup 设置为 false 并遵循上述方法。

请记住,当使用 Spring Cloud Stream Binder 时,您不是直接处理来自 Spring for Apache Kafka 的 StreamsBuilderFactoryBean,而是 StreamsBuilderFactoryManager,因为 StreamsBuilderFactoryBean 对象是由 Binder 内部管理的。