接收消息

本文介绍了如何在 Spring 中接收 JMS 消息。

同步接收

虽然 JMS 通常与异步处理相关联,但您可以同步使用消息。重载的receive(..) 方法提供了此功能。在同步接收期间,调用线程会阻塞,直到消息可用。这可能是一个危险的操作,因为调用线程可能会无限期地阻塞。receiveTimeout 属性指定接收器在放弃等待消息之前应等待多长时间。

异步接收:消息驱动的 POJO

Spring 还通过使用@JmsListener 注解支持带注解的监听器端点,并提供了一个开放的基础架构以编程方式注册端点。这迄今为止是设置异步接收器的最便捷方式。有关更多详细信息,请参阅启用监听器端点注解

以类似于 EJB 世界中的消息驱动的 Bean (MDB) 的方式,消息驱动的 POJO (MDP) 充当 JMS 消息的接收器。MDP 的唯一限制(但请参阅使用MessageListenerAdapter)是它必须实现jakarta.jms.MessageListener 接口。请注意,如果您的 POJO 在多个线程上接收消息,则务必确保您的实现是线程安全的。

以下示例显示了 MDP 的简单实现

  • Java

  • Kotlin

public class ExampleListener implements MessageListener {

	public void onMessage(Message message) {
		if (message instanceof TextMessage textMessage) {
			try {
				System.out.println(textMessage.getText());
			}
			catch (JMSException ex) {
				throw new RuntimeException(ex);
			}
		}
		else {
			throw new IllegalArgumentException("Message must be of type TextMessage");
		}
	}
}
class ExampleListener : MessageListener {

	override fun onMessage(message: Message) {
		if (message is TextMessage) {
			try {
				println(message.text)
			} catch (ex: JMSException) {
				throw RuntimeException(ex)
			}
		} else {
			throw IllegalArgumentException("Message must be of type TextMessage")
		}
	}
}

实现完MessageListener 后,就可以创建消息监听器容器了。

以下示例演示了如何定义和配置 Spring 附带的消息监听器容器之一(在本例中为DefaultMessageListenerContainer

  • Java

  • Kotlin

  • Xml

@Bean
ExampleListener messageListener() {
	return new ExampleListener();
}

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	return jmsContainer;
}
@Bean
fun messageListener() = ExampleListener()

@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
	}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>

<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
</bean>

有关每个实现支持的功能的完整描述,请参阅 Spring 的各种消息监听器容器(所有这些容器都实现了MessageListenerContainer)的 Java 文档。

使用SessionAwareMessageListener 接口

SessionAwareMessageListener 接口是 Spring 特定的接口,它提供与 JMS MessageListener 接口类似的契约,但还允许消息处理方法访问接收Message 的 JMS Session。以下列表显示了SessionAwareMessageListener 接口的定义

package org.springframework.jms.listener;

public interface SessionAwareMessageListener {

	void onMessage(Message message, Session session) throws JMSException;
}

如果您希望 MDP 能够响应任何接收到的消息(通过在onMessage(Message, Session) 方法中使用提供的Session),可以选择让您的 MDP 实现此接口(而不是标准 JMS MessageListener 接口)。Spring 附带的所有消息监听器容器实现都支持实现MessageListenerSessionAwareMessageListener 接口的 MDP。实现SessionAwareMessageListener 的类存在一个警告,即它们随后通过接口绑定到 Spring。是否使用它完全由您作为应用程序开发人员或架构师决定。

请注意,SessionAwareMessageListener 接口的onMessage(..) 方法会抛出JMSException。与标准 JMS MessageListener 接口相反,当使用SessionAwareMessageListener 接口时,客户端代码负责处理任何抛出的异常。

使用MessageListenerAdapter

MessageListenerAdapter 类是 Spring 异步消息支持中的最后一个组件。简而言之,它允许您将几乎任何类公开为 MDP(尽管有一些约束)。

考虑以下接口定义

  • Java

  • Kotlin

public interface MessageDelegate {

	void handleMessage(String message);

	void handleMessage(Map message);

	void handleMessage(byte[] message);

	void handleMessage(Serializable message);
}
interface MessageDelegate {
	fun handleMessage(message: String)
	fun handleMessage(message: Map<*, *>)
	fun handleMessage(message: ByteArray)
	fun handleMessage(message: Serializable)
}

请注意,尽管该接口既没有扩展MessageListener 也没有扩展SessionAwareMessageListener 接口,但您仍然可以通过使用MessageListenerAdapter 类将其用作 MDP。还要注意各种消息处理方法是如何根据它们可以接收和处理的各种Message 类型的內容进行强类型化的。

现在考虑以下MessageDelegate 接口的实现

  • Java

  • Kotlin

public class DefaultMessageDelegate implements MessageDelegate {

	@Override
	public void handleMessage(String message) {
		// ...
	}

	@Override
	public void handleMessage(Map message) {
		// ...
	}

	@Override
	public void handleMessage(byte[] message) {
		// ...
	}

	@Override
	public void handleMessage(Serializable message) {
		// ...
	}
}
class DefaultMessageDelegate : MessageDelegate {

	override fun handleMessage(message: String) {
		// ...
	}

	override fun handleMessage(message: Map<*, *>) {
		// ...
	}

	override fun handleMessage(message: ByteArray) {
		// ...
	}

	override fun handleMessage(message: Serializable) {
		// ...
	}
}

特别是,请注意MessageDelegate 接口(DefaultMessageDelegate 类)的前述实现根本没有任何 JMS 依赖项。它确实是一个 POJO,我们可以通过以下配置将其变成 MDP

  • Java

  • Kotlin

  • Xml

