JMS 支持
Spring Integration 提供了用于接收和发送 JMS 消息的通道适配器。
您需要将此依赖项包含到您的项目中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jms</artifactId>
<version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-jms:6.3.5"
必须通过某些 JMS 供应商特定的实现(例如 Apache ActiveMQ)显式添加jakarta.jms:jakarta.jms-api
。
实际上有两个基于 JMS 的入站通道适配器。第一个使用 Spring 的JmsTemplate
基于轮询周期接收消息。第二个是“消息驱动的”,并依赖于 Spring 的MessageListener
容器。出站通道适配器使用JmsTemplate
按需转换和发送 JMS 消息。
通过使用JmsTemplate
和MessageListener
容器,Spring Integration 依赖于 Spring 的 JMS 支持。这很重要,因为这些适配器上公开的大多数属性都配置了底层的JmsTemplate
和MessageListener
容器。有关JmsTemplate
和MessageListener
容器的更多详细信息,请参见Spring JMS 文档。
虽然 JMS 通道适配器用于单向消息传递(仅发送或仅接收),但 Spring Integration 还为请求和回复操作提供了入站和出站 JMS 网关。入站网关依赖于 Spring 的MessageListener
容器实现之一来进行消息驱动的接收。它还能够将返回值发送到接收到的消息提供的reply-to
目标。出站网关将 JMS 消息发送到request-destination
(或request-destination-name
或request-destination-expression
),然后接收回复消息。您可以显式配置reply-destination
引用(或reply-destination-name
或reply-destination-expression
)。否则,出站网关将使用 JMS TemporaryQueue。
在 Spring Integration 2.2 之前,如有必要,将为每个请求或回复创建一个(并删除)TemporaryQueue
。从 Spring Integration 2.2 开始,您可以配置出站网关以使用MessageListener
容器接收回复,而不是直接使用新的(或缓存的)Consumer
来接收每个请求的回复。在这种情况下,如果未提供显式回复目标,则每个网关将使用单个TemporaryQueue
,而不是为每个请求使用一个。
从 6.0 版本开始,如果replyPubSubDomain
选项设置为true
,则出站网关将创建TemporaryTopic
而不是TemporaryQueue
。一些 JMS 供应商对这些目标的处理方式不同。
入站通道适配器
入站通道适配器需要引用单个JmsTemplate
实例或ConnectionFactory
和Destination
(您可以提供'destinationName'来代替'destination'引用)。以下示例定义了一个带有Destination
引用的入站通道适配器
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow jmsInbound(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(
Jms.inboundAdapter(connectionFactory)
.destination("inQueue"),
e -> e.poller(poller -> poller.fixedRate(30000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
@Bean
fun jmsInbound(connectionFactory: ConnectionFactory) =
integrationFlow(
Jms.inboundAdapter(connectionFactory).destination("inQueue"),
{ poller { Pollers.fixedRate(30000) } })
{
handle { m -> println(m.payload) }
}
@Bean
@InboundChannelAdapter(value = "exampleChannel", poller = @Poller(fixedRate = "30000"))
public MessageSource<Object> jmsIn(ConnectionFactory connectionFactory) {
JmsDestinationPollingSource source = new JmsDestinationPollingSource(new JmsTemplate(connectionFactory));
source.setDestinationName("inQueue");
return source;
}
<int-jms:inbound-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel">
<int:poller fixed-rate="30000"/>
</int-jms:inbound-channel-adapter>
从前面的配置可以看出,inbound-channel-adapter 是一个轮询使用者。这意味着它在被触发时调用receive() 。您应该只在轮询相对不频繁且时间性不重要的情况下使用它。对于所有其他情况(绝大多数基于 JMS 的用例),message-driven-channel-adapter (稍后描述)是更好的选择。 |
默认情况下,所有需要ConnectionFactory 引用的 JMS 适配器都会自动查找名为jmsConnectionFactory 的 bean。这就是为什么在许多示例中您没有看到connection-factory 属性的原因。但是,如果您的 JMS ConnectionFactory 具有不同的 bean 名称,则需要提供该属性。 |
如果extract-payload
设置为true
(默认值),则接收到的 JMS 消息将通过MessageConverter
传递。当依赖于默认的SimpleMessageConverter
时,这意味着生成的 Spring Integration 消息具有 JMS 消息的主体作为其有效负载。JMS TextMessage
产生基于字符串的有效负载,JMS BytesMessage
产生字节数组有效负载,JMS ObjectMessage
的可序列化实例成为 Spring Integration 消息的有效负载。如果您希望将原始 JMS 消息作为 Spring Integration 消息的有效负载,请将extractPayload
选项设置为false
。
从 5.0.8 版本开始,对于org.springframework.jms.connection.CachingConnectionFactory
和cacheConsumers
,receive-timeout
的默认值为-1
(无等待),否则为 1 秒。JMS 入站通道适配器基于提供的ConnectionFactory
和选项创建一个DynamicJmsTemplate
。如果需要外部JmsTemplate
(例如在 Spring Boot 环境中),或者ConnectionFactory
不缓存,或者没有cacheConsumers
,则建议设置jmsTemplate.receiveTimeout(-1)
,如果预期非阻塞消费。
Jms.inboundAdapter(connectionFactory)
.destination(queueName)
.configureJmsTemplate(template -> template.receiveTimeout(-1))
事务
从 4.0 版本开始,入站通道适配器支持session-transacted
属性。在早期版本中,您必须注入一个sessionTransacted
设置为true
的JmsTemplate
。(适配器确实允许您将acknowledge
属性设置为transacted
,但这不正确且不起作用)。
但是,请注意,将session-transacted
设置为true
几乎没有价值,因为事务在receive()
操作之后以及消息发送到channel
之前立即提交。
如果您希望整个流程具有事务性(例如,如果存在下游出站通道适配器),则必须使用带有JmsTransactionManager
的transactional
轮询器。或者,考虑使用jms-message-driven-channel-adapter
并将acknowledge
设置为transacted
(默认值)。
消息驱动的通道适配器
message-driven-channel-adapter
需要引用 Spring MessageListener
容器的实例(AbstractMessageListenerContainer
的任何子类)或ConnectionFactory
和Destination
(可以使用'destinationName'代替'destination'引用)。以下示例定义了一个带有Destination
引用的消息驱动的通道适配器
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow jmsMessageDrivenRedeliveryFlow() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
.destination("inQueue"))
.channel("exampleChannel")
.get();
}
@Bean
fun jmsMessageDrivenFlowWithContainer() =
integrationFlow(
Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
.destination("inQueue")) {
channel("exampleChannel")
}
@Bean
public JmsMessageDrivenEndpoint jmsIn() {
JmsMessageDrivenEndpoint endpoint = new JmsMessageDrivenEndpoint(container(), listener());
return endpoint;
}
@Bean
public AbstractMessageListenerContainer container() {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(cf());
container.setDestinationName("inQueue");
return container;
}
@Bean
public ChannelPublishingJmsMessageListener listener() {
ChannelPublishingJmsMessageListener listener = new ChannelPublishingJmsMessageListener();
listener.setRequestChannelName("exampleChannel");
return listener;
}
<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel"/>
消息驱动的适配器还接受几个与 如果您有自定义侦听器容器实现(通常是 |
您不能使用 Spring JMS 命名空间元素 建议为 |
从 4.2 版本开始,默认的acknowledge 模式为transacted ,除非您提供外部容器。在这种情况下,您应该根据需要配置容器。我们建议将transacted 与DefaultMessageListenerContainer 一起使用以避免消息丢失。 |
“extract-payload”属性具有相同的效果,其默认值为“true”。poller
元素不适用于消息驱动的通道适配器,因为它会被主动调用。对于大多数场景,消息驱动的方法更好,因为消息一旦从底层 JMS 消费者接收,就会立即传递到MessageChannel
。
最后,<message-driven-channel-adapter>
元素还接受'error-channel'属性。这提供了与进入GatewayProxyFactoryBean
中描述的基本相同的功能。以下示例显示了如何在消息驱动的通道适配器上设置错误通道
<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue"
channel="exampleChannel"
error-channel="exampleErrorChannel"/>
当将前面的示例与我们稍后讨论的通用网关配置或 JMS 'inbound-gateway'进行比较时,关键区别在于我们处于单向流中,因为这是一个'channel-adapter',而不是网关。因此,'error-channel'下游的流也应该是单向的。例如,它可以发送到日志处理程序,或者可以连接到不同的 JMS <outbound-channel-adapter>
元素。
当从主题消费时,将pub-sub-domain
属性设置为 true。对于持久订阅,将subscription-durable
设置为true
;对于共享订阅,将subscription-shared
设置为true
(这需要 JMS 2.0 代理,并且从 4.2 版本开始可用)。使用subscription-name
来命名订阅。
从 5.1 版本开始,当应用程序仍在运行时停止端点时,底层侦听器容器将关闭,关闭其共享连接和使用者。以前,连接和使用者保持打开状态。要恢复到之前的行为,请将JmsMessageDrivenEndpoint
上的shutdownContainerOnStop
设置为false
。
从 6.3 版本开始,ChannelPublishingJmsMessageListener
现在可以提供 RetryTemplate
和 RecoveryCallback<Message<?>>
用于下游发送和发送-接收操作的重试。这些选项也已暴露到 Java DSL 的 JmsMessageDrivenChannelAdapterSpec
中。
入站转换错误
从 4.2 版本开始,'error-channel' 也用于转换错误。以前,如果 JMS <message-driven-channel-adapter/>
或 <inbound-gateway/>
由于转换错误而无法传递消息,则会将异常抛回到容器。如果容器配置为使用事务,则消息将回滚并反复重新传递。转换过程发生在消息构建之前和期间,因此此类错误不会发送到“error-channel”。现在,此类转换异常会导致将ErrorMessage
发送到“error-channel”,异常作为其payload
。如果您希望事务回滚,并且已定义“error-channel”,则“error-channel”上的集成流程必须重新抛出异常(或其他异常)。如果错误流程不抛出异常,则提交事务并删除消息。如果没有定义“error-channel”,则与以前一样,将异常抛回到容器。
出站通道适配器
JmsSendingMessageHandler
实现 MessageHandler
接口,能够将 Spring Integration Messages
转换为 JMS 消息,然后发送到 JMS 目的地。它需要 jmsTemplate
引用,或者同时需要 jmsConnectionFactory
和 destination
引用(可以使用 destinationName
代替 destination
)。与入站通道适配器一样,配置此适配器的最简单方法是使用命名空间支持。以下配置生成一个适配器,该适配器从 exampleChannel
接收 Spring Integration 消息,将其转换为 JMS 消息,并将其发送到 bean 名称是 outQueue
的 JMS 目的地引用。
-
Java DSL
-
Kotlin DSL
-
Groovy DSL
-
Java
-
XML
@Bean
public IntegrationFlow jmsOutboundFlow() {
return IntegrationFlow.from("exampleChannel")
.handle(Jms.outboundAdapter(cachingConnectionFactory())
.destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
.configureJmsTemplate(t -> t.id("jmsOutboundFlowTemplate")));
}
@Bean
fun jmsOutboundFlow() =
integrationFlow("exampleChannel") {
handle(Jms.outboundAdapter(jmsConnectionFactory())
.apply {
destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
deliveryModeFunction<Any> { DeliveryMode.NON_PERSISTENT }
timeToLiveExpression("10000")
configureJmsTemplate { it.explicitQosEnabled(true) }
}
)
}
@Bean
jmsOutboundFlow() {
integrationFlow('exampleChannel') {
handle(Jms.outboundAdapter(new ActiveMQConnectionFactory())
.with {
destinationExpression 'headers.' + SimpMessageHeaderAccessor.DESTINATION_HEADER
deliveryModeFunction { DeliveryMode.NON_PERSISTENT }
timeToLiveExpression '10000'
configureJmsTemplate {
it.explicitQosEnabled true
}
}
)
}
}
@Bean
@ServiceActivator(inputChannel = "exampleChannel")
public MessageHandler jmsOut() {
JmsSendingMessageHandler handler = new JmsSendingMessageHandler(new JmsTemplate(connectionFactory));
handler.setDestinationName("outQueue");
return handler;
}
<int-jms:outbound-channel-adapter id="jmsOut" destination="outQueue" channel="exampleChannel"/>
与入站通道适配器一样,还有一个“extract-payload”属性。但是,对于出站适配器,其含义相反。它不是应用于 JMS 消息,而是应用于 Spring Integration 消息负载。换句话说,它决定是将 Spring Integration 消息本身作为 JMS 消息体传递,还是将 Spring Integration 消息负载作为 JMS 消息体传递。默认值为“true”。因此,如果您传递一个负载为String
的 Spring Integration 消息,则会创建一个 JMS TextMessage
。另一方面,如果您想通过 JMS 将实际的 Spring Integration 消息发送到另一个系统,请将其设置为“false”。
无论有效负载提取的布尔值如何,只要您依赖于默认转换器或提供对另一个 MessageConverter 实例的引用,Spring Integration MessageHeaders 就会映射到 JMS 属性。(对于“入站”适配器也是如此,只是在这种情况下,JMS 属性映射到 Spring Integration MessageHeaders )。 |
从 5.1 版本开始,<int-jms:outbound-channel-adapter>
(JmsSendingMessageHandler
)可以使用 deliveryModeExpression
和 timeToLiveExpression
属性来评估 JMS 消息在运行时针对请求 Spring Message
发送的适当 QoS 值。DefaultJmsHeaderMapper
的新 setMapInboundDeliveryMode(true)
和 setMapInboundExpiration(true)
选项可以作为从消息头中获取动态 deliveryMode
和 timeToLive
信息的来源。
<int-jms:outbound-channel-adapter delivery-mode-expression="headers.jms_deliveryMode"
time-to-live-expression="headers.jms_expiration - T(System).currentTimeMillis()"/>
入站网关
Spring Integration 的消息驱动的 JMS 入站网关委托给一个 MessageListener
容器,支持动态调整并发消费者,并且还可以处理回复。入站网关需要对 ConnectionFactory
和请求 Destination
(或“requestDestinationName”)的引用。以下示例定义了一个 JMS inbound-gateway
,它从 bean id 为 inQueue
的 JMS 队列接收消息,并发送到名为 exampleChannel
的 Spring Integration 通道。
<int-jms:inbound-gateway id="jmsInGateway"
request-destination="inQueue"
request-channel="exampleChannel"/>
由于网关提供请求-回复行为而不是单向发送或接收行为,因此它们还具有两个不同的“有效负载提取”属性(如前面讨论的通道适配器的“extract-payload”设置)。对于入站网关,“extract-request-payload”属性决定是否提取接收到的 JMS Message 主体。如果为“false”,则 JMS 消息本身成为 Spring Integration 消息负载。默认为“true”。
类似地,对于入站网关,“extract-reply-payload”属性应用于要转换为回复 JMS 消息的 Spring Integration 消息。如果您想传递整个 Spring Integration 消息(作为 JMS ObjectMessage 的主体),请将此值设置为“false”。默认情况下,它也是“true”,即 Spring Integration 消息负载将转换为 JMS 消息(例如,String
负载将成为 JMS TextMessage)。
与任何其他事情一样,网关调用可能会导致错误。默认情况下,生产者不会收到可能在消费者端发生的错误的通知,并且会超时等待回复。但是,有时您可能希望将错误条件传回给消费者(换句话说,您可能希望通过将其映射到消息来将异常视为有效的回复)。为此,JMS 入站网关支持一个消息通道,可以将错误发送到该通道进行处理,这可能会导致符合某些合同的回复消息负载,该合同定义调用者可能期望的“错误”回复。您可以使用 error-channel 属性来配置此类通道,如下例所示。
<int-jms:inbound-gateway request-destination="requestQueue"
request-channel="jmsInputChannel"
error-channel="errorTransformationChannel"/>
<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
您可能会注意到,此示例与进入GatewayProxyFactoryBean
中包含的示例非常相似。这里也适用同样的想法:exceptionTransformer
可以是一个创建错误响应对象的 POJO,您可以引用 nullChannel
来抑制错误,或者您可以省略“error-channel”以让异常传播。
参见入站转换错误。
从主题消费时,请将pub-sub-domain
属性设置为 true。对于持久订阅,请将subscription-durable
设置为true
;对于共享订阅,请设置为subscription-shared
(需要 JMS 2.0 代理,从 4.2 版本开始可用)。使用subscription-name
来命名订阅。
从 4.2 版本开始,默认的 acknowledge 模式为 transacted ,除非提供了外部容器。在这种情况下,您应该根据需要配置容器。我们建议您将 transacted 与 DefaultMessageListenerContainer 一起使用,以避免消息丢失。 |
从 5.1 版本开始,当应用程序仍在运行时停止端点时,底层侦听器容器将关闭,关闭其共享连接和消费者。以前,连接和消费者保持打开状态。要恢复到之前的行为,请将 JmsInboundGateway
上的 shutdownContainerOnStop
设置为 false
。
默认情况下,JmsInboundGateway
在接收到的消息中查找 jakarta.jms.Message.getJMSReplyTo()
属性以确定将回复发送到哪里。否则,可以使用静态 defaultReplyDestination
或 defaultReplyQueueName
或 defaultReplyTopicName
进行配置。此外,从 6.1 版本开始,可以在提供的 ChannelPublishingJmsMessageListener
上配置 replyToExpression
以动态确定回复目标,如果请求上的标准 JMSReplyTo
属性为 null
。接收到的 jakarta.jms.Message
用作根评估上下文对象。以下示例演示如何使用 Java DSL API 配置具有从请求消息解析的自定义回复目标的入站 JMS 网关。
@Bean
public IntegrationFlow jmsInboundGatewayFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(
Jms.inboundGateway(connectionFactory)
.requestDestination("requestDestination")
.replyToFunction(message -> message.getStringProperty("myReplyTo")))
.<String, String>transform(String::toUpperCase)
.get();
}
从 6.3 版本开始,Jms.inboundGateway()
API 公开了 retryTemplate()
和 recoveryCallback()
选项,用于重试内部发送和接收操作。
出站网关
出站网关从 Spring Integration 消息创建 JMS 消息,并将它们发送到 request-destination
。然后,它通过使用选择器从您配置的 reply-destination
接收 JMS 回复消息,或者如果没有提供 reply-destination
,则通过创建 JMS TemporaryQueue
(如果 replyPubSubDomain= true
,则为 TemporaryTopic
)实例来处理它。
将 如果您指定了回复目标,建议您不要使用缓存的消费者。或者,可以考虑使用 |
以下示例显示如何配置出站网关。
<int-jms:outbound-gateway id="jmsOutGateway"
request-destination="outQueue"
request-channel="outboundJmsRequests"
reply-channel="jmsReplies"/>
“outbound-gateway”有效负载提取属性与“inbound-gateway”的属性相反(参见前面的讨论)。这意味着“extract-request-payload”属性值应用于要转换为作为请求发送的 JMS 消息的 Spring Integration 消息。“extract-reply-payload”属性值应用于作为回复接收到的 JMS 消息,然后将其转换为 Spring Integration 消息,随后发送到“reply-channel”,如前面的配置示例所示。
使用<reply-listener/>
Spring Integration 2.2 引入了一种处理回复的替代技术。如果您向网关添加 <reply-listener/>
子元素而不是为每个回复创建消费者,则将使用 MessageListener
容器来接收回复并将它们交给请求线程。这提供了许多性能优势,并减轻了前面注意事项中描述的缓存消费者内存利用率问题。
当将 <reply-listener/>
与没有 reply-destination
的出站网关一起使用时,将使用单个 TemporaryQueue
,而不是为每个请求创建一个 TemporaryQueue
。(如果与代理的连接丢失并恢复,网关会根据需要创建额外的 TemporaryQueue
)。如果 replyPubSubDomain
设置为 true
,则从 6.0 版本开始,将创建一个 TemporaryTopic
。
使用 correlation-key
时,多个网关可以共享相同的回复目标,因为侦听器容器使用对每个网关唯一的选择器。
如果您指定了回复侦听器并指定了回复目标(或回复目标名称),但没有提供相关键,则网关会记录警告并回退到 2.2 版本之前的行为。这是因为在这种情况下无法配置选择器。因此,无法避免回复发送到可能配置了相同回复目标的不同网关。 请注意,在这种情况下,每个请求都会使用一个新的消费者,并且消费者可能会像上面注意事项中描述的那样在内存中累积;因此,在这种情况下不应使用缓存的消费者。 |
以下示例显示具有默认属性的回复侦听器。
<int-jms:outbound-gateway id="jmsOutGateway"
request-destination="outQueue"
request-channel="outboundJmsRequests"
reply-channel="jmsReplies">
<int-jms:reply-listener />
</int-jms-outbound-gateway>
监听器非常轻量级,我们预计在大多数情况下,您只需要一个消费者。但是,您可以添加诸如concurrent-consumers
、max-concurrent-consumers
等属性。有关支持属性的完整列表以及它们的含义,请参阅架构和Spring JMS文档。
空闲回复监听器
从 4.2 版本开始,您可以根据需要启动回复监听器(并在空闲一段时间后停止它),而不是在网关的生命周期内运行。如果您在应用程序上下文中有很多网关,而它们大多处于空闲状态,这将非常有用。一种情况是上下文包含许多(非活动)分区Spring Batch作业,这些作业使用 Spring Integration 和 JMS 进行分区分配。如果所有回复监听器都处于活动状态,则 JMS 代理对每个网关都有一个活动消费者。通过启用空闲超时,每个消费者仅在相应的批处理作业运行期间(以及完成后的短时间内)存在。
请参见属性参考中的idle-reply-listener-timeout
。
网关回复关联
本节描述用于回复关联(确保起始网关仅接收其请求的回复)的机制,具体取决于网关的配置方式。有关此处讨论的属性的完整说明,请参见属性参考。
以下列表描述了各种场景(数字用于标识——顺序无关紧要)
-
没有
reply-destination*
属性且没有<reply-listener>
为每个请求创建一个
TemporaryQueue
,并在请求完成(成功或失败)时将其删除。correlation-key
无关紧要。 -
提供
reply-destination*
属性,但未提供<reply-listener/>
或correlation-key
等于传出消息的
JMSCorrelationID
用作消费者的消息选择器messageSelector = "JMSCorrelationID = '" + messageId + "'"
响应系统应在回复
JMSCorrelationID
中返回传入的JMSMessageID
。这是一个常见的模式,由 Spring Integration 入站网关以及 Spring 的用于消息驱动 POJO 的MessageListenerAdapter
实现。使用此配置时,不应使用主题进行回复。回复可能会丢失。 -
提供
reply-destination*
属性,未提供<reply-listener/>
,并且correlation-key="JMSCorrelationID"
网关生成一个唯一的关联 IS 并将其插入
JMSCorrelationID
标头。消息选择器是messageSelector = "JMSCorrelationID = '" + uniqueId + "'"
响应系统应在回复
JMSCorrelationID
中返回传入的JMSCorrelationID
。这是一个常见的模式,由 Spring Integration 入站网关以及 Spring 的用于消息驱动 POJO 的MessageListenerAdapter
实现。 -
提供
reply-destination*
属性,未提供<reply-listener/>
,并且correlation-key="myCorrelationHeader"
网关生成一个唯一的关联 ID 并将其插入
myCorrelationHeader
消息属性中。correlation-key
可以是任何用户定义的值。消息选择器是messageSelector = "myCorrelationHeader = '" + uniqueId + "'"
响应系统应在回复
myCorrelationHeader
中返回传入的myCorrelationHeader
。 -
提供
reply-destination*
属性,未提供<reply-listener/>
,并且correlation-key="JMSCorrelationID*"
(注意关联键中的*
)。网关使用请求消息中
jms_correlationId
标头(如果存在)中的值,并将其插入JMSCorrelationID
标头。消息选择器是messageSelector = "JMSCorrelationID = '" + headers['jms_correlationId'] + "'"
用户必须确保此值唯一。
如果标头不存在,则网关的行为与
3
相同。响应系统应在回复
JMSCorrelationID
中返回传入的JMSCorrelationID
。这是一个常见的模式,由 Spring Integration 入站网关以及 Spring 的用于消息驱动 POJO 的MessageListenerAdapter
实现。 -
未提供
reply-destination*
属性,并提供了<reply-listener>
创建一个临时队列并将其用于来自此网关实例的所有回复。消息中不需要关联数据,但传出的
JMSMessageID
在网关内部用于将回复定向到正确的请求线程。 -
提供
reply-destination*
属性,提供<reply-listener>
,并且未提供correlation-key
不允许。
<reply-listener/>
配置将被忽略,网关的行为与2
相同。将写入警告日志消息以指示这种情况。 -
提供
reply-destination*
属性,提供<reply-listener>
,并且correlation-key="JMSCorrelationID"
网关具有唯一的关联 ID 并将其与
JMSCorrelationID
标头中的递增值一起插入(gatewayId + "_" + ++seq
)。消息选择器是messageSelector = "JMSCorrelationID LIKE '" + gatewayId%'"
响应系统应在回复
JMSCorrelationID
中返回传入的JMSCorrelationID
。这是一个常见的模式,由 Spring Integration 入站网关以及 Spring 的用于消息驱动 POJO 的MessageListenerAdapter
实现。由于每个网关都有一个唯一的 ID,因此每个实例只获取自己的回复。完整的关联数据用于将回复路由到正确的请求线程。 -
提供
reply-destination*
属性,提供<reply-listener/>
,并且correlation-key="myCorrelationHeader"
网关具有唯一的关联 ID 并将其与
myCorrelationHeader
属性中的递增值一起插入(gatewayId + "_" + ++seq
)。correlation-key
可以是任何用户定义的值。消息选择器是messageSelector = "myCorrelationHeader LIKE '" + gatewayId%'"
响应系统应在回复
myCorrelationHeader
中返回传入的myCorrelationHeader
。由于每个网关都有一个唯一的 ID,因此每个实例只获取自己的回复。完整的关联数据用于将回复路由到正确的请求线程。 -
提供
reply-destination*
属性,提供<reply-listener/>
,并且correlation-key="JMSCorrelationID*"
(注意关联键中的
*
)不允许。
回复监听器不允许使用用户提供的关联 ID。网关不会使用此配置进行初始化。
异步网关
从 4.3 版本开始,您现在可以在配置出站网关时指定async="true"
(或在 Java 中使用setAsync(true)
)。
默认情况下,当请求发送到网关时,请求线程将暂停,直到收到回复。然后,流程将在此线程上继续。如果async
为true
,则在send()
完成之后立即释放请求线程,并在监听器容器线程上返回回复(并继续流程)。当在轮询线程上调用网关时,这将非常有用。线程被释放,并且可用于框架中的其他任务。
async
需要一个<reply-listener/>
(或在使用 Java 配置时使用setUseReplyContainer(true)
)。它还需要指定一个correlationKey
(通常为JMSCorrelationID
)。如果这两个条件中的任何一个不满足,则忽略async
。
属性参考
以下列表显示了outbound-gateway
的所有可用属性
<int-jms:outbound-gateway
connection-factory="connectionFactory" (1)
correlation-key="" (2)
delivery-persistent="" (3)
destination-resolver="" (4)
explicit-qos-enabled="" (5)
extract-reply-payload="true" (6)
extract-request-payload="true" (7)
header-mapper="" (8)
message-converter="" (9)
priority="" (10)
receive-timeout="" (11)
reply-channel="" (12)
reply-destination="" (13)
reply-destination-expression="" (14)
reply-destination-name="" (15)
reply-pub-sub-domain="" (16)
reply-timeout="" (17)
request-channel="" (18)
request-destination="" (19)
request-destination-expression="" (20)
request-destination-name="" (21)
request-pub-sub-domain="" (22)
time-to-live="" (23)
requires-reply="" (24)
idle-reply-listener-timeout="" (25)
async=""> (26)
<int-jms:reply-listener /> (27)
</int-jms:outbound-gateway>
1 | 对jakarta.jms.ConnectionFactory 的引用。默认的jmsConnectionFactory 。 |
||
2 | 包含关联数据以将响应与回复关联的属性的名称。如果省略,则网关期望响应系统在JMSCorrelationID 标头中返回出站JMSMessageID 标头的值。如果指定,则网关将生成一个关联 ID 并使用它填充指定的属性。响应系统必须在同一属性中回显该值。它可以设置为JMSCorrelationID ,在这种情况下,将使用标准标头而不是String 属性来保存关联数据。当使用<reply-container/> 时,如果提供显式reply-destination ,则必须指定correlation-key 。从 4.0.1 版本开始,此属性还支持值JMSCorrelationID* ,这意味着如果出站消息已经具有JMSCorrelationID (从jms_correlationId 映射)标头,则使用它而不是生成一个新的。请注意,当使用<reply-container/> 时,不允许使用JMSCorrelationID* 键,因为容器需要在初始化期间设置消息选择器。
|
||
3 | 一个布尔值,指示传递模式应该是DeliveryMode.PERSISTENT (true )还是DeliveryMode.NON_PERSISTENT (false )。只有当explicit-qos-enabled 为true 时,此设置才会生效。 |
||
4 | 一个DestinationResolver 。默认值为DynamicDestinationResolver ,它将目标名称映射到具有该名称的队列或主题。 |
||
5 | 设置为true 时,它将启用服务质量属性的使用:priority 、delivery-mode 和time-to-live 。 |
||
6 | 设置为true (默认值)时,Spring Integration 回复消息的有效负载将从 JMS 回复消息的主体创建(使用MessageConverter )。设置为false 时,整个 JMS 消息将成为 Spring Integration 消息的有效负载。 |
||
7 | 设置为true (默认值)时,Spring Integration 消息的有效负载将转换为JMSMessage (使用MessageConverter )。设置为false 时,整个 Spring Integration 消息将转换为JMSMessage 。在这两种情况下,Spring Integration 消息标头都将使用HeaderMapper 映射到 JMS 标头和属性。 |
||
8 | 用于将 Spring Integration 消息标头映射到 JMS 消息标头和属性以及从其映射的HeaderMapper 。 |
||
9 | 对MessageConverter 的引用,用于在 JMS 消息和 Spring Integration 消息有效负载(或消息,如果extract-request-payload 为false )之间进行转换。默认值为SimpleMessageConverter 。 |
||
10 | 请求消息的默认优先级。如果存在,则会被消息优先级标头覆盖。其范围为0 到9 。只有当explicit-qos-enabled 为true 时,此设置才会生效。 |
||
11 | 等待回复的时间(以毫秒为单位)。默认为5000 (五秒)。 |
||
12 | 发送回复消息的通道。 | ||
13 | 对Destination 的引用,它设置为JMSReplyTo 标头。最多只允许reply-destination 、reply-destination-expression 或reply-destination-name 中的一个。如果没有提供,则使用TemporaryQueue 来回复此网关。 |
||
14 | 计算结果为Destination 的 SpEL 表达式,它将设置为JMSReplyTo 标头。表达式可以是Destination 对象或String 。它由DestinationResolver 用于解析实际的Destination 。最多只允许reply-destination 、reply-destination-expression 或reply-destination-name 中的一个。如果没有提供,则使用TemporaryQueue 来回复此网关。 |
||
15 | 设置为 JMSReplyTo 标头的目标的名称。它由DestinationResolver 用于解析实际的Destination 。最多只允许reply-destination 、reply-destination-expression 或reply-destination-name 中的一个。如果没有提供,则使用TemporaryQueue 来回复此网关。 |
||
16 | 设置为true 时,表示DestinationResolver 解析的任何回复Destination 都应该是Topic 而不是Queue 。 |
||
17 | 网关在将回复消息发送到reply-channel 时等待的时间。这只有在reply-channel 可以阻塞时才有效——例如,具有当前已满的容量限制的QueueChannel 。默认值为无限大。 |
||
18 | 此网关接收请求消息的通道。 | ||
19 | 对发送请求消息的Destination 的引用。reply-destination 、reply-destination-expression 或 reply-destination-name 中的一个是必需的。只能使用这三个属性中的一个。 |
||
20 | 一个 SpEL 表达式,计算结果为发送请求消息的Destination 。该表达式可以返回一个Destination 对象或一个String 。它由DestinationResolver 用来解析实际的Destination 。reply-destination 、reply-destination-expression 或 reply-destination-name 中的一个是必需的。只能使用这三个属性中的一个。 |
||
21 | 发送请求消息的目标的名称。它由DestinationResolver 用来解析实际的Destination 。reply-destination 、reply-destination-expression 或 reply-destination-name 中的一个是必需的。只能使用这三个属性中的一个。 |
||
22 | 设置为true 时,表示DestinationResolver 解析的任何请求Destination 都应为Topic ,而不是Queue 。 |
||
23 | 指定消息生存时间。此设置仅当explicit-qos-enabled 为true 时才有效。 |
||
24 | 指定此出站网关是否必须返回非空值。默认情况下,此值为true ,当底层服务在receive-timeout 后未返回值时,将抛出MessageTimeoutException 异常。请注意,如果服务预期永远不会返回回复,最好使用<int-jms:outbound-channel-adapter/> 而不是带有requires-reply="false" 的<int-jms:outbound-gateway/> 。使用后者,发送线程将被阻塞,等待回复receive-timeout 时间段。 |
||
25 | 当使用<reply-listener /> 时,默认情况下,其生命周期(启动和停止)与网关的生命周期匹配。当此值大于0 时,容器按需启动(发送请求时)。容器将继续运行,直到至少经过此时间段未收到任何请求(并且没有未完成的回复)。在下一个请求时,容器将再次启动。停止时间是最小值,实际上可能长达此值的 1.5 倍。 |
||
26 | 参见 异步网关。 | ||
27 | 包含此元素时,回复将由异步MessageListenerContainer 接收,而不是为每个回复创建消费者。在许多情况下,这效率更高。 |
消息头与 JMS 消息之间的映射
JMS 消息可以包含元信息,例如 JMS API 头和简单属性。可以使用JmsHeaderMapper
将这些元信息映射到 Spring Integration 消息头,并从 Spring Integration 消息头映射到这些元信息。JMS API 头传递给相应的 setter 方法(例如setJMSReplyTo
),而其他头则复制到 JMS 消息的常规属性中。JMS 出站网关使用JmsHeaderMapper
的默认实现进行引导,该实现将映射标准 JMS API 头以及原始或String
消息头。您还可以使用入站和出站网关的header-mapper
属性提供自定义头映射器。
许多 JMS 供应商特定的客户端不允许直接在已创建的 JMS 消息上设置deliveryMode 、priority 和timeToLive 属性。它们被认为是 QoS 属性,因此必须传播到目标MessageProducer.send(message, deliveryMode, priority, timeToLive) API。因此,DefaultJmsHeaderMapper 不会将相应的 Spring Integration 头(或表达式的结果)映射到提到的 JMS 消息属性。相反,JmsSendingMessageHandler 使用DynamicJmsTemplate 将请求消息中的头值传播到MessageProducer.send() API。要启用此功能,必须使用其explicitQosEnabled 属性设置为 true 的DynamicJmsTemplate 配置出站端点。Spring Integration Java DSL 默认配置DynamicJmsTemplate ,但仍然必须设置explicitQosEnabled 属性。 |
从 4.0 版开始,JMSPriority 头映射到入站消息的标准priority 头。以前,priority 头仅用于出站消息。要恢复到之前的行为(即,不映射入站优先级),请将DefaultJmsHeaderMapper 的mapInboundPriority 属性设置为false 。 |
从 4.3 版开始,DefaultJmsHeaderMapper 通过调用其toString() 方法将标准correlationId 头映射为消息属性(correlationId 通常是UUID ,JMS 不支持)。在入站方面,它被映射为String 。这与jms_correlationId 头无关,后者映射到JMSCorrelationID 头并从中映射。JMSCorrelationID 通常用于关联请求和回复,而correlationId 通常用于将相关消息组合到一个组中(例如使用聚合器或重新排序器)。 |
从 5.1 版开始,可以配置DefaultJmsHeaderMapper
来映射入站JMSDeliveryMode
和JMSExpiration
属性。
@Bean
public DefaultJmsHeaderMapper jmsHeaderMapper() {
DefaultJmsHeaderMapper mapper = new DefaultJmsHeaderMapper();
mapper.setMapInboundDeliveryMode(true)
mapper.setMapInboundExpiration(true)
return mapper;
}
这些 JMS 属性分别映射到JmsHeaders.DELIVERY_MODE
和JmsHeaders.EXPIRATION
Spring 消息头。
消息转换、编组和解组
如果需要转换消息,所有 JMS 适配器和网关都允许通过设置message-converter
属性来提供MessageConverter
。为此,请提供在同一个 ApplicationContext 中可用的MessageConverter
实例的 bean 名称。此外,为了与编组器和解组器接口保持一致,Spring 提供了MarshallingMessageConverter
,您可以使用自己的自定义编组器和解组器对其进行配置。以下示例显示了如何操作
<int-jms:inbound-gateway request-destination="requestQueue"
request-channel="inbound-gateway-channel"
message-converter="marshallingMessageConverter"/>
<bean id="marshallingMessageConverter"
class="org.springframework.jms.support.converter.MarshallingMessageConverter">
<constructor-arg>
<bean class="org.bar.SampleMarshaller"/>
</constructor-arg>
<constructor-arg>
<bean class="org.bar.SampleUnmarshaller"/>
</constructor-arg>
</bean>
当提供自己的MessageConverter 实例时,它仍然包装在HeaderMappingMessageConverter 中。这意味着“extract-request-payload”和“extract-reply-payload”属性会影响传递给转换器的实际对象。HeaderMappingMessageConverter 本身委托给目标MessageConverter ,同时还将 Spring Integration MessageHeaders 映射到 JMS 消息属性,反之亦然。 |
基于 JMS 的消息通道
前面介绍的通道适配器和网关都适用于与其他外部系统集成的应用程序。入站选项假设其他一些系统正在将 JMS 消息发送到 JMS 目标,而出站选项假设其他一些系统正在从目标接收消息。另一个系统可能是也可能不是 Spring Integration 应用程序。当然,当将 Spring Integration 消息实例作为 JMS 消息本身的主体发送时('extract-payload' 值设置为false
),假设另一个系统基于 Spring Integration。但是,这绝不是必需的。这种灵活性是使用基于消息的集成选项以及“通道”(在 JMS 的情况下为目标)抽象的优点之一。
有时,给定 JMS 目标的生产者和消费者都打算成为同一个应用程序的一部分,在同一个进程中运行。您可以通过使用一对入站和出站通道适配器来实现这一点。这种方法的问题在于,即使从概念上讲,目标是拥有单个消息通道,您也需要两个适配器。从 Spring Integration 2.0 版开始,支持更好的选项。现在,在使用 JMS 命名空间时,可以定义单个“通道”,如下例所示
<int-jms:channel id="jmsChannel" queue="exampleQueue"/>
前面的示例中的通道的行为与来自主 Spring Integration 命名空间的普通<channel/>
元素非常相似。任何端点的input-channel
和output-channel
属性都可以引用它。不同之处在于,此通道由名为exampleQueue
的 JMS 队列实例支持。这意味着生产者和消费者端点之间可以进行异步消息传递。但是,与通过在非 JMS <channel/>
元素中添加<queue/>
元素创建的更简单的异步消息通道不同,消息不会存储在内存队列中。相反,这些消息在 JMS 消息体中传递,然后底层 JMS 提供程序的全部功能可用于该通道。使用此替代方案最常见的原因可能是利用 JMS 消息的存储转发方法提供的持久性。
如果配置正确,基于 JMS 的消息通道也支持事务。换句话说,如果生产者的发送操作是回滚事务的一部分,则生产者实际上不会写入事务性基于 JMS 的通道。同样,如果接收该消息是回滚事务的一部分,则消费者不会物理地从通道中删除 JMS 消息。请注意,在这种情况下,生产者和消费者事务是分开的。这与跨没有<queue/>
子元素的简单同步<channel/>
元素传播事务上下文的情况大不相同。
由于上面的示例引用了 JMS 队列实例,因此它充当点对点通道。另一方面,如果您需要发布-订阅行为,则可以使用单独的元素并引用 JMS 主题。以下示例显示了如何操作
<int-jms:publish-subscribe-channel id="jmsChannel" topic="exampleTopic"/>
对于任何类型的基于 JMS 的通道,都可以提供目标的名称而不是引用,如下例所示
<int-jms:channel id="jmsQueueChannel" queue-name="exampleQueueName"/>
<jms:publish-subscribe-channel id="jmsTopicChannel" topic-name="exampleTopicName"/>
在前面的示例中,目标名称由 Spring 的默认DynamicDestinationResolver
实现解析,但您可以提供DestinationResolver
接口的任何实现。此外,JMS ConnectionFactory
是通道的必需属性,但默认情况下,预期的 bean 名称是jmsConnectionFactory
。以下示例同时提供了用于解析 JMS 目标名称的自定义实例和ConnectionFactory
的不同名称
<int-jms:channel id="jmsChannel" queue-name="exampleQueueName"
destination-resolver="customDestinationResolver"
connection-factory="customConnectionFactory"/>
对于<publish-subscribe-channel />
,将durable
属性设置为true
以实现持久订阅,或设置为subscription-shared
以实现共享订阅(需要 JMS 2.0 代理,自 4.2 版本起可用)。使用subscription
来命名订阅。
使用 JMS 消息选择器
使用 JMS 消息选择器,您可以根据 JMS 头部和 JMS 属性过滤JMS 消息。例如,如果您想监听自定义 JMS 头部属性myHeaderProperty
等于something
的消息,您可以指定以下表达式
myHeaderProperty = 'something'
消息选择器表达式是SQL-92条件表达式语法的一个子集,并被定义为Java 消息服务规范的一部分。您可以使用 XML 命名空间配置为以下 Spring Integration JMS 组件指定 JMS 消息selector
属性
-
JMS 通道
-
JMS 发布订阅通道
-
JMS 入站通道适配器
-
JMS 入站网关
-
JMS 消息驱动的通道适配器
您不能使用 JMS 消息选择器引用消息体值。 |
JMS 示例
要试验这些 JMS 适配器,请查看 Spring Integration Samples Git 仓库中提供的 JMS 示例:https://github.com/spring-projects/spring-integration-samples/tree/master/basic/jms。
该仓库包含两个示例。一个提供入站和出站通道适配器,另一个提供入站和出站网关。它们配置为与嵌入式ActiveMQ进程一起运行,但是您可以修改每个示例的common.xml Spring 应用上下文文件以支持不同的 JMS 提供程序或独立的 ActiveMQ 进程。
换句话说,您可以拆分配置,以便入站和出站适配器在单独的 JVM 中运行。如果您已安装 ActiveMQ,请修改common.xml
文件中的brokerURL
属性以使用tcp://127.0.0.1:61616
(而不是vm://127.0.0.1
)。这两个示例都接受来自 stdin 的输入并回显到 stdout。查看配置以了解这些消息如何通过 JMS 路由。