Kafka 队列(共享消费者)
从 4.0 版本开始,Spring for Apache Kafka 通过共享消费者提供了对 Kafka 队列的支持,共享消费者是 Apache Kafka 4.0.0 的一部分,并实现了 KIP-932(Kafka 队列)。此功能目前处于早期访问阶段。
与传统消费者组相比,Kafka 队列启用了不同的消费模型。传统的基于分区的分配模型中,每个分区都独占分配给一个消费者,而共享消费者可以协作消费相同的分区,记录会在共享组的消费者之间进行分配。
共享消费者工厂
ShareConsumerFactory 负责创建共享消费者实例。Spring Kafka 提供了 DefaultShareConsumerFactory 实现。
配置
您可以像配置常规 ConsumerFactory 一样配置 DefaultShareConsumerFactory。
@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-share-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultShareConsumerFactory<>(props);
}
构造函数选项
DefaultShareConsumerFactory 提供了多种构造函数选项
// Basic configuration
new DefaultShareConsumerFactory<>(configs);
// With deserializer suppliers
new DefaultShareConsumerFactory<>(configs, keyDeserializerSupplier, valueDeserializerSupplier);
// With deserializer instances
new DefaultShareConsumerFactory<>(configs, keyDeserializer, valueDeserializer, configureDeserializers);
反序列化器配置
您可以通过多种方式配置反序列化器
-
通过配置属性(推荐用于简单情况)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -
通过 Setters:
factory.setKeyDeserializer(new StringDeserializer()); factory.setValueDeserializer(new StringDeserializer()); -
通过 Suppliers(用于需要为每个消费者创建反序列化器的情况)
factory.setKeyDeserializerSupplier(() -> new StringDeserializer()); factory.setValueDeserializerSupplier(() -> new StringDeserializer());
如果您的反序列化器已完全配置且不应由工厂重新配置,请将 configureDeserializers 设置为 false。
生命周期监听器
您可以添加监听器来监控共享消费者的生命周期
factory.addListener(new ShareConsumerFactory.Listener<String, String>() {
@Override
public void consumerAdded(String id, ShareConsumer<String, String> consumer) {
// Called when a new consumer is created
System.out.println("Consumer added: " + id);
}
@Override
public void consumerRemoved(String id, ShareConsumer<String, String> consumer) {
// Called when a consumer is closed
System.out.println("Consumer removed: " + id);
}
});
共享消息监听器容器
ShareKafkaMessageListenerContainer
ShareKafkaMessageListenerContainer 为共享消费者提供了一个容器,支持并发处理
@Bean
public ShareKafkaMessageListenerContainer<String, String> container(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ContainerProperties containerProps = new ContainerProperties("my-topic");
containerProps.setGroupId("my-share-group");
ShareKafkaMessageListenerContainer<String, String> container =
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);
container.setupMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println("Received: " + record.value());
}
});
return container;
}
容器属性
共享容器支持常规消费者可用的容器属性的子集
-
topics: 要订阅的主题名称数组 -
groupId: 共享组 ID -
clientId: 消费者的客户端 ID -
kafkaConsumerProperties: 额外的消费者属性
|
共享消费者不支持
|
并发性
ShareKafkaMessageListenerContainer 通过在单个容器中创建多个消费者线程来支持并发处理。每个线程都运行自己的 ShareConsumer 实例,该实例参与同一个共享组。
与传统消费者组中并发涉及分区分配不同,共享消费者利用 Kafka 在代理上的记录级别分发。这意味着同一容器中的多个消费者线程作为共享组的一部分协同工作,Kafka 代理将记录分发到所有消费者实例。
|
并发性在应用程序实例之间是累加的 从共享组的角度来看,每个 例如: * 应用程序实例 1: 这意味着在单个容器中设置 |
以编程方式配置并发性
@Bean
public ShareKafkaMessageListenerContainer<String, String> concurrentContainer(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ContainerProperties containerProps = new ContainerProperties("my-topic");
containerProps.setGroupId("my-share-group");
ShareKafkaMessageListenerContainer<String, String> container =
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);
// Set concurrency to create 5 consumer threads
container.setConcurrency(5);
container.setupMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println("Received on " + Thread.currentThread().getName() + ": " + record.value());
}
});
return container;
}
通过工厂配置并发性
您可以在工厂级别设置默认并发性,该设置适用于该工厂创建的所有容器
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
// Set default concurrency for all containers created by this factory
factory.setConcurrency(3);
return factory;
}
每个监听器的并发性
可以使用 concurrency 属性覆盖每个监听器的并发设置
@Component
public class ConcurrentShareListener {
@KafkaListener(
topics = "high-throughput-topic",
containerFactory = "shareKafkaListenerContainerFactory",
groupId = "my-share-group",
concurrency = "10" // Override factory default
)
public void listen(ConsumerRecord<String, String> record) {
// This listener will use 10 consumer threads
System.out.println("Processing: " + record.value());
}
}
并发性注意事项
-
线程安全:每个消费者线程都有自己的
ShareConsumer实例并独立管理其自身的确认 -
客户端 ID:每个消费者线程都会收到一个带有数字后缀的唯一客户端 ID(例如,
myContainer-0、myContainer-1等) -
指标:来自所有消费者线程的指标会被聚合并通过
container.metrics()访问 -
生命周期:所有消费者线程作为一个单元同时启动和停止
-
工作分配:Kafka 代理处理共享组中所有消费者实例的记录分发
-
显式确认:每个线程独立管理其记录的确认;一个线程中未确认的记录不会阻塞其他线程
显式确认下的并发性
并发性与显式确认模式无缝协作。每个消费者线程独立跟踪并确认其自己的记录
@KafkaListener(
topics = "order-queue",
containerFactory = "explicitShareKafkaListenerContainerFactory",
groupId = "order-processors",
concurrency = "5"
)
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
try {
// Process the order
processOrderLogic(record.value());
acknowledgment.acknowledge(); // ACCEPT
}
catch (RetryableException e) {
acknowledgment.release(); // Will be redelivered
}
catch (Exception e) {
acknowledgment.reject(); // Permanent failure
}
}
|
记录获取和分发行为 共享消费者使用拉取模型,其中每个消费者线程调用
当记录被一个消费者获取时,其他消费者无法访问它们。当获取锁到期时,未确认的记录会自动返回“可用”状态,并可以交付给另一个消费者。 代理使用 对并发性的影响
配置
|
注解驱动的监听器
带有共享消费者的 @KafkaListener
您可以通过配置 ShareKafkaListenerContainerFactory 来使用带有共享消费者的 @KafkaListener
@Configuration
@EnableKafka
public class ShareConsumerConfig {
@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultShareConsumerFactory<>(props);
}
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
}
}
然后在您的监听器中使用它
@Component
public class ShareMessageListener {
@KafkaListener(
topics = "my-queue-topic",
containerFactory = "shareKafkaListenerContainerFactory",
groupId = "my-share-group"
)
public void listen(ConsumerRecord<String, String> record) {
System.out.println("Received from queue: " + record.value());
// Record is automatically acknowledged with ACCEPT
}
}
共享组偏移量重置
与常规消费者组不同,共享组使用不同的配置来控制偏移量重置行为。您可以通过编程方式配置此项
private void configureShareGroup(String bootstrapServers, String groupId) throws Exception {
Map<String, Object> adminProps = new HashMap<>();
adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (Admin admin = Admin.create(adminProps)) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
ConfigEntry configEntry = new ConfigEntry("share.auto.offset.reset", "earliest");
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
configResource, List.of(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))
);
admin.incrementalAlterConfigs(configs).all().get();
}
}
记录确认
共享消费者支持两种确认模式,控制记录在处理后如何确认。
隐式确认(默认)
在隐式模式下,记录根据处理结果自动确认
成功处理:记录被确认为 ACCEPT 处理错误:记录被确认为 REJECT
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
// Implicit mode is the default - no additional configuration needed
return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
}
显式确认
在显式模式下,应用程序必须使用提供的 ShareAcknowledgment 手动确认每个记录。
有两种方式配置显式确认模式
选项 1:使用 Kafka 客户端配置
@Bean
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"); // Official Kafka client config
return new DefaultShareConsumerFactory<>(props);
}
选项 2:使用 Spring 容器配置
@Bean
public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
// Configure acknowledgment mode at container factory level
// true means explicit acknowledgment is required
factory.getContainerProperties().setExplicitShareAcknowledgment(true);
return factory;
}
确认类型
共享消费者支持三种确认类型
ACCEPT: Record processed successfully, mark as completed RELEASE: Temporary failure, make record available for redelivery REJECT: Permanent failure, do not retry
ShareAcknowledgment API
ShareAcknowledgment 接口提供了显式确认的方法
public interface ShareAcknowledgment {
void acknowledge();
void release();
void reject();
}
监听器接口
共享消费者支持针对不同用例的专用监听器接口
基本消息监听器
对于简单情况,使用标准 MessageListener
@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record) {
System.out.println("Received: " + record.value());
// Automatically acknowledged in implicit mode
}
AcknowledgingShareConsumerAwareMessageListener
此接口提供对 ShareConsumer 实例的访问,并支持可选的确认。确认参数可为空,具体取决于容器的确认模式
隐式模式示例(确认为空)
@KafkaListener(
topics = "my-topic",
containerFactory = "shareKafkaListenerContainerFactory" // Implicit mode by default
)
public void listen(ConsumerRecord<String, String> record,
@Nullable ShareAcknowledgment acknowledgment,
ShareConsumer<?, ?> consumer) {
// In implicit mode, acknowledgment is null
System.out.println("Received: " + record.value());
// Access consumer metrics if needed
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
// Record is auto-acknowledged as ACCEPT on success, REJECT on error
}
显式模式示例(确认非空)
@Component
public class ExplicitAckListener {
@KafkaListener(
topics = "my-topic",
containerFactory = "explicitShareKafkaListenerContainerFactory"
)
public void listen(ConsumerRecord<String, String> record,
@Nullable ShareAcknowledgment acknowledgment,
ShareConsumer<?, ?> consumer) {
// In explicit mode, acknowledgment is non-null
try {
processRecord(record);
acknowledgment.acknowledge(); // ACCEPT
}
catch (RetryableException e) {
acknowledgment.release(); // Will be redelivered
}
catch (Exception e) {
acknowledgment.reject(); // Permanent failure
}
}
private void processRecord(ConsumerRecord<String, String> record) {
// Business logic here
}
}
确认约束
在显式确认模式下,容器强制执行重要的约束
Poll Blocking: Subsequent polls are blocked until all records from the previous poll are acknowledged. One-time Acknowledgment: Each record can only be acknowledged once. Error Handling: If processing throws an exception, the record is automatically acknowledged as `REJECT`.
| 在显式模式下,如果未能确认记录,将阻塞后续的消息处理。请务必确保在所有代码路径中都确认记录。 |
确认超时检测
为了帮助识别缺失的确认,Spring Kafka 提供了可配置的超时检测。当记录在指定超时时间内未被确认时,会记录一条警告,其中包含未确认记录的详细信息。
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
// Set acknowledgment timeout (default is 30 seconds)
factory.getContainerProperties().setShareAcknowledgmentTimeout(Duration.ofSeconds(30));
return factory;
}
当记录超过超时时间时,您会看到如下警告
WARN: Record not acknowledged within timeout (30 seconds). In explicit acknowledgment mode, you must call ack.acknowledge(), ack.release(), or ack.reject() for every record.
此功能有助于开发人员快速识别代码中缺少确认调用的情况,从而防止因忘记确认而导致的常见问题“Spring Kafka 不再消费新记录”。
确认示例
混合确认模式
@KafkaListener(topics = "order-processing", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
String orderId = record.key();
String orderData = record.value();
try {
if (isValidOrder(orderData)) {
if (processOrder(orderData)) {
acknowledgment.acknowledge(); // Success - ACCEPT
}
else {
acknowledgment.release(); // Temporary failure - retry later
}
}
else {
acknowledgment.reject(); // Invalid order - don't retry
}
}
catch (Exception e) {
// Exception automatically triggers REJECT
throw e;
}
}
条件确认
@KafkaListener(topics = "data-validation", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void validateData(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
ValidationResult result = validator.validate(record.value());
switch (result.getStatus()) {
case VALID:
acknowledgment.acknowledge(AcknowledgeType.ACCEPT);
break;
case INVALID_RETRYABLE:
acknowledgment.acknowledge(AcknowledgeType.RELEASE);
break;
case INVALID_PERMANENT:
acknowledgment.acknowledge(AcknowledgeType.REJECT);
break;
}
}
毒药消息保护和递送计数
KIP-932 包含了代理端的毒药消息保护,以防止无法处理的记录被无休止地重新递送。
工作原理
每当共享组中的消费者获取记录时,代理都会递增内部递送计数。第一次获取将递送计数设置为 1,每次后续获取都会递增。当递送计数达到配置的限制(默认:5)时,记录会进入已归档状态,并且不再有资格进行额外的递送尝试。
配置
最大递送尝试次数可以通过 Admin API 为每个共享组配置
private void configureMaxDeliveryAttempts(String bootstrapServers, String groupId) throws Exception {
Map<String, Object> adminProps = new HashMap<>();
adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (Admin admin = Admin.create(adminProps)) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
// Default is 5, adjust based on your retry tolerance
ConfigEntry maxAttempts = new ConfigEntry("group.share.delivery.attempt.limit", "10");
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
configResource, List.of(new AlterConfigOp(maxAttempts, AlterConfigOp.OpType.SET))
);
admin.incrementalAlterConfigs(configs).all().get();
}
}
|
递送计数不暴露给应用程序 递送计数由代理内部维护,并且不暴露给消费者应用程序。这是 KIP-932 中的有意设计决策。递送计数是近似值,用作毒药消息保护机制,而不是精确的重递送计数器。应用程序无法通过任何 API 查询或访问此值。 对于应用程序级别的重试逻辑,请使用确认类型
一旦达到 |
重试策略建议
@KafkaListener(topics = "orders", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment ack) {
try {
// Attempt to process the order
orderService.process(record.value());
ack.acknowledge(); // ACCEPT - successfully processed
}
catch (TransientException e) {
// Temporary failure (network issue, service unavailable, etc.)
// Release the record for redelivery
// Broker will retry up to group.share.delivery.attempt.limit times
logger.warn("Transient error processing order, will retry: {}", e.getMessage());
ack.release(); // RELEASE - make available for retry
}
catch (ValidationException e) {
// Permanent semantic error (invalid data format, business rule violation, etc.)
// Do not retry - this record will never succeed
logger.error("Invalid order data, rejecting: {}", e.getMessage());
ack.reject(); // REJECT - permanent failure, do not retry
}
catch (Exception e) {
// Unknown error - typically safer to reject to avoid infinite loops
// But could also release if you suspect it might be transient
logger.error("Unexpected error processing order, rejecting: {}", e.getMessage());
ack.reject(); // REJECT - avoid poison message loops
}
}
代理的毒药消息保护确保即使您始终对错误使用 RELEASE,记录也不会无休止地重试。它们在达到递送尝试限制后会自动归档。