@Bean
MessageListenerAdapter messageListener(DefaultMessageDelegate messageDelegate) {
	return new MessageListenerAdapter(messageDelegate);
}

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	return jmsContainer;
}
@Bean
fun messageListener(messageDelegate: DefaultMessageDelegate): MessageListenerAdapter {
	return MessageListenerAdapter(messageDelegate)
}

@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
	}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultMessageDelegate"/>
	</constructor-arg>
</bean>

<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
</bean>

下一个示例显示了另一个 MDP,它只能处理接收 JMS TextMessage 消息。请注意,消息处理方法实际上称为receiveMessageListenerAdapter 中的消息处理方法的名称默认为handleMessage),但它是可配置的(如您稍后在本节中所见)。还要注意receive(..) 方法是如何被强类型化为仅接收和响应 JMS TextMessage 消息的。以下列表显示了TextMessageDelegate 接口的定义

  • Java

  • Kotlin

public interface TextMessageDelegate {

	void receive(TextMessage message);
}
interface TextMessageDelegate {
	fun receive(message: TextMessage)
}

以下列表显示了一个实现TextMessageDelegate 接口的类

  • Java

  • Kotlin

public class DefaultTextMessageDelegate implements TextMessageDelegate {

	@Override
	public void receive(TextMessage message) {
		// ...
	}
}
class DefaultTextMessageDelegate : TextMessageDelegate {

	override fun receive(message: TextMessage) {
		// ...
	}
}

然后,相关的MessageListenerAdapter 的配置如下所示

  • Java

  • Kotlin

  • Xml

@Bean
MessageListenerAdapter messageListener(DefaultTextMessageDelegate messageDelegate) {
	MessageListenerAdapter messageListener = new MessageListenerAdapter(messageDelegate);
	messageListener.setDefaultListenerMethod("receive");
	// We don't want automatic message context extraction
	messageListener.setMessageConverter(null);
	return messageListener;
}
@Bean
fun messageListener(messageDelegate: DefaultTextMessageDelegate) = MessageListenerAdapter(messageDelegate).apply {
	setDefaultListenerMethod("receive")
	// We don't want automatic message context extraction
	setMessageConverter(null)
}
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultTextMessageDelegate"/>
	</constructor-arg>
	<property name="defaultListenerMethod" value="receive"/>
	<!-- we don't want automatic message context extraction -->
	<property name="messageConverter">
		<null/>
	</property>
</bean>

请注意,如果messageListener 接收到的 JMS Message 的类型不是TextMessage,则会抛出IllegalStateException(随后被吞并)。MessageListenerAdapter 类的另一个功能是,如果处理程序方法返回非空值,则能够自动发送回响应Message。考虑以下接口和类

  • Java

  • Kotlin

public interface ResponsiveTextMessageDelegate {

	// Notice the return type...
	String receive(TextMessage message);
}
interface ResponsiveTextMessageDelegate {

	// Notice the return type...
	fun receive(message: TextMessage): String
}
  • Java

  • Kotlin

public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {

	@Override
	public String receive(TextMessage message) {
		return "message";
	}
}
class DefaultResponsiveTextMessageDelegate : ResponsiveTextMessageDelegate {

	override fun receive(message: TextMessage): String {
		return "message"
	}
}

如果您将DefaultResponsiveTextMessageDelegateMessageListenerAdapter 结合使用,则从'receive(..)' 方法执行返回的任何非空值都会(在默认配置中)转换为TextMessage。然后,生成的TextMessage 会发送到原始Message 的 JMS Reply-To 属性中定义的Destination(如果存在),或者发送到MessageListenerAdapter 上设置的默认Destination(如果已配置)。如果找不到任何Destination,则会抛出InvalidDestinationException(请注意,此异常不会被吞并,而是会向上传播到调用堆栈)。

在事务中处理消息

在事务中调用消息监听器只需要重新配置监听器容器。

您可以通过监听器容器定义上的sessionTransacted 标志激活本地资源事务。然后,每个消息监听器调用都在活动 JMS 事务中操作,如果监听器执行失败,则消息接收将回滚。发送响应消息(通过SessionAwareMessageListener)是同一本地事务的一部分,但任何其他资源操作(例如数据库访问)都是独立操作的。这通常需要在监听器实现中进行重复消息检测,以涵盖数据库处理已提交但消息处理失败提交的情况。

考虑以下 Bean 定义

  • Java

  • Kotlin

  • Xml

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	jmsContainer.setSessionTransacted(true);
	return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
		isSessionTransacted = true
	}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="sessionTransacted" value="true"/>
</bean>

要参与外部管理的事务,您需要配置一个事务管理器并使用支持外部管理事务的监听器容器(通常是DefaultMessageListenerContainer)。

要配置消息监听器容器以参与 XA 事务,您需要配置一个JtaTransactionManager(默认情况下,它委托给 Jakarta EE 服务器的事务子系统)。请注意,底层的 JMS ConnectionFactory 需要支持 XA 并正确注册到您的 JTA 事务协调器。(检查您的 Jakarta EE 服务器的 JNDI 资源配置。)这使得消息接收以及(例如)数据库访问成为同一事务的一部分(具有统一的提交语义,但以 XA 事务日志开销为代价)。

以下 Bean 定义创建了一个事务管理器

  • Java

  • Kotlin

  • Xml

@Bean
JtaTransactionManager transactionManager()  {
	return new JtaTransactionManager();
}
@Bean
fun transactionManager() = JtaTransactionManager()
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

然后我们需要将其添加到我们之前的容器配置中。容器负责其余工作。以下示例展示了如何执行此操作

  • Java

  • Kotlin

  • Xml

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	jmsContainer.setSessionTransacted(true);
	return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener,
				 transactionManager: JtaTransactionManager) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
		setTransactionManager(transactionManager)
	}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="transactionManager" ref="transactionManager"/>
</bean>