应用程序事件

监听器容器及其消费者发布以下 Spring 应用程序事件

  • ConsumerStartingEvent:在消费者线程首次启动时发布,在它开始轮询之前。

  • ConsumerStartedEvent:当消费者即将开始轮询时发布。

  • ConsumerFailedToStartEvent:如果在 consumerStartTimeout 容器属性内未发布 ConsumerStartingEvent,则发布此事件。此事件可能表明配置的任务执行程序线程不足以支持其使用的容器及其并发性。当发生此情况时,还会记录错误消息。

  • ListenerContainerIdleEvent:当在 idleEventInterval(如果已配置)内未收到任何消息时发布。

  • ListenerContainerNoLongerIdleEvent:在先前发布 ListenerContainerIdleEvent 后使用记录时发布。

  • ListenerContainerPartitionIdleEvent:当在 idlePartitionEventInterval(如果已配置)内未从该分区接收任何消息时发布。

  • ListenerContainerPartitionNoLongerIdleEvent:当从先前发布 ListenerContainerPartitionIdleEvent 的分区使用记录时发布。

  • NonResponsiveConsumerEvent:当消费者似乎在 poll 方法中被阻塞时发布。

  • ConsumerPartitionPausedEvent:每个消费者在分区暂停时发布。

  • ConsumerPartitionResumedEvent:每个消费者在分区恢复时发布。

  • ConsumerPausedEvent:每个消费者在容器暂停时发布。

  • ConsumerResumedEvent:每个消费者在容器恢复时发布。

  • ConsumerStoppingEvent:每个消费者在停止之前发布。

  • ConsumerStoppedEvent:消费者关闭后发布。请参阅线程安全

  • ConsumerRetryAuthEvent:当消费者的身份验证或授权失败并正在重试时发布。

  • ConsumerRetryAuthSuccessfulEvent:当身份验证或授权已成功重试时发布。仅当之前存在 ConsumerRetryAuthEvent 时才会发生。

  • ContainerStoppedEvent:所有消费者停止时发布。

默认情况下,应用程序上下文的事件多播器在调用线程上调用事件监听器。如果您将多播器更改为使用异步执行程序,则在事件包含对消费者的引用时,不得调用任何 Consumer 方法。

ListenerContainerIdleEvent 具有以下属性

  • source:发布事件的监听器容器实例。

  • container:监听器容器或父监听器容器(如果源容器是子容器)。

  • id:监听器 ID(或容器 bean 名称)。

  • idleTime:发布事件时容器空闲的时间。

  • topicPartitions:生成事件时容器分配的主题和分区。

  • consumer:对 Kafka Consumer 对象的引用。例如,如果之前调用了消费者的 pause() 方法,则在接收到事件时可以 resume()

  • paused:容器当前是否已暂停。有关更多信息,请参阅暂停和恢复监听器容器

ListenerContainerNoLongerIdleEvent 具有相同的属性,除了 idleTimepaused

ListenerContainerPartitionIdleEvent 具有以下属性

  • source:发布事件的监听器容器实例。

  • container:监听器容器或父监听器容器(如果源容器是子容器)。

  • id:监听器 ID(或容器 bean 名称)。

  • idleTime:发布事件时分区使用空闲的时间。

  • topicPartition:触发事件的主题和分区。

  • consumer:对 Kafka Consumer 对象的引用。例如,如果之前调用了消费者的 pause() 方法,则在接收到事件时可以 resume()

  • paused:该消费者的分区使用是否当前已暂停。有关更多信息,请参阅暂停和恢复监听器容器

ListenerContainerPartitionNoLongerIdleEvent 具有相同的属性,除了 idleTimepaused

NonResponsiveConsumerEvent 具有以下属性

  • source:发布事件的监听器容器实例。

  • container:监听器容器或父监听器容器(如果源容器是子容器)。

  • id:监听器 ID(或容器 bean 名称)。

  • timeSinceLastPoll:容器上次调用poll()之前的时长。

  • topicPartitions:生成事件时容器分配的主题和分区。

  • consumer:对 Kafka Consumer 对象的引用。例如,如果之前调用了消费者的 pause() 方法,则在接收到事件时可以 resume()

  • paused:容器当前是否已暂停。有关更多信息,请参阅暂停和恢复监听器容器

ConsumerPausedEventConsumerResumedEventConsumerStopping事件具有以下属性

  • source:发布事件的监听器容器实例。

  • container:监听器容器或父监听器容器(如果源容器是子容器)。

  • partitions:涉及的TopicPartition实例。

ConsumerPartitionPausedEventConsumerPartitionResumedEvent事件具有以下属性

  • source:发布事件的监听器容器实例。

  • container:监听器容器或父监听器容器(如果源容器是子容器)。

  • partition:涉及的TopicPartition实例。

