Kafka 队列(共享消费者)

从 4.0 版本开始,Spring for Apache Kafka 通过共享消费者提供了对 Kafka 队列的支持,共享消费者是 Apache Kafka 4.0.0 的一部分,并实现了 KIP-932(Kafka 队列)。此功能目前处于早期访问阶段。

与传统消费者组相比,Kafka 队列启用了不同的消费模型。传统的基于分区的分配模型中,每个分区都独占分配给一个消费者,而共享消费者可以协作消费相同的分区,记录会在共享组的消费者之间进行分配。

共享消费者工厂

ShareConsumerFactory 负责创建共享消费者实例。Spring Kafka 提供了 DefaultShareConsumerFactory 实现。

配置

您可以像配置常规 ConsumerFactory 一样配置 DefaultShareConsumerFactory

@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-share-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultShareConsumerFactory<>(props);
}

构造函数选项

DefaultShareConsumerFactory 提供了多种构造函数选项

// Basic configuration
new DefaultShareConsumerFactory<>(configs);

// With deserializer suppliers
new DefaultShareConsumerFactory<>(configs, keyDeserializerSupplier, valueDeserializerSupplier);

// With deserializer instances
new DefaultShareConsumerFactory<>(configs, keyDeserializer, valueDeserializer, configureDeserializers);

反序列化器配置

您可以通过多种方式配置反序列化器

  1. 通过配置属性(推荐用于简单情况)

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  2. 通过 Setters:

    factory.setKeyDeserializer(new StringDeserializer());
    factory.setValueDeserializer(new StringDeserializer());
  3. 通过 Suppliers(用于需要为每个消费者创建反序列化器的情况)

    factory.setKeyDeserializerSupplier(() -> new StringDeserializer());
    factory.setValueDeserializerSupplier(() -> new StringDeserializer());

如果您的反序列化器已完全配置且不应由工厂重新配置,请将 configureDeserializers 设置为 false

生命周期监听器

您可以添加监听器来监控共享消费者的生命周期

factory.addListener(new ShareConsumerFactory.Listener<String, String>() {
    @Override
    public void consumerAdded(String id, ShareConsumer<String, String> consumer) {
        // Called when a new consumer is created
        System.out.println("Consumer added: " + id);
    }

    @Override
    public void consumerRemoved(String id, ShareConsumer<String, String> consumer) {
        // Called when a consumer is closed
        System.out.println("Consumer removed: " + id);
    }
});

共享消息监听器容器

ShareKafkaMessageListenerContainer

ShareKafkaMessageListenerContainer 为共享消费者提供了一个容器,支持并发处理

@Bean
public ShareKafkaMessageListenerContainer<String, String> container(
        ShareConsumerFactory<String, String> shareConsumerFactory) {

    ContainerProperties containerProps = new ContainerProperties("my-topic");
    containerProps.setGroupId("my-share-group");

    ShareKafkaMessageListenerContainer<String, String> container =
        new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);

    container.setupMessageListener(new MessageListener<String, String>() {
        @Override
        public void onMessage(ConsumerRecord<String, String> record) {
            System.out.println("Received: " + record.value());
        }
    });

    return container;
}

容器属性

共享容器支持常规消费者可用的容器属性的子集

  • topics: 要订阅的主题名称数组

  • groupId: 共享组 ID

  • clientId: 消费者的客户端 ID

  • kafkaConsumerProperties: 额外的消费者属性

共享消费者不支持

  • 显式分区分配 (TopicPartitionOffset)

  • 主题模式

  • 手动偏移量管理

并发性

ShareKafkaMessageListenerContainer 通过在单个容器中创建多个消费者线程来支持并发处理。每个线程都运行自己的 ShareConsumer 实例,该实例参与同一个共享组。

与传统消费者组中并发涉及分区分配不同,共享消费者利用 Kafka 在代理上的记录级别分发。这意味着同一容器中的多个消费者线程作为共享组的一部分协同工作,Kafka 代理将记录分发到所有消费者实例。

并发性在应用程序实例之间是累加的

