应用事件

监听器容器及其 consumer 会发布以下 Spring 应用事件

  • ConsumerStartingEvent: 当 consumer 线程首次启动时发布,在开始 polling 之前。

  • ConsumerStartedEvent: 当 consumer 即将开始 polling 时发布。

  • ConsumerFailedToStartEvent: 如果在 consumerStartTimeout 容器属性指定的时间内没有发布 ConsumerStartingEvent,则发布此事件。此事件可能表明配置的任务执行器没有足够的线程来支持其使用的容器及其并发性。出现此情况时也会记录错误消息。

  • ListenerContainerIdleEvent: 如果在 idleEventInterval(如果配置)时间内没有收到消息时发布。

  • ListenerContainerNoLongerIdleEvent: 在之前发布 ListenerContainerIdleEvent 后,收到一条记录时发布。

  • ListenerContainerPartitionIdleEvent: 如果在 idlePartitionEventInterval(如果配置)时间内未从该分区收到消息时发布。

  • ListenerContainerPartitionNoLongerIdleEvent: 从之前发布过 ListenerContainerPartitionIdleEvent 的分区收到一条记录时发布。

  • NonResponsiveConsumerEvent: 当 consumer 似乎阻塞在 poll 方法中时发布。

  • ConsumerPartitionPausedEvent: 当分区暂停时,由每个 consumer 发布。

  • ConsumerPartitionResumedEvent: 当分区恢复时,由每个 consumer 发布。

  • ConsumerPausedEvent: 当容器暂停时,由每个 consumer 发布。

  • ConsumerResumedEvent: 当容器恢复时,由每个 consumer 发布。

  • ConsumerStoppingEvent: 在停止之前,由每个 consumer 发布。

  • ConsumerStoppedEvent: consumer 关闭后发布。详见 线程安全

  • ConsumerRetryAuthEvent: 当 consumer 认证或授权失败并正在重试时发布。

  • ConsumerRetryAuthSuccessfulEvent: 当认证或授权重试成功时发布。只有在之前发生过 ConsumerRetryAuthEvent 时才会发生。

  • ContainerStoppedEvent: 当所有 consumer 都已停止时发布。

  • ConcurrentContainerStoppedEvent: 当 ConcurrentMessageListenerContainer 已停止时发布。

默认情况下,应用上下文的事件多播器在调用线程上调用事件监听器。如果将多播器更改为使用异步执行器,则当事件包含对 consumer 的引用时,不得调用任何 Consumer 方法。

ListenerContainerIdleEvent 具有以下属性

  • source: 发布事件的监听器容器实例。

  • container: 监听器容器,如果源容器是子容器,则为父监听器容器。

  • id: 监听器 ID(或容器 Bean 名称)。

  • idleTime: 发布事件时容器已空闲的时间。

  • topicPartitions: 生成事件时分配给容器的 topic 和分区。

  • consumer: Kafka Consumer 对象的引用。例如,如果之前调用了 consumer 的 pause() 方法,则在收到事件时可以 resume()

  • paused: 容器当前是否已暂停。详见 暂停和恢复监听器容器

ListenerContainerNoLongerIdleEvent 具有相同的属性,除了 idleTimepaused

ListenerContainerPartitionIdleEvent 具有以下属性

  • source: 发布事件的监听器容器实例。

  • container: 监听器容器,如果源容器是子容器,则为父监听器容器。

  • id: 监听器 ID(或容器 Bean 名称)。

  • idleTime: 发布事件时分区消费已空闲的时间。

  • topicPartition: 触发事件的 topic 和分区。

  • consumer: Kafka Consumer 对象的引用。例如,如果之前调用了 consumer 的 pause() 方法,则在收到事件时可以 resume()

  • paused: 该 consumer 的该分区消费当前是否已暂停。详见 暂停和恢复监听器容器

ListenerContainerPartitionNoLongerIdleEvent 具有相同的属性,除了 idleTimepaused

NonResponsiveConsumerEvent 具有以下属性

  • source: 发布事件的监听器容器实例。

  • container: 监听器容器,如果源容器是子容器,则为父监听器容器。

  • id: 监听器 ID(或容器 Bean 名称)。

  • timeSinceLastPoll: 容器上次调用 poll() 之前的时间。

  • topicPartitions: 生成事件时分配给容器的 topic 和分区。

  • consumer: Kafka Consumer 对象的引用。例如,如果之前调用了 consumer 的 pause() 方法,则在收到事件时可以 resume()

  • paused: 容器当前是否已暂停。详见 暂停和恢复监听器容器

ConsumerPausedEventConsumerResumedEventConsumerStopping 事件具有以下属性

  • source: 发布事件的监听器容器实例。

  • container: 监听器容器,如果源容器是子容器,则为父监听器容器。

  • partitions: 涉及的 TopicPartition 实例。

ConsumerPartitionPausedEventConsumerPartitionResumedEvent 事件具有以下属性

  • source: 发布事件的监听器容器实例。

  • container: 监听器容器,如果源容器是子容器,则为父监听器容器。

  • partition: 涉及的 TopicPartition 实例。

