多绑定器,结合基于 Kafka Streams 的绑定器和常规 Kafka 绑定器
您的应用中可以同时包含基于常规 Kafka 绑定器的函数/消费者/供应者,以及基于 Kafka Streams 的处理器。但是,您不能在单个函数或消费者中混合使用它们两者。
以下是一个示例,展示了如何在同一个应用中同时使用这两种基于绑定器的组件。
@Bean
public Function<String, String> process() {
return s -> s;
}
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {
return input -> input;
}
这是配置中的相关部分
spring.cloud.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
如果您的应用与上述示例相同,但需要处理两个不同的 Kafka 集群,事情会变得稍微复杂一些。例如,常规的 process
处理逻辑作用于 Kafka 集群 1 和集群 2(从集群 1 接收数据并发送到集群 2),而 Kafka Streams 处理器作用于 Kafka 集群 2。在这种情况下,您必须使用 Spring Cloud Stream 提供的多绑定器功能。
在这种场景下,您的配置可能会如下变化。
# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.function.definition=process;kstreamProcess
# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2
# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3
请注意上述配置。我们有两种绑定器类型,但总共有 3 个绑定器:第一个是基于集群 1 的常规 Kafka 绑定器(kafka1
),然后是基于集群 2 的另一个 Kafka 绑定器(kafka2
),最后是 kstream
绑定器(kafka3
)。应用中的第一个处理器从 kafka1
接收数据并发布到 kafka2
,尽管这两个绑定器都基于常规 Kafka 绑定器,但它们连接的是不同的集群。第二个处理器是 Kafka Streams 处理器,它从 kafka3
消费数据,kafka3
与 kafka2
是同一个集群,但绑定器类型不同。
由于 Kafka Streams 绑定器家族中有三种不同的绑定器类型 - kstream
、ktable
和 globalktable
- 如果您的应用有多个基于这些绑定器的绑定,则需要显式地将其提供为绑定器类型。
例如,如果您有一个如下所示的处理器:
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
...
}
那么,在多绑定器场景下,必须将其配置如下。请注意,这仅在您拥有真正的多绑定器场景时才需要,即单个应用中存在多个处理器处理多个集群的情况。在这种情况下,需要为绑定明确指定绑定器,以便与其他处理器的绑定器类型和集群区分开来。
spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1 #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2 #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3 #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream
# rest of the configuration is omitted.