线程安全

当使用并发消息监听器容器时,单个监听器实例将在所有消费者线程上被调用。因此,监听器需要是线程安全的,最好使用无状态监听器。如果无法使您的监听器线程安全,或者添加同步会大大降低添加并发的益处,您可以使用以下几种技术之一

  • 使用 n 个容器,每个容器的 concurrency=1,并使用原型作用域的 MessageListener Bean,以便每个容器获得自己的实例(在使用 @KafkaListener 时无法实现此功能)。

  • 将状态保存在 ThreadLocal<?> 实例中。

  • 让单例监听器委托给在 SimpleThreadScope(或类似作用域)中声明的 Bean。

为了便于清理线程状态(对于前面列表中的第二项和第三项),从 2.2 版开始,监听器容器在每个线程退出时发布一个 ConsumerStoppedEvent。您可以使用 ApplicationListener@EventListener 方法来使用这些事件删除 ThreadLocal<?> 实例或从作用域中 remove() 线程作用域 Bean。请注意,SimpleThreadScope 不会销毁具有销毁接口(如 DisposableBean)的 Bean,因此您应自行 destroy() 该实例。

默认情况下,应用程序上下文的事件多播器在调用线程上调用事件监听器。如果您更改多播器以使用异步执行器,则线程清理将无效。

关于虚拟线程和并发消息监听器容器的特别说明

由于某些底层库类仍然使用 synchronized 块进行线程协调,因此应用程序在将虚拟线程与并发消息监听器容器一起使用时需要谨慎。当启用虚拟线程时,如果并发数超过可用平台线程数,则虚拟线程很可能被固定在平台线程上,并可能出现竞争条件。因此,随着 Spring for Apache Kafka 使用的第三方库发展到完全支持虚拟线程,建议将消息监听器容器上的并发数保持等于或小于平台线程数。这样,应用程序可以避免线程之间的任何竞争条件以及虚拟线程被固定在平台线程上。