ConsumerRetryAuthEvent 事件具有以下属性

  • source: 发布事件的监听器容器实例。

  • container: 监听器容器,如果源容器是子容器,则为父监听器容器。

  • 原因:

    • AUTHENTICATION - 事件因认证异常而发布。

    • AUTHORIZATION - 事件因授权异常而发布。

ConsumerStartingEventConsumerStartedEventConsumerFailedToStartEventConsumerStoppedEventConsumerRetryAuthSuccessfulEventContainerStoppedEvent 事件具有以下属性

  • source: 发布事件的监听器容器实例。

  • container: 监听器容器,如果源容器是子容器,则为父监听器容器。

所有容器(无论是子容器还是父容器)都发布 ContainerStoppedEvent。对于父容器,sourcecontainer 属性是相同的。

此外,ConsumerStoppedEvent 具有以下附加属性

  • 原因:

    • NORMAL - consumer 正常停止(容器已停止)。

    • ERROR - 抛出了 java.lang.Error

    • FENCED - 事务性 producer 被隔离 (fenced),并且 stopContainerWhenFenced 容器属性为 true

    • AUTH - 抛出了 AuthenticationExceptionAuthorizationException,并且未配置 authExceptionRetryInterval

    • NO_OFFSET - 分区没有 offset,并且 auto.offset.reset 策略是 none

您可以在发生此类情况后使用此事件重新启动容器

if (event.getReason().equals(Reason.FENCED)) {
    event.getSource(MessageListenerContainer.class).start();
}

检测空闲和无响应的 Consumer

异步 consumer 虽然高效,但一个问题是检测它们何时空闲。如果一段时间没有消息到达,您可能需要采取一些措施。

您可以配置监听器容器,使其在一段时间没有消息投递时发布 ListenerContainerIdleEvent。容器空闲时,每隔 idleEventInterval 毫秒就会发布一个事件。

要配置此功能,请在容器上设置 idleEventInterval。以下示例展示了如何操作

@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    ...
    containerProps.setIdleEventInterval(60000L);
    ...
    KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
    return container;
}

以下示例展示了如何为 @KafkaListener 设置 idleEventInterval

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.getContainerProperties().setIdleEventInterval(60000L);
    ...
    return factory;
}

在这些情况下,容器空闲时每分钟发布一次事件。

如果由于某种原因 consumer 的 poll() 方法没有退出,则不会接收到消息,也无法生成空闲事件(在早期版本的 kafka-clients 中,当 broker 不可达时存在此问题)。在这种情况下,如果 poll 在 pollTimeout 属性的 3 倍时间内没有返回,则容器会发布 NonResponsiveConsumerEvent。默认情况下,此检查在每个容器中每 30 秒执行一次。您可以通过在配置监听器容器时,在 ContainerProperties 中设置 monitorInterval(默认 30 秒)和 noPollThreshold(默认 3.0)属性来修改此行为。noPollThreshold 应大于 1.0,以避免由于竞态条件导致出现虚假事件。接收到此类事件后,您可以停止容器,从而唤醒 consumer,使其能够停止。

从版本 2.6.2 开始,如果容器发布了 ListenerContainerIdleEvent,则在随后收到记录时,它将发布 ListenerContainerNoLongerIdleEvent

事件消费

您可以通过实现 ApplicationListener 来捕获这些事件——可以是通用的监听器,也可以是仅接收此特定事件的监听器。您还可以使用 Spring Framework 4.2 中引入的 @EventListener

下一个示例将 @KafkaListener@EventListener 组合到一个类中。您应该了解,应用监听器会接收所有容器的事件,因此如果您想根据哪个容器空闲而采取特定行动,您可能需要检查监听器 ID。您也可以为此目的使用 @EventListenercondition

详见 应用事件 以获取有关事件属性的信息。

事件通常发布在 consumer 线程上,因此与 Consumer 对象交互是安全的。

以下示例同时使用了 @KafkaListener@EventListener

public class Listener {

    @KafkaListener(id = "qux", topics = "annotated")
    public void listen4(@Payload String foo, Acknowledgment ack) {
        ...
    }

    @EventListener(condition = "event.listenerId.startsWith('qux-')")
    public void eventHandler(ListenerContainerIdleEvent event) {
        ...
    }

}
事件监听器会看到所有容器的事件。因此,在前面的示例中,我们根据监听器 ID 来缩小接收到的事件范围。由于为 @KafkaListener 创建的容器支持并发,实际的容器被命名为 id-n,其中 n 是支持并发的每个实例的唯一值。这就是我们在条件中使用 startsWith 的原因。
如果您希望使用空闲事件来停止监听器容器,则不应在调用监听器的线程上调用 container.stop()。这样做会导致延迟和不必要的日志消息。相反,您应该将事件交给另一个线程,然后由该线程停止容器。此外,如果容器实例是子容器,则不应 stop() 它。您应该停止并发容器。

空闲时的当前位置

请注意,您可以通过在监听器中实现 ConsumerSeekAware 来获取检测到空闲时的当前位置。详见 seek 中的 onIdleContainer()