编程模型辅助功能
单个应用程序中的多个 Kafka Streams 处理器
Binder 允许在单个 Spring Cloud Stream 应用程序中拥有多个 Kafka Streams 处理器。你可以拥有一个如下所示的应用程序。
@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
...
}
在这种情况下,binder 将创建 3 个独立的 Kafka Streams 对象,它们具有不同的应用程序 ID(下面将详细介绍)。但是,如果应用程序中有多个处理器,则必须告诉 Spring Cloud Stream 哪些函数需要被激活。以下是激活函数的方法。
spring.cloud.function.definition: process;anotherProcess;yetAnotherProcess
如果你希望某些函数不要立即激活,可以将其从列表中移除。
当你在同一个应用程序中拥有一个 Kafka Streams 处理器和通过不同 binder 处理的其他类型的 Function bean(例如,基于常规 Kafka 消息通道 binder 的函数 bean)时,这也是成立的。
Kafka Streams 应用程序 ID
应用程序 ID 是 Kafka Streams 应用程序必须提供的属性。Spring Cloud Stream Kafka Streams binder 允许你通过多种方式配置此应用程序 ID。
如果应用程序中只有一个处理器,则可以使用以下属性在 binder 级别设置它
spring.cloud.stream.kafka.streams.binder.applicationId.
为了方便,如果只有一个处理器,你还可以使用 spring.application.name 作为属性来委托应用程序 ID。
如果应用程序中有多个 Kafka Streams 处理器,则需要为每个处理器设置应用程序 ID。在函数式模型中,你可以将其作为属性附加到每个函数。
例如,假设你有以下函数。
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
...
}
和
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
然后你可以使用以下 binder 级别属性为每个函数设置应用程序 ID。
spring.cloud.stream.kafka.streams.binder.functions.process.applicationId
和
spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId
对于基于函数的模型,在绑定级别设置应用程序 ID 的方法也适用。但是,如果你使用函数式模型,如我们上面所见,在 binder 级别为每个函数进行设置要容易得多。
对于生产部署,强烈建议通过配置明确指定应用程序 ID。如果你正在自动扩展应用程序,这一点尤其关键,在这种情况下,你需要确保每个实例都以相同的应用程序 ID 进行部署。
如果应用程序未提供应用程序 ID,则 binder 将为你自动生成一个静态应用程序 ID。这在开发场景中很方便,因为它避免了显式提供应用程序 ID 的需要。以这种方式生成的应用程序 ID 在应用程序重启时将是静态的。在函数式模型中,生成的应用程序 ID 将是函数 bean 名称后跟字面量 applicationID,例如,如果 process 是函数 bean 名称,则为 process-applicationID。
设置应用程序 ID 摘要
-
默认情况下,binder 将为每个函数方法自动生成应用程序 ID。
-
如果只有一个处理器,则可以使用
spring.kafka.streams.applicationId、spring.application.name或spring.cloud.stream.kafka.streams.binder.applicationId。 -
如果具有多个处理器,则可以通过属性
spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId为每个函数设置应用程序 ID。
使用函数式风格覆盖 binder 生成的默认绑定名称
默认情况下,当使用函数式风格时,binder 使用上述策略生成绑定名称,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0、process-out-0 等。如果你想覆盖这些绑定名称,可以通过指定以下属性来实现。
spring.cloud.stream.function.bindings.<default binding name>。默认绑定名称是 binder 生成的原始绑定名称。
例如,假设你有以下函数。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
Binder 将生成名称为 process-in-0、process-in-1 和 process-out-0 的绑定。现在,如果你想将它们完全更改为其他名称,也许是更具领域特定性的绑定名称,那么可以按如下方式进行。
spring.cloud.stream.function.bindings.process-in-0=users
spring.cloud.stream.function.bindings.process-in-0=regions
和
spring.cloud.stream.function.bindings.process-out-0=clicks
之后,你必须在新绑定名称上设置所有绑定级别的属性。
请记住,对于上面描述的函数式编程模型,遵守默认绑定名称在大多数情况下都是有意义的。你可能仍然想要进行这种覆盖的唯一原因是当你拥有大量配置属性并且希望将绑定映射到更友好的领域名称时。
设置引导服务器配置
运行 Kafka Streams 应用程序时,必须提供 Kafka 代理服务器信息。如果你不提供此信息,则 binder 期望你正在默认的 localhost:9092 上运行代理。如果不是这种情况,则需要覆盖它。有几种方法可以做到这一点。
-
使用启动属性 -
spring.kafka.bootstrapServers -
Binder 级别属性 -
spring.cloud.stream.kafka.streams.binder.brokers
对于 binder 级别属性,无论你是否使用通过常规 Kafka binder 提供的代理属性 - spring.cloud.stream.kafka.binder.brokers,都没有关系。Kafka Streams binder 将首先检查是否设置了 Kafka Streams binder 特定的代理属性 (spring.cloud.stream.kafka.streams.binder.brokers),如果未找到,则会查找 spring.cloud.stream.kafka.binder.brokers。