可观测性
Spring 通过 Micrometer 提供可观测性支持,它定义了一个 观测概念,该概念可以在应用程序中同时实现指标和跟踪。
Spring Cloud Stream 在 Spring Cloud Function 层面集成了此类支持,通过提供ObservationFunctionAroundWrapper等抽象,该包装器包装函数以开箱即用地处理观测。
所需依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core-micrometer</artifactId>
</dependency>
以及其中一个可用的跟踪器桥接器。例如 Zipkin Brave
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
命令式函数
命令式函数被观测包装器 ObservationFunctionAroundWrapper 包装,该包装器提供了与观测注册表交互所需的基础设施。此类交互发生在函数的每次调用中,这实际上意味着观测附加到函数的每次调用(即,每条消息一个观测)。换句话说,对于命令式函数,如果存在前面提到的所需依赖,可观测性将直接生效。
响应式函数
响应式函数与命令式函数本质上不同,因此不使用 ObservationFunctionAroundWrapper 包装。
命令式函数是一个消息处理函数,每当有消息时,框架都会调用它,类似于您典型的事件处理程序,对于 N 条消息,将有 N 次此类函数的调用。这允许我们包装此类函数,用附加功能(如错误处理、重试以及当然还有可观测性)来装饰它。
响应式函数是初始化函数。它的作用是将用户提供的流处理代码 (Flux) 与绑定器提供的源流和目标流连接起来。它在应用程序启动期间只被调用一次。一旦流代码与源/目标流连接,我们就无法看到或控制实际的流处理。它由响应式 API 控制。响应式函数还带来了一个附加变量。鉴于该函数为您提供了整个流链的可见性(不仅仅是单个事件),那么观测的默认单位应该是什么?流链中的单个项目?一个范围的项?如果一段时间后没有消息怎么办?等等……我们想要强调的是,对于响应式函数,我们不能假设任何事情。(有关响应式函数和命令式函数之间差异的更多信息,请参阅 响应式函数)。
因此,就像重试和错误处理一样,您需要手动处理观测。
幸运的是,您可以通过使用响应式 API 的 tap 操作并提供 ObservationRegistry 实例来轻松地对流的一部分进行观测。此类部分定义了一个观测单位,它可以是 flux 中的单个项、一个范围或您可能希望在流中观测的任何其他内容。
@SpringBootApplication
public class DemoStreamApplication {
Logger logger = LoggerFactory.getLogger(DemoStreamApplication.class);
public static void main(String[] args) {
Hooks.enableAutomaticContextPropagation();
SpringApplication.run(DemoStreamApplication.class, args);
}
@Bean
public Function<Flux<String>, Flux<String>> uppercase(ObservationRegistry registry) {
return flux -> flux.flatMap(item -> {
return Mono.just(item)
.map(value -> value.toUpperCase())
.doOnNext(v -> logger.info(v))
.tap(Micrometer.observation(registry));
});
}
}
上面的示例模拟了将 观测 附加到单个消息处理(即命令式函数),因为在这种情况下,观测单位以 Mono.just(..) 开始,最后一个操作将 ObservationRegistry 附加到订阅者。
如果订阅者已附加观测,则将使用该观测为 tap 上游的链/段创建一个子观测,但是如我们所述,默认情况下,框架不会将任何观测附加到您返回的流链。