JMS 支持
Spring Integration 提供用于接收和发送 JMS 消息的通道适配器。
您需要在项目中包含此依赖项
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jms</artifactId>
<version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-jms:6.4.4"
必须通过某些 JMS 供应商特定的实现(例如 Apache ActiveMQ)显式添加 jakarta.jms:jakarta.jms-api
。
实际上有两种基于 JMS 的入站通道适配器。第一种使用 Spring 的 JmsTemplate
基于轮询周期进行接收。第二种是“消息驱动”的,它依赖于 Spring 的 MessageListener
容器。出站通道适配器使用 JmsTemplate
按需转换并发送 JMS 消息。
通过使用 JmsTemplate
和 MessageListener
容器,Spring Integration 依赖于 Spring 的 JMS 支持。理解这一点很重要,因为这些适配器上的大多数属性都配置底层的 JmsTemplate
和 MessageListener
容器。有关 JmsTemplate
和 MessageListener
容器的更多详细信息,请参阅Spring JMS 文档。
JMS 通道适配器用于单向消息传递(仅发送或仅接收),而 Spring Integration 还提供入站和出站 JMS 网关,用于请求和回复操作。入站网关依赖于 Spring 的一种 MessageListener
容器实现来进行消息驱动的接收。它还能够将返回值发送到接收消息提供的 reply-to
目标。出站网关将 JMS 消息发送到 request-destination
(或 request-destination-name
或 request-destination-expression
),然后接收回复消息。您可以显式配置 reply-destination
引用(或 reply-destination-name
或 reply-destination-expression
)。否则,出站网关使用 JMS TemporaryQueue。
在 Spring Integration 2.2 之前,如果需要,会为每个请求或回复创建一个(并删除)TemporaryQueue
。从 Spring Integration 2.2 开始,您可以配置出站网关使用 MessageListener
容器接收回复,而不是直接使用新的(或缓存的)Consumer
为每个请求接收回复。这样配置后,并且没有提供显式的回复目标,每个网关将使用单个 TemporaryQueue
,而不是每个请求使用一个。
从版本 6.0 开始,如果 replyPubSubDomain
选项设置为 true
,出站网关会创建 TemporaryTopic
而不是 TemporaryQueue
。一些 JMS 供应商对这些目标的处理方式不同。
入站通道适配器
入站通道适配器需要引用单个 JmsTemplate
实例,或者同时引用 ConnectionFactory
和 Destination
(您可以使用 'destinationName' 代替 'destination' 引用)。以下示例定义了一个引用 Destination
的入站通道适配器
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow jmsInbound(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(
Jms.inboundAdapter(connectionFactory)
.destination("inQueue"),
e -> e.poller(poller -> poller.fixedRate(30000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
@Bean
fun jmsInbound(connectionFactory: ConnectionFactory) =
integrationFlow(
Jms.inboundAdapter(connectionFactory).destination("inQueue"),
{ poller { Pollers.fixedRate(30000) } })
{
handle { m -> println(m.payload) }
}
@Bean
@InboundChannelAdapter(value = "exampleChannel", poller = @Poller(fixedRate = "30000"))
public MessageSource<Object> jmsIn(ConnectionFactory connectionFactory) {
JmsDestinationPollingSource source = new JmsDestinationPollingSource(new JmsTemplate(connectionFactory));
source.setDestinationName("inQueue");
return source;
}
<int-jms:inbound-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel">
<int:poller fixed-rate="30000"/>
</int-jms:inbound-channel-adapter>
从前面的配置中可以看出,inbound-channel-adapter 是一个轮询消费者。这意味着它在触发时调用 receive() 。您只应在轮询频率相对较低且及时性不重要的场景中使用此选项。对于所有其他场景(绝大多数基于 JMS 的用例),消息驱动的通道适配器 message-driven-channel-adapter (稍后描述)是更好的选择。 |
默认情况下,所有需要引用 ConnectionFactory 的 JMS 适配器都会自动查找名为 jmsConnectionFactory 的 bean。这就是为什么您在许多示例中没有看到 connection-factory 属性。但是,如果您的 JMS ConnectionFactory 有不同的 bean 名称,则需要提供该属性。 |
如果 extract-payload
设置为 true
(默认值),则接收到的 JMS Message 会通过 MessageConverter
。当依赖默认的 SimpleMessageConverter
时,这意味着生成的 Spring Integration Message 的载荷是 JMS 消息的主体。JMS TextMessage
会产生基于字符串的载荷,JMS BytesMessage
会产生字节数组载荷,而 JMS ObjectMessage
的可序列化实例会成为 Spring Integration 消息的载荷。如果您希望将原始 JMS 消息作为 Spring Integration 消息的载荷,请将 extractPayload
选项设置为 false
。
从版本 5.0.8 开始,对于 org.springframework.jms.connection.CachingConnectionFactory
和 cacheConsumers
,receive-timeout
的默认值为 -1
(不等待),否则为 1 秒。JMS 入站通道适配器根据提供的 ConnectionFactory
和选项创建一个 DynamicJmsTemplate
。如果需要外部 JmsTemplate
(例如在 Spring Boot 环境中),或者 ConnectionFactory
不是缓存的,或者没有 cacheConsumers
,建议在预期非阻塞消费时设置 jmsTemplate.receiveTimeout(-1)
Jms.inboundAdapter(connectionFactory)
.destination(queueName)
.configureJmsTemplate(template -> template.receiveTimeout(-1))
事务
从版本 4.0 开始,入站通道适配器支持 session-transacted
属性。在早期版本中,您必须注入一个设置了 sessionTransacted
为 true
的 JmsTemplate
。(适配器确实允许您将 acknowledge
属性设置为 transacted
,但这不正确且不起作用)。
但是请注意,将 session-transacted
设置为 true
价值不大,因为事务在 receive()
操作之后、消息发送到 channel
之前立即提交。
如果您希望整个流程是事务性的(例如,如果下游有一个出站通道适配器),则必须使用带有 JmsTransactionManager
的 transactional
轮询器。或者,考虑使用设置了 acknowledge
为 transacted
(默认值)的 jms-message-driven-channel-adapter
。
消息驱动通道适配器
message-driven-channel-adapter
需要引用 Spring 的 MessageListener
容器实例(AbstractMessageListenerContainer
的任何子类),或者同时引用 ConnectionFactory
和 Destination
(可以使用 'destinationName' 代替 'destination' 引用)。以下示例定义了一个引用 Destination
的消息驱动通道适配器
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow jmsMessageDrivenRedeliveryFlow() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
.destination("inQueue"))
.channel("exampleChannel")
.get();
}
@Bean
fun jmsMessageDrivenFlowWithContainer() =
integrationFlow(
Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
.destination("inQueue")) {
channel("exampleChannel")
}
@Bean
public JmsMessageDrivenEndpoint jmsIn() {
JmsMessageDrivenEndpoint endpoint = new JmsMessageDrivenEndpoint(container(), listener());
return endpoint;
}
@Bean
public AbstractMessageListenerContainer container() {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(cf());
container.setDestinationName("inQueue");
return container;
}
@Bean
public ChannelPublishingJmsMessageListener listener() {
ChannelPublishingJmsMessageListener listener = new ChannelPublishingJmsMessageListener();
listener.setRequestChannelName("exampleChannel");
return listener;
}
<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel"/>
消息驱动适配器还接受与 如果您有自定义的监听器容器实现(通常是 |
您不能使用 Spring JMS 命名空间元素 建议为 |
从版本 4.2 开始,除非您提供了外部容器,否则默认的 acknowledge 模式是 transacted 。在这种情况下,您应该根据需要配置容器。我们建议将 transacted 与 DefaultMessageListenerContainer 一起使用,以避免消息丢失。 |
'extract-payload' 属性具有相同的效果,其默认值为 'true'。poller
元素不适用于消息驱动通道适配器,因为它会被主动调用。对于大多数场景,消息驱动方法更好,因为消息从底层 JMS 消费者接收后会立即传递给 MessageChannel
。
最后,<message-driven-channel-adapter>
元素还接受 'error-channel' 属性。这提供了与 进入 GatewayProxyFactoryBean
中描述的相同基本功能。以下示例展示了如何在消息驱动通道适配器上设置错误通道
<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue"
channel="exampleChannel"
error-channel="exampleErrorChannel"/>
将前面的示例与我们稍后讨论的通用网关配置或 JMS 'inbound-gateway' 进行比较,主要区别在于我们处于单向流中,因为这是一个 'channel-adapter',而不是网关。因此,从 'error-channel' 向下游的流也应该是单向的。例如,它可以发送到日志处理器,或者可以连接到不同的 JMS <outbound-channel-adapter>
元素。
从主题消费时,将 pub-sub-domain
属性设置为 true。对于持久订阅,将 subscription-durable
设置为 true
;对于共享订阅(需要 JMS 2.0 代理,自版本 4.2 起可用),将 subscription-shared
设置为 true
。使用 subscription-name
为订阅命名。
从版本 5.1 开始,当应用仍在运行时停止端点时,底层的监听器容器会被关闭,其共享连接和消费者也会关闭。以前,连接和消费者保持开放。要恢复到以前的行为,请将 JmsMessageDrivenEndpoint
上的 shutdownContainerOnStop
设置为 false
。
从版本 6.3 开始,现在可以为 ChannelPublishingJmsMessageListener
提供 RetryTemplate
和 RecoveryCallback<Message<?>>
,用于下游发送和发送-接收操作的重试。这些选项也暴露到 Java DSL 的 JmsMessageDrivenChannelAdapterSpec
中。
入站转换错误
从版本 4.2 开始,'error-channel' 也用于处理转换错误。以前,如果 JMS <message-driven-channel-adapter/>
或 <inbound-gateway/>
由于转换错误而无法发送消息,异常会抛回给容器。如果容器配置为使用事务,则消息会被回滚并重复重传。转换过程在消息构造之前和期间发生,因此此类错误不会发送到 'error-channel'。现在,此类转换异常会导致将 ErrorMessage
发送到 'error-channel',并将异常作为 payload
。如果您希望事务回滚,并且定义了 'error-channel',则 'error-channel' 上的集成流必须重新抛出异常(或另一个异常)。如果错误流不抛出异常,则事务会提交并删除消息。如果未定义 'error-channel',则异常会像以前一样抛回给容器。
出站通道适配器
JmsSendingMessageHandler
实现了 MessageHandler
接口,能够将 Spring Integration Messages
转换为 JMS 消息,然后发送到 JMS 目标。它需要 jmsTemplate
引用,或者同时需要 jmsConnectionFactory
和 destination
引用(可以使用 destinationName
代替 destination
)。与入站通道适配器一样,配置此适配器最简单的方法是使用命名空间支持。以下配置会生成一个适配器,它从 exampleChannel
接收 Spring Integration 消息,将这些消息转换为 JMS 消息,然后将其发送到 bean 名称为 outQueue
的 JMS 目标引用
-
Java DSL
-
Kotlin DSL
-
Groovy DSL
-
Java
-
XML
@Bean
public IntegrationFlow jmsOutboundFlow() {
return IntegrationFlow.from("exampleChannel")
.handle(Jms.outboundAdapter(cachingConnectionFactory())
.destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
.configureJmsTemplate(t -> t.id("jmsOutboundFlowTemplate")));
}
@Bean
fun jmsOutboundFlow() =
integrationFlow("exampleChannel") {
handle(Jms.outboundAdapter(jmsConnectionFactory())
.apply {
destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
deliveryModeFunction<Any> { DeliveryMode.NON_PERSISTENT }
timeToLiveExpression("10000")
configureJmsTemplate { it.explicitQosEnabled(true) }
}
)
}
@Bean
jmsOutboundFlow() {
integrationFlow('exampleChannel') {
handle(Jms.outboundAdapter(new ActiveMQConnectionFactory())
.with {
destinationExpression 'headers.' + SimpMessageHeaderAccessor.DESTINATION_HEADER
deliveryModeFunction { DeliveryMode.NON_PERSISTENT }
timeToLiveExpression '10000'
configureJmsTemplate {
it.explicitQosEnabled true
}
}
)
}
}
@Bean
@ServiceActivator(inputChannel = "exampleChannel")
public MessageHandler jmsOut() {
JmsSendingMessageHandler handler = new JmsSendingMessageHandler(new JmsTemplate(connectionFactory));
handler.setDestinationName("outQueue");
return handler;
}
<int-jms:outbound-channel-adapter id="jmsOut" destination="outQueue" channel="exampleChannel"/>
与入站通道适配器一样,这里也有一个 'extract-payload' 属性。但是,对于出站适配器来说,其含义是相反的。这个布尔属性并非应用于 JMS 消息,而是应用于 Spring Integration 消息的载荷。换句话说,决定是将 Spring Integration 消息本身作为 JMS 消息体传递,还是将 Spring Integration 消息载荷作为 JMS 消息体传递。默认值为 'true'。因此,如果您传递一个载荷是 String
的 Spring Integration 消息,则会创建一个 JMS TextMessage
。另一方面,如果您想通过 JMS 将实际的 Spring Integration 消息发送到另一个系统,请将其设置为 'false'。
无论载荷提取的布尔值如何,只要您依赖默认转换器或提供了另一个 MessageConverter 实例的引用,Spring Integration 的 MessageHeaders 都会映射到 JMS 属性。(对于“入站”适配器也是如此,只不过在这种情况下,JMS 属性会映射到 Spring Integration 的 MessageHeaders )。 |
从版本 5.1 开始,可以通过配置 <int-jms:outbound-channel-adapter>
(JmsSendingMessageHandler
) 的 deliveryModeExpression
和 timeToLiveExpression
属性来在运行时针对请求 Spring Message
评估发送 JMS 消息的适当 QoS 值。DefaultJmsHeaderMapper
的新选项 setMapInboundDeliveryMode(true)
和 setMapInboundExpiration(true)
可以作为从消息头获取动态 deliveryMode
和 timeToLive
信息的来源
<int-jms:outbound-channel-adapter delivery-mode-expression="headers.jms_deliveryMode"
time-to-live-expression="headers.jms_expiration - T(System).currentTimeMillis()"/>
入站网关
Spring Integration 的消息驱动 JMS 入站网关委托给 MessageListener
容器,支持动态调整并发消费者,并且可以处理回复。入站网关需要引用 ConnectionFactory
和请求 Destination
(或 'requestDestinationName')。以下示例定义了一个 JMS inbound-gateway
,它从 bean id 为 inQueue
引用的 JMS 队列接收消息,并将其发送到名为 exampleChannel
的 Spring Integration 通道
<int-jms:inbound-gateway id="jmsInGateway"
request-destination="inQueue"
request-channel="exampleChannel"/>
由于网关提供请求-回复行为而非单向发送或接收行为,它们也有两个不同的“载荷提取”属性(如 之前讨论的 通道适配器的 'extract-payload' 设置)。对于入站网关,'extract-request-payload' 属性决定是否提取接收到的 JMS Message 主体。如果为 'false',则 JMS 消息本身成为 Spring Integration 消息载荷。默认值为 'true'。
类似地,对于入站网关,'extract-reply-payload' 属性应用于要转换为回复 JMS Message 的 Spring Integration 消息。如果您想传递整个 Spring Integration 消息(作为 JMS ObjectMessage 的主体),请将其值设置为 'false'。默认情况下,Spring Integration 消息载荷也会被转换为 JMS Message(例如,String
载荷会变成 JMS TextMessage)。
与其他任何情况一样,网关调用可能会导致错误。默认情况下,生产者不会收到消费者端可能发生的错误的通知,并且会在等待回复时超时。但是,有时您可能希望将错误情况传回给消费者(换句话说,您可能希望通过将异常映射到消息来将其视为有效的回复)。为此,JMS 入站网关提供了消息通道支持,可以将错误发送到该通道进行处理,这可能会产生符合某个合约的回复消息载荷,该合约定义了调用者作为“错误”回复所期望的内容。您可以使用 error-channel 属性来配置此类通道,如下例所示
<int-jms:inbound-gateway request-destination="requestQueue"
request-channel="jmsInputChannel"
error-channel="errorTransformationChannel"/>
<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
您可能会注意到,此示例与 进入 GatewayProxyFactoryBean
中包含的示例非常相似。这里的想法是相同的:exceptionTransformer
可以是一个创建错误响应对象的 POJO,您可以引用 nullChannel
来抑制错误,或者您可以省略 'error-channel' 以让异常传播。
参见入站转换错误。
从主题消费时,将 pub-sub-domain
属性设置为 true。对于持久订阅,将 subscription-durable
设置为 true
;对于共享订阅(需要 JMS 2.0 代理,自版本 4.2 起可用),将 subscription-shared
设置为 true
。使用 subscription-name
为订阅命名。
从版本 4.2 开始,除非提供外部容器,否则默认的 acknowledge 模式是 transacted 。在这种情况下,您应根据需要配置容器。我们建议您将 transacted 与 DefaultMessageListenerContainer 一起使用,以避免消息丢失。 |
从版本 5.1 开始,当应用仍在运行时停止端点时,底层监听器容器将关闭,从而关闭其共享连接和消费者。以前,连接和消费者保持打开状态。要恢复到之前的行为,请将 JmsInboundGateway
上的 shutdownContainerOnStop
设置为 false
。
默认情况下,JmsInboundGateway
会在接收到的消息中查找 jakarta.jms.Message.getJMSReplyTo()
属性来确定发送回复的位置。否则,可以使用静态的 defaultReplyDestination
、defaultReplyQueueName
或 defaultReplyTopicName
进行配置。此外,从版本 6.1 开始,可以在提供的 ChannelPublishingJmsMessageListener
上配置 replyToExpression
,以便在请求消息的标准 JMSReplyTo
属性为 null
时动态确定回复目的地。收到的 jakarta.jms.Message
用作根评估上下文对象。以下示例演示了如何使用 Java DSL API 配置入站 JMS 网关,其自定义回复目的地从请求消息中解析得出:
@Bean
public IntegrationFlow jmsInboundGatewayFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(
Jms.inboundGateway(connectionFactory)
.requestDestination("requestDestination")
.replyToFunction(message -> message.getStringProperty("myReplyTo")))
.<String, String>transform(String::toUpperCase)
.get();
}
从版本 6.3 开始,Jms.inboundGateway()
API 暴露了 retryTemplate()
和 recoveryCallback()
选项,用于重试内部发送和接收操作。
出站网关
出站网关从 Spring Integration 消息创建 JMS 消息,并将它们发送到 request-destination
。然后,它通过使用选择器从您配置的 reply-destination
接收或(如果没有提供 reply-destination
)通过创建 JMS TemporaryQueue
(如果 replyPubSubDomain= true
则创建 TemporaryTopic
)实例来处理 JMS 回复消息。
将 如果指定了回复目的地,建议不要使用缓存消费者。或者,可以考虑使用 |
以下示例展示了如何配置出站网关:
<int-jms:outbound-gateway id="jmsOutGateway"
request-destination="outQueue"
request-channel="outboundJmsRequests"
reply-channel="jmsReplies"/>
“outbound-gateway”的有效载荷提取属性与“inbound-gateway”的属性相反(参见先前的讨论)。这意味着“extract-request-payload”属性值适用于转换为要发送的 JMS 消息的 Spring Integration 消息。而“extract-reply-payload”属性值适用于作为回复接收并随后转换为 Spring Integration 消息以发送到“reply-channel”的 JMS 消息,如前面的配置示例所示。
使用 <reply-listener/>
Spring Integration 2.2 引入了一种处理回复的替代技术。如果向网关添加 <reply-listener/>
子元素而不是为每个回复创建消费者,则使用 MessageListener
容器来接收回复并将它们交给请求线程。这提供了许多性能优势,同时也缓解了先前警告中描述的缓存消费者内存利用问题。
当使用没有 reply-destination
的出站网关时,使用 <reply-listener/>
,不是为每个请求创建一个 TemporaryQueue
,而是使用一个单独的 TemporaryQueue
。(如果与 broker 的连接丢失并恢复,网关会根据需要创建一个额外的 TemporaryQueue
)。如果 replyPubSubDomain
设置为 true
,则从版本 6.0 开始,会创建一个 TemporaryTopic
。
使用 correlation-key
时,多个网关可以共享同一个回复目的地,因为监听器容器使用对每个网关唯一的选择器。
如果指定了回复监听器并指定了回复目的地(或回复目的地名称)但没有提供关联键,则网关会记录警告并回退到 2.2 版本之前的行为。这是因为在这种情况下无法配置选择器。因此,无法避免回复发送到可能配置了相同回复目的地的不同网关。 请注意,在这种情况下,每个请求都会使用一个新的消费者,并且如上述警告所述,消费者会在内存中累积;因此,在这种情况下不应使用缓存消费者。 |
以下示例展示了一个带有默认属性的回复监听器:
<int-jms:outbound-gateway id="jmsOutGateway"
request-destination="outQueue"
request-channel="outboundJmsRequests"
reply-channel="jmsReplies">
<int-jms:reply-listener />
</int-jms-outbound-gateway>
监听器非常轻量级,我们预计在大多数情况下,只需要一个消费者。但是,您可以添加诸如 concurrent-consumers
、max-concurrent-consumers
等属性。有关支持的属性的完整列表,请参阅模式文件,以及Spring JMS 文档了解其含义。
空闲回复监听器
从版本 4.2 开始,您可以根据需要启动回复监听器(并在空闲一段时间后停止它),而不是在网关的整个生命周期内运行。如果您在应用程序上下文中有很多网关(例如许多不活动的、分区的 Spring Batch 作业使用 Spring Integration 和 JMS 进行分区分发)并且它们大多数时候处于空闲状态,这会很有用。如果所有回复监听器都处于活动状态,则 JMS broker 会为每个网关配备一个活动消费者。通过启用空闲超时,每个消费者仅在相应的批处理作业运行时存在(并在作业完成后短暂存在一段时间)。
参见属性参考中的 idle-reply-listener-timeout
。
网关回复关联
本节描述用于回复关联的机制(确保源网关仅接收对其请求的回复),具体取决于网关的配置方式。有关此处讨论的属性的完整描述,请参见属性参考。
以下列表描述了各种场景(数字仅用于标识 — 顺序无关紧要):
-
没有
reply-destination*
属性且没有<reply-listener>
为每个请求创建一个
TemporaryQueue
,并在请求完成(成功或失败)后删除。correlation-key
不相关。 -
提供了
reply-destination*
属性,且未提供<reply-listener/>
或correlation-key
等于出站消息的
JMSCorrelationID
用作消费者的消息选择器messageSelector = "JMSCorrelationID = '" + messageId + "'"
期望响应系统在回复的
JMSCorrelationID
中返回入站JMSMessageID
。这是一种常见模式,由 Spring Integration 入站网关以及 Spring 的MessageListenerAdapter
(用于消息驱动 POJO)实现。使用此配置时,不应将 topic 用于回复。回复可能会丢失。 -
提供了
reply-destination*
属性,未提供<reply-listener/>
,且correlation-key="JMSCorrelationID"
网关生成一个唯一的关联 ID 并将其插入到
JMSCorrelationID
头部。消息选择器为:messageSelector = "JMSCorrelationID = '" + uniqueId + "'"
期望响应系统在回复的
JMSCorrelationID
中返回入站JMSCorrelationID
。这是一种常见模式,由 Spring Integration 入站网关以及 Spring 的MessageListenerAdapter
(用于消息驱动 POJO)实现。 -
提供了
reply-destination*
属性,未提供<reply-listener/>
,且correlation-key="myCorrelationHeader"
网关生成一个唯一的关联 ID 并将其插入到
myCorrelationHeader
消息属性中。correlation-key
可以是任何用户定义的值。消息选择器为:messageSelector = "myCorrelationHeader = '" + uniqueId + "'"
期望响应系统在回复的
myCorrelationHeader
中返回入站myCorrelationHeader
。 -
提供了
reply-destination*
属性,未提供<reply-listener/>
,且correlation-key="JMSCorrelationID*"
(注意关联键中的*
。)网关使用请求消息中
jms_correlationId
头部中的值(如果存在),并将其插入到JMSCorrelationID
头部中。消息选择器为:messageSelector = "JMSCorrelationID = '" + headers['jms_correlationId'] + "'"
用户必须确保此值是唯一的。
如果头部不存在,网关行为与场景
3
相同。期望响应系统在回复的
JMSCorrelationID
中返回入站JMSCorrelationID
。这是一种常见模式,由 Spring Integration 入站网关以及 Spring 的MessageListenerAdapter
(用于消息驱动 POJO)实现。 -
未提供
reply-destination*
属性,且提供了<reply-listener>
创建一个临时队列,用于来自此网关实例的所有回复。消息中不需要关联数据,但出站
JMSMessageID
在网关内部用于将回复定向到正确的请求线程。 -
提供了
reply-destination*
属性,提供了<reply-listener>
,且未提供correlation-key
不允许。
<reply-listener/>
配置被忽略,网关行为与场景2
相同。会写入警告日志消息以指示此情况。 -
提供了
reply-destination*
属性,提供了<reply-listener>
,且correlation-key="JMSCorrelationID"
网关具有唯一的关联 ID,并将其与递增值一起插入到
JMSCorrelationID
头部(gatewayId + "_" + ++seq
)。消息选择器为:messageSelector = "JMSCorrelationID LIKE '" + gatewayId%'"
期望响应系统在回复的
JMSCorrelationID
中返回入站JMSCorrelationID
。这是一种常见模式,由 Spring Integration 入站网关以及 Spring 的MessageListenerAdapter
(用于消息驱动 POJO)实现。由于每个网关都有唯一的 ID,因此每个实例只接收自己的回复。完整的关联数据用于将回复路由到正确的请求线程。 -
提供了
reply-destination*
属性,提供了<reply-listener/>
,且correlation-key="myCorrelationHeader"
网关具有唯一的关联 ID,并将其与递增值一起插入到
myCorrelationHeader
属性中(gatewayId + "_" + ++seq
)。correlation-key
可以是任何用户定义的值。消息选择器为:messageSelector = "myCorrelationHeader LIKE '" + gatewayId%'"
期望响应系统在回复的
myCorrelationHeader
中返回入站myCorrelationHeader
。由于每个网关都有唯一的 ID,因此每个实例只接收自己的回复。完整的关联数据用于将回复路由到正确的请求线程。 -
提供了
reply-destination*
属性,提供了<reply-listener/>
,且correlation-key="JMSCorrelationID*"
(注意关联键中的
*
)不允许。
不允许用户提供的关联 ID 与回复监听器一起使用。网关不会使用此配置进行初始化。
异步网关
从版本 4.3 开始,现在配置出站网关时可以指定 async="true"
(或在 Java 中使用 setAsync(true)
)。
默认情况下,当请求发送到网关时,请求线程会被挂起,直到收到回复。然后流程在该线程上继续。如果 async
为 true
,则在 send()
完成后立即释放请求线程,回复在监听器容器线程上返回(并且流程继续)。当网关在轮询线程上调用时,这会很有用。线程被释放并可用于框架内的其他任务。
async
需要 <reply-listener/>
(或在使用 Java 配置时使用 setUseReplyContainer(true)
)。它还需要指定 correlationKey
(通常是 JMSCorrelationID
)。如果这两个条件中的任何一个未满足,async
将被忽略。
属性参考
以下列表显示了 outbound-gateway
的所有可用属性:
<int-jms:outbound-gateway
connection-factory="connectionFactory" (1)
correlation-key="" (2)
delivery-persistent="" (3)
destination-resolver="" (4)
explicit-qos-enabled="" (5)
extract-reply-payload="true" (6)
extract-request-payload="true" (7)
header-mapper="" (8)
message-converter="" (9)
priority="" (10)
receive-timeout="" (11)
reply-channel="" (12)
reply-destination="" (13)
reply-destination-expression="" (14)
reply-destination-name="" (15)
reply-pub-sub-domain="" (16)
reply-timeout="" (17)
request-channel="" (18)
request-destination="" (19)
request-destination-expression="" (20)
request-destination-name="" (21)
request-pub-sub-domain="" (22)
time-to-live="" (23)
requires-reply="" (24)
idle-reply-listener-timeout="" (25)
async=""> (26)
<int-jms:reply-listener /> (27)
</int-jms:outbound-gateway>
1 | 对 jakarta.jms.ConnectionFactory 的引用。默认为 jmsConnectionFactory 。 |
||
2 | 包含关联数据以关联响应与回复的属性名称。如果省略,网关期望响应系统在 JMSCorrelationID 头部中返回出站 JMSMessageID 头部的值。如果指定,网关会生成一个关联 ID 并使用它填充指定的属性。响应系统必须在同一属性中回显该值。可以将其设置为 JMSCorrelationID ,在这种情况下,使用标准头部而不是 String 属性来保存关联数据。当使用 <reply-container/> 时,如果提供了明确的 reply-destination ,则必须指定 correlation-key 。从版本 4.0.1 开始,此属性还支持值 JMSCorrelationID* ,这意味着如果出站消息已具有 JMSCorrelationID (从 jms_correlationId 映射而来)头部,则使用该值而不是生成一个新的关联 ID。请注意,当使用 <reply-container/> 时,不允许使用 JMSCorrelationID* 键,因为容器需要在初始化期间设置消息选择器。
|
||
3 | 一个布尔值,指示投递模式应为 DeliveryMode.PERSISTENT (true )还是 DeliveryMode.NON_PERSISTENT (false )。此设置仅在 explicit-qos-enabled 为 true 时生效。 |
||
4 | 一个 DestinationResolver 。默认为 DynamicDestinationResolver ,它将目的地名称映射到同名队列或主题。 |
||
5 | 当设置为 true 时,它启用质量服务(QoS)属性的使用:priority 、delivery-mode 和 time-to-live 。 |
||
6 | 当设置为 true (默认值)时,Spring Integration 回复消息的有效载荷从 JMS 回复消息的主体创建(使用 MessageConverter )。当设置为 false 时,整个 JMS 消息成为 Spring Integration 消息的有效载荷。 |
||
7 | 当设置为 true (默认值)时,Spring Integration 消息的有效载荷被转换为 JMSMessage (使用 MessageConverter )。当设置为 false 时,整个 Spring Integration Message 被转换为 JMSMessage 。在这两种情况下,Spring Integration 消息头部都通过 HeaderMapper 映射到 JMS 头部和属性。 |
||
8 | 一个 HeaderMapper ,用于将 Spring Integration 消息头部映射到 JMS 消息头部和属性之间以及从 JMS 消息头部和属性映射回 Spring Integration 消息头部。 |
||
9 | 对 MessageConverter 的引用,用于在 JMS 消息和 Spring Integration 消息有效载荷(或消息,如果 extract-request-payload 为 false )之间进行转换。默认为 SimpleMessageConverter 。 |
||
10 | 请求消息的默认优先级。如果存在消息优先级头部,则会被覆盖。其范围是 0 到 9 。此设置仅在 explicit-qos-enabled 为 true 时生效。 |
||
11 | 等待回复的时间(以毫秒为单位)。默认为 5000 (五秒)。 |
||
12 | 回复消息发送到的通道。 | ||
13 | 对 Destination 的引用,将设置为 JMSReplyTo 头部。最多只能允许一个 reply-destination 、reply-destination-expression 或 reply-destination-name 。如果未提供任何一个,则为对此网关的回复使用 TemporaryQueue 。 |
||
14 | 一个 SpEL 表达式,评估结果为 Destination ,将设置为 JMSReplyTo 头部。表达式结果可以是 Destination 对象或 String 。它由 DestinationResolver 用于解析实际的 Destination 。最多只能允许一个 reply-destination 、reply-destination-expression 或 reply-destination-name 。如果未提供任何一个,则为对此网关的回复使用 TemporaryQueue 。 |
||
15 | 将设置为 JMSReplyTo 头部的目的地名称。它由 DestinationResolver 用于解析实际的 Destination 。最多只能允许一个 reply-destination 、reply-destination-expression 或 reply-destination-name 。如果未提供任何一个,则为对此网关的回复使用 TemporaryQueue 。 |
||
16 | 当设置为 true 时,表示由 DestinationResolver 解析的任何回复 Destination 应为 Topic 而不是 Queue 。 |
||
17 | 网关在向 reply-channel 发送回复消息时等待的时间。这仅在 reply-channel 可能阻塞时有效 — 例如具有容量限制且当前已满的 QueueChannel 。默认为无限。 |
||
18 | 此网关接收请求消息的通道。 | ||
19 | 对发送请求消息的 Destination 的引用。request-destination 、request-destination-expression 或 request-destination-name 之一是必需的。您只能使用这三个属性中的一个。 |
||
20 | 一个 SpEL 表达式,评估结果为发送请求消息的 Destination 。表达式结果可以是 Destination 对象或 String 。它由 DestinationResolver 用于解析实际的 Destination 。request-destination 、request-destination-expression 或 request-destination-name 之一是必需的。您只能使用这三个属性中的一个。 |
||
21 | 发送请求消息的目的地名称。它由 DestinationResolver 用于解析实际的 Destination 。request-destination 、request-destination-expression 或 request-destination-name 之一是必需的。您只能使用这三个属性中的一个。 |
||
22 | 当设置为 true 时,表示由 DestinationResolver 解析的任何请求 Destination 应为 Topic 而不是 Queue 。 |
||
23 | 指定消息的存活时间。此设置仅在 explicit-qos-enabled 为 true 时生效。 |
||
24 | 指定此出站网关是否必须返回非空值。默认情况下,此值为 true ,当底层服务在 receive-timeout 后未返回值时,将抛出 MessageTimeoutException 。请注意,如果服务永远不期望返回回复,最好使用 <int-jms:outbound-channel-adapter/> 而不是带有 requires-reply="false" 的 <int-jms:outbound-gateway/> 。使用后者时,发送线程会被阻塞,等待回复直至 receive-timeout 时间结束。 |
||
25 | 当使用 <reply-listener /> 时,其生命周期(启动和停止)默认与网关的生命周期匹配。当此值大于 0 时,容器会按需启动(当发送请求时)。容器会继续运行直到至少经过此时间且没有收到请求(并且直到没有未完成的回复)。容器会在下一个请求时再次启动。停止时间是最小值,实际可能高达此值的 1.5 倍。 |
||
26 | 参见异步网关。 | ||
27 | 当包含此元素时,回复由异步 MessageListenerContainer 接收,而不是为每个回复创建一个消费者。在许多情况下,这会更有效。 |
将消息头部映射到 JMS 消息以及从 JMS 消息映射
JMS 消息可以包含元信息,如 JMS API 头部和简单属性。您可以使用 JmsHeaderMapper
将这些信息映射到 Spring Integration 消息头部以及从 Spring Integration 消息头部映射回来。JMS API 头部会传递给相应的 setter 方法(例如 setJMSReplyTo
),而其他头部则复制到 JMS Message 的一般属性中。JMS 出站网关通过 JmsHeaderMapper
的默认实现进行引导,该实现将映射标准 JMS API 头部以及基本类型或 String
消息头部。您还可以通过入站和出站网关的 header-mapper
属性提供自定义头部映射器。
许多 JMS 供应商特定的客户端不允许直接在已创建的 JMS 消息上设置 deliveryMode 、priority 和 timeToLive 属性。它们被认为是 QoS 属性,因此必须传播到目标 MessageProducer.send(message, deliveryMode, priority, timeToLive) API。因此,DefaultJmsHeaderMapper 不会将适当的 Spring Integration 头部(或表达式结果)映射到上述 JMS 消息属性中。相反,JmsSendingMessageHandler 使用 DynamicJmsTemplate 将请求消息中的头部值传播到 MessageProducer.send() API。要启用此功能,必须使用 DynamicJmsTemplate 配置出站端点,并将其 explicitQosEnabled 属性设置为 true。Spring Integration Java DSL 默认配置 DynamicJmsTemplate ,但您仍然必须设置 explicitQosEnabled 属性。 |
从版本 4.0 开始,JMSPriority 头部被映射到入站消息的标准 priority 头部。以前,priority 头部仅用于出站消息。要恢复到之前的行为(即不映射入站优先级),请将 DefaultJmsHeaderMapper 的 mapInboundPriority 属性设置为 false 。 |
从版本 4.3 开始,DefaultJmsHeaderMapper 将标准 correlationId 头部映射为消息属性,通过调用其 toString() 方法(correlationId 通常是 UUID ,JMS 不支持)。在入站端,它被映射为 String 。这独立于 jms_correlationId 头部,后者被映射到 JMSCorrelationID 头部以及从 JMSCorrelationID 头部映射回来。JMSCorrelationID 通常用于关联请求和回复,而 correlationId 通常用于将相关消息组合成一组(例如与聚合器或重排序器一起使用)。 |
从版本 5.1 开始,DefaultJmsHeaderMapper
可以配置为映射入站 JMSDeliveryMode
和 JMSExpiration
属性:
@Bean
public DefaultJmsHeaderMapper jmsHeaderMapper() {
DefaultJmsHeaderMapper mapper = new DefaultJmsHeaderMapper();
mapper.setMapInboundDeliveryMode(true)
mapper.setMapInboundExpiration(true)
return mapper;
}
这些 JMS 属性分别映射到 JmsHeaders.DELIVERY_MODE
和 JmsHeaders.EXPIRATION
Spring Message 头部。
消息转换、编组和解组
如果需要转换消息,所有 JMS 适配器和网关都允许您通过设置 message-converter
属性来提供 MessageConverter
。为此,请提供在同一 ApplicationContext 中可用的 MessageConverter
实例的 bean 名称。此外,为了与编组器和解组器接口保持一致,Spring 提供了 MarshallingMessageConverter
,您可以配置自己的自定义编组器和解组器。以下示例展示了如何操作:
<int-jms:inbound-gateway request-destination="requestQueue"
request-channel="inbound-gateway-channel"
message-converter="marshallingMessageConverter"/>
<bean id="marshallingMessageConverter"
class="org.springframework.jms.support.converter.MarshallingMessageConverter">
<constructor-arg>
<bean class="org.bar.SampleMarshaller"/>
</constructor-arg>
<constructor-arg>
<bean class="org.bar.SampleUnmarshaller"/>
</constructor-arg>
</bean>
当您提供自己的 MessageConverter 实例时,它仍会包装在 HeaderMappingMessageConverter 中。这意味着 'extract-request-payload' 和 'extract-reply-payload' 属性可以影响传递给您的转换器的实际对象。HeaderMappingMessageConverter 本身委托给目标 MessageConverter ,同时还将 Spring Integration MessageHeaders 映射到 JMS 消息属性并再次映射回来。 |
JMS 支持的消息通道
前面介绍的通道适配器和网关都旨在用于与外部系统集成的应用程序。入站选项假定其他系统正在向 JMS 目的地发送 JMS 消息,而出站选项假定其他系统正在从目的地接收消息。这个其他系统可能是也可能不是 Spring Integration 应用程序。当然,当将 Spring Integration 消息实例作为 JMS 消息本身的主体发送时(将 'extract-payload' 值设置为 false
),假定其他系统基于 Spring Integration。然而,这绝不是必需的要求。这种灵活性是使用基于消息的集成选项(抽象为“通道”(或在 JMS 中为目的地))的优势之一。
有时,给定 JMS Destination 的生产者和消费者都打算成为同一应用程序的一部分,并在同一进程中运行。这可以通过使用一对入站和出站通道适配器来实现。这种方法的问题是需要两个适配器,即使从概念上讲,目标是拥有一个单一的消息通道。自 Spring Integration 2.0 版本以来,支持更好的选项。现在可以在使用 JMS 命名空间时定义一个单一的“通道”,如下例所示:
<int-jms:channel id="jmsChannel" queue="exampleQueue"/>
前一个示例中的通道的行为与主 Spring Integration 命名空间中的普通 <channel/>
元素非常相似。它可以被任何端点的 input-channel
和 output-channel
属性引用。不同之处在于,此通道由名为 exampleQueue
的 JMS Queue 实例支持。这意味着生产者和消费者端点之间可以进行异步消息传递。然而,与通过在非 JMS <channel/>
元素中添加 <queue/>
元素创建的更简单的异步消息通道不同,消息不存储在内存队列中。相反,这些消息在 JMS 消息主体中传递,底层 JMS 提供程序的全部功能即可用于该通道。使用这种替代方案最常见的原因可能是利用 JMS 消息的存储转发方法提供的持久性。
如果配置得当,JMS 支持的消息通道也支持事务。换句话说,如果其发送操作是回滚的事务的一部分,生产者实际上不会写入事务性的 JMS 支持通道。同样,如果接收消息是回滚的事务的一部分,消费者不会物理地从通道中删除 JMS 消息。请注意,在这种场景中,生产者和消费者的事务是独立的。这与跨没有 <queue/>
子元素的简单同步 <channel/>
元素传播事务上下文有显著差异。
由于前一个示例引用了 JMS Queue 实例,因此它充当点对点通道。另一方面,如果您需要发布/订阅行为,可以使用单独的元素并引用 JMS Topic。以下示例展示了如何操作:
<int-jms:publish-subscribe-channel id="jmsChannel" topic="exampleTopic"/>
对于任何类型的 JMS 支持通道,可以提供目标名称而不是引用,如下例所示:
<int-jms:channel id="jmsQueueChannel" queue-name="exampleQueueName"/>
<jms:publish-subscribe-channel id="jmsTopicChannel" topic-name="exampleTopicName"/>
在前述示例中,目标名称由 Spring 的默认 DynamicDestinationResolver
实现解析,但您可以提供任何 DestinationResolver
接口的实现。此外,JMS ConnectionFactory
是通道的必需属性,但默认情况下,预期的 bean 名称将是 jmsConnectionFactory
。以下示例既提供了一个用于解析 JMS 目标名称的自定义实例,又为 ConnectionFactory
提供了不同的名称:
<int-jms:channel id="jmsChannel" queue-name="exampleQueueName"
destination-resolver="customDestinationResolver"
connection-factory="customConnectionFactory"/>
对于 <publish-subscribe-channel />
,将 durable
属性设置为 true
以实现持久订阅,或将 subscription-shared
设置为共享订阅(需要 JMS 2.0 broker,自版本 4.2 起可用)。使用 subscription
为订阅命名。
使用 JMS 消息选择器
借助 JMS 消息选择器,您可以根据 JMS 头部和 JMS 属性过滤 JMS 消息。例如,如果您想监听自定义 JMS 头部属性 myHeaderProperty
等于 something
的消息,您可以指定以下表达式:
myHeaderProperty = 'something'
消息选择器表达式是 SQL-92 条件表达式语法的一个子集,并被定义为 Java Message Service 规范的一部分。您可以通过以下 Spring Integration JMS 组件的 XML 命名空间配置来指定 JMS 消息 selector
属性:
-
JMS Channel
-
JMS Publish Subscribe Channel
-
JMS Inbound Channel Adapter
-
JMS Inbound Gateway
-
JMS Message-driven Channel Adapter
您不能使用 JMS 消息选择器引用消息主体值。 |
JMS 示例
要尝试这些 JMS 适配器,请查看 Spring Integration Samples Git 仓库中提供的 JMS 示例:https://github.com/spring-projects/spring-integration-samples/tree/master/basic/jms。
该仓库包含两个示例。一个提供了入站和出站通道适配器,另一个提供了入站和出站网关。它们配置为使用嵌入式 ActiveMQ 进程运行,但您可以修改每个示例的 common.xml Spring 应用程序上下文文件以支持不同的 JMS 提供程序或独立的 ActiveMQ 进程。
换句话说,您可以拆分配置,以便入站和出站适配器在独立的 JVM 中运行。如果您安装了 ActiveMQ,请修改 common.xml
文件中的 brokerURL
属性,以使用 tcp://localhost:61616
(而不是 vm://localhost
)。这两个示例都接受来自 stdin 的输入并回显到 stdout。查看配置以了解这些消息如何通过 JMS 进行路由。