时间戳提取器

Kafka Streams 允许你基于不同的时间戳概念来控制消费者记录的处理。默认情况下,Kafka Streams 会提取嵌入在消费者记录中的时间戳元数据。你可以通过为每个输入绑定提供不同的 TimestampExtractor 实现来改变此默认行为。以下是一些关于如何实现这一点的详细信息。

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
    return orderStream ->
            customers ->
                products -> orderStream;
}

@Bean
public TimestampExtractor timestampExtractor() {
    return new WallclockTimestampExtractor();
}

然后,你可以为每个消费者绑定设置上述 TimestampExtractor 的 bean 名称。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"

如果你跳过为输入消费者绑定设置自定义时间戳提取器,那么该消费者将使用默认设置。