从共享组的角度来看,每个 ShareConsumer 实例都是一个独立的成员,无论它在哪里运行。在单个容器中设置 concurrency=3 会创建 3 个共享组成员。如果您运行多个具有相同共享组 ID 的应用程序实例,它们的所有消费者线程将合并到一个池中。

例如: * 应用程序实例 1:concurrency=3 → 3 个共享组成员 * 应用程序实例 2:concurrency=3 → 3 个共享组成员 * 总计:6 个共享组成员可供代理分发记录

这意味着在单个容器中设置 concurrency=5 在操作上等同于运行 5 个独立的应用程序实例,每个实例的 concurrency=1(都使用相同的 group.id)。Kafka 代理对所有消费者实例一视同仁,并将记录分发到整个池中。

以编程方式配置并发性

@Bean
public ShareKafkaMessageListenerContainer<String, String> concurrentContainer(
        ShareConsumerFactory<String, String> shareConsumerFactory) {

    ContainerProperties containerProps = new ContainerProperties("my-topic");
    containerProps.setGroupId("my-share-group");

    ShareKafkaMessageListenerContainer<String, String> container =
        new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);

    // Set concurrency to create 5 consumer threads
    container.setConcurrency(5);

    container.setupMessageListener(new MessageListener<String, String>() {
        @Override
        public void onMessage(ConsumerRecord<String, String> record) {
            System.out.println("Received on " + Thread.currentThread().getName() + ": " + record.value());
        }
    });

    return container;
}

通过工厂配置并发性

您可以在工厂级别设置默认并发性,该设置适用于该工厂创建的所有容器

@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
        ShareConsumerFactory<String, String> shareConsumerFactory) {

    ShareKafkaListenerContainerFactory<String, String> factory =
        new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);

    // Set default concurrency for all containers created by this factory
    factory.setConcurrency(3);

    return factory;
}

每个监听器的并发性

可以使用 concurrency 属性覆盖每个监听器的并发设置

@Component
public class ConcurrentShareListener {

    @KafkaListener(
        topics = "high-throughput-topic",
        containerFactory = "shareKafkaListenerContainerFactory",
        groupId = "my-share-group",
        concurrency = "10"  // Override factory default
    )
    public void listen(ConsumerRecord<String, String> record) {
        // This listener will use 10 consumer threads
        System.out.println("Processing: " + record.value());
    }
}

并发性注意事项

  • 线程安全:每个消费者线程都有自己的 ShareConsumer 实例并独立管理其自身的确认

  • 客户端 ID:每个消费者线程都会收到一个带有数字后缀的唯一客户端 ID(例如,myContainer-0myContainer-1 等)

  • 指标:来自所有消费者线程的指标会被聚合并通过 container.metrics() 访问

  • 生命周期:所有消费者线程作为一个单元同时启动和停止

  • 工作分配:Kafka 代理处理共享组中所有消费者实例的记录分发

  • 显式确认:每个线程独立管理其记录的确认;一个线程中未确认的记录不会阻塞其他线程

显式确认下的并发性

并发性与显式确认模式无缝协作。每个消费者线程独立跟踪并确认其自己的记录

@KafkaListener(
    topics = "order-queue",
    containerFactory = "explicitShareKafkaListenerContainerFactory",
    groupId = "order-processors",
    concurrency = "5"
)
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
    try {
        // Process the order
        processOrderLogic(record.value());
        acknowledgment.acknowledge(); // ACCEPT
    }
    catch (RetryableException e) {
        acknowledgment.release(); // Will be redelivered
    }
    catch (Exception e) {
        acknowledgment.reject(); // Permanent failure
    }
}

记录获取和分发行为

共享消费者使用拉取模型,其中每个消费者线程调用 poll() 从代理获取记录。当消费者进行轮询时,代理的共享分区领导者

  • 选择处于“可用”状态的记录

  • 将它们移动到“已获取”状态,并带有时间限制的获取锁(默认为 30 秒,可通过 group.share.record.lock.duration.ms 配置)

  • 为了提高效率,倾向于返回完整的记录批次

  • max.poll.records 应用为软限制,这意味着即使超过此值,也会获取完整的记录批次

当记录被一个消费者获取时,其他消费者无法访问它们。当获取锁到期时,未确认的记录会自动返回“可用”状态,并可以交付给另一个消费者。

