线程安全
当使用并发消息监听器容器时,所有消费者线程都会调用同一个监听器实例。因此,监听器需要是线程安全的,并且最好使用无状态监听器。如果无法使监听器线程安全,或者添加同步会显著降低增加并发性的好处,可以使用以下几种技术之一:
-
使用 `n` 个 `concurrency=1` 的容器,并结合原型范围(prototype scoped)的 `MessageListener` bean,这样每个容器都会有自己的实例(在使用 `@KafkaListener` 时无法做到这一点)。
-
将状态保存在 `ThreadLocal<?>` 实例中。
-
让单例监听器委托给在 `SimpleThreadScope`(或类似范围)中声明的 bean。
为了方便清理线程状态(针对前述列表中的第二项和第三项),从 2.2 版本开始,监听器容器在每个线程退出时会发布一个 `ConsumerStoppedEvent`。您可以使用 `ApplicationListener` 或 `@EventListener` 方法来消费这些事件,以移除 `ThreadLocal<?>` 实例或从范围中 `remove()` 线程范围的 bean。请注意,`SimpleThreadScope` 不会销毁具有销毁接口(如 `DisposableBean`)的 bean,因此您应该自己调用 `destroy()` 实例。
默认情况下,应用上下文的事件多播器(event multicaster)在调用线程上调用事件监听器。如果您更改多播器使用异步执行器,线程清理将无效。 |
关于虚拟线程和并发消息监听器容器的特别说明
由于底层库类在线程协调方面仍然使用 `synchronized` 块存在某些限制,因此在使用并发消息监听器容器与虚拟线程时,应用需要谨慎。启用虚拟线程后,如果并发数超过可用的平台线程数,虚拟线程很可能被固定在平台线程上,并且可能出现竞态条件。因此,随着 Spring for Apache Kafka 使用的第三方库逐渐演进以完全支持虚拟线程,建议将消息监听器容器的并发数保持等于或少于平台线程数。通过这种方式,应用可以避免线程之间的竞态条件以及虚拟线程被固定在平台线程上的情况。