ConsumerRetryAuthEvent事件具有以下属性

  • source:发布事件的监听器容器实例。

  • container:监听器容器或父监听器容器(如果源容器是子容器)。

  • reason:

    • AUTHENTICATION - 由于身份验证异常而发布此事件。

    • AUTHORIZATION - 由于授权异常而发布此事件。

ConsumerStartingEventConsumerStartedEventConsumerFailedToStartEventConsumerStoppedEventConsumerRetryAuthSuccessfulEventContainerStoppedEvent事件具有以下属性

  • source:发布事件的监听器容器实例。

  • container:监听器容器或父监听器容器(如果源容器是子容器)。

所有容器(无论是子容器还是父容器)都会发布ContainerStoppedEvent。对于父容器,源和容器属性相同。

此外,ConsumerStoppedEvent具有以下附加属性

  • reason:

    • NORMAL - 消费者正常停止(容器已停止)。

    • ERROR - 抛出了java.lang.Error

    • FENCED - 事务性生产者被隔离,并且stopContainerWhenFenced容器属性为true

    • AUTH - 抛出了AuthenticationExceptionAuthorizationException,并且未配置authExceptionRetryInterval

    • NO_OFFSET - 分区没有偏移量,并且auto.offset.reset策略为none

您可以使用此事件在发生此类情况后重新启动容器

if (event.getReason.equals(Reason.FENCED)) {
    event.getSource(MessageListenerContainer.class).start();
}

检测空闲和无响应的消费者

异步消费者虽然高效,但一个问题是检测它们何时处于空闲状态。如果在一段时间内没有消息到达,您可能希望采取一些措施。

您可以配置侦听器容器,以便在一段时间内没有消息传递时发布ListenerContainerIdleEvent。当容器处于空闲状态时,每隔idleEventInterval毫秒发布一次事件。

要配置此功能,请在容器上设置idleEventInterval。以下示例演示了如何操作

@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    ...
    containerProps.setIdleEventInterval(60000L);
    ...
    KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
    return container;
}

以下示例演示了如何为@KafkaListener设置idleEventInterval

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.getContainerProperties().setIdleEventInterval(60000L);
    ...
    return factory;
}

在每种情况下,当容器处于空闲状态时,每分钟发布一次事件。

如果由于某种原因,消费者的poll()方法没有退出,没有收到消息并且无法生成空闲事件(这是kafka-clients早期版本的一个问题,当时代理不可访问)。在这种情况下,如果pollpollTimeout属性的3x倍时间内未返回,则容器会发布NonResponsiveConsumerEvent。默认情况下,此检查在每个容器中每30秒执行一次。您可以在配置侦听器容器时,通过在ContainerProperties中设置monitorInterval(默认30秒)和noPollThreshold(默认3.0)属性来修改此行为。noPollThreshold应大于1.0,以避免由于竞争条件而导致出现虚假事件。收到此类事件后,您可以停止容器,从而唤醒消费者以便它可以停止。

从版本2.6.2开始,如果容器已发布ListenerContainerIdleEvent,则在随后接收到记录时,它将发布ListenerContainerNoLongerIdleEvent

事件消费

您可以通过实现ApplicationListener来捕获这些事件——可以是通用侦听器,也可以是仅接收此特定事件的侦听器。您还可以使用Spring Framework 4.2中引入的@EventListener

下一个示例将@KafkaListener@EventListener组合到一个类中。您应该了解,应用程序侦听器会获取所有容器的事件,因此如果要根据哪个容器处于空闲状态采取特定操作,则可能需要检查侦听器ID。您也可以为此目的使用@EventListenercondition

有关事件属性的信息,请参阅应用程序事件

事件通常在消费者线程上发布,因此可以安全地与Consumer对象交互。

以下示例同时使用@KafkaListener@EventListener

public class Listener {

    @KafkaListener(id = "qux", topics = "annotated")
    public void listen4(@Payload String foo, Acknowledgment ack) {
        ...
    }

    @EventListener(condition = "event.listenerId.startsWith('qux-')")
    public void eventHandler(ListenerContainerIdleEvent event) {
        ...
    }

}
事件侦听器会查看所有容器的事件。因此,在前面的示例中,我们根据侦听器ID缩小接收的事件范围。由于为@KafkaListener创建的容器支持并发,因此实际的容器被命名为id-n,其中n是每个实例的唯一值,以支持并发。这就是我们在条件中使用startsWith的原因。
如果您希望使用空闲事件来停止侦听器容器,则不应在调用侦听器的线程上调用container.stop()。这样做会导致延迟和不必要的日志消息。相反,您应该将事件传递给另一个线程,然后该线程可以停止容器。此外,如果容器是子容器,则不应stop()容器实例。您应该停止并发容器。

空闲时的当前位置

请注意,您可以通过在侦听器中实现ConsumerSeekAware来获取检测到空闲时的当前位置。请参阅查找中的onIdleContainer()