监听器并发

SimpleMessageListenerContainer

默认情况下,监听器容器启动一个消费者来接收队列中的消息。

在查看前一节的表格时,您可以看到许多控制并发的属性。最简单的是 concurrentConsumers,它会创建(固定)数量的消费者来并发处理消息。

在 1.3.0 版本之前,这是唯一可用的设置,必须先停止容器再重新启动才能更改设置。

从 1.3.0 版本开始,您现在可以动态调整 concurrentConsumers 属性。如果在容器运行时更改此属性,将根据需要添加或移除消费者以适应新设置。

此外,新增了一个名为 maxConcurrentConsumers 的属性,容器会根据工作负载动态调整并发。这与另外四个属性配合使用:consecutiveActiveTriggerstartConsumerMinIntervalconsecutiveIdleTriggerstopConsumerMinInterval。在默认设置下,增加消费者的算法如下:

如果尚未达到 maxConcurrentConsumers,并且一个现有消费者连续十个周期处于活动状态,并且自上次启动消费者以来至少过去了 10 秒,则会启动一个新的消费者。如果在 batchSize * receiveTimeout 毫秒内收到至少一条消息,则认为消费者处于活动状态。

在默认设置下,减少消费者的算法如下:

如果运行的消费者数量多于 concurrentConsumers,并且一个消费者检测到连续十次超时(空闲),并且自上次停止消费者以来至少过去了 60 秒,则会停止一个消费者。超时取决于 receiveTimeoutbatchSize 属性。如果在 batchSize * receiveTimeout 毫秒内未收到任何消息,则认为消费者处于空闲状态。因此,在默认超时时间(一秒)和 batchSize 为四的情况下,40 秒空闲时间后(四次超时对应一次空闲检测)会考虑停止一个消费者。

实际上,只有当整个容器空闲一段时间后,消费者才能被停止。这是因为 Broker 会将其工作分配给所有活动的消费者。

每个消费者使用一个独立的通道,无论配置了多少个队列。

从 2.0 版本开始,concurrentConsumersmaxConcurrentConsumers 属性可以通过 concurrency 属性设置,例如 2-4

使用 DirectMessageListenerContainer

对于此容器,并发是基于配置的队列和 consumersPerQueue 的。每个队列的每个消费者使用一个单独的通道,并且并发由 Rabbit 客户端库控制。在撰写本文时,默认情况下,它使用一个线程池,其大小为 DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2

您可以配置一个 taskExecutor 来提供所需的最大并发。