检测空闲的异步消费者

虽然异步消费者效率很高,但检测它们何时空闲是一个问题——用户可能希望在一段时间没有消息到达时采取一些行动。

从 1.6 版本开始,现在可以配置监听器容器,使其在一段时间没有消息投递时发布 `ListenerContainerIdleEvent` 事件。当容器空闲时,每隔 `idleEventInterval` 毫秒就会发布一次事件。

要配置此功能,请在容器上设置 `idleEventInterval`。以下示例展示了如何在 XML 和 Java 中(对于 `SimpleMessageListenerContainer` 和 `SimpleRabbitListenerContainerFactory`)进行配置。

<rabbit:listener-container connection-factory="connectionFactory"
        ...
        idle-event-interval="60000"
        ...
        >
    <rabbit:listener id="container1" queue-names="foo" ref="myListener" method="handle" />
</rabbit:listener-container>
@Bean
public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    ...
    container.setIdleEventInterval(60000L);
    ...
    return container;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory());
    factory.setIdleEventInterval(60000L);
    ...
    return factory;
}

在每种情况下,当容器空闲时,每分钟发布一次事件。

事件消费

您可以通过实现 `ApplicationListener` 来捕获空闲事件——可以是通用监听器,也可以是仅接收此特定事件的监听器。您还可以使用 Spring Framework 4.2 中引入的 `@EventListener`。

以下示例将 `@RabbitListener` 和 `@EventListener` 合并到一个类中。您需要理解应用程序监听器会收到所有容器的事件,因此如果您想根据哪个容器空闲来采取特定行动,可能需要检查监听器 ID。您也可以为此目的使用 `@EventListener` 的 `condition` 属性。

事件有四个属性

  • source:监听器容器实例

  • id:监听器 ID(或容器 Bean 名称)

  • idleTime:事件发布时容器已空闲的时间

  • queueNames:容器监听的队列名称

以下示例展示了如何使用 `@RabbitListener` 和 `@EventListener` 注解创建监听器

public class Listener {

    @RabbitListener(id="someId", queues="#{queue.name}")
    public String listen(String foo) {
        return foo.toUpperCase();
    }

    @EventListener(condition = "event.listenerId == 'someId'")
    public void onApplicationEvent(ListenerContainerIdleEvent event) {
        ...
    }

}
事件监听器会看到所有容器的事件。因此,在前面的示例中,我们根据监听器 ID 来过滤接收到的事件。
如果您希望使用空闲事件来停止监听器容器,则不应在调用监听器的线程上调用 `container.stop()`。这样做总是会导致延迟和不必要的日志消息。相反,您应该将事件传递给另一个线程,然后由该线程来停止容器。