连接到 Kafka
从版本 2.5 开始,这些都扩展了 KafkaResourceFactory
。这允许通过在配置中添加 Supplier<String>
来在运行时更改 bootstrap servers:setBootstrapServersSupplier(() -> …)
。对于所有新连接,都将调用此方法以获取服务器列表。消费者和生产者通常是长期存活的。要关闭现有生产者,请在 DefaultKafkaProducerFactory
上调用 reset()
。要关闭现有消费者,请在 KafkaListenerEndpointRegistry
上调用 stop()
(然后 start()
),或者在任何其他监听器容器 bean 上调用 stop()
和 start()
。
为方便起见,框架还提供了 ABSwitchCluster
,它支持两组 bootstrap servers,其中一组在任何时候都是活跃的。通过调用 setBootstrapServersSupplier()
配置 ABSwitchCluster
并将其添加到生产者和消费者工厂以及 KafkaAdmin
中。当需要切换时,调用 primary()
或 secondary()
并在生产者工厂上调用 reset()
以建立新连接;对于消费者,停止 (stop()
) 并启动 (start()
) 所有监听器容器。使用 @KafkaListener
时,停止 (stop()
) 并启动 (start()
) KafkaListenerEndpointRegistry
bean。
更多信息请参见 Javadocs。
工厂监听器
从版本 2.5 开始,DefaultKafkaProducerFactory
和 DefaultKafkaConsumerFactory
可以配置一个 Listener
,以便在创建或关闭生产者或消费者时接收通知。
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
在每种情况下,id
都是通过将 client-id
属性(在创建后通过 metrics()
获取)附加到工厂 beanName
属性后面创建的,中间用 .
分隔。
这些监听器可以用于,例如,在创建新客户端时创建并绑定一个 Micrometer KafkaClientMetrics
实例(并在客户端关闭时关闭它)。
框架提供了完全执行此操作的监听器;参见 Micrometer 原生指标。
默认客户端 ID 前缀
从版本 3.2 开始,对于使用 spring.application.name
属性定义了应用名称的 Spring Boot 应用,该名称现在将用作这些客户端类型自动生成客户端 ID 的默认前缀
-
不使用消费者组的消费者客户端
-
生产者客户端
-
管理客户端
这使得在服务器端更容易识别这些客户端,以便进行故障排除或应用配额。
客户端类型 | 无应用名称 | 有应用名称 |
---|---|---|
无消费者组的消费者 |
consumer-null-1 |
myapp-consumer-1 |
使用消费者组 "mygroup" 的消费者 |
consumer-mygroup-1 |
consumer-mygroup-1 |
生产者 |
producer-1 |
myapp-producer-1 |
管理端 |
adminclient-1 |
myapp-admin-1 |