注解驱动的监听器端点

以异步方式接收消息最简单的方法是使用注解驱动的监听器端点基础设施。简而言之,它允许您将托管 Bean 的方法公开为 JMS 监听器端点。以下示例展示了如何使用它

@Component
public class MyService {

	@JmsListener(destination = "myDestination")
	public void processOrder(String data) { ... }
}

前面示例的思想是,每当 jakarta.jms.Destination myDestination 上的消息可用时,processOrder 就会相应地调用 方法(在此示例中,使用 JMS 消息的内容,类似于 MessageListenerAdapter 提供的功能)。

注解驱动的端点基础设施会为每个注解方法在幕后创建一个消息监听器容器,通过使用 JmsListenerContainerFactory。此类容器未在 application context 注册,但可以通过使用 JmsListenerEndpointRegistry bean 轻松定位以进行管理。

@JmsListener 是 Java 8 中可重复使用的注解,因此您可以通过向同一方法添加额外的 @JmsListener 声明来将其与多个 JMS 目的地关联。

启用监听器端点注解

要启用对 @JmsListener 注解的支持,您可以将 @EnableJms 添加到您的 @Configuration 类之一中,如下例所示:

  • Java

  • Kotlin

  • Xml

@Configuration
@EnableJms
public class JmsConfiguration {

	@Bean
	public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory,
			DestinationResolver destinationResolver) {

		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory);
		factory.setDestinationResolver(destinationResolver);
		factory.setSessionTransacted(true);
		factory.setConcurrency("3-10");
		return factory;
	}
}
@Configuration
@EnableJms
class JmsConfiguration {

	@Bean
	fun jmsListenerContainerFactory(connectionFactory: ConnectionFactory, destinationResolver: DestinationResolver) =
		DefaultJmsListenerContainerFactory().apply {
			setConnectionFactory(connectionFactory)
			setDestinationResolver(destinationResolver)
			setSessionTransacted(true)
			setConcurrency("3-10")
		}
}
<jms:annotation-driven/>

<bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destinationResolver" ref="destinationResolver"/>
	<property name="sessionTransacted" value="true"/>
	<property name="concurrency" value="3-10"/>
</bean>

默认情况下,基础设施会查找名为 jmsListenerContainerFactory 的 bean 作为创建消息监听器容器的工厂来源。在这种情况下(并忽略 JMS 基础设施设置),您可以使用三个核心线程和十个最大线程的线程池来调用 processOrder 方法。

您可以为每个注解定制要使用的监听器容器工厂,或者通过实现 JmsListenerConfigurer 接口来配置显式默认设置。仅当至少有一个端点未注册特定容器工厂时才需要默认设置。有关详细信息和示例,请参阅实现 JmsListenerConfigurer 的类的 javadoc。

编程式端点注册

JmsListenerEndpoint 提供了 JMS 端点的模型,并负责为该模型配置容器。除了通过 JmsListener 注解检测到的端点外,该基础设施还允许您编程式地配置端点。以下示例展示了如何进行:

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

	@Override
	public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
		SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
		endpoint.setId("myJmsEndpoint");
		endpoint.setDestination("anotherQueue");
		endpoint.setMessageListener(message -> {
			// processing
		});
		registrar.registerEndpoint(endpoint);
	}
}

在前面的示例中,我们使用了 SimpleJmsListenerEndpoint,它提供了要调用的实际 MessageListener。但是,您也可以构建自己的端点变体来描述自定义调用机制。

请注意,您可以完全跳过使用 @JmsListener,而仅通过 JmsListenerConfigurer 编程式地注册您的端点。

注解端点方法签名

到目前为止,我们一直在端点中注入一个简单的 String,但它实际上可以具有非常灵活的方法签名。在以下示例中,我们将其重写为注入带有自定义 header 的 Order

@Component
public class MyService {

	@JmsListener(destination = "myDestination")
	public void processOrder(Order order, @Header("order_type") String orderType) {
		...
	}
}

您可以在 JMS 监听器端点中注入的主要元素如下:

  • 原始 jakarta.jms.Message 或其任何子类(前提是它与传入的消息类型匹配)。

  • 用于可选访问原生 JMS API 的 jakarta.jms.Session (例如,用于发送自定义回复)。

  • 代表传入 JMS 消息的 org.springframework.messaging.Message。请注意,此消息包含自定义 header 和标准 header(由 JmsHeaders 定义)。

  • 使用 @Header 注解的方法参数,用于提取特定的 header 值,包括标准 JMS header。

  • 使用 @Headers 注解的参数,该参数还必须可赋值给 java.util.Map,以便访问所有 header。

  • 未使用注解的元素,如果不是受支持的类型(MessageSession),则被视为 payload。您可以通过使用 @Payload 注解参数来明确指定这一点。您还可以通过添加额外的 @Valid 来开启验证。

注入 Spring 的 Message 抽象的能力特别有用,可以利用存储在传输特定消息中的所有信息,而无需依赖传输特定的 API。以下示例展示了如何进行:

@JmsListener(destination = "myDestination")
public void processOrder(Message<Order> order) { ... }

方法参数的处理由 DefaultMessageHandlerMethodFactory 提供,您可以进一步自定义它以支持其他方法参数。您也可以在此处自定义转换和验证支持。

例如,如果我们想确保我们的 Order 在处理之前是有效的,我们可以使用 @Valid 注解 payload 并配置必要的验证器,如下例所示:

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

	@Override
	public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
		registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
	}

	@Bean
	public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
		DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
		factory.setValidator(myValidator());
		return factory;
	}
}

响应管理

MessageListenerAdapter 中已有的支持允许您方法具有非 void 返回类型。在这种情况下,方法调用的结果会被封装在 jakarta.jms.Message 中,发送到原始消息的 JMSReplyTo header 中指定的目的地,或发送到监听器上配置的默认目的地。现在,您可以使用消息抽象的 @SendTo 注解来设置该默认目的地。

假设我们的 processOrder 方法现在应该返回一个 OrderStatus,我们可以编写它以自动发送响应,如下例所示:

@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
	// order processing
	return status;
}
如果您有多个使用 @JmsListener 注解的方法,您也可以将 @SendTo 注解放在类级别上以共享一个默认回复目的地。

如果您需要以传输无关的方式设置额外的 header,您可以返回一个 Message 而不是,方法类似于以下示例:

@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
	// order processing
	return MessageBuilder
			.withPayload(status)
			.setHeader("code", 1234)
			.build();
}

如果您需要在运行时计算响应目的地,您可以将响应封装在 JmsResponse 实例中,该实例也提供运行时使用的目的地。我们可以将前面的示例重写如下:

@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
	// order processing
	Message<OrderStatus> response = MessageBuilder
			.withPayload(status)
			.setHeader("code", 1234)
			.build();
	return JmsResponse.forQueue(response, "status");
}

最后,如果您需要为响应指定一些 QoS values,例如 priority 或 time to live,您可以相应地配置 JmsListenerContainerFactory,如下例所示:

@Configuration
@EnableJms
public class AppConfig {

	@Bean
	public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory());
		QosSettings replyQosSettings = new QosSettings();
		replyQosSettings.setPriority(2);
		replyQosSettings.setTimeToLive(10000);
		factory.setReplyQosSettings(replyQosSettings);
		return factory;
	}
}