出站分区支持

Kafka Streams 处理器通常将处理后的输出发送到出站 Kafka 主题。如果出站主题已分区,并且处理器需要将出站数据发送到特定分区,则应用需要提供一个类型为 StreamPartitioner 的 bean。有关更多详细信息,请参阅 StreamPartitioner。让我们看一些示例。

这是我们已经多次看到的同一个处理器,

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    ...
}

这是输出绑定目标

spring.cloud.stream.bindings.process-out-0.destination: outputTopic

如果主题 outputTopic 有 4 个分区,如果您不提供分区策略,Kafka Streams 将使用默认分区策略,这可能不是您根据特定用例想要的结果。假设您希望将任何与 spring 匹配的键发送到分区 0,将 cloud 发送到分区 1,将 stream 发送到分区 2,以及将其他所有内容发送到分区 3。您需要在应用中执行以下操作。

@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
    return (t, k, v, n) -> {
        if (k.equals("spring")) {
            return 0;
        }
        else if (k.equals("cloud")) {
            return 1;
        }
        else if (k.equals("stream")) {
            return 2;
        }
        else {
            return 3;
        }
    };
}

这是一个基础实现,但是,您可以访问记录的键/值、主题名称和总分区数。因此,如有需要,您可以实现复杂的分区策略。

您还需要在应用配置中提供此 bean 名称。

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner

应用中的每个输出主题都需要像这样单独配置。