编程模型的辅助功能
单个应用程序中的多个 Kafka Streams 处理器
绑定器允许在单个 Spring Cloud Stream 应用程序中拥有多个 Kafka Streams 处理器。您可以像下面这样创建一个应用程序。
@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
...
}
在这种情况下,绑定器将创建 3 个具有不同应用程序 ID 的独立 Kafka Streams 对象(更多信息请参见下文)。但是,如果您在应用程序中有多个处理器,则必须告诉 Spring Cloud Stream 哪些函数需要激活。以下是如何激活函数。
spring.cloud.function.definition: process;anotherProcess;yetAnotherProcess
如果您希望某些函数不要立即激活,可以将其从列表中移除。
当您拥有一个 Kafka Streams 处理器以及同一个应用程序中通过不同绑定器处理的其他类型的 Function
bean 时,也适用此规则(例如,基于普通 Kafka 消息通道绑定器的函数 bean)。
Kafka Streams 应用程序 ID
应用程序 ID 是您需要为 Kafka Streams 应用程序提供的必填属性。Spring Cloud Stream Kafka Streams 绑定器允许您通过多种方式配置此应用程序 ID。
如果您应用程序中只有一个处理器,则可以使用以下属性在绑定器级别设置它
spring.cloud.stream.kafka.streams.binder.applicationId
.
为了方便起见,如果您只有一个处理器,也可以使用 spring.application.name
作为属性来委托应用程序 ID。
如果您在应用程序中有多个 Kafka Streams 处理器,则需要为每个处理器设置应用程序 ID。在功能模型的情况下,您可以将其作为属性附加到每个函数。
例如,假设您有以下函数。
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
...
}
以及
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
然后,您可以使用以下绑定器级别属性为每个函数设置应用程序 ID。
spring.cloud.stream.kafka.streams.binder.functions.process.applicationId
以及
spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId
对于基于函数的模型,这种在绑定级别设置应用程序 ID 的方法也适用。但是,如果您使用的是功能模型,则在绑定级别为每个函数设置应用程序 ID 的方法如上所示,会更加简单。
对于生产环境部署,强烈建议通过配置显式指定应用程序 ID。如果您正在自动扩展应用程序,这将尤其重要,在这种情况下,您需要确保每个实例都部署了相同的应用程序 ID。
如果应用程序未提供应用程序 ID,则绑定器将为您自动生成一个静态应用程序 ID。这在开发场景中很方便,因为它避免了显式提供应用程序 ID 的需要。以这种方式生成的应用程序 ID 在应用程序重新启动时将保持静态。在函数模型的情况下,生成的应用程序 ID 将是函数 Bean 名称后跟字面量applicationID
,例如,如果process
是函数 Bean 名称,则为process-applicationID
。
设置应用程序 ID 的总结
-
默认情况下,绑定器将为每个函数方法自动生成应用程序 ID。
-
如果您只有一个处理器,则可以使用
spring.kafka.streams.applicationId
、spring.application.name
或spring.cloud.stream.kafka.streams.binder.applicationId
。 -
如果您有多个处理器,则可以使用属性
spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId
为每个函数设置应用程序 ID。
使用函数式风格覆盖绑定器生成的默认绑定名称
默认情况下,绑定器在使用函数式风格时使用上面讨论的策略来生成绑定名称,即<function-bean-name>-<in>|<out>-[0..n],例如 process-in-0、process-out-0 等。如果您想覆盖这些绑定名称,可以通过指定以下属性来实现。
spring.cloud.stream.function.bindings.<default binding name>
。默认绑定名称是绑定器生成的原始绑定名称。
例如,假设您有以下函数。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
绑定器将生成名称为process-in-0
、process-in-1
和process-out-0
的绑定。现在,如果您想将它们更改为其他内容,也许是更具领域意义的绑定名称,则可以按照以下步骤操作。
spring.cloud.stream.function.bindings.process-in-0=users
spring.cloud.stream.function.bindings.process-in-0=regions
以及
spring.cloud.stream.function.bindings.process-out-0=clicks
之后,您必须在这些新的绑定名称上设置所有绑定级别的属性。
请记住,使用上面描述的函数式编程模型,在大多数情况下,遵循默认绑定名称是有意义的。您可能仍然想要执行此覆盖的唯一原因是,当您有大量配置属性并且想要将绑定映射到更具领域友好的内容时。
设置 Bootstrap 服务器配置
运行 Kafka Streams 应用程序时,必须提供 Kafka 代理服务器信息。如果您不提供此信息,则绑定器期望您在默认的localhost:9092
上运行代理。如果不是这种情况,则需要覆盖它。有几种方法可以做到这一点。
-
使用引导属性 -
spring.kafka.bootstrapServers
-
绑定器级别属性 -
spring.cloud.stream.kafka.streams.binder.brokers
对于绑定器级别属性,无论您是否使用通过常规 Kafka 绑定器提供的代理属性 - spring.cloud.stream.kafka.binder.brokers
,都没有关系。Kafka Streams 绑定器将首先检查是否设置了 Kafka Streams 绑定器特定的代理属性(spring.cloud.stream.kafka.streams.binder.brokers
),如果未找到,则查找spring.cloud.stream.kafka.binder.brokers
。