监听器并发

SimpleMessageListenerContainer

默认情况下,侦听器容器启动一个消费者,从队列中接收消息。

查看上一节中的表格,您可以看到许多控制并发的属性和特性。最简单的是 concurrentConsumers,它创建(固定)数量的消费者来并发处理消息。

在 1.3.0 版本之前,这是唯一可用的设置,并且必须停止并重新启动容器才能更改此设置。

从 1.3.0 版本开始,您现在可以动态调整 concurrentConsumers 属性。如果在容器运行时更改了此属性,将根据需要添加或移除消费者以适应新设置。

此外,还添加了一个名为 maxConcurrentConsumers 的新属性,容器会根据工作负载动态调整并发性。这与另外四个属性结合使用:consecutiveActiveTriggerstartConsumerMinIntervalconsecutiveIdleTriggerstopConsumerMinInterval。在默认设置下,增加消费者的算法如下:

如果 maxConcurrentConsumers 尚未达到,并且现有消费者连续十个周期都处于活动状态,并且距离上次启动消费者至少已过去 10 秒,则会启动一个新的消费者。如果消费者在 batchSize * receiveTimeout 毫秒内至少收到一条消息,则认为该消费者处于活动状态。

在默认设置下,减少消费者的算法如下:

如果运行中的消费者数量多于 concurrentConsumers,并且某个消费者检测到连续十次超时(空闲),并且距离上次停止消费者至少已过去 60 秒,则会停止一个消费者。超时时间取决于 receiveTimeoutbatchSize 属性。如果消费者在 batchSize * receiveTimeout 毫秒内没有收到任何消息,则认为该消费者处于空闲状态。因此,在默认超时(一秒)和 batchSize 为四的情况下,在空闲 40 秒后(四次超时对应一次空闲检测)才会考虑停止消费者。

实际上,只有当整个容器空闲一段时间后,消费者才能被停止。这是因为代理会将工作分配给所有活动的消费者。

每个消费者使用一个通道,无论配置的队列数量是多少。

从 2.0 版本开始,concurrentConsumersmaxConcurrentConsumers 属性可以通过 concurrency 属性进行设置——例如,2-4

使用 DirectMessageListenerContainer

使用此容器时,并发性基于配置的队列和 consumersPerQueue。每个队列的每个消费者都使用一个单独的通道,并发性由 Rabbit 客户端库控制。默认情况下,在撰写本文时,它使用一个线程池,其大小为 DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2

您可以配置一个 taskExecutor 来提供所需的并发最大值。

© . This site is unofficial and not affiliated with VMware.