空载荷和“墓碑”记录的日志压缩

使用日志压缩时,您可以发送和接收带有 null 载荷的消息来标识键的删除。您也可能由于其他原因接收到 null 值,例如反序列化器在无法反序列化值时可能返回 null

生产空载荷

您可以通过将 null 消息参数值传递给 ReactivePulsarTemplate 的其中一个 send 方法来发送 null 值,例如

reactiveTemplate
        .send(null, Schema.STRING)
        .subscribe();
发送 null 值时,必须指定 Schema 类型,因为系统无法从 null 载荷确定消息的类型。

消费空载荷

对于 `@ReactivePulsarListener`,`null` 载荷根据其消息参数的类型传递到监听器方法中,具体如下

参数类型 传入的值

基本类型

null

用户定义类型

null

org.apache.pulsar.client.api.Message<T>

一个非空的 Pulsar 消息,其 getValue() 返回 null

org.springframework.messaging.Message<T>

一个非空的 Spring 消息,其 getPayload() 返回 PulsarNull

Flux<org.apache.pulsar.client.api.Message<T>>

一个非空的 Flux,其条目是非空的 Pulsar 消息,这些消息的 getValue() 返回 null

Flux<org.springframework.messaging.Message<T>>

一个非空的 Flux,其条目是非空的 Spring 消息,这些消息的 getPayload() 返回 PulsarNull

当传入的值为 null 时(即使用基本类型或用户定义类型的单记录监听器),您必须使用 `@Payload` 参数注解并设置 required = false
当使用 Spring 的 org.springframework.messaging.Message 作为监听器的载荷类型时,其泛型类型信息必须足够宽泛,以接受 Message<PulsarNull>(例如 MessageMessage<?>Message<Object>)。这是因为 Spring Message 不允许其载荷为 null 值,而是使用 PulsarNull 占位符。

如果它是用于压缩日志的墓碑消息,您通常还需要获取 key,以便您的应用程序可以确定哪个 key 被“`删除`”了。以下示例展示了这种配置

@ReactivePulsarListener(
        topics = "my-topic",
        subscriptionName = "my-topic-sub",
        schemaType = SchemaType.STRING)
Mono<Void> myListener(
        @Payload(required = false) String msg,
        @Header(PulsarHeaders.KEY) String key) {
    ...
}
使用流式消息监听器(Flux)时,头部支持有限,因此在日志压缩场景中不太有用。