Reactive Kafka Binder 中的可观测性
本节描述了如何在响应式 Kafka Binder 中启用基于 Micrometer 的可观测性。
生产者绑定
生产者绑定内置了可观测性支持。要启用它,请设置以下属性
spring.cloud.stream.kafka.binder.enable-observation
将此属性设置为 true
时,您可以观察记录的发布。使用 StreamBridge
发布记录以及常规的 Supplier<?>
bean 都可以被观察到。
消费者绑定
在消费者端启用可观测性比生产者端更复杂。消费者绑定有两个起点
-
通过生产者绑定发布数据的 Topic
-
在 Spring Cloud Stream 外部生产数据的 Topic
第一种情况下,应用理想情况下希望将可观测性头部信息传递到消费者入站。第二种情况下,如果上游没有开始观察,则会启动一个新的观察。
示例:具有可观测性的 Function
@Bean
Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<Message<String>>> receive(ObservationRegistry observationRegistry) {
return s -> s.flatMap(record -> {
Observation receiverObservation = KafkaReceiverObservation.RECEIVER_OBSERVATION.start(
null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(record, "user.receiver", "localhost:9092"),
observationRegistry
);
return Mono.deferContextual(contextView -> Mono.just(record)
.map(rec -> new String(rec.value()).toLowerCase())
.map(rec -> MessageBuilder.withPayload(rec)
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView)
.build()))
.doOnTerminate(receiverObservation::stop)
.doOnError(receiverObservation::error)
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
});
}
在此示例中
-
接收到记录时,将创建一个观察。
-
如果存在上游观察,它将成为
KafkaRecordReceiverContext
的一部分。 -
创建一个带有延迟上下文的
Mono
。 -
调用
map
操作时,上下文可以访问正确的观察。 -
flatMap
操作的结果作为Flux<Message<?>>
发送回绑定。 -
出站记录将包含与入站绑定相同的可观测性头部信息。
示例:具有可观测性的 Consumer
@Bean
Consumer<Flux<ReceiverRecord<?, String>>> receive(ObservationRegistry observationRegistry, @Value("${spring.kafka.bootstrap-servers}") String bootstrap) {
return f -> f.doOnNext(record -> KafkaReceiverObservation.RECEIVER_OBSERVATION.observation(
null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(record, "user.receiver", bootstrap),
observationRegistry).observe(() -> System.out.println(record)))
.subscribe();
}
在此情况下
-
由于没有出站绑定,所以在
Flux
上使用了doOnNext
而不是flatMap
。 -
直接调用
observe
会启动观察,并在完成后正确关闭它。