编程模型的辅助功能

单个应用中的多个 Kafka Streams 处理器

绑定器允许在单个 Spring Cloud Stream 应用中包含多个 Kafka Streams 处理器。你可以拥有一个如下所示的应用。

@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
   ...
}

在这种情况下,绑定器将创建 3 个具有不同应用 ID 的独立 Kafka Streams 对象(稍后详细介绍)。但是,如果你的应用中有多个处理器,则必须告知 Spring Cloud Stream 需要激活哪些函数。以下是如何激活这些函数的方法。

spring.cloud.function.definition: process;anotherProcess;yetAnotherProcess

如果你希望某些函数不会立即激活,可以将其从此列表中移除。

当你在同一个应用中拥有一个 Kafka Streams 处理器以及通过不同绑定器处理的其他类型的 Function bean 时(例如,基于常规 Kafka Message Channel 绑定器的函数 bean),情况也是如此。

Kafka Streams 应用 ID

应用 ID 是 Kafka Streams 应用必须提供的属性。Spring Cloud Stream Kafka Streams 绑定器允许你通过多种方式配置此应用 ID。

如果应用中只有一个处理器,则可以使用以下属性在绑定器级别进行设置。

spring.cloud.stream.kafka.streams.binder.applicationId.

为方便起见,如果只有一个处理器,你也可以使用 spring.application.name 作为属性来委派应用 ID。

如果应用中有多个 Kafka Streams 处理器,则需要为每个处理器设置应用 ID。在使用函数式模型的情况下,可以将其作为属性附加到每个函数上。

例如,假设你有以下函数。

@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

然后,可以使用以下绑定器级别属性为每个函数设置应用 ID。

spring.cloud.stream.kafka.streams.binder.functions.process.applicationId

spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId

对于基于函数的模型,在绑定级别设置应用 ID 的方法也有效。但是,如果你使用函数式模型,如上所示在绑定器级别为每个函数进行设置会容易得多。

对于生产环境部署,强烈建议通过配置显式指定应用 ID。如果你正在对应用进行自动扩缩容,这一点尤其重要,在这种情况下,你需要确保部署的每个实例都具有相同的应用 ID。

如果应用未提供应用 ID,则绑定器将为你自动生成一个静态应用 ID。这在开发场景中非常方便,因为它避免了显式提供应用 ID 的需要。以这种方式生成的应用 ID 在应用重启后将保持静态。在使用函数式模型的情况下,生成的应用 ID 将是函数 bean 名称后跟字面值 applicationID,例如,如果函数 bean 名称是 process,则生成的应用 ID 将是 process-applicationID

设置应用 ID 摘要

  • 默认情况下,绑定器将为每个函数方法自动生成应用 ID。

  • 如果只有一个处理器,则可以使用 spring.kafka.streams.applicationIdspring.application.namespring.cloud.stream.kafka.streams.binder.applicationId

  • 如果拥有多个处理器,则可以使用属性 spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId 为每个函数设置应用 ID。

使用函数式风格覆盖绑定器生成的默认绑定名称

默认情况下,在使用函数式风格时,绑定器使用上述策略生成绑定名称,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0, process-out-0 等。如果你想覆盖这些绑定名称,可以通过指定以下属性来实现。

spring.cloud.stream.function.bindings.<default binding name>。默认绑定名称是绑定器生成的原始绑定名称。

例如,假设你有这个函数。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

绑定器将生成名称为 process-in-0, process-in-1process-out-0 的绑定。现在,如果你想将它们完全更改为其他名称,例如更具领域特定性的绑定名称,则可以如下进行。

spring.cloud.stream.function.bindings.process-in-0=users

spring.cloud.stream.function.bindings.process-in-0=regions

spring.cloud.stream.function.bindings.process-out-0=clicks

之后,你必须在这些新的绑定名称上设置所有绑定级别的属性。

请记住,使用上述函数式编程模型时,在大多数情况下遵循默认绑定名称是合理的。你可能仍然希望进行此覆盖的唯一原因是,当你有大量配置属性并且希望将绑定映射到更具领域友好性的名称时。

设置 bootstrap server 配置

运行 Kafka Streams 应用时,必须提供 Kafka broker 服务器信息。如果你不提供此信息,绑定器会假定你在默认地址 localhost:9092 上运行 broker。如果情况不是这样,则需要覆盖它。有几种方法可以实现。

  • 使用 boot 属性 - spring.kafka.bootstrapServers

  • 绑定器级别属性 - spring.cloud.stream.kafka.streams.binder.brokers

关于绑定器级别属性,无论你是否使用通过常规 Kafka 绑定器提供的 broker 属性 - spring.cloud.stream.kafka.binder.brokers,都没有关系。Kafka Streams 绑定器会首先检查是否设置了 Kafka Streams 绑定器特定的 broker 属性 (spring.cloud.stream.kafka.streams.binder.brokers),如果未找到,则会查找 spring.cloud.stream.kafka.binder.brokers