Kafka Binder 监听器容器定制器

Spring Cloud Stream 通过使用定制器 (customizers) 为消息监听器容器提供了强大的定制选项。本节涵盖了适用于 Kafka 的定制器接口:ListenerContainerCustomizer、其 Kafka 特定的扩展 KafkaListenerContainerCustomizer 以及专门的 ListenerContainerWithDlqAndRetryCustomizer

ListenerContainerCustomizer

ListenerContainerCustomizer 是 Spring Cloud Stream 中一个通用的接口,允许定制消息监听器容器。

目的

当你需要修改监听器容器的行为时,使用此定制器。

用法

要使用 ListenerContainerCustomizer,请在你的配置中创建一个实现此接口的 bean

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> genericCustomizer() {
    return (container, destinationName, group) -> {
        // Customize the container here
    };
}

ListenerContainerCustomizer 接口定义了以下方法

void configure(C container, String destinationName, String group);
  • container: 要定制的消息监听器容器。

  • destinationName: 目标名称 (主题)。

  • group: 消费者组 ID。

KafkaListenerContainerCustomizer

KafkaListenerContainerCustomizer 接口扩展了 ListenerContainerCustomizer,用于修改监听器容器的行为,并提供对绑定特定的扩展 Kafka 消费者属性的访问。

目的

在定制监听器容器时,当你需要访问绑定特定的扩展 Kafka 消费者属性时,使用此定制器。

用法

要使用 KafkaListenerContainerCustomizer,请在你的配置中创建一个实现此接口的 bean

@Bean
public KafkaListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> kafkaCustomizer() {
    return (container, destinationName, group, properties) -> {
        // Customize the Kafka container here
    };
}

KafkaListenerContainerCustomizer 接口添加了以下方法

default void configureKafkaListenerContainer(
    C container,
    String destinationName,
    String group,
    ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        configure(container, destinationName, group);
}

此方法扩展了基础的 configure 方法,增加了一个附加参数

  • extendedConsumerProperties: 扩展的消费者属性,包括 Kafka 特定的属性。

ListenerContainerWithDlqAndRetryCustomizer

ListenerContainerWithDlqAndRetryCustomizer 接口为涉及死信队列 (DLQ) 和重试机制的场景提供了额外的定制选项。

目的

当你需要微调 DLQ 行为或为你的 Kafka 消费者实现自定义重试逻辑时,使用此定制器。

用法

要使用 ListenerContainerWithDlqAndRetryCustomizer,请在你的配置中创建一个实现此接口的 bean

@Bean
public ListenerContainerWithDlqAndRetryCustomizer dlqCustomizer() {
    return (container, destinationName, group, dlqDestinationResolver, backOff, properties) -> {
        // Access the container here with access to the extended consumer binding properties.
    };
}

ListenerContainerWithDlqAndRetryCustomizer 接口定义了以下方法

void configure(
    AbstractMessageListenerContainer<?, ?> container,
    String destinationName,
    String group,
    BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
    BackOff backOff,
    ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties
);
  • container: 要定制的 Kafka 监听器容器。

  • destinationName: 目标名称 (主题)。

  • group: 消费者组 ID。

  • dlqDestinationResolver: 一个用于解析失败记录的 DLQ 目标的函数。

  • backOff: 重试的退避策略。

  • extendedConsumerProperties: 扩展的消费者属性,包括 Kafka 特定的属性。

总结

  • 如果启用了 DLQ,则使用 ListenerContainerWithDlqAndRetryCustomizer

  • KafkaListenerContainerCustomizer 用于不涉及 DLQ 的 Kafka 特定定制。

  • 基础的 ListenerContainerCustomizer 用于通用定制。

这种分层方法允许在 Spring Cloud Stream 应用中对 Kafka 监听器容器进行灵活且特定的定制。