代理使用 group.share.partition.max.record.locks 限制每个分区可获取的记录数量。一旦达到此限制,后续的轮询将暂时不返回任何记录,直到锁过期。

对并发性的影响

  • 每个消费者线程独立轮询,每次轮询可能获取不同数量的记录

  • 记录在线程间的分发取决于轮询时间和批次可用性

  • 多个线程增加了可用于获取记录的消费者池

  • 在消息量少或分区单一的情况下,记录可能会集中在少数线程上

  • 对于长时间运行的工作负载,分发往往更均匀

配置

  • 每个线程独立轮询和处理记录

  • 确认约束适用于每个线程(一个线程中未确认的记录不会阻塞其他线程)

  • 并发设置必须大于 0,并且在容器运行时不能更改

注解驱动的监听器

带有共享消费者的 @KafkaListener

您可以通过配置 ShareKafkaListenerContainerFactory 来使用带有共享消费者的 @KafkaListener

@Configuration
@EnableKafka
public class ShareConsumerConfig {

    @Bean
    public ShareConsumerFactory<String, String> shareConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultShareConsumerFactory<>(props);
    }

    @Bean
    public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
            ShareConsumerFactory<String, String> shareConsumerFactory) {
        return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
    }
}

然后在您的监听器中使用它

@Component
public class ShareMessageListener {

    @KafkaListener(
        topics = "my-queue-topic",
        containerFactory = "shareKafkaListenerContainerFactory",
        groupId = "my-share-group"
    )
    public void listen(ConsumerRecord<String, String> record) {
        System.out.println("Received from queue: " + record.value());
        // Record is automatically acknowledged with ACCEPT
    }
}

共享组偏移量重置

与常规消费者组不同,共享组使用不同的配置来控制偏移量重置行为。您可以通过编程方式配置此项

private void configureShareGroup(String bootstrapServers, String groupId) throws Exception {
    Map<String, Object> adminProps = new HashMap<>();
    adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    try (Admin admin = Admin.create(adminProps)) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
        ConfigEntry configEntry = new ConfigEntry("share.auto.offset.reset", "earliest");

        Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
            configResource, List.of(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))
        );

        admin.incrementalAlterConfigs(configs).all().get();
    }
}

记录确认

共享消费者支持两种确认模式,控制记录在处理后如何确认。

隐式确认(默认)

在隐式模式下,记录根据处理结果自动确认

成功处理:记录被确认为 ACCEPT 处理错误:记录被确认为 REJECT

@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
    ShareConsumerFactory<String, String> shareConsumerFactory) {
    // Implicit mode is the default - no additional configuration needed
    return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
}

显式确认

在显式模式下,应用程序必须使用提供的 ShareAcknowledgment 手动确认每个记录。

有两种方式配置显式确认模式

选项 1:使用 Kafka 客户端配置

@Bean
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"); // Official Kafka client config
    return new DefaultShareConsumerFactory<>(props);
}

选项 2:使用 Spring 容器配置

@Bean
public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaListenerContainerFactory(
    ShareConsumerFactory<String, String> shareConsumerFactory) {

    ShareKafkaListenerContainerFactory<String, String> factory =
        new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);

    // Configure acknowledgment mode at container factory level
    // true means explicit acknowledgment is required
    factory.getContainerProperties().setExplicitShareAcknowledgment(true);

    return factory;
}

配置优先级

当两种配置方法都使用时,Spring Kafka 遵循以下优先级顺序(从高到低)

  1. 容器属性containerProperties.setExplicitShareAcknowledgment(true/false)

  2. 消费者配置ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG("implicit" 或 "explicit")

  3. 默认值false(隐式确认)

确认类型

共享消费者支持三种确认类型

ACCEPT: Record processed successfully, mark as completed
RELEASE: Temporary failure, make record available for redelivery
REJECT: Permanent failure, do not retry

ShareAcknowledgment API

ShareAcknowledgment 接口提供了显式确认的方法

public interface ShareAcknowledgment {
    void acknowledge();
    void release();
    void reject();
}

监听器接口

共享消费者支持针对不同用例的专用监听器接口

基本消息监听器

