监控
监控监听器性能
从 2.3 版本开始,如果在 classpath 中检测到 Micrometer 且应用程序上下文中存在单个 MeterRegistry,则监听器容器将自动为监听器创建和更新 Micrometer Timer
。可以通过将 ContainerProperty
的 micrometerEnabled
设置为 false
来禁用计时器。
维护两个计时器 - 一个用于监听器成功调用,一个用于失败调用。
计时器命名为 spring.kafka.listener
并具有以下标签
-
name
: (容器 bean 名称) -
result
:success
或failure
-
exception
:none
或ListenerExecutionFailedException
可以使用 ContainerProperties
的 micrometerTags
属性添加额外的标签。
从 2.9.8、3.0.6 版本开始,您可以在 ContainerProperties
的 micrometerTagsProvider
中提供一个函数;该函数接收 ConsumerRecord<?, ?>
并返回可以基于该记录生成的标签,这些标签将与 micrometerTags
中的任何静态标签合并。
对于并发容器,会为每个线程创建计时器,并且 name 标签会附加 -n 后缀,其中 n 是 0 到 concurrency-1 。 |
监控 KafkaTemplate 性能
从 2.5 版本开始,如果在 classpath 中检测到 Micrometer 且应用程序上下文中存在单个 MeterRegistry,则模板将自动为发送操作创建和更新 Micrometer Timer
。可以通过将模板的 micrometerEnabled
属性设置为 false
来禁用计时器。
维护两个计时器 - 一个用于监听器成功调用,一个用于失败调用。
计时器命名为 spring.kafka.template
并具有以下标签
-
name
: (模板 bean 名称) -
result
:success
或failure
-
exception
:none
或失败时的异常类名
可以使用模板的 micrometerTags
属性添加额外的标签。
从 2.9.8、3.0.6 版本开始,您可以提供一个 KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>)
属性;该函数接收 ProducerRecord<?, ?>
并返回可以基于该记录生成的标签,这些标签将与 micrometerTags
中的任何静态标签合并。
Micrometer 原生指标
从 2.5 版本开始,该框架提供 Factory Listeners,用于在生产者和消费者创建和关闭时管理 Micrometer KafkaClientMetrics
实例。
要启用此功能,只需将监听器添加到您的生产者和消费者工厂中
@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
Map<String, Object> configs = consumerConfigs();
...
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
...
cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return cf;
}
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return pf;
}
传递给监听器的 consumer/producer id
会以标签名 spring.id
添加到指标的标签中。
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
.tag("customTag", "customTagValue")
.tag("spring.id", "myProducerFactory.myClientId-1")
.functionCounter()
.count();
为 StreamsBuilderFactoryBean
提供了类似的监听器 - 请参阅 KafkaStreams Micrometer 支持。
从 3.3 版本开始,引入了一个 KafkaMetricsSupport
抽象类,用于管理 io.micrometer.core.instrument.binder.kafka.KafkaMetrics
与提供的 Kafka 客户端的 MeterRegistry
的绑定。这个类是上面提到的 MicrometerConsumerListener
、MicrometerProducerListener
和 KafkaStreamsMicrometerListener
的超类。然而,它可用于任何 Kafka 客户端用例。该类需要扩展,并且必须调用其 bindClient()
和 unbindClient()
API 来将 Kafka 客户端指标连接到 Micrometer 收集器。
Micrometer Observation
从 3.0 版本开始,现在支持将 Micrometer 用于 observation,适用于 KafkaTemplate
和监听器容器。
在 KafkaTemplate
和 ContainerProperties
上将 observationEnabled
设置为 true
以启用 observation;这将禁用 Micrometer Timers,因为现在计时器将通过每次 observation 进行管理。
Micrometer Observation 不支持批量监听器;这将启用 Micrometer Timers |
请参阅 Micrometer Tracing 以获取更多信息。
要向计时器/跟踪添加标签,请分别为模板或监听器容器配置一个自定义的 KafkaTemplateObservationConvention
或 KafkaListenerObservationConvention
。
默认实现为模板 observations 添加 bean.name
标签,为容器添加 listener.id
标签。
您可以继承 DefaultKafkaTemplateObservationConvention
或 DefaultKafkaListenerObservationConvention
,也可以提供全新的实现。
请参阅 Micrometer Observation 文档,了解记录的默认 observations 详情。
从 3.0.6 版本开始,您可以根据 consumer 或 producer 记录中的信息向计时器和跟踪添加动态标签。为此,请分别为监听器容器属性或 KafkaTemplate
添加一个自定义的 KafkaListenerObservationConvention
和/或 KafkaTemplateObservationConvention
。两个 observation 上下文中的 record
属性分别包含 ConsumerRecord
或 ProducerRecord
。
发送方和接收方上下文的 remoteServiceName
属性设置为 Kafka clusterId
属性;这由 KafkaAdmin
获取。如果由于某种原因(可能缺乏管理权限)无法获取 cluster id,从 3.1 版本开始,您可以在 KafkaAdmin
上设置一个手动 clusterId
,并将其注入到 KafkaTemplate
和监听器容器中。当其为 null
(默认值)时,admin 将调用 describeCluster
管理操作从 broker 获取它。