线程处理与异步消费者
异步消费者涉及多种不同的线程。
配置在 SimpleMessageListenerContainer
中的 TaskExecutor
的线程用于在 RabbitMQ Client
传递新消息时调用 MessageListener
。如果未配置,则使用 SimpleAsyncTaskExecutor
。如果您使用线程池执行器,需要确保线程池大小足以处理配置的并发数。对于 DirectMessageListenerContainer
,MessageListener
会直接在 RabbitMQ Client
线程上被调用。在这种情况下,taskExecutor
用于监视消费者的任务。
使用默认的 SimpleAsyncTaskExecutor 时,监听器被调用的线程的 threadNamePrefix 会使用监听器容器的 beanName 。这对于日志分析很有用。我们通常建议在日志 appender 配置中始终包含线程名称。当通过容器上的 taskExecutor 属性专门提供了 TaskExecutor 时,它将按原样使用,不做修改。建议您使用类似的技术来命名由自定义 TaskExecutor bean 定义创建的线程,以便于在日志消息中识别线程。 |
在创建连接时,配置在 CachingConnectionFactory
中的 Executor
会传递给 RabbitMQ Client
,其线程用于将新消息传递给监听器容器。如果未配置,客户端会使用内部线程池执行器,每个连接的线程池大小(在撰写本文时)为 Runtime.getRuntime().availableProcessors() * 2
。
如果您有大量工厂或正在使用 CacheMode.CONNECTION
,您可能需要考虑使用一个共享的 ThreadPoolTaskExecutor
,它应具备足够的线程来满足您的工作负载。
对于 DirectMessageListenerContainer ,您需要确保连接工厂配置的 task executor 具有足够的线程来支持所有使用该工厂的监听器容器所需的并发数。默认线程池大小(在撰写本文时)为 Runtime.getRuntime().availableProcessors() * 2 。 |
RabbitMQ client
使用 ThreadFactory
创建用于低级 I/O(套接字)操作的线程。要修改此工厂,您需要配置底层的 RabbitMQ ConnectionFactory
,如配置底层客户端连接工厂中所述。