定位到特定偏移量

为了进行 seek 操作,您的监听器必须实现 ConsumerSeekAware 接口,它包含以下方法:

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onPartitionsRevoked(Collection<TopicPartition> partitions);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

registerSeekCallback 方法在容器启动时以及每次分配分区时被调用。初始化后,当需要在任意时间点进行 seek 操作时,应使用此回调。您应该保存对此回调的引用。如果您在多个容器(或 ConcurrentMessageListenerContainer)中使用同一个监听器,您应该将回调存储在 ThreadLocal 中,或者按监听器 Thread 为键的其他结构中。

使用组管理时,onPartitionsAssigned 方法在分配分区时被调用。您可以使用此方法,例如通过调用回调来设置分区的初始偏移量。您还可以使用此方法将当前线程的回调与分配的分区关联起来(参见下面的示例)。您必须使用回调参数,而不是传递给 registerSeekCallback 的参数。从版本 2.5.5 开始,即使在使用手动分区分配时,也会调用此方法。

onPartitionsRevoked 方法在容器停止或 Kafka 撤销分配时被调用。您应该丢弃当前线程的回调,并移除与被撤销分区的所有关联。

回调具有以下方法:

void seek(String topic, int partition, long offset);

void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction);

void seekToBeginning(String topic, int partition);

void seekToBeginning(Collection<TopicPartitions> partitions);

void seekToEnd(String topic, int partition);

void seekToEnd(Collection<TopicPartitions> partitions);

void seekRelative(String topic, int partition, long offset, boolean toCurrent);

void seekToTimestamp(String topic, int partition, long timestamp);

void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);

String getGroupId();

seek 方法的两种不同变体提供了定位到任意偏移量的方法。接受 Function 作为参数来计算偏移量的方法是在框架的 3.2 版本中添加的。此函数提供对当前偏移量(消费者返回的当前位置,即下一个要获取的偏移量)的访问。用户可以在函数定义中根据消费者中的当前偏移量来决定要定位到哪个偏移量。

seekRelative 方法在版本 2.3 中添加,用于执行相对定位。

  • offset 为负且 toCurrentfalse - 相对于分区末尾定位。

  • offset 为正且 toCurrentfalse - 相对于分区开头定位。

  • offset 为负且 toCurrenttrue - 相对于当前位置定位(后退)。

  • offset 为正且 toCurrenttrue - 相对于当前位置定位(快进)。

seekToTimestamp 方法也在版本 2.3 中添加。

当在 onIdleContaineronPartitionsAssigned 方法中为多个分区定位到同一时间戳时,首选第二种方法,因为它通过一次调用消费者的 offsetsForTimes 方法来查找时间戳对应的偏移量效率更高。当从其他位置调用时,容器将收集所有时间戳定位请求,并一次调用 offsetsForTimes

检测到空闲容器时,您也可以从 onIdleContainer() 执行 seek 操作。请参阅检测空闲和无响应的 Consumer 以了解如何启用空闲容器检测。

接受集合参数的 seekToBeginning 方法很有用,例如,当处理压缩主题并且希望在应用每次启动时都定位到开头时。
public class MyListener implements ConsumerSeekAware {

    ...

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        callback.seekToBeginning(assignments.keySet());
    }

}

要在运行时任意定位,请使用相应线程从 registerSeekCallback 获取的回调引用。

这里有一个简单的 Spring Boot 应用,演示了如何使用回调;它向主题发送 10 条记录;在控制台按下 <Enter> 会使所有分区定位到开头。

@SpringBootApplication
public class SeekExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SeekExampleApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send(
                new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
            while (true) {
                System.in.read();
                listener.seekToStart();
            }
        };
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("seekExample", 3, (short) 1);
    }

}

@Component
class Listener implements ConsumerSeekAware {

    private static final Logger logger = LoggerFactory.getLogger(Listener.class);

    private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();

    private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        this.callbackForThread.set(callback);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        partitions.forEach(tp -> this.callbacks.remove(tp));
        this.callbackForThread.remove();
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
    }

    @KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
    public void listen(ConsumerRecord<String, String> in) {
        logger.info(in.toString());
    }

    public void seekToStart() {
        this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
    }

}

为了简化操作,版本 2.3 添加了 AbstractConsumerSeekAware 类,该类跟踪主题/分区应使用哪个回调。以下示例展示了如何在容器每次空闲时,定位到每个分区处理的最后一条记录。它还提供了允许任意外部调用将分区回退一条记录的方法。

public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {

    @KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
    public void listen(String in) {
        ...
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments,
            ConsumerSeekCallback callback) {

            assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
    }

    /**
    * Rewind all partitions one record.
    */
    public void rewindAllOneRecord() {
        getTopicsAndCallbacks()
            .forEach((tp, callbacks) ->
                    callbacks.forEach(callback -> callback.seekRelative(tp.topic(), tp.partition(), -1, true))
            );
    }

    /**
    * Rewind one partition one record.
    */
    public void rewindOnePartitionOneRecord(String topic, int partition) {
        getSeekCallbacksFor(new TopicPartition(topic, partition))
            .forEach(callback -> callback.seekRelative(topic, partition, -1, true));
    }

}

版本 2.6 为抽象类添加了便利方法:

  • seekToBeginning() - 将所有已分配分区定位到开头。

  • seekToEnd() - 将所有已分配分区定位到末尾。

  • seekToTimestamp(long timestamp) - 将所有已分配分区定位到该时间戳表示的偏移量。

示例

public class MyListener extends AbstractConsumerSeekAware {

    @KafkaListener(...)
    void listen(...) {
        ...
    }
}

public class SomeOtherBean {

    MyListener listener;

    ...

    void someMethod() {
        this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000);
    }

}

从版本 3.3 开始,在 ConsumerSeekAware.ConsumerSeekCallback 接口中引入了一个新方法 getGroupId()。当您需要识别与特定 seek 回调关联的消费者组时,此方法特别有用。

当使用继承 AbstractConsumerSeekAware 的类时,在一个监听器中执行的 seek 操作可能会影响同一类中的所有监听器。这并非总是期望的行为。为了解决这个问题,您可以使用回调提供的 getGroupId() 方法。这允许您有选择地执行 seek 操作,仅针对感兴趣的消费者组。