应用事件
监听器容器及其 consumer 会发布以下 Spring 应用事件
-
ConsumerStartingEvent
: 当 consumer 线程首次启动时发布,在开始 polling 之前。 -
ConsumerStartedEvent
: 当 consumer 即将开始 polling 时发布。 -
ConsumerFailedToStartEvent
: 如果在consumerStartTimeout
容器属性指定的时间内没有发布ConsumerStartingEvent
,则发布此事件。此事件可能表明配置的任务执行器没有足够的线程来支持其使用的容器及其并发性。出现此情况时也会记录错误消息。 -
ListenerContainerIdleEvent
: 如果在idleEventInterval
(如果配置)时间内没有收到消息时发布。 -
ListenerContainerNoLongerIdleEvent
: 在之前发布ListenerContainerIdleEvent
后,收到一条记录时发布。 -
ListenerContainerPartitionIdleEvent
: 如果在idlePartitionEventInterval
(如果配置)时间内未从该分区收到消息时发布。 -
ListenerContainerPartitionNoLongerIdleEvent
: 从之前发布过ListenerContainerPartitionIdleEvent
的分区收到一条记录时发布。 -
NonResponsiveConsumerEvent
: 当 consumer 似乎阻塞在poll
方法中时发布。 -
ConsumerPartitionPausedEvent
: 当分区暂停时,由每个 consumer 发布。 -
ConsumerPartitionResumedEvent
: 当分区恢复时,由每个 consumer 发布。 -
ConsumerPausedEvent
: 当容器暂停时,由每个 consumer 发布。 -
ConsumerResumedEvent
: 当容器恢复时,由每个 consumer 发布。 -
ConsumerStoppingEvent
: 在停止之前,由每个 consumer 发布。 -
ConsumerStoppedEvent
: consumer 关闭后发布。详见 线程安全。 -
ConsumerRetryAuthEvent
: 当 consumer 认证或授权失败并正在重试时发布。 -
ConsumerRetryAuthSuccessfulEvent
: 当认证或授权重试成功时发布。只有在之前发生过ConsumerRetryAuthEvent
时才会发生。 -
ContainerStoppedEvent
: 当所有 consumer 都已停止时发布。 -
ConcurrentContainerStoppedEvent
: 当ConcurrentMessageListenerContainer
已停止时发布。
默认情况下,应用上下文的事件多播器在调用线程上调用事件监听器。如果将多播器更改为使用异步执行器,则当事件包含对 consumer 的引用时,不得调用任何 Consumer 方法。 |
ListenerContainerIdleEvent
具有以下属性
-
source
: 发布事件的监听器容器实例。 -
container
: 监听器容器,如果源容器是子容器,则为父监听器容器。 -
id
: 监听器 ID(或容器 Bean 名称)。 -
idleTime
: 发布事件时容器已空闲的时间。 -
topicPartitions
: 生成事件时分配给容器的 topic 和分区。 -
consumer
: KafkaConsumer
对象的引用。例如,如果之前调用了 consumer 的pause()
方法,则在收到事件时可以resume()
。 -
paused
: 容器当前是否已暂停。详见 暂停和恢复监听器容器。
ListenerContainerNoLongerIdleEvent
具有相同的属性,除了 idleTime
和 paused
。
ListenerContainerPartitionIdleEvent
具有以下属性
-
source
: 发布事件的监听器容器实例。 -
container
: 监听器容器,如果源容器是子容器,则为父监听器容器。 -
id
: 监听器 ID(或容器 Bean 名称)。 -
idleTime
: 发布事件时分区消费已空闲的时间。 -
topicPartition
: 触发事件的 topic 和分区。 -
consumer
: KafkaConsumer
对象的引用。例如,如果之前调用了 consumer 的pause()
方法,则在收到事件时可以resume()
。 -
paused
: 该 consumer 的该分区消费当前是否已暂停。详见 暂停和恢复监听器容器。
ListenerContainerPartitionNoLongerIdleEvent
具有相同的属性,除了 idleTime
和 paused
。
NonResponsiveConsumerEvent
具有以下属性
-
source
: 发布事件的监听器容器实例。 -
container
: 监听器容器,如果源容器是子容器,则为父监听器容器。 -
id
: 监听器 ID(或容器 Bean 名称)。 -
timeSinceLastPoll
: 容器上次调用poll()
之前的时间。 -
topicPartitions
: 生成事件时分配给容器的 topic 和分区。 -
consumer
: KafkaConsumer
对象的引用。例如,如果之前调用了 consumer 的pause()
方法,则在收到事件时可以resume()
。 -
paused
: 容器当前是否已暂停。详见 暂停和恢复监听器容器。
ConsumerPausedEvent
、ConsumerResumedEvent
和 ConsumerStopping
事件具有以下属性
-
source
: 发布事件的监听器容器实例。 -
container
: 监听器容器,如果源容器是子容器,则为父监听器容器。 -
partitions
: 涉及的TopicPartition
实例。
ConsumerPartitionPausedEvent
、ConsumerPartitionResumedEvent
事件具有以下属性
-
source
: 发布事件的监听器容器实例。 -
container
: 监听器容器,如果源容器是子容器,则为父监听器容器。 -
partition
: 涉及的TopicPartition
实例。
ConsumerRetryAuthEvent
事件具有以下属性
-
source
: 发布事件的监听器容器实例。 -
container
: 监听器容器,如果源容器是子容器,则为父监听器容器。 -
原因
:-
AUTHENTICATION
- 事件因认证异常而发布。 -
AUTHORIZATION
- 事件因授权异常而发布。
-
ConsumerStartingEvent
、ConsumerStartedEvent
、ConsumerFailedToStartEvent
、ConsumerStoppedEvent
、ConsumerRetryAuthSuccessfulEvent
和 ContainerStoppedEvent
事件具有以下属性
-
source
: 发布事件的监听器容器实例。 -
container
: 监听器容器,如果源容器是子容器,则为父监听器容器。
所有容器(无论是子容器还是父容器)都发布 ContainerStoppedEvent
。对于父容器,source
和 container
属性是相同的。
此外,ConsumerStoppedEvent
具有以下附加属性
-
原因
:-
NORMAL
- consumer 正常停止(容器已停止)。 -
ERROR
- 抛出了java.lang.Error
。 -
FENCED
- 事务性 producer 被隔离 (fenced),并且stopContainerWhenFenced
容器属性为true
。 -
AUTH
- 抛出了AuthenticationException
或AuthorizationException
,并且未配置authExceptionRetryInterval
。 -
NO_OFFSET
- 分区没有 offset,并且auto.offset.reset
策略是none
。
-
您可以在发生此类情况后使用此事件重新启动容器
if (event.getReason().equals(Reason.FENCED)) {
event.getSource(MessageListenerContainer.class).start();
}
检测空闲和无响应的 Consumer
异步 consumer 虽然高效,但一个问题是检测它们何时空闲。如果一段时间没有消息到达,您可能需要采取一些措施。
您可以配置监听器容器,使其在一段时间没有消息投递时发布 ListenerContainerIdleEvent
。容器空闲时,每隔 idleEventInterval
毫秒就会发布一个事件。
要配置此功能,请在容器上设置 idleEventInterval
。以下示例展示了如何操作
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
return container;
}
以下示例展示了如何为 @KafkaListener
设置 idleEventInterval
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
在这些情况下,容器空闲时每分钟发布一次事件。
如果由于某种原因 consumer 的 poll()
方法没有退出,则不会接收到消息,也无法生成空闲事件(在早期版本的 kafka-clients
中,当 broker 不可达时存在此问题)。在这种情况下,如果 poll 在 pollTimeout
属性的 3
倍时间内没有返回,则容器会发布 NonResponsiveConsumerEvent
。默认情况下,此检查在每个容器中每 30 秒执行一次。您可以通过在配置监听器容器时,在 ContainerProperties
中设置 monitorInterval
(默认 30 秒)和 noPollThreshold
(默认 3.0)属性来修改此行为。noPollThreshold
应大于 1.0
,以避免由于竞态条件导致出现虚假事件。接收到此类事件后,您可以停止容器,从而唤醒 consumer,使其能够停止。
从版本 2.6.2 开始,如果容器发布了 ListenerContainerIdleEvent
,则在随后收到记录时,它将发布 ListenerContainerNoLongerIdleEvent
。
事件消费
您可以通过实现 ApplicationListener
来捕获这些事件——可以是通用的监听器,也可以是仅接收此特定事件的监听器。您还可以使用 Spring Framework 4.2 中引入的 @EventListener
。
下一个示例将 @KafkaListener
和 @EventListener
组合到一个类中。您应该了解,应用监听器会接收所有容器的事件,因此如果您想根据哪个容器空闲而采取特定行动,您可能需要检查监听器 ID。您也可以为此目的使用 @EventListener
的 condition
。
详见 应用事件 以获取有关事件属性的信息。
事件通常发布在 consumer 线程上,因此与 Consumer
对象交互是安全的。
以下示例同时使用了 @KafkaListener
和 @EventListener
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}
事件监听器会看到所有容器的事件。因此,在前面的示例中,我们根据监听器 ID 来缩小接收到的事件范围。由于为 @KafkaListener 创建的容器支持并发,实际的容器被命名为 id-n ,其中 n 是支持并发的每个实例的唯一值。这就是我们在条件中使用 startsWith 的原因。 |
如果您希望使用空闲事件来停止监听器容器,则不应在调用监听器的线程上调用 container.stop() 。这样做会导致延迟和不必要的日志消息。相反,您应该将事件交给另一个线程,然后由该线程停止容器。此外,如果容器实例是子容器,则不应 stop() 它。您应该停止并发容器。 |
空闲时的当前位置
请注意,您可以通过在监听器中实现 ConsumerSeekAware
来获取检测到空闲时的当前位置。详见 seek 中的 onIdleContainer()
。