使用 Spring JMS

本节介绍如何使用 Spring 的 JMS 组件。

使用 JmsTemplate

JmsTemplate 类是 JMS 核心包中的中心类。它简化了 JMS 的使用,因为它在发送或同步接收消息时处理资源的创建和释放。

使用 JmsTemplate 的代码只需要实现回调接口,这些接口提供了清晰定义的高级契约。MessageCreator 回调接口在给定由 JmsTemplate 中的调用代码提供的 Session 时创建消息。为了允许更复杂的 JMS API 使用,SessionCallback 提供 JMS 会话,而 ProducerCallback 暴露一个 SessionMessageProducer 对。

JMS API 暴露了两种发送方法,一种接收投递模式、优先级和存活时间作为服务质量 (QOS) 参数,另一种不接收 QOS 参数并使用默认值。由于 JmsTemplate 有许多发送方法,为了避免发送方法数量上的重复,QOS 参数的设置被暴露为 bean 属性。类似地,同步接收调用的超时值可以通过使用 setReceiveTimeout 属性来设置。

一些 JMS 提供者允许通过配置 ConnectionFactory 来管理性地设置默认 QOS 值。这样做会导致对 MessageProducer 实例的 send 方法 (send(Destination destination, Message message)) 的调用使用的默认 QOS 值与 JMS 规范中指定的值不同。为了提供一致的 QOS 值管理,JmsTemplate 因此必须通过将布尔属性 isExplicitQosEnabled 设置为 true 来专门启用使用自己的 QOS 值。

为了方便,JmsTemplate 还暴露了一个基本的请求-回复操作,允许发送消息并在操作过程中创建的临时队列上等待回复。

一旦配置完成,JmsTemplate 类的实例是线程安全的。这很重要,因为它意味着你可以配置一个 JmsTemplate 的单个实例,然后安全地将这个共享引用注入到多个协作对象中。需要明确的是,JmsTemplate 是有状态的,因为它维护了对 ConnectionFactory 的引用,但这种状态不是会话状态。

自 Spring Framework 4.1 起,JmsMessagingTemplate 构建在 JmsTemplate 之上,并提供了与消息抽象(即 org.springframework.messaging.Message)的集成。这允许你以通用方式创建要发送的消息。

连接

JmsTemplate 需要引用一个 ConnectionFactoryConnectionFactory 是 JMS 规范的一部分,作为使用 JMS 的入口点。它被客户端应用用作与 JMS 提供者创建连接的工厂,并封装了各种配置参数,其中许多是供应商特定的,例如 SSL 配置选项。

在 EJB 中使用 JMS 时,供应商提供了 JMS 接口的实现,以便它们可以参与声明式事务管理并执行连接和会话的池化。为了使用这种实现,Jakarta EE 容器通常要求你在 EJB 或 servlet 部署描述符内部将 JMS 连接工厂声明为 resource-ref。为了确保在 EJB 内部使用 JmsTemplate 时能够利用这些特性,客户端应用应确保它引用的是 ConnectionFactory 的托管实现。

缓存消息资源

标准 API 涉及创建许多中间对象。要发送消息,需要执行以下 'API' 流程:

ConnectionFactory->Connection->Session->MessageProducer->send

ConnectionFactorySend 操作之间,会创建和销毁三个中间对象。为了优化资源使用并提高性能,Spring 提供了两种 ConnectionFactory 实现。

使用 SingleConnectionFactory

Spring 提供了一个 ConnectionFactory 接口的实现,SingleConnectionFactory,它在所有 createConnection() 调用中返回相同的 Connection 并忽略对 close() 的调用。这对于测试和独立环境非常有用,这样同一连接可以用于跨越任意数量事务的多个 JmsTemplate 调用。SingleConnectionFactory 接收一个标准 ConnectionFactory 的引用,该引用通常来自 JNDI。

使用 CachingConnectionFactory

CachingConnectionFactory 扩展了 SingleConnectionFactory 的功能,并增加了 SessionMessageProducerMessageConsumer 实例的缓存。初始缓存大小设置为 1。你可以使用 sessionCacheSize 属性来增加缓存会话的数量。请注意,实际缓存会话的数量会多于该数字,因为会话根据它们的确认模式进行缓存,所以当 sessionCacheSize 设置为一时,最多可以有四个缓存的会话实例(每种确认模式一个)。MessageProducerMessageConsumer 实例在其所属会话内缓存,并在缓存时也考虑生产者和消费者的独特属性。MessageProducer 根据其目的地进行缓存。MessageConsumer 根据由目的地、选择器、noLocal 投递标志和持久订阅名称(如果创建持久消费者)组成的键进行缓存。

