有选择地手动启动 Kafka 流处理器
虽然上面列出的方法将通过 StreamsBuilderFactoryManager 无条件地将 auto start false
应用于应用程序中的所有 Kafka Streams 处理器,但通常需要仅不自动启动单独选择的 Kafka Streams 处理器。例如,让我们假设您的应用程序中有三个不同的函数(处理器),并且对于其中一个处理器,您不希望在应用程序启动时启动它。以下是一个这样的情况示例。
@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {
}
@Bean
public Consumer<KStream<?, ?>> process2() {
}
@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {
}
在上述场景中,如果您将 spring.kafka.streams.auto-startup
设置为 false
,那么在应用程序启动期间,所有处理器都不会自动启动。在这种情况下,您必须通过在基础 StreamsBuilderFactoryManager
上调用 start()
来以编程方式启动它们,如上所述。但是,如果我们有一个用例仅选择性地禁用一个处理器,那么您必须在该处理器的各个绑定上设置 auto-startup
。让我们假设我们不希望我们的 process3
函数自动启动。这是一个 BiFunction
,有两个输入绑定 - process3-in-0
和 process3-in-1
。为了避免此处理器的自动启动,您可以选择其中任何一个输入绑定并在其上设置 auto-startup
。选择哪个绑定并不重要;如果您愿意,可以在两者上将 auto-startup
设置为 false
,但为了清晰起见,只需一个就足够了。因为它们共享相同的工厂 bean,所以您不必在两个绑定上都将 autoStartup 设置为 false,但这样做可能更有意义。
以下是您可以用来禁用此处理器的自动启动的 Spring Cloud Stream 属性。
spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false
或
spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false
然后,你可以使用 REST 端点或使用 BindingsEndpoint
API 手动启动处理器,如下所示。为此,你需要确保类路径上有 Spring Boot 执行器依赖项。
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST https://127.0.0.1:8080/actuator/bindings/process3-in-0
或
@Autowired
BindingsEndpoint endpoint;
@Bean
public ApplicationRunner runner() {
return args -> {
endpoint.changeState("process3-in-0", State.STARTED);
};
}
请参阅参考文档中的此部分,以了解有关此机制的更多详细信息。
当按照本部分所述禁用 auto-startup 来控制绑定时,请注意,这仅适用于消费者绑定。换句话说,如果你使用生产者绑定 process3-out-0 ,它在禁用处理器的自动启动方面没有任何作用,尽管此生产者绑定使用与消费者绑定相同的 StreamsBuilderFactoryBean 。
|