应用程序事件
以下 Spring 应用事件由监听器容器及其消费者发布
-
ConsumerStartingEvent:当消费者线程首次启动,在它开始轮询之前发布。 -
ConsumerStartedEvent:当消费者即将开始轮询时发布。 -
ConsumerFailedToStartEvent:如果在consumerStartTimeout容器属性所定义的时间内未发布ConsumerStartingEvent,则发布此事件。此事件可能表明配置的任务执行器没有足够的线程来支持它所使用的容器及其并发性。当出现此情况时,也会记录一条错误消息。 -
ListenerContainerIdleEvent:在idleEventInterval(如果已配置)内未收到任何消息时发布。 -
ListenerContainerNoLongerIdleEvent:在之前发布ListenerContainerIdleEvent后,当收到一条记录时发布。 -
ListenerContainerPartitionIdleEvent:在idlePartitionEventInterval(如果已配置)内未从该分区收到任何消息时发布。 -
ListenerContainerPartitionNoLongerIdleEvent:在之前发布ListenerContainerPartitionIdleEvent后,当从该分区收到一条记录时发布。 -
NonResponsiveConsumerEvent:当消费者似乎在poll方法中被阻塞时发布。 -
ConsumerPartitionPausedEvent:当分区暂停时,由每个消费者发布。 -
ConsumerPartitionResumedEvent:当分区恢复时,由每个消费者发布。 -
ConsumerPausedEvent:当容器暂停时,由每个消费者发布。 -
ConsumerResumedEvent:当容器恢复时,由每个消费者发布。 -
ConsumerStoppingEvent:在停止之前,由每个消费者发布。 -
ConsumerStoppedEvent:在消费者关闭后发布。参见线程安全。 -
ConsumerRetryAuthEvent:当消费者的身份验证或授权失败并正在重试时发布。 -
ConsumerRetryAuthSuccessfulEvent:当身份验证或授权重试成功时发布。只有在之前发生过ConsumerRetryAuthEvent时才会出现。 -
ContainerStoppedEvent:当所有消费者都停止时发布。 -
ConcurrentContainerStoppedEvent:当ConcurrentMessageListenerContainer停止时发布。
默认情况下,应用程序上下文的事件多播器在调用线程上调用事件监听器。如果将多播器更改为使用异步执行器,则当事件包含对消费者的引用时,不得调用任何 Consumer 方法。 |
ListenerContainerIdleEvent 具有以下属性
-
source:发布事件的监听器容器实例。 -
container:监听器容器或父监听器容器,如果源容器是子容器。 -
id:监听器 ID(或容器 Bean 名称)。 -
idleTime:事件发布时容器已空闲的时间。 -
topicPartitions:事件生成时容器分配到的主题和分区。 -
consumer:对 KafkaConsumer对象的引用。例如,如果之前调用了消费者的pause()方法,则在收到事件时可以resume()。 -
paused:容器当前是否已暂停。有关更多信息,请参阅暂停和恢复监听器容器。
ListenerContainerNoLongerIdleEvent 具有相同的属性,除了 idleTime 和 paused。
ListenerContainerPartitionIdleEvent 具有以下属性
-
source:发布事件的监听器容器实例。 -
container:监听器容器或父监听器容器,如果源容器是子容器。 -
id:监听器 ID(或容器 Bean 名称)。 -
idleTime:事件发布时分区消费已空闲的时间。 -
topicPartition:触发事件的主题和分区。 -
consumer:对 KafkaConsumer对象的引用。例如,如果之前调用了消费者的pause()方法,则在收到事件时可以resume()。 -
paused:该分区消费对于该消费者是否当前已暂停。有关更多信息,请参阅暂停和恢复监听器容器。
ListenerContainerPartitionNoLongerIdleEvent 具有相同的属性,除了 idleTime 和 paused。
NonResponsiveConsumerEvent 具有以下属性
-
source:发布事件的监听器容器实例。 -
container:监听器容器或父监听器容器,如果源容器是子容器。 -
id:监听器 ID(或容器 Bean 名称)。 -
timeSinceLastPoll:容器最后一次调用poll()之前的时间。 -
topicPartitions:事件生成时容器分配到的主题和分区。 -
consumer:对 KafkaConsumer对象的引用。例如,如果之前调用了消费者的pause()方法,则在收到事件时可以resume()。 -
paused:容器当前是否已暂停。有关更多信息,请参阅暂停和恢复监听器容器。
ConsumerPausedEvent、ConsumerResumedEvent 和 ConsumerStopping 事件具有以下属性
-
source:发布事件的监听器容器实例。 -
container:监听器容器或父监听器容器,如果源容器是子容器。 -
partitions:涉及的TopicPartition实例。
ConsumerPartitionPausedEvent、ConsumerPartitionResumedEvent 事件具有以下属性
-
source:发布事件的监听器容器实例。 -
container:监听器容器或父监听器容器,如果源容器是子容器。 -
partition:涉及的TopicPartition实例。
ConsumerRetryAuthEvent 事件具有以下属性
-
source:发布事件的监听器容器实例。 -
container:监听器容器或父监听器容器,如果源容器是子容器。 -
reason:-
AUTHENTICATION- 事件因身份验证异常而发布。 -
AUTHORIZATION- 事件因授权异常而发布。
-
ConsumerStartingEvent、ConsumerStartedEvent、ConsumerFailedToStartEvent、ConsumerStoppedEvent、ConsumerRetryAuthSuccessfulEvent 和 ContainerStoppedEvent 事件具有以下属性
-
source:发布事件的监听器容器实例。 -
container:监听器容器或父监听器容器,如果源容器是子容器。
所有容器(无论是子容器还是父容器)都发布 ContainerStoppedEvent。对于父容器,源和容器属性相同。
此外,ConsumerStoppedEvent 具有以下附加属性
-
reason:-
NORMAL- 消费者正常停止(容器已停止)。 -
ABNORMAL- 消费者异常停止(容器异常停止)。 -
ERROR- 抛出了java.lang.Error。 -
FENCED- 事务性生产者被隔离,并且stopContainerWhenFenced容器属性为true。 -
AUTH- 抛出了AuthenticationException或AuthorizationException,并且未配置authExceptionRetryInterval。 -
NO_OFFSET- 分区没有偏移量,并且auto.offset.reset策略为none。
-
您可以使用此事件在此类情况后重新启动容器
if (event.getReason().equals(Reason.FENCED)) {
event.getSource(MessageListenerContainer.class).start();
}
检测空闲和无响应的消费者
异步消费者虽然高效,但一个问题是检测它们何时空闲。您可能希望在一段时间内没有消息到达时采取一些措施。
您可以配置监听器容器,在一段时间没有消息传递时发布 ListenerContainerIdleEvent。当容器空闲时,每 idleEventInterval 毫秒发布一次事件。
要配置此功能,请在容器上设置 idleEventInterval。以下示例展示了如何实现
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
return container;
}
以下示例展示了如何为 @KafkaListener 设置 idleEventInterval
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
在这些情况下,当容器空闲时,每分钟发布一次事件。
如果由于某种原因,消费者 poll() 方法没有退出,则不会收到任何消息,也无法生成空闲事件(这在早期版本的 kafka-clients 中是一个问题,当 broker 无法访问时)。在这种情况下,如果 poll 在 3x pollTimeout 属性内没有返回,则容器会发布 NonResponsiveConsumerEvent。默认情况下,此检查在每个容器中每 30 秒执行一次。您可以通过在配置监听器容器时设置 ContainerProperties 中的 monitorInterval(默认 30 秒)和 noPollThreshold(默认 3.0)属性来修改此行为。noPollThreshold 应该大于 1.0,以避免由于竞态条件而收到虚假事件。接收此类事件可以让您停止容器,从而唤醒消费者以便它停止。
从 2.6.2 版本开始,如果容器已发布 ListenerContainerIdleEvent,则在随后收到记录时,它将发布 ListenerContainerNoLongerIdleEvent。
事件消费
您可以通过实现 ApplicationListener 来捕获这些事件——可以是通用监听器,也可以是仅接收此特定事件的窄范围监听器。您还可以使用 Spring Framework 4.2 中引入的 @EventListener。
下一个示例将 @KafkaListener 和 @EventListener 组合到一个类中。您应该明白,应用程序监听器会收到所有容器的事件,因此如果您想根据哪个容器空闲来采取特定操作,您可能需要检查监听器 ID。您也可以为此目的使用 @EventListener 的 condition。
有关事件属性的信息,请参阅应用事件。
事件通常在消费者线程上发布,因此可以安全地与 Consumer 对象交互。
以下示例同时使用 @KafkaListener 和 @EventListener
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}
事件监听器会看到所有容器的事件。因此,在前面的示例中,我们根据监听器 ID 缩小了收到的事件范围。由于为 @KafkaListener 创建的容器支持并发性,因此实际容器被命名为 id-n,其中 n 是每个实例的唯一值以支持并发性。这就是我们在条件中使用 startsWith 的原因。 |
如果您希望使用空闲事件来停止监听器容器,则不应在调用监听器的线程上调用 container.stop()。这样做会导致延迟和不必要的日志消息。相反,您应该将事件交给另一个线程,然后该线程可以停止容器。此外,如果它是子容器,则不应 stop() 容器实例。您应该停止并发容器。 |
空闲时的当前位置
请注意,您可以通过在监听器中实现 ConsumerSeekAware 来获取检测到空闲时的当前位置。请参阅查找中的 onIdleContainer()。