临时队列和主题(TemporaryQueue/TemporaryTopic)的 MessageProducer 和 MessageConsumer 将永远不会被缓存。遗憾的是,WebLogic JMS 恰好在其常规目的地实现上实现了临时队列/主题接口,错误地指示其所有目的地都无法缓存。请在 WebLogic 上使用不同的连接池/缓存,或为 WebLogic 定制 CachingConnectionFactory

目的地管理

目的地,作为 ConnectionFactory 实例,是 JMS 管理对象,你可以将其存储和检索在 JNDI 中。配置 Spring 应用上下文时,你可以使用 JNDI 的 JndiObjectFactoryBean 工厂类或 <jee:jndi-lookup> 对你的对象引用 JMS 目的地进行依赖注入。然而,如果应用中有大量目的地,或者存在 JMS 提供者特有的高级目的地管理功能,这种策略通常会很繁琐。这类高级目的地管理的示例包括动态目的地的创建或对层级目的地命名空间的支持。JmsTemplate 将目的地名称的解析委托给实现 DestinationResolver 接口的 JMS 目的地对象。DynamicDestinationResolverJmsTemplate 使用的默认实现,用于解析动态目的地。还提供了 JndiDestinationResolver 作为 JNDI 中包含的目的地的服务定位器,并可选地回退到 DynamicDestinationResolver 中的行为。

通常,JMS 应用中使用的目的地仅在运行时可知,因此在部署应用时无法通过管理方式创建。这通常是因为交互系统组件之间存在共享应用逻辑,这些逻辑根据众所周知的命名约定在运行时创建目的地。尽管动态目的地的创建不是 JMS 规范的一部分,但大多数供应商都提供了此功能。动态目的地使用用户定义的名称创建,这使其与临时目的地不同,并且通常不注册在 JNDI 中。创建动态目的地所使用的 API 因提供者而异,因为与目的地关联的属性是供应商特定的。然而,供应商有时会做出一个简单的实现选择,即忽略 JMS 规范中的警告,并使用 TopicSessioncreateTopic(String topicName) 方法或 QueueSessioncreateQueue(String queueName) 方法创建具有默认目的地属性的新目的地。根据供应商实现的不同,DynamicDestinationResolver 随后也可以创建一个物理目的地,而不仅仅是解析一个现有的。

布尔属性 pubSubDomain 用于配置 JmsTemplate,使其了解正在使用哪种 JMS 域。默认情况下,此属性的值为 false,表示使用点对点域,即 Queues。此属性(由 JmsTemplate 使用)通过 DestinationResolver 接口的实现来决定动态目的地解析的行为。

你还可以通过 defaultDestination 属性为 JmsTemplate 配置一个默认目的地。对于不指定特定目的地的发送和接收操作,将使用默认目的地。

消息监听器容器

在 EJB 世界中,JMS 消息最常见的用途之一是驱动消息驱动 Bean (MDB)。Spring 提供了一种创建消息驱动 POJO (MDP) 的解决方案,这种方式不将用户绑定到 EJB 容器。(有关 Spring MDP 支持的详细介绍,请参阅异步接收:消息驱动 POJO。)端点方法可以用 @JmsListener 注解——有关详细信息,请参阅注解驱动的监听器端点

消息监听器容器用于从 JMS 消息队列接收消息并驱动注入其中的 MessageListener。监听器容器负责所有消息接收的线程化,并将消息分派给监听器进行处理。消息监听器容器是 MDP 和消息提供者之间的中介,负责注册接收消息、参与事务、资源获取和释放、异常转换等等。这使你能够编写与接收消息(并可能对其进行响应)相关的(可能很复杂的)业务逻辑,并将样板化的 JMS 基础设施关注点委托给框架。

Spring 提供了两个标准的 JMS 消息监听器容器,每个容器都有一套专门的功能集。

使用 SimpleMessageListenerContainer

这个消息监听器容器是两个标准类型中较简单的一个。它在启动时创建固定数量的 JMS 会话和消费者,使用标准的 JMS MessageConsumer.setMessageListener() 方法注册监听器,并由 JMS 提供者负责执行监听器回调。这个变体不允许动态适应运行时需求,也不允许参与外部管理的事务。在兼容性方面,它非常接近独立 JMS 规范的精神,但通常与 Jakarta EE 的 JMS 限制不兼容。