对于简单情况,使用标准 MessageListener

@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record) {
    System.out.println("Received: " + record.value());
    // Automatically acknowledged in implicit mode
}

AcknowledgingShareConsumerAwareMessageListener

此接口提供对 ShareConsumer 实例的访问,并支持可选的确认。确认参数可为空,具体取决于容器的确认模式

隐式模式示例(确认为空)
@KafkaListener(
    topics = "my-topic",
    containerFactory = "shareKafkaListenerContainerFactory"  // Implicit mode by default
)
public void listen(ConsumerRecord<String, String> record,
                  @Nullable ShareAcknowledgment acknowledgment,
                  ShareConsumer<?, ?> consumer) {

    // In implicit mode, acknowledgment is null
    System.out.println("Received: " + record.value());

    // Access consumer metrics if needed
    Map<MetricName, ? extends Metric> metrics = consumer.metrics();

    // Record is auto-acknowledged as ACCEPT on success, REJECT on error
}
显式模式示例(确认非空)
@Component
public class ExplicitAckListener {
    @KafkaListener(
        topics = "my-topic",
        containerFactory = "explicitShareKafkaListenerContainerFactory"
    )
    public void listen(ConsumerRecord<String, String> record,
                      @Nullable ShareAcknowledgment acknowledgment,
                      ShareConsumer<?, ?> consumer) {

        // In explicit mode, acknowledgment is non-null
        try {
            processRecord(record);
            acknowledgment.acknowledge(); // ACCEPT
        }
		catch (RetryableException e) {
            acknowledgment.release(); // Will be redelivered
        }
		catch (Exception e) {
            acknowledgment.reject(); // Permanent failure
        }
    }

    private void processRecord(ConsumerRecord<String, String> record) {
        // Business logic here
    }
}

确认约束

在显式确认模式下,容器强制执行重要的约束

Poll Blocking: Subsequent polls are blocked until all records from the previous poll are acknowledged.
One-time Acknowledgment: Each record can only be acknowledged once.
Error Handling: If processing throws an exception, the record is automatically acknowledged as `REJECT`.
在显式模式下,如果未能确认记录,将阻塞后续的消息处理。请务必确保在所有代码路径中都确认记录。

确认超时检测

为了帮助识别缺失的确认,Spring Kafka 提供了可配置的超时检测。当记录在指定超时时间内未被确认时,会记录一条警告,其中包含未确认记录的详细信息。

@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
    ShareConsumerFactory<String, String> shareConsumerFactory) {
    ShareKafkaListenerContainerFactory<String, String> factory =
        new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);

    // Set acknowledgment timeout (default is 30 seconds)
    factory.getContainerProperties().setShareAcknowledgmentTimeout(Duration.ofSeconds(30));

    return factory;
}

当记录超过超时时间时,您会看到如下警告

WARN: Record not acknowledged within timeout (30 seconds).
In explicit acknowledgment mode, you must call ack.acknowledge(), ack.release(),
or ack.reject() for every record.

此功能有助于开发人员快速识别代码中缺少确认调用的情况,从而防止因忘记确认而导致的常见问题“Spring Kafka 不再消费新记录”。

确认示例

混合确认模式

@KafkaListener(topics = "order-processing", containerFactory = "explicitShareKafkaListenerContainerFactory")
    public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
        String orderId = record.key();
        String orderData = record.value();
        try {
            if (isValidOrder(orderData)) {
                if (processOrder(orderData)) {
                    acknowledgment.acknowledge(); // Success - ACCEPT
                }
                else {
                    acknowledgment.release(); // Temporary failure - retry later
                }
            }
            else {
                acknowledgment.reject(); // Invalid order - don't retry
            }
        }
        catch (Exception e) {
            // Exception automatically triggers REJECT
            throw e;
        }
}

条件确认

