可观察性

Spring 通过 Micrometer 提供可观察性支持,该库定义了一个 Observation (观测) 概念,它同时支持 Metrics (指标) 和 Traces (追踪)

Spring Cloud Stream 在 Spring Cloud Function 层面集成了此支持,它通过提供几个抽象中的一个 ObservationFunctionAroundWrapper,该包装器封装了函数,从而开箱即用地处理观测。

所需依赖

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
	<groupId>io.projectreactor</groupId>
	<artifactId>reactor-core-micrometer</artifactId>
</dependency>

以及一个可用的追踪器桥接器。例如 Zipkin Brave

<dependency>
	<groupId>io.micrometer</groupId>
	<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>

命令式函数

命令式函数被观测包装器 ObservationFunctionAroundWrapper 封装,该包装器提供了处理与 Observation 注册表交互所需的基础设施。此类交互在函数的每次调用时发生,这意味着观测被附加到函数的每次调用 (即每条消息一次观测)。换句话说,对于命令式函数,如果前面提到的所需依赖存在,可观察性将直接生效。

响应式函数

响应式函数与命令式函数本质上不同,因此不会被 ObservationFunctionAroundWrapper 封装。

命令式函数 是一个消息处理函数,框架在每条消息到来时都会调用它,类似于典型的事件处理器,N 条消息会触发 N 次此类函数的调用。这允许我们封装此类函数,为其添加额外功能,例如 错误处理重试,当然还有 可观察性

响应式函数 是初始化函数。它的作用是将用户提供的流处理代码 (Flux) 与 Binder 提供的源和目标流连接起来。它只在应用启动期间调用一次。一旦流代码与源/目标流连接,我们就无法看到或控制实际的流处理过程。这完全取决于响应式 API。响应式函数还带来了一个额外的变量。考虑到函数提供的是对整个流链 (而非单个事件) 的可见性,默认的观测单位应该是什么?是流链中的单个项吗?是一个项范围吗?如果在一段时间后没有消息怎么办?等等... 我们想要强调的是,对于响应式函数,我们不能假定任何事情。(有关响应式函数和命令式函数之间差异的更多信息,请参阅 响应式函数)。

因此,就像 重试错误处理 一样,你需要手动处理观测。

幸运的是,你可以通过使用响应式 API 的 tap 操作来轻松做到这一点,同时提供一个 ObservationRegistry 实例。这样的片段定义了一个观测单位,它可以是 Flux 中的单个项、一个范围,或者你希望在流中观测的任何其他内容。

@SpringBootApplication
public class DemoStreamApplication {

	Logger logger = LoggerFactory.getLogger(DemoStreamApplication.class);

	public static void main(String[] args) {
		Hooks.enableAutomaticContextPropagation();
		SpringApplication.run(DemoStreamApplication.class, args);
	}

	@Bean
	public Function<Flux<String>, Flux<String>> uppercase(ObservationRegistry registry) {
		return flux -> flux.flatMap(item -> {
			return Mono.just(item)
                             .map(value -> value.toUpperCase())
                             .doOnNext(v -> logger.info(v))
                             .tap(Micrometer.observation(registry));
		});
	}
}

上面的例子模拟了将一个 Observation (观测) 附加到单个消息处理 (即命令式函数) 的过程,因为在这种情况下,观测单位始于 Mono.just(..),并且最后一个操作将 ObservationRegistry 附加到订阅者。

如果订阅者已经附加了一个观测,它将用于为 tap 上游的链/片段创建一个子 Observation,但是,正如我们之前所述,默认情况下,框架不会为您返回的流链附加任何 Observation。