虽然 SimpleMessageListenerContainer 不允许参与外部管理的事务,但它确实支持原生的 JMS 事务。要启用此功能,你可以将 sessionTransacted 标志切换为 true,或者在 XML 命名空间中将 acknowledge 属性设置为 transacted。监听器抛出的异常将导致回滚,并且消息会被重新投递。或者,可以考虑使用 CLIENT_ACKNOWLEDGE 模式,该模式在发生异常时也提供重新投递,但不使用事务性 Session 实例,因此不在事务协议中包含任何其他 Session 操作(例如发送响应消息)。
默认的 AUTO_ACKNOWLEDGE 模式无法提供足够的可靠性保证。当监听器执行失败时(因为提供者在调用监听器后会自动确认每条消息,不会将异常传播给提供者)或者当监听器容器关闭时(你可以通过设置 acceptMessagesWhileStopping 标志来配置),消息可能会丢失。如果需要可靠性保证(例如,用于可靠的队列处理和持久主题订阅),请确保使用事务性会话。

使用 DefaultMessageListenerContainer

这个消息监听器容器在大多数情况下被使用。与 SimpleMessageListenerContainer 不同,这个容器变体允许动态适应运行时需求,并能够参与外部管理的事务。当配置了 JtaTransactionManager 时,每条收到的消息都会注册到一个 XA 事务中。因此,处理过程可以利用 XA 事务语义。这个监听器容器在对 JMS 提供者的低要求、高级功能(例如参与外部管理的事务)以及与 Jakarta EE 环境的兼容性之间取得了很好的平衡。

你可以自定义容器的缓存级别。请注意,当没有启用缓存时,每次接收消息都会创建一个新的连接和一个新的会话。在高负载下结合非持久订阅使用,这可能导致消息丢失。在这种情况下,请务必使用适当的缓存级别。

当代理服务器宕机时,这个容器也具备恢复能力。默认情况下,一个简单的 BackOff 实现会每五秒重试一次。你可以指定一个自定义的 BackOff 实现,以获得更细粒度的恢复选项。请参阅 ExponentialBackOff 作为示例。

与其同胞容器 (SimpleMessageListenerContainer) 类似,DefaultMessageListenerContainer 支持原生 JMS 事务并允许自定义确认模式。如果对你的场景可行,强烈建议优先使用此方式而非外部管理的事务——也就是说,如果你能够接受在 JVM 死亡情况下偶尔出现的消息重复。你的业务逻辑中的自定义重复消息检测步骤可以处理这种情况——例如,通过检查业务实体是否存在或检查协议表。任何此类安排都比替代方案效率显著更高:通过 XA 事务包装你的整个处理过程(通过配置你的 DefaultMessageListenerContainer 使用 JtaTransactionManager),以覆盖 JMS 消息的接收以及消息监听器中业务逻辑的执行(包括数据库操作等)。
默认的 AUTO_ACKNOWLEDGE 模式无法提供足够的可靠性保证。当监听器执行失败时(因为提供者在调用监听器后会自动确认每条消息,不会将异常传播给提供者)或者当监听器容器关闭时(你可以通过设置 acceptMessagesWhileStopping 标志来配置),消息可能会丢失。如果需要可靠性保证(例如,用于可靠的队列处理和持久主题订阅),请确保使用事务性会话。

事务管理

Spring 提供了一个 JmsTransactionManager,它管理单个 JMS ConnectionFactory 的事务。这使得 JMS 应用能够利用 Spring 的托管事务特性,如数据访问章节的事务管理部分所述。JmsTransactionManager 执行本地资源事务,将指定的 ConnectionFactory 的 JMS Connection/Session 对绑定到线程。JmsTemplate 会自动检测到此类事务性资源并相应地进行操作。

在 Jakarta EE 环境中,ConnectionFactory 会连接和会话实例进行池化,因此这些资源可以在事务中高效地重用。在独立环境中,使用 Spring 的 SingleConnectionFactory 会导致共享的 JMS Connection,每个事务都有其自己的独立 Session。或者,可以考虑使用特定于提供者的池化适配器,例如 ActiveMQ 的 PooledConnectionFactory 类。

你还可以将 JmsTemplateJtaTransactionManager 和支持 XA 的 JMS ConnectionFactory 一起使用,以执行分布式事务。请注意,这需要使用 JTA 事务管理器以及经过正确 XA 配置的 ConnectionFactory。(请查阅你的 Jakarta EE 服务器或 JMS 提供者的文档。)

在托管和非托管事务环境中重用代码时,使用 JMS API 从 Connection 创建 Session 可能会令人困惑。这是因为 JMS API 只有一个创建 Session 的工厂方法,并且它需要事务和确认模式的值。在托管环境中,设置这些值是环境事务基础设施的责任,因此这些值会被厂商提供的 JMS Connection 包装器忽略。当你在非托管环境中使用 JmsTemplate 时,你可以通过使用属性 sessionTransactedsessionAcknowledgeMode 来指定这些值。当你将 PlatformTransactionManagerJmsTemplate 一起使用时,模板总是会获得一个事务性的 JMS Session