@KafkaListener(topics = "data-validation", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void validateData(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
    ValidationResult result = validator.validate(record.value());
    switch (result.getStatus()) {
        case VALID:
            acknowledgment.acknowledge(AcknowledgeType.ACCEPT);
            break;
        case INVALID_RETRYABLE:
            acknowledgment.acknowledge(AcknowledgeType.RELEASE);
            break;
        case INVALID_PERMANENT:
            acknowledgment.acknowledge(AcknowledgeType.REJECT);
            break;
    }
}

毒药消息保护和递送计数

KIP-932 包含了代理端的毒药消息保护,以防止无法处理的记录被无休止地重新递送。

工作原理

每当共享组中的消费者获取记录时,代理都会递增内部递送计数。第一次获取将递送计数设置为 1,每次后续获取都会递增。当递送计数达到配置的限制(默认:5)时,记录会进入已归档状态,并且不再有资格进行额外的递送尝试。

配置

最大递送尝试次数可以通过 Admin API 为每个共享组配置

private void configureMaxDeliveryAttempts(String bootstrapServers, String groupId) throws Exception {
    Map<String, Object> adminProps = new HashMap<>();
    adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    try (Admin admin = Admin.create(adminProps)) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);

        // Default is 5, adjust based on your retry tolerance
        ConfigEntry maxAttempts = new ConfigEntry("group.share.delivery.attempt.limit", "10");

        Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
            configResource, List.of(new AlterConfigOp(maxAttempts, AlterConfigOp.OpType.SET))
        );

        admin.incrementalAlterConfigs(configs).all().get();
    }
}

递送计数不暴露给应用程序

递送计数由代理内部维护,并且不暴露给消费者应用程序。这是 KIP-932 中的有意设计决策。递送计数是近似值,用作毒药消息保护机制,而不是精确的重递送计数器。应用程序无法通过任何 API 查询或访问此值。

对于应用程序级别的重试逻辑,请使用确认类型

  • RELEASE - 使记录可用于重递送(计入递送计数)

  • REJECT - 标记为永久失败(不会导致重递送)

  • ACCEPT - 成功处理(不会导致重递送)

一旦达到 group.share.delivery.attempt.limit,代理会自动防止无休止的重递送,将记录移至已归档状态。

重试策略建议

@KafkaListener(topics = "orders", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment ack) {
    try {
        // Attempt to process the order
        orderService.process(record.value());
        ack.acknowledge(); // ACCEPT - successfully processed
    }
    catch (TransientException e) {
        // Temporary failure (network issue, service unavailable, etc.)
        // Release the record for redelivery
        // Broker will retry up to group.share.delivery.attempt.limit times
        logger.warn("Transient error processing order, will retry: {}", e.getMessage());
        ack.release(); // RELEASE - make available for retry
    }
    catch (ValidationException e) {
        // Permanent semantic error (invalid data format, business rule violation, etc.)
        // Do not retry - this record will never succeed
        logger.error("Invalid order data, rejecting: {}", e.getMessage());
        ack.reject(); // REJECT - permanent failure, do not retry
    }
    catch (Exception e) {
        // Unknown error - typically safer to reject to avoid infinite loops
        // But could also release if you suspect it might be transient
        logger.error("Unexpected error processing order, rejecting: {}", e.getMessage());
        ack.reject(); // REJECT - avoid poison message loops
    }
}

代理的毒药消息保护确保即使您始终对错误使用 RELEASE,记录也不会无休止地重试。它们在达到递送尝试限制后会自动归档。

与常规消费者的区别

共享消费者与常规消费者在几个关键方面有所不同

  1. 无分区分配:共享消费者不能被分配特定分区

  2. 无主题模式:共享消费者不支持订阅主题模式

  3. 协作消费:同一共享组中的多个消费者可以同时消费同一分区

  4. 记录级别确认:支持带有 ACCEPTRELEASEREJECT 类型的显式确认

  5. 不同的组管理:共享组使用不同的协调器协议

  6. 无批处理:共享消费者单独处理记录,而不是批量处理

  7. 代理端重试管理:递送计数跟踪和毒药消息保护由代理管理,不暴露给应用程序

限制和注意事项

当前限制

  • 预览版:此功能处于预览模式,未来版本可能会更改

  • 无消息转换器:共享消费者尚不支持消息转换器

  • 无批处理监听器:共享消费者不支持批处理

  • 轮询约束:在显式确认模式下,未确认的记录会阻塞每个消费者线程中的后续轮询

© . This site is unofficial and not affiliated with VMware.