使用 Kafka Binder 进行分区
Apache Kafka 原生支持主题分区。
有时将数据发送到特定分区会很有优势——例如,当您希望严格按顺序处理消息时(某个特定客户的所有消息应发送到同一分区)。
以下示例展示了如何配置生产者和消费者端
@SpringBootApplication
public class KafkaPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"foo1", "bar1", "qux1",
"foo2", "bar2", "qux2",
"foo3", "bar3", "qux3",
"foo4", "bar4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
.web(false)
.run(args);
}
@Bean
public Supplier<Message<?>> generate() {
return () -> {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
};
}
}
application.yml
spring:
cloud:
stream:
bindings:
generate-out-0:
destination: partitioned.topic
producer:
partition-key-expression: headers['partitionKey']
partition-count: 12
请务必记住,由于 Apache Kafka 原生支持分区,除非您像示例中一样使用自定义分区键,或者使用涉及载荷(payload)本身的表达式,否则无需依赖上述描述的 binder 分区机制。binder 提供的分区选择机制是为那些不支持原生分区的中间件技术设计的。请注意,在上面的示例中,我们使用了一个名为 partitionKey 的自定义键,它将是决定分区的关键因素,因此在这种情况下,使用 binder 分区是合适的。当使用原生 Kafka 分区时,即当您不提供 partition-key-expression 时,Apache Kafka 将选择一个分区,默认情况下,它将是记录键的哈希值对可用分区数取模的结果。要向出站记录添加键,请在 spring-messaging Message<?> 中将 KafkaHeaders.KEY 头部设置为所需的键值。默认情况下,如果没有提供记录键,Apache Kafka 将根据 Apache Kafka 文档中描述的逻辑选择一个分区。 |
必须为主题预置足够的分区,以实现所有消费者组所需的并发度。上述配置最多支持 12 个消费者实例(如果 concurrency 为 2,则支持 6 个;如果 concurrency 为 3,则支持 4 个,依此类推)。通常最好“过度配置”分区,以便未来能够增加消费者或并发度。 |
上述配置使用了默认分区策略(key.hashCode() % partitionCount )。根据键值的不同,这可能提供,也可能不提供一个均衡的算法。特别需要注意的是,这种分区策略与独立 Kafka 生产者(例如 Kafka Streams 使用的生产者)的默认策略不同,这意味着相同的键值在这些客户端生产时可能在分区之间产生不同的均衡结果。您可以通过使用 partitionSelectorExpression 或 partitionSelectorClass 属性来覆盖此默认设置。 |
由于分区由 Kafka 原生处理,消费者端无需特殊配置。Kafka 会在实例间分配分区。
Kafka 主题的分区数可能在运行时发生变化(例如由于管理任务)。之后计算出的分区将有所不同(例如,届时将使用新的分区)。从 Spring Cloud Stream 运行时版本 4.0.3 开始将支持分区数的变化。另请参见参数 'spring.kafka.producer.properties.metadata.max.age.ms' 来配置更新间隔。由于某些限制,不能使用引用消息 'payload' 的 'partition-key-expression',在这种情况下该机制将被禁用。此整体行为默认是禁用的,可以通过配置参数 'producer.dynamicPartitionUpdatesEnabled=true' 来启用。 |
以下 Spring Boot 应用监听 Kafka 流,并将每条消息发送到的分区 ID 打印(到控制台)出来
@SpringBootApplication
public class KafkaPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}
@Bean
public Consumer<Message<String>> listen() {
return message -> {
int partition = (int) message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION);
System.out.println(message + " received from partition " + partition);
};
}
}
application.yml
spring:
cloud:
stream:
bindings:
listen-in-0:
destination: partitioned.topic
group: myGroup
您可以根据需要添加实例。Kafka 会重新平衡分区分配。如果实例数量(或 实例数量 * 并发数
)超过分区数量,一些消费者将处于空闲状态。