Reactive Kafka Binder 中的可观测性

本节描述了如何在响应式 Kafka Binder 中启用基于 Micrometer 的可观测性。

生产者绑定

生产者绑定内置了可观测性支持。要启用它,请设置以下属性

spring.cloud.stream.kafka.binder.enable-observation

将此属性设置为 true 时,您可以观察记录的发布。使用 StreamBridge 发布记录以及常规的 Supplier<?> bean 都可以被观察到。

消费者绑定

在消费者端启用可观测性比生产者端更复杂。消费者绑定有两个起点

  1. 通过生产者绑定发布数据的 Topic

  2. 在 Spring Cloud Stream 外部生产数据的 Topic

第一种情况下,应用理想情况下希望将可观测性头部信息传递到消费者入站。第二种情况下,如果上游没有开始观察,则会启动一个新的观察。

示例:具有可观测性的 Function

@Bean
Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<Message<String>>> receive(ObservationRegistry observationRegistry) {

	return s -> s.flatMap(record -> {
		Observation receiverObservation = KafkaReceiverObservation.RECEIVER_OBSERVATION.start(
		null,
		KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
		() -> new KafkaRecordReceiverContext(record, "user.receiver", "localhost:9092"),
		observationRegistry
		);

		return Mono.deferContextual(contextView -> Mono.just(record)
			.map(rec -> new String(rec.value()).toLowerCase())
			.map(rec -> MessageBuilder.withPayload(rec)
				.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView)
				.build()))
			.doOnTerminate(receiverObservation::stop)
			.doOnError(receiverObservation::error)
			.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
    });
}

在此示例中

  1. 接收到记录时,将创建一个观察。

  2. 如果存在上游观察,它将成为 KafkaRecordReceiverContext 的一部分。

  3. 创建一个带有延迟上下文的 Mono

  4. 调用 map 操作时,上下文可以访问正确的观察。

  5. flatMap 操作的结果作为 Flux<Message<?>> 发送回绑定。

  6. 出站记录将包含与入站绑定相同的可观测性头部信息。

示例:具有可观测性的 Consumer

@Bean
Consumer<Flux<ReceiverRecord<?, String>>> receive(ObservationRegistry observationRegistry, @Value("${spring.kafka.bootstrap-servers}") String bootstrap) {
	return f -> f.doOnNext(record -> KafkaReceiverObservation.RECEIVER_OBSERVATION.observation(
			null,
			KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
			() -> new KafkaRecordReceiverContext(record, "user.receiver", bootstrap),
			observationRegistry).observe(() -> System.out.println(record)))
		.subscribe();
}

在此情况下

  1. 由于没有出站绑定,所以在 Flux 上使用了 doOnNext 而不是 flatMap

  2. 直接调用 observe 会启动观察,并在完成后正确关闭它。