接收消息
本节介绍如何在 Spring 中使用 JMS 接收消息。
同步接收
虽然 JMS 通常与异步处理相关联,但你也可以同步消费消息。重载的 receive(..)
方法提供了此功能。在同步接收期间,调用线程会阻塞,直到消息可用。这可能是一个危险的操作,因为调用线程可能会无限期地阻塞。receiveTimeout
属性指定了接收器在放弃等待消息之前应该等待多长时间。
异步接收:消息驱动的 POJO (MDP)
Spring 还通过使用 `@JmsListener` 注解支持注解监听器端点,并提供开放的基础设施来通过编程方式注册端点。这是迄今为止设置异步接收器最方便的方法。更多详情请参阅 启用监听器端点注解。 |
与 EJB 世界中的消息驱动 Bean (MDB) 类似,消息驱动 POJO (MDP) 充当 JMS 消息的接收器。对 MDP 的一个限制(但请参阅 使用 MessageListenerAdapter
)是它必须实现 jakarta.jms.MessageListener
接口。请注意,如果你的 POJO 在多个线程上接收消息,务必确保你的实现是线程安全的。
以下示例展示了一个简单的 MDP 实现
-
Java
-
Kotlin
public class ExampleListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage textMessage) {
try {
System.out.println(textMessage.getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}
class ExampleListener : MessageListener {
override fun onMessage(message: Message) {
if (message is TextMessage) {
try {
println(message.text)
} catch (ex: JMSException) {
throw RuntimeException(ex)
}
} else {
throw IllegalArgumentException("Message must be of type TextMessage")
}
}
}
一旦你实现了 MessageListener
,就可以创建一个消息监听器容器了。
以下示例展示了如何定义和配置 Spring 自带的一种消息监听器容器(在本例中为 DefaultMessageListenerContainer
)
-
Java
-
Kotlin
-
Xml
@Bean
ExampleListener messageListener() {
return new ExampleListener();
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}
@Bean
fun messageListener() = ExampleListener()
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>
<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
请参阅各种消息监听器容器(它们都实现了 MessageListenerContainer)的 Spring javadoc,以获取每种实现支持功能的完整描述。
使用 SessionAwareMessageListener
接口
SessionAwareMessageListener
接口是一个 Spring 特有的接口,它提供了一个与 JMS MessageListener
接口类似的契约,但同时也允许消息处理方法访问接收到 Message
的 JMS Session
。以下列表显示了 SessionAwareMessageListener
接口的定义
package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
如果你希望你的 MDP 能够响应任何接收到的消息(通过使用 onMessage(Message, Session)
方法中提供的 Session
),你可以选择让你的 MDP 实现此接口(优先于标准的 JMS MessageListener
接口)。Spring 自带的所有消息监听器容器实现都支持实现 MessageListener
或 SessionAwareMessageListener
接口的 MDP。实现了 SessionAwareMessageListener
的类有一个注意事项:它们通过接口与 Spring 耦合。是否使用它完全取决于你作为应用开发者或架构师的决定。
请注意,SessionAwareMessageListener
接口的 onMessage(..)
方法会抛出 JMSException
。与标准的 JMS MessageListener
接口不同,当使用 SessionAwareMessageListener
接口时,客户端代码有责任处理所有抛出的异常。
使用 MessageListenerAdapter
MessageListenerAdapter
类是 Spring 异步消息支持中的最后一个组件。简而言之,它允许你将几乎任何类公开为 MDP(尽管存在一些限制)。
考虑以下接口定义
-
Java
-
Kotlin
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
}
interface MessageDelegate {
fun handleMessage(message: String)
fun handleMessage(message: Map<*, *>)
fun handleMessage(message: ByteArray)
fun handleMessage(message: Serializable)
}
注意,尽管该接口既没有扩展 MessageListener
接口也没有扩展 SessionAwareMessageListener
接口,但你仍然可以使用 MessageListenerAdapter
类将其用作 MDP。另请注意,各种消息处理方法是如何根据它们可以接收和处理的各种 Message
类型的内容进行强类型化的。
现在考虑 MessageDelegate
接口的以下实现
-
Java
-
Kotlin
public class DefaultMessageDelegate implements MessageDelegate {
@Override
public void handleMessage(String message) {
// ...
}
@Override
public void handleMessage(Map message) {
// ...
}
@Override
public void handleMessage(byte[] message) {
// ...
}
@Override
public void handleMessage(Serializable message) {
// ...
}
}
class DefaultMessageDelegate : MessageDelegate {
override fun handleMessage(message: String) {
// ...
}
override fun handleMessage(message: Map<*, *>) {
// ...
}
override fun handleMessage(message: ByteArray) {
// ...
}
override fun handleMessage(message: Serializable) {
// ...
}
}
特别请注意,上述 MessageDelegate
接口的实现(即 DefaultMessageDelegate
类)完全没有 JMS 依赖。它确实是一个 POJO,我们可以通过以下配置将其变成 MDP。
-
Java
-
Kotlin
-
Xml
@Bean
MessageListenerAdapter messageListener(DefaultMessageDelegate messageDelegate) {
return new MessageListenerAdapter(messageDelegate);
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}
@Bean
fun messageListener(messageDelegate: DefaultMessageDelegate): MessageListenerAdapter {
return MessageListenerAdapter(messageDelegate)
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
下一个示例展示了另一个只能处理接收 JMS TextMessage
消息的 MDP。注意消息处理方法实际上是如何被称为 receive
的(在 MessageListenerAdapter
中,消息处理方法名称默认为 handleMessage
),但它是可配置的(如本节后面所示)。另请注意,receive(..)
方法是强类型化的,只接收并响应 JMS TextMessage
消息。以下列表显示了 TextMessageDelegate
接口的定义
-
Java
-
Kotlin
public interface TextMessageDelegate {
void receive(TextMessage message);
}
interface TextMessageDelegate {
fun receive(message: TextMessage)
}
以下列表显示了一个实现 TextMessageDelegate
接口的类
-
Java
-
Kotlin
public class DefaultTextMessageDelegate implements TextMessageDelegate {
@Override
public void receive(TextMessage message) {
// ...
}
}
class DefaultTextMessageDelegate : TextMessageDelegate {
override fun receive(message: TextMessage) {
// ...
}
}
相应的 MessageListenerAdapter
配置如下
-
Java
-
Kotlin
-
Xml
@Bean
MessageListenerAdapter messageListener(DefaultTextMessageDelegate messageDelegate) {
MessageListenerAdapter messageListener = new MessageListenerAdapter(messageDelegate);
messageListener.setDefaultListenerMethod("receive");
// We don't want automatic message context extraction
messageListener.setMessageConverter(null);
return messageListener;
}
@Bean
fun messageListener(messageDelegate: DefaultTextMessageDelegate) = MessageListenerAdapter(messageDelegate).apply {
setDefaultListenerMethod("receive")
// We don't want automatic message context extraction
setMessageConverter(null)
}
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- we don't want automatic message context extraction -->
<property name="messageConverter">
<null/>
</property>
</bean>
请注意,如果 messageListener
接收到的 JMS Message
类型不是 TextMessage
,将抛出 IllegalStateException
(并随后被吞没)。MessageListenerAdapter
类的另一个功能是,如果处理方法返回非 void 值,则能够自动发送响应 Message
。考虑以下接口和类
-
Java
-
Kotlin
public interface ResponsiveTextMessageDelegate {
// Notice the return type...
String receive(TextMessage message);
}
interface ResponsiveTextMessageDelegate {
// Notice the return type...
fun receive(message: TextMessage): String
}
-
Java
-
Kotlin
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
@Override
public String receive(TextMessage message) {
return "message";
}
}
class DefaultResponsiveTextMessageDelegate : ResponsiveTextMessageDelegate {
override fun receive(message: TextMessage): String {
return "message"
}
}
如果你将 DefaultResponsiveTextMessageDelegate
与 MessageListenerAdapter
结合使用,从 `'receive(..)'` 方法执行返回的任何非 null 值(在默认配置下)将被转换为 TextMessage
。然后,生成的 TextMessage
将发送到原始 Message
的 JMS Reply-To
属性中定义的 Destination
(如果存在)或 MessageListenerAdapter
上设置的默认 Destination
(如果已配置)。如果未找到 Destination
,将抛出 InvalidDestinationException
(注意,此异常不会被吞没,而是会沿着调用栈向上层传播)。
在事务中处理消息
在事务中调用消息监听器只需要重新配置监听器容器。
你可以通过监听器容器定义上的 sessionTransacted
标志激活本地资源事务。这样,每次消息监听器调用都会在活跃的 JMS 事务中运行,如果监听器执行失败,消息接收将被回滚。发送响应消息(通过 SessionAwareMessageListener
)是同一本地事务的一部分,但任何其他资源操作(如数据库访问)则独立运行。这通常需要在监听器实现中进行重复消息检测,以应对数据库处理已提交但消息处理未能提交的情况。
考虑以下 Bean 定义
-
Java
-
Kotlin
-
Xml
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
isSessionTransacted = true
}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>
要参与外部管理的事务,你需要配置一个事务管理器,并使用支持外部管理事务的监听器容器(通常是 DefaultMessageListenerContainer
)。
要为 XA 事务参与配置消息监听器容器,你需要配置一个 JtaTransactionManager
(它默认会委托给 Jakarta EE 服务器的事务子系统)。请注意,底层的 JMS ConnectionFactory
需要具备 XA 能力,并已在你的 JTA 事务协调器中正确注册。(请检查你的 Jakarta EE 服务器的 JNDI 资源配置。)这样,消息接收以及(例如)数据库访问就可以作为同一事务的一部分(具有统一的提交语义,但会增加 XA 事务日志开销)。
以下 Bean 定义创建了一个事务管理器
-
Java
-
Kotlin
-
Xml
@Bean
JtaTransactionManager transactionManager() {
return new JtaTransactionManager();
}
@Bean
fun transactionManager() = JtaTransactionManager()
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
然后我们需要将其添加到之前的容器配置中。容器会处理其余的事情。以下示例展示了如何操作
-
Java
-
Kotlin
-
Xml
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener,
transactionManager: JtaTransactionManager) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
setTransactionManager(transactionManager)
}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/>
</bean>