Kafka 绑定器监听器容器定制器
Spring Cloud Stream 通过使用定制器为消息监听器容器提供了强大的定制选项。本节介绍适用于 Kafka 的定制器接口:ListenerContainerCustomizer、其 Kafka 专用扩展 KafkaListenerContainerCustomizer,以及专门的 ListenerContainerWithDlqAndRetryCustomizer。
ListenerContainerCustomizer
ListenerContainerCustomizer 是 Spring Cloud Stream 中的一个通用接口,允许对消息监听器容器进行定制。
用法
要使用 ListenerContainerCustomizer,请在您的配置中创建一个实现此接口的 bean
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> genericCustomizer() {
return (container, destinationName, group) -> {
// Customize the container here
};
}
ListenerContainerCustomizer 接口定义了以下方法
void configure(C container, String destinationName, String group);
-
container:要定制的消息监听器容器。 -
destinationName:目标(主题)的名称。 -
group:消费者组 ID。
KafkaListenerContainerCustomizer
KafkaListenerContainerCustomizer 接口扩展了 ListenerContainerCustomizer,用于修改监听器容器的行为,并提供对绑定特定扩展 Kafka 消费者属性的访问。
用法
要使用 KafkaListenerContainerCustomizer,请在您的配置中创建一个实现此接口的 bean
@Bean
public KafkaListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> kafkaCustomizer() {
return (container, destinationName, group, properties) -> {
// Customize the Kafka container here
};
}
KafkaListenerContainerCustomizer 接口添加了以下方法
default void configureKafkaListenerContainer(
C container,
String destinationName,
String group,
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
configure(container, destinationName, group);
}
此方法使用一个额外的参数扩展了基类 configure 方法
-
extendedConsumerProperties:扩展的消费者属性,包括 Kafka 特定的属性。
ListenerContainerWithDlqAndRetryCustomizer
ListenerContainerWithDlqAndRetryCustomizer 接口为涉及死信队列 (DLQ) 和重试机制的场景提供了额外的定制选项。
用法
要使用 ListenerContainerWithDlqAndRetryCustomizer,请在您的配置中创建一个实现此接口的 bean
@Bean
public ListenerContainerWithDlqAndRetryCustomizer dlqCustomizer() {
return (container, destinationName, group, dlqDestinationResolver, backOff, properties) -> {
// Access the container here with access to the extended consumer binding properties.
};
}
ListenerContainerWithDlqAndRetryCustomizer 接口定义了以下方法
void configure(
AbstractMessageListenerContainer<?, ?> container,
String destinationName,
String group,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
BackOff backOff,
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties
);
-
container:要定制的 Kafka 监听器容器。 -
destinationName:目标(主题)的名称。 -
group:消费者组 ID。 -
dlqDestinationResolver:用于解析失败记录的 DLQ 目标的函数。 -
backOff:重试的退避策略。 -
extendedConsumerProperties:扩展的消费者属性,包括 Kafka 特定的属性。