JMS 支持

Spring Integration 提供了用于接收和发送 JMS 消息的通道适配器。

项目需要此依赖项

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jms</artifactId>
    <version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-jms:7.0.0"

必须通过一些 JMS 厂商特定的实现(例如 Apache ActiveMQ)显式添加 jakarta.jms:jakarta.jms-api

实际上,有两种基于 JMS 的入站通道适配器。第一种使用 Spring 的 JmsTemplate 基于轮询周期进行接收。第二种是“消息驱动”的,依赖于 Spring MessageListener 容器。出站通道适配器使用 JmsTemplate 按需转换和发送 JMS 消息。

通过使用 JmsTemplateMessageListener 容器,Spring Integration 依赖于 Spring 的 JMS 支持。理解这一点很重要,因为这些适配器上公开的大多数属性都配置了底层的 JmsTemplateMessageListener 容器。有关 JmsTemplateMessageListener 容器的更多详细信息,请参阅 Spring JMS 文档

虽然 JMS 通道适配器旨在用于单向消息传递(仅发送或仅接收),但 Spring Integration 还提供了入站和出站 JMS 网关,用于请求和回复操作。入站网关依赖于 Spring 的 MessageListener 容器实现之一进行消息驱动接收。它还能够将返回值发送到接收消息提供的 reply-to 目标。出站网关将 JMS 消息发送到 request-destination(或 request-destination-namerequest-destination-expression),然后接收回复消息。您可以显式配置 reply-destination 引用(或 reply-destination-namereply-destination-expression)。否则,出站网关使用 JMS TemporaryQueue

在 Spring Integration 2.2 之前,如果需要,会为每个请求或回复创建(并删除)一个 TemporaryQueue。从 Spring Integration 2.2 开始,您可以配置出站网关使用 MessageListener 容器来接收回复,而不是直接使用新的(或缓存的)Consumer 来接收每个请求的回复。当如此配置且未提供显式回复目标时,每个网关使用单个 TemporaryQueue,而不是每个请求使用一个。

从 6.0 版本开始,如果将 replyPubSubDomain 选项设置为 true,则出站网关会创建 TemporaryTopic 而不是 TemporaryQueue。一些 JMS 供应商对这些目标的处理方式不同。

入站通道适配器

入站通道适配器需要引用单个 JmsTemplate 实例或 ConnectionFactoryDestination(您可以使用“destinationName”代替“destination”引用)。以下示例定义了一个具有 Destination 引用的入站通道适配器

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow jmsInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(
                    Jms.inboundAdapter(connectionFactory)
                       .destination("inQueue"),
                    e -> e.poller(poller -> poller.fixedRate(30000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
}
@Bean
fun jmsInbound(connectionFactory: ConnectionFactory) =
    integrationFlow(
            Jms.inboundAdapter(connectionFactory).destination("inQueue"),
            { poller { Pollers.fixedRate(30000) } })
       {
            handle { m -> println(m.payload) }
       }
@Bean
@InboundChannelAdapter(value = "exampleChannel", poller = @Poller(fixedRate = "30000"))
public MessageSource<Object> jmsIn(ConnectionFactory connectionFactory) {
    JmsDestinationPollingSource source = new JmsDestinationPollingSource(new JmsTemplate(connectionFactory));
    source.setDestinationName("inQueue");
    return source;
}
<int-jms:inbound-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel">
    <int:poller fixed-rate="30000"/>
</int-jms:inbound-channel-adapter>
从上述配置中可以看出,inbound-channel-adapter 是一个轮询消费者。这意味着当它被触发时会调用 receive()。您应该只在轮询不频繁且及时性不重要的情况下使用此功能。对于所有其他情况(绝大多数基于 JMS 的用例),message-driven-channel-adapter稍后描述)是更好的选择。
默认情况下,所有需要引用 ConnectionFactory 的 JMS 适配器都会自动查找名为 jmsConnectionFactory 的 bean。这就是为什么您在许多示例中看不到 connection-factory 属性的原因。但是,如果您的 JMS ConnectionFactory 具有不同的 bean 名称,则需要提供该属性。

如果 extract-payload 设置为 true(默认值),则收到的 JMS 消息将通过 MessageConverter。当依赖默认的 SimpleMessageConverter 时,这意味着生成的 Spring Integration Message 的有效负载是 JMS 消息的正文。JMS TextMessage 生成基于字符串的有效负载,JMS BytesMessage 生成字节数组有效负载,JMS ObjectMessage 的可序列化实例成为 Spring Integration 消息的有效负载。如果您希望将原始 JMS 消息作为 Spring Integration 消息的有效负载,请将 extractPayload 选项设置为 false

从版本 5.0.8 开始,对于 org.springframework.jms.connection.CachingConnectionFactorycacheConsumersreceive-timeout 的默认值为 -1(不等待),否则为 1 秒。JMS 入站通道适配器根据提供的 ConnectionFactory 和选项创建 DynamicJmsTemplate。如果需要外部 JmsTemplate(例如在 Spring Boot 环境中),或者 ConnectionFactory 未缓存,或者没有 cacheConsumers,建议如果期望非阻塞消费,则将 jmsTemplate.receiveTimeout(-1) 设置为 -1

Jms.inboundAdapter(connectionFactory)
        .destination(queueName)
        .configureJmsTemplate(template -> template.receiveTimeout(-1))

事务

从 4.0 版本开始,入站通道适配器支持 session-transacted 属性。在早期版本中,您必须注入一个 JmsTemplate,其中 sessionTransacted 设置为 true。(适配器确实允许您将 acknowledge 属性设置为 transacted,但这不正确且不起作用)。

但是,请注意,将 session-transacted 设置为 true 几乎没有价值,因为事务在 receive() 操作之后立即提交,并且在消息发送到 channel 之前。

如果您希望整个流程都是事务性的(例如,如果存在下游出站通道适配器),则必须使用带有 JmsTransactionManagertransactional 轮询器。或者,考虑使用 jms-message-driven-channel-adapter,并将 acknowledge 设置为 transacted(默认值)。

消息驱动通道适配器

message-driven-channel-adapter 需要引用 Spring MessageListener 容器的实例(AbstractMessageListenerContainer 的任何子类)或 ConnectionFactoryDestination(可以使用“destinationName”代替“destination”引用)。以下示例定义了一个具有 Destination 引用的消息驱动通道适配器

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow jmsMessageDrivenRedeliveryFlow() {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
                     .destination("inQueue"))
            .channel("exampleChannel")
            .get();
}
@Bean
fun jmsMessageDrivenFlowWithContainer() =
        integrationFlow(
                Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
                             .destination("inQueue")) {
            channel("exampleChannel")
        }
@Bean
public JmsMessageDrivenEndpoint jmsIn() {
    JmsMessageDrivenEndpoint endpoint = new JmsMessageDrivenEndpoint(container(), listener());
    return endpoint;
}
@Bean
public AbstractMessageListenerContainer container() {
    DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
    container.setConnectionFactory(cf());
    container.setDestinationName("inQueue");
    return container;
}

@Bean
public ChannelPublishingJmsMessageListener listener() {
    ChannelPublishingJmsMessageListener listener = new ChannelPublishingJmsMessageListener();
    listener.setRequestChannelName("exampleChannel");
    return listener;
}
<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel"/>

消息驱动适配器还接受与 MessageListener 容器相关的几个属性。只有在您未提供 container 引用时才考虑这些值。在这种情况下,将根据这些属性创建并配置 DefaultMessageListenerContainer 实例。例如,您可以指定 transaction-manager 引用、concurrent-consumers 值以及其他几个属性引用和值。有关更多详细信息,请参阅 Javadoc 和 Spring Integration 的 JMS 模式 (spring-integration-jms.xsd)。

如果您有一个自定义监听器容器实现(通常是 DefaultMessageListenerContainer 的子类),您可以通过使用 container 属性提供其实例的引用,或者通过使用 container-class 属性提供其完全限定的类名。在这种情况下,适配器上的属性将传递给您的自定义容器实例。

您不能使用 Spring JMS 命名空间元素 <jms:listener-container/><int-jms:message-driven-channel-adapter> 配置容器引用,因为该元素实际上并未引用容器。每个 <jms:listener/> 子元素都获取自己的 DefaultMessageListenerContainer(共享父 <jms:listener-container/> 元素上定义的属性)。您可以为每个监听器子元素指定一个 id,并使用它注入到通道适配器中,但是,<jms:/> 命名空间需要一个真正的监听器。

建议为 DefaultMessageListenerContainer 配置一个常规的 <bean>,并将其用作通道适配器中的引用。

从 4.2 版本开始,默认的 acknowledge 模式是 transacted,除非您提供外部容器。在这种情况下,您应该根据需要配置容器。我们建议将 transactedDefaultMessageListenerContainer 结合使用,以避免消息丢失。

'extract-payload' 属性具有相同的效果,其默认值为 'true'。poller 元素不适用于消息驱动通道适配器,因为它被主动调用。对于大多数情况,消息驱动方法更好,因为消息一旦从底层 JMS 消费者接收到,就会立即传递到 MessageChannel

最后,<message-driven-channel-adapter> 元素还接受“error-channel”属性。这提供了与 进入 GatewayProxyFactoryBean 中描述的相同基本功能。以下示例显示了如何在消息驱动通道适配器上设置错误通道

<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue"
    channel="exampleChannel"
    error-channel="exampleErrorChannel"/>

将上述示例与通用网关配置或我们稍后讨论的 JMS“入站网关”进行比较时,主要区别在于我们处于单向流中,因为这是一个“通道适配器”,而不是网关。因此,“error-channel”下游的流也应该是单向的。例如,它可以发送到日志处理程序,或者它可以连接到另一个 JMS <outbound-channel-adapter> 元素。

从主题消费时,将 pub-sub-domain 属性设置为 true。将 subscription-durable 设置为 true 以实现持久订阅,或将 subscription-shared 设置为共享订阅(需要 JMS 2.0 代理,并从 4.2 版本开始可用)。使用 subscription-name 来命名订阅。

从版本 5.1 开始,当应用程序仍在运行时停止端点时,底层监听器容器将关闭,从而关闭其共享连接和消费者。以前,连接和消费者保持打开状态。要恢复到以前的行为,请将 JmsMessageDrivenEndpoint 上的 shutdownContainerOnStop 设置为 false

从版本 6.3 开始,ChannelPublishingJmsMessageListener 现在可以提供 RetryTemplateRecoveryCallback<Message<?>>,用于下游发送和发送-接收操作的重试。这些选项也暴露在 Java DSL 的 JmsMessageDrivenChannelAdapterSpec 中。

入站转换错误

从 4.2 版本开始,转换错误也使用“error-channel”。以前,如果 JMS <message-driven-channel-adapter/><inbound-gateway/> 由于转换错误而无法传递消息,则会将异常抛回容器。如果容器配置为使用事务,则消息将回滚并重复重新传递。转换过程在消息构造之前和期间发生,因此此类错误不会发送到“error-channel”。现在,此类转换异常会导致 ErrorMessage 发送到“error-channel”,异常作为 payload。如果您希望事务回滚,并且您已定义“error-channel”,则“error-channel”上的集成流必须重新抛出异常(或其他异常)。如果错误流不抛出异常,则事务将提交并删除消息。如果未定义“error-channel”,则异常将像以前一样抛回容器。

出站通道适配器

JmsSendingMessageHandler 实现了 MessageHandler 接口,能够将 Spring Integration Messages 转换为 JMS 消息,然后发送到 JMS 目标。它需要 jmsTemplate 引用或 jmsConnectionFactorydestination 引用(destinationName 可以代替 destination 提供)。与入站通道适配器一样,配置此适配器最简单的方法是使用命名空间支持。以下配置生成一个适配器,该适配器从 exampleChannel 接收 Spring Integration 消息,将其转换为 JMS 消息,并将其发送到 bean 名称为 outQueue 的 JMS 目标引用

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • Java

  • XML

@Bean
public IntegrationFlow jmsOutboundFlow() {
    return IntegrationFlow.from("exampleChannel")
                .handle(Jms.outboundAdapter(cachingConnectionFactory())
                    .destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
                    .configureJmsTemplate(t -> t.id("jmsOutboundFlowTemplate")))
                .get();
}
@Bean
fun jmsOutboundFlow() =
        integrationFlow("exampleChannel") {
            handle(Jms.outboundAdapter(jmsConnectionFactory())
                    .apply {
                        destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
                        deliveryModeFunction<Any> { DeliveryMode.NON_PERSISTENT }
                        timeToLiveExpression("10000")
                        configureJmsTemplate { it.explicitQosEnabled(true) }
                    }
            )
        }
@Bean
jmsOutboundFlow() {
    integrationFlow('exampleChannel') {
        handle(Jms.outboundAdapter(new ActiveMQConnectionFactory())
                .with {
                    destinationExpression 'headers.' + SimpMessageHeaderAccessor.DESTINATION_HEADER
                    deliveryModeFunction { DeliveryMode.NON_PERSISTENT }
                    timeToLiveExpression '10000'
                    configureJmsTemplate {
                        it.explicitQosEnabled true
                    }
                }
        )
    }
}
@Bean
@ServiceActivator(inputChannel = "exampleChannel")
public MessageHandler jmsOut() {
    JmsSendingMessageHandler handler = new JmsSendingMessageHandler(new JmsTemplate(connectionFactory));
    handler.setDestinationName("outQueue");
    return handler;
}
<int-jms:outbound-channel-adapter id="jmsOut" destination="outQueue" channel="exampleChannel"/>

与入站通道适配器一样,有一个 'extract-payload' 属性。但是,对于出站适配器,其含义是相反的。布尔属性不是应用于 JMS 消息,而是应用于 Spring Integration 消息有效负载。换句话说,决定是将 Spring Integration 消息本身作为 JMS 消息正文传递,还是将 Spring Integration 消息有效负载作为 JMS 消息正文传递。默认值为 'true'。因此,如果您传递一个有效负载为 String 的 Spring Integration 消息,则会创建一个 JMS TextMessage。另一方面,如果您想通过 JMS 将实际的 Spring Integration 消息发送到另一个系统,请将其设置为 'false'。

无论有效负载提取的布尔值如何,只要您依赖默认转换器或提供另一个 MessageConverter 实例的引用,Spring Integration MessageHeaders 都会映射到 JMS 属性。(入站适配器也是如此,只不过在那些情况下,JMS 属性映射到 Spring Integration MessageHeaders)。

从版本 5.1 开始,<int-jms:outbound-channel-adapter> (JmsSendingMessageHandler) 可以配置 deliveryModeExpressiontimeToLiveExpression 属性,以根据请求 Spring Message 在运行时评估 JMS 消息的适当 QoS 值。DefaultJmsHeaderMapper 的新 setMapInboundDeliveryMode(true)setMapInboundExpiration(true) 选项可能有助于作为消息头中动态 deliveryModetimeToLive 的信息源

<int-jms:outbound-channel-adapter delivery-mode-expression="headers.jms_deliveryMode"
                        time-to-live-expression="headers.jms_expiration - T(System).currentTimeMillis()"/>

事务

从 4.0 版本开始,出站通道适配器支持 session-transacted 属性。在早期版本中,您必须注入一个 JmsTemplate,其中 sessionTransacted 设置为 true。该属性现在在内置的默认 JmsTemplate 上设置该属性。如果存在事务(可能来自上游的 message-driven-channel-adapter),则发送操作将在同一事务中执行。否则,将启动新事务。

入站网关

Spring Integration 的消息驱动 JMS 入站网关委托给 MessageListener 容器,支持动态调整并发消费者,并且还可以处理回复。入站网关需要引用 ConnectionFactory 和请求 Destination(或“requestDestinationName”)。以下示例定义了一个 JMS inbound-gateway,它从 bean ID 为 inQueue 引用的 JMS 队列接收,并发送到名为 exampleChannel 的 Spring Integration 通道

<int-jms:inbound-gateway id="jmsInGateway"
    request-destination="inQueue"
    request-channel="exampleChannel"/>

由于网关提供请求-回复行为而不是单向发送或接收行为,因此它们还具有两个不同的“有效负载提取”属性(如 前面讨论 的通道适配器的“extract-payload”设置)。对于入站网关,“extract-request-payload”属性确定是否提取收到的 JMS 消息正文。如果为“false”,则 JMS 消息本身成为 Spring Integration 消息有效负载。默认值为“true”。

同样,对于入站网关,`extract-reply-payload` 属性应用于要转换为回复 JMS 消息的 Spring Integration 消息。如果您想传递整个 Spring Integration 消息(作为 JMS ObjectMessage 的正文),请将此值设置为“false”。默认情况下,Spring Integration 消息负载也会转换为 JMS 消息(例如,`String` 负载会转换为 JMS TextMessage)。

与任何其他情况一样,网关调用可能会导致错误。默认情况下,生产者不会收到消费者端可能发生的错误的通知,并且在等待回复时会超时。但是,有时您可能希望将错误条件传回消费者(换句话说,您可能希望通过将其映射到消息来将异常视为有效回复)。为了实现这一点,JMS 入站网关支持一个消息通道,错误可以发送到该通道进行处理,从而可能导致回复消息有效负载符合定义调用者可能期望的“错误”回复的某些约定。您可以使用 error-channel 属性来配置此类通道,如以下示例所示

<int-jms:inbound-gateway request-destination="requestQueue"
          request-channel="jmsInputChannel"
          error-channel="errorTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
        ref="exceptionTransformer" method="createErrorResponse"/>

您可能会注意到,此示例与 进入 GatewayProxyFactoryBean 中包含的示例非常相似。这里适用相同的想法:exceptionTransformer 可以是一个创建错误响应对象的 POJO,您可以引用 nullChannel 来抑制错误,或者您可以省略 'error-channel' 以让异常传播。

请参阅 入站转换错误

从主题消费时,将 pub-sub-domain 属性设置为 true。将 subscription-durable 设置为 true 以实现持久订阅,或将 subscription-shared 设置为共享订阅(需要 JMS 2.0 代理,并从 4.2 版本开始可用)。使用 subscription-name 来命名订阅。

从 4.2 版本开始,默认的 acknowledge 模式是 transacted,除非提供了外部容器。在这种情况下,您应该根据需要配置容器。我们建议您将 transactedDefaultMessageListenerContainer 结合使用,以避免消息丢失。

从版本 5.1 开始,当应用程序仍在运行时停止端点时,底层监听器容器将关闭,从而关闭其共享连接和消费者。以前,连接和消费者保持打开状态。要恢复到以前的行为,请将 JmsInboundGateway 上的 shutdownContainerOnStop 设置为 false

默认情况下,JmsInboundGateway 会在收到的消息中查找 jakarta.jms.Message.getJMSReplyTo() 属性,以确定回复的发送位置。否则,它可以通过静态的 defaultReplyDestinationdefaultReplyQueueNamedefaultReplyTopicName 进行配置。此外,从 6.1 版本开始,可以在提供的 ChannelPublishingJmsMessageListener 上配置 replyToExpression,以便在请求中标准 JMSReplyTo 属性为 null 时动态确定回复目标。收到的 jakarta.jms.Message 用作根评估上下文对象。以下示例演示了如何使用 Java DSL API 配置一个 JMS 入站网关,该网关的自定义回复目标从请求消息中解析

@Bean
public IntegrationFlow jmsInboundGatewayFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(
                    Jms.inboundGateway(connectionFactory)
                            .requestDestination("requestDestination")
                            .replyToFunction(message -> message.getStringProperty("myReplyTo")))
            .<String, String>transform(String::toUpperCase)
            .get();
}

从版本 6.3 开始,Jms.inboundGateway() API 暴露了 retryTemplate()recoveryCallback() 选项,用于重试内部发送和接收操作。

出站网关

出站网关从 Spring Integration 消息创建 JMS 消息,并将其发送到 request-destination。然后,它通过使用选择器从您配置的 reply-destination 接收,或者在未提供 reply-destination 的情况下,通过创建 JMS TemporaryQueue(如果 replyPubSubDomain=true,则创建 TemporaryTopic)实例来处理 JMS 回复消息。

reply-destination(或 reply-destination-name)与将 cacheConsumers 设置为 trueCachingConnectionFactory 结合使用可能会导致内存不足。这是因为每个请求都会获得一个带有新选择器的新消费者(在 correlation-key 值上选择,或者在没有 correlation-key 的情况下,在发送的 JMSMessageID 上选择)。鉴于这些选择器是唯一的,它们在当前请求完成后仍然保留在缓存中(未使用)。

如果您指定了回复目标,建议不要使用缓存的消费者。或者,考虑使用 如下所述<reply-listener/>

以下示例展示了如何配置出站网关

<int-jms:outbound-gateway id="jmsOutGateway"
    request-destination="outQueue"
    request-channel="outboundJmsRequests"
    reply-channel="jmsReplies"/>

“出站网关”的有效负载提取属性与“入站网关”的属性(参见之前的讨论)相反。这意味着“extract-request-payload”属性值适用于转换为 JMS 消息以作为请求发送的 Spring Integration 消息。而“extract-reply-payload”属性值适用于作为回复接收的 JMS 消息,然后转换为 Spring Integration 消息,以便随后发送到“reply-channel”,如上述配置示例所示。

使用 <reply-listener/>

Spring Integration 2.2 引入了一种处理回复的替代技术。如果您向网关添加 <reply-listener/> 子元素,而不是为每个回复创建消费者,则会使用 MessageListener 容器来接收回复并将其交给请求线程。这提供了许多性能优势,并缓解了 前面提到的警告 中描述的缓存消费者内存利用率问题。

当使用带有出站网关的 <reply-listener/> 且没有 reply-destination 时,将使用单个 TemporaryQueue,而不是为每个请求创建一个 TemporaryQueue。(如果与代理的连接丢失并恢复,网关会根据需要创建额外的 TemporaryQueue)。如果 replyPubSubDomain 设置为 true,从 6.0 版本开始,将创建 TemporaryTopic

当使用 correlation-key 时,多个网关可以共享相同的回复目标,因为监听器容器使用每个网关唯一的选择器。

如果您指定了回复监听器并指定了回复目标(或回复目标名称),但未提供关联键,则网关会记录警告并回退到 2.2 版本之前的行为。这是因为在这种情况下无法配置选择器。因此,无法避免回复转到可能配置了相同回复目标的不同网关。

请注意,在这种情况下,每个请求都使用一个新的消费者,并且消费者可能会在内存中积累,如上述注意事项所述;因此,在这种情况下不应使用缓存消费者。

以下示例显示了具有默认属性的回复监听器

<int-jms:outbound-gateway id="jmsOutGateway"
        request-destination="outQueue"
        request-channel="outboundJmsRequests"
        reply-channel="jmsReplies">
    <int-jms:reply-listener />
</int-jms-outbound-gateway>

监听器非常轻量级,我们预计在大多数情况下您只需要一个消费者。但是,您可以添加 concurrent-consumersmax-concurrent-consumers 等属性。有关支持属性的完整列表及其含义,请参阅模式和 Spring JMS 文档

空闲回复监听器

从 4.2 版本开始,您可以根据需要启动回复监听器(并在空闲时间后停止它),而不是在网关的整个生命周期内运行。如果您在应用程序上下文中有很多网关(其中大部分处于空闲状态),这可能很有用。其中一种情况是,上下文中有许多(不活跃的)分区 Spring Batch 作业使用 Spring Integration 和 JMS 进行分区分发。如果所有回复监听器都处于活动状态,则 JMS 代理会为每个网关都有一个活动的消费者。通过启用空闲超时,每个消费者仅在相应的批处理作业运行期间(以及作业完成后的一小段时间内)存在。

请参阅 属性参考 中的 idle-reply-listener-timeout

网关回复关联

本节描述了用于回复关联的机制(确保源网关仅接收对其请求的回复),具体取决于网关的配置方式。有关此处讨论的属性的完整描述,请参阅 属性参考

以下列表描述了各种场景(数字仅用于标识——顺序无关紧要)

  1. reply-destination* 属性且无 <reply-listener>

    为每个请求创建一个 TemporaryQueue,并在请求完成时(成功或不成功)删除。correlation-key 无关紧要。

  2. 提供了 reply-destination* 属性,并且没有提供 <reply-listener/> 也未提供 correlation-key

    与出站消息相同的 JMSCorrelationID 用作消费者的消息选择器

    messageSelector = "JMSCorrelationID = '" + messageId + "'"

    响应系统应在回复 JMSCorrelationID 中返回入站 JMSMessageID。这是一种常见模式,由 Spring Integration 入站网关以及 Spring 的 MessageListenerAdapter 用于消息驱动 POJO 实现。

    当您使用此配置时,不应将主题用于回复。回复可能会丢失。
  3. 提供了 reply-destination* 属性,未提供 <reply-listener/>,且 correlation-key="JMSCorrelationID"

    网关生成一个唯一的关联 ID 并将其插入 JMSCorrelationID 头中。消息选择器是

    messageSelector = "JMSCorrelationID = '" + uniqueId + "'"

    响应系统应在回复 JMSCorrelationID 中返回入站 JMSCorrelationID。这是一种常见模式,由 Spring Integration 入站网关以及 Spring 的 MessageListenerAdapter 用于消息驱动 POJO 实现。

  4. 提供了 reply-destination* 属性,未提供 <reply-listener/>,且 correlation-key="myCorrelationHeader"

    网关生成一个唯一的关联 ID 并将其插入 myCorrelationHeader 消息属性中。correlation-key 可以是任何用户定义的值。消息选择器是

    messageSelector = "myCorrelationHeader = '" + uniqueId + "'"

    响应系统应在回复 myCorrelationHeader 中返回入站 myCorrelationHeader

  5. 提供了 reply-destination* 属性,未提供 <reply-listener/>,且 correlation-key="JMSCorrelationID*"(注意关联键中的 *。)

    网关使用请求消息中 jms_correlationId 头中的值(如果存在)并将其插入 JMSCorrelationID 头中。消息选择器是

    messageSelector = "JMSCorrelationID = '" + headers['jms_correlationId'] + "'"

    用户必须确保此值是唯一的。

    如果头不存在,则网关的行为与 3 相同。

    响应系统应在回复 JMSCorrelationID 中返回入站 JMSCorrelationID。这是一种常见模式,由 Spring Integration 入站网关以及 Spring 的 MessageListenerAdapter 用于消息驱动 POJO 实现。

  6. 未提供 reply-destination* 属性,但提供了 <reply-listener>

    创建了一个临时队列,用于此网关实例的所有回复。消息中不需要关联数据,但出站 JMSMessageID 在网关内部用于将回复定向到正确的请求线程。

  7. 提供了 reply-destination* 属性,提供了 <reply-listener>,但未提供 correlation-key

    不允许。

    <reply-listener/> 配置将被忽略,网关的行为与 2 相同。将写入警告日志消息以指示此情况。

  8. 提供了 reply-destination* 属性,提供了 <reply-listener>,且 correlation-key="JMSCorrelationID"

    网关具有唯一的关联 ID,并将其与递增值一起插入 JMSCorrelationID 头中(gatewayId + "_" + ++seq)。消息选择器是

    messageSelector = "JMSCorrelationID LIKE '" + gatewayId%'"

    响应系统应在回复 JMSCorrelationID 中返回入站 JMSCorrelationID。这是一种常见模式,由 Spring Integration 入站网关以及 Spring 的 MessageListenerAdapter 用于消息驱动 POJO 实现。由于每个网关都有唯一的 ID,因此每个实例只获取自己的回复。完整的关联数据用于将回复路由到正确的请求线程。

  9. 提供了 reply-destination* 属性,提供了 <reply-listener/>,且 correlation-key="myCorrelationHeader"

    网关具有唯一的关联 ID,并将其与递增值一起插入 myCorrelationHeader 属性中(gatewayId + "_" + ++seq)。correlation-key 可以是任何用户定义的值。消息选择器是

    messageSelector = "myCorrelationHeader LIKE '" + gatewayId%'"

    响应系统应在回复 myCorrelationHeader 中返回入站 myCorrelationHeader。由于每个网关都有唯一的 ID,因此每个实例只获取自己的回复。完整的关联数据用于将回复路由到正确的请求线程。

  10. 提供了 reply-destination* 属性,提供了 <reply-listener/>,且 correlation-key="JMSCorrelationID*"

    (注意关联键中的 *

    不允许。

    不允许用户提供的关联 ID 与回复监听器一起使用。网关不会使用此配置进行初始化。

异步网关

从 4.3 版本开始,您现在可以在配置出站网关时指定 async="true"(或在 Java 中指定 setAsync(true))。

默认情况下,当请求发送到网关时,请求线程会暂停,直到收到回复。然后流在该线程上继续。如果 asynctrue,则请求线程在 send() 完成后立即释放,并且回复在监听器容器线程上返回(并且流继续)。当网关在轮询器线程上调用时,这可能很有用。线程被释放并可用于框架内的其他任务。

async 需要 <reply-listener/>(或在 Java 配置中设置 setUseReplyContainer(true))。它还需要指定 correlationKey(通常是 JMSCorrelationID)。如果这些条件中的任何一个不满足,async 将被忽略。

属性参考

以下列表显示了 outbound-gateway 的所有可用属性

<int-jms:outbound-gateway
    connection-factory="connectionFactory" (1)
    correlation-key="" (2)
    delivery-persistent="" (3)
    destination-resolver="" (4)
    explicit-qos-enabled="" (5)
    extract-reply-payload="true" (6)
    extract-request-payload="true" (7)
    header-mapper="" (8)
    message-converter="" (9)
    priority="" (10)
    receive-timeout="" (11)
    reply-channel="" (12)
    reply-destination="" (13)
    reply-destination-expression="" (14)
    reply-destination-name="" (15)
    reply-pub-sub-domain="" (16)
    reply-timeout="" (17)
    request-channel="" (18)
    request-destination="" (19)
    request-destination-expression="" (20)
    request-destination-name="" (21)
    request-pub-sub-domain="" (22)
    time-to-live="" (23)
    requires-reply="" (24)
    idle-reply-listener-timeout="" (25)
    async=""> (26)
  <int-jms:reply-listener /> (27)
</int-jms:outbound-gateway>
1 引用 jakarta.jms.ConnectionFactory。默认值为 jmsConnectionFactory
2 包含关联数据以将响应与回复关联起来的属性名称。如果省略,网关期望响应系统在 JMSCorrelationID 头中返回出站 JMSMessageID 头的值。如果指定,网关将生成一个关联 ID 并用它填充指定的属性。响应系统必须在同一属性中回显该值。它可以设置为 JMSCorrelationID,在这种情况下,将使用标准头而不是 String 属性来保存关联数据。当您使用 <reply-container/> 时,如果您提供了显式的 reply-destination,则必须指定 correlation-key。从 4.0.1 版本开始,此属性还支持值 JMSCorrelationID*,这意味着如果出站消息已经具有 JMSCorrelationID(从 jms_correlationId 映射)头,则使用它而不是生成一个新的。请注意,当您使用 <reply-container/> 时不允许使用 JMSCorrelationID* 键,因为容器需要在初始化期间设置消息选择器。
您应该理解,网关无法确保唯一性,如果提供的关联 ID 不唯一,可能会发生意想不到的副作用。
3 一个布尔值,指示传递模式应为 DeliveryMode.PERSISTENTtrue)还是 DeliveryMode.NON_PERSISTENTfalse)。此设置仅在 explicit-qos-enabledtrue 时才生效。
4 一个 DestinationResolver。默认是一个 SimpleDestinationResolver,它将目标名称映射到该名称的队列或主题并缓存一个目标。
5 设置为 true 时,启用质量服务属性:prioritydelivery-modetime-to-live
6 设置为 true(默认值)时,Spring Integration 回复消息的有效负载将从 JMS 回复消息的正文创建(使用 MessageConverter)。设置为 false 时,整个 JMS 消息将成为 Spring Integration 消息的有效负载。
7 当设置为 true(默认值)时,Spring Integration 消息的有效负载将转换为 JMSMessage(通过使用 MessageConverter)。当设置为 false 时,整个 Spring Integration 消息将转换为 JMSMessage。在这两种情况下,Spring Integration 消息头都将通过 HeaderMapper 映射到 JMS 头和属性。
8 一个 HeaderMapper,用于将 Spring Integration 消息头映射到 JMS 消息头和属性,以及反向映射。
9 MessageConverter 的引用,用于在 JMS 消息和 Spring Integration 消息负载(或消息,如果 extract-request-payloadfalse)之间进行转换。默认是 SimpleMessageConverter
10 请求消息的默认优先级。如果存在消息优先级头,则会被覆盖。其范围为 09。此设置仅在 explicit-qos-enabledtrue 时才生效。
11 等待回复的时间(毫秒)。默认值为 5000(五秒)。
12 回复消息发送到的通道。
13 Destination 的引用,该引用被设置为 JMSReplyTo 头。最多只允许一个 reply-destinationreply-destination-expressionreply-destination-name。如果未提供,则使用 TemporaryQueue 作为此网关的回复。
14 评估为 Destination 的 SpEL 表达式,它将设置为 JMSReplyTo 头。表达式可以生成 Destination 对象或 String。它由 DestinationResolver 用于解析实际的 Destination。最多只允许一个 reply-destinationreply-destination-expressionreply-destination-name。如果未提供,则使用 TemporaryQueue 作为此网关的回复。
15 设置为 JMSReplyTo 头的目标名称。它由 DestinationResolver 用于解析实际的 Destination。最多只允许一个 reply-destinationreply-destination-expressionreply-destination-name。如果未提供,则使用 TemporaryQueue 作为此网关的回复。
16 当设置为 true 时,表示由 DestinationResolver 解析的任何回复 Destination 都应该是 Topic 而不是 Queue
17 网关将回复消息发送到 reply-channel 时等待的时间。这仅在 reply-channel 可以阻塞时才生效,例如容量限制已满的 QueueChannel。默认值为无穷大。
18 此网关接收请求消息的通道。
19 对发送请求消息的 Destination 的引用。reply-destinationreply-destination-expressionreply-destination-name 中的一个为必需。您只能使用这三个属性中的一个。
20 评估为 Destination 的 SpEL 表达式,用于发送请求消息。表达式可以生成 Destination 对象或 String。它由 DestinationResolver 用于解析实际的 Destinationreply-destinationreply-destination-expressionreply-destination-name 中的一个为必需。您只能使用这三个属性中的一个。
21 发送请求消息到的目标名称。它由 DestinationResolver 用于解析实际的 Destinationreply-destinationreply-destination-expressionreply-destination-name 中的一个为必需。您只能使用这三个属性中的一个。
22 当设置为 true 时,表示由 DestinationResolver 解析的任何请求 Destination 都应该是 Topic 而不是 Queue
23 指定消息的存活时间。此设置仅在 explicit-qos-enabledtrue 时才生效。
24 指定此出站网关是否必须返回非空值。默认情况下,此值为 true,当底层服务在 receive-timeout 后未返回值时,将抛出 MessageTimeoutException。请注意,如果服务从不期望返回回复,最好使用 <int-jms:outbound-channel-adapter/> 而不是 <int-jms:outbound-gateway/> 并设置 requires-reply="false"。后者会阻塞发送线程,等待回复,时间为 receive-timeout 周期。
25 当您使用 <reply-listener /> 时,它的生命周期(启动和停止)默认与网关的生命周期匹配。当此值大于 0 时,容器按需启动(当发送请求时)。容器将继续运行,直到至少此时间过去,没有收到请求(并且没有待处理的回复)。容器将在下一个请求时再次启动。停止时间是最小值,实际上可能高达此值的 1.5 倍。
26 请参阅 异步网关
27 当包含此元素时,回复由异步 MessageListenerContainer 接收,而不是为每个回复创建消费者。在许多情况下,这可能更高效。

将消息头映射到 JMS 消息和从 JMS 消息映射消息头

JMS 消息可以包含元信息,例如 JMS API 消息头和简单属性。您可以使用 JmsHeaderMapper 将这些消息头映射到 Spring Integration 消息头以及从 Spring Integration 消息头映射。JMS API 消息头传递给适当的 setter 方法(例如 setJMSReplyTo),而其他消息头则复制到 JMS 消息的通用属性中。JMS 出站网关使用 JmsHeaderMapper 的默认实现进行引导,该实现将映射标准 JMS API 消息头以及基本类型或 String 消息头。您还可以通过使用入站和出站网关的 header-mapper 属性来提供自定义消息头映射器。

许多 JMS 厂商特定的客户端不允许直接在已创建的 JMS 消息上设置 deliveryModeprioritytimeToLive 属性。它们被认为是 QoS 属性,因此必须传播到目标 MessageProducer.send(message, deliveryMode, priority, timeToLive) API。因此,DefaultJmsHeaderMapper 不会将适当的 Spring Integration 消息头(或表达式结果)映射到上述 JMS 消息属性中。相反,JmsSendingMessageHandler 使用 DynamicJmsTemplate 将请求消息中的消息头值传播到 MessageProducer.send() API 中。要启用此功能,您必须使用 explicitQosEnabled 属性设置为 true 的 DynamicJmsTemplate 配置出站端点。Spring Integration Java DSL 默认配置 DynamicJmsTemplate,但您仍然必须设置 explicitQosEnabled 属性。
从 4.0 版本开始,JMSPriority 头会映射到入站消息的标准 priority 头。以前,priority 头仅用于出站消息。要恢复到以前的行为(即不映射入站优先级),请将 DefaultJmsHeaderMappermapInboundPriority 属性设置为 false
从 4.3 版本开始,DefaultJmsHeaderMapper 通过调用其 toString() 方法将标准 correlationId 标头映射为消息属性(correlationId 通常是 UUID,JMS 不支持)。在入站端,它映射为 String。这独立于 jms_correlationId 标头,后者映射到 JMSCorrelationID 标头和从 JMSCorrelationID 标头映射。JMSCorrelationID 通常用于关联请求和回复,而 correlationId 通常用于将相关消息组合成一个组(例如与聚合器或重排序器一起使用)。

从 5.1 版本开始,DefaultJmsHeaderMapper 可以配置为映射入站 JMSDeliveryModeJMSExpiration 属性

@Bean
public DefaultJmsHeaderMapper jmsHeaderMapper() {
    DefaultJmsHeaderMapper mapper = new DefaultJmsHeaderMapper();
    mapper.setMapInboundDeliveryMode(true)
    mapper.setMapInboundExpiration(true)
    return mapper;
}

这些 JMS 属性分别映射到 JmsHeaders.DELIVERY_MODEJmsHeaders.EXPIRATION Spring 消息头。

消息转换、编组和解组

如果您需要转换消息,所有 JMS 适配器和网关都允许您通过设置 message-converter 属性来提供 MessageConverter。为此,请提供同一 ApplicationContext 中可用的 MessageConverter 实例的 bean 名称。此外,为了与编组器和解组器接口保持一致,Spring 提供了 MarshallingMessageConverter,您可以将其配置为您自己的自定义编组器和解组器。以下示例演示了如何执行此操作

<int-jms:inbound-gateway request-destination="requestQueue"
    request-channel="inbound-gateway-channel"
    message-converter="marshallingMessageConverter"/>

<bean id="marshallingMessageConverter"
    class="org.springframework.jms.support.converter.MarshallingMessageConverter">
    <constructor-arg>
        <bean class="org.bar.SampleMarshaller"/>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.bar.SampleUnmarshaller"/>
    </constructor-arg>
</bean>
当您提供自己的 MessageConverter 实例时,它仍然被封装在 HeaderMappingMessageConverter 中。这意味着“extract-request-payload”和“extract-reply-payload”属性可能会影响传递给您的转换器的实际对象。HeaderMappingMessageConverter 本身委托给目标 MessageConverter,同时还将 Spring Integration MessageHeaders 映射到 JMS 消息属性并再次映射回来。

JMS 支持的消息通道

前面介绍的通道适配器和网关都用于与外部系统集成的应用程序。入站选项假定其他系统正在将 JMS 消息发送到 JMS 目标,而出站选项假定其他系统正在从目标接收消息。其他系统可能是也可能不是 Spring Integration 应用程序。当然,当将 Spring Integration 消息实例作为 JMS 消息本身的正文发送时(将“extract-payload”值设置为 false),假定其他系统是基于 Spring Integration 的。但是,这绝不是一个要求。这种灵活性是使用基于消息的集成选项和“通道”(或 JMS 情况下的目标)抽象的好处之一。

有时,给定 JMS 目标的生产者和消费者都旨在成为同一应用程序的一部分,在同一进程中运行。您可以通过使用一对入站和出站通道适配器来实现这一点。这种方法的缺点是您需要两个适配器,即使从概念上讲,目标是拥有单个消息通道。Spring Integration 2.0 版本中支持更好的选项。现在可以使用 JMS 命名空间定义单个“通道”,如以下示例所示

<int-jms:channel id="jmsChannel" queue="exampleQueue"/>

上述示例中的通道行为很像主 Spring Integration 命名空间中的普通 <channel/> 元素。它可以由任何端点的 input-channeloutput-channel 属性引用。不同之处在于,此通道由名为 exampleQueue 的 JMS Queue 实例支持。这意味着在生产者和消费者端点之间可以进行异步消息传递。但是,与通过在非 JMS <channel/> 元素中添加 <queue/> 元素创建的更简单的异步消息通道不同,消息不会存储在内存队列中。相反,这些消息在 JMS 消息正文中传递,并且底层 JMS 提供程序的全部功能可用于该通道。使用此替代方案最常见的原因可能是利用 JMS 消息的存储转发方法提供的持久性。

如果配置得当,JMS 支持的消息通道也支持事务。换句话说,如果生产者的发送操作是回滚事务的一部分,则生产者实际上不会写入事务性 JMS 支持的通道。同样,如果接收消息是回滚事务的一部分,则消费者不会物理地从通道中删除 JMS 消息。请注意,在这种情况下,生产者和消费者事务是独立的。这与事务上下文通过没有 <queue/> 子元素的简单同步 <channel/> 元素传播的情况显著不同。

由于上述示例引用了 JMS 队列实例,因此它充当点对点通道。另一方面,如果您需要发布-订阅行为,您可以使用单独的元素并引用 JMS 主题。以下示例显示了如何执行此操作

<int-jms:publish-subscribe-channel id="jmsChannel" topic="exampleTopic"/>

对于任何类型的 JMS 支持通道,可以提供目标名称而不是引用,如以下示例所示

<int-jms:channel id="jmsQueueChannel" queue-name="exampleQueueName"/>

<jms:publish-subscribe-channel id="jmsTopicChannel" topic-name="exampleTopicName"/>

在前面的示例中,目标名称由 Spring 的默认 SimpleDestinationResolver 实现解析,但您可以提供 DestinationResolver 接口的任何实现。此外,JMS ConnectionFactory 是通道的必需属性,但默认情况下,预期的 bean 名称将是 jmsConnectionFactory。以下示例为 JMS 目标名称的解析和 ConnectionFactory 的不同名称提供了一个自定义实例

<int-jms:channel id="jmsChannel" queue-name="exampleQueueName"
    destination-resolver="customDestinationResolver"
    connection-factory="customConnectionFactory"/>

对于 <publish-subscribe-channel />,将 durable 属性设置为 true 以实现持久订阅,或将 subscription-shared 设置为共享订阅(需要 JMS 2.0 代理,并从 4.2 版本开始可用)。使用 subscription 来命名订阅。

使用 JMS 消息选择器

借助 JMS 消息选择器,您可以根据 JMS 消息头和 JMS 属性过滤 JMS 消息。例如,如果您想监听自定义 JMS 消息头属性 myHeaderProperty 等于 something 的消息,您可以指定以下表达式

myHeaderProperty = 'something'

消息选择器表达式是 SQL-92 条件表达式语法的一个子集,并作为 Java 消息服务 规范的一部分进行定义。您可以使用 XML 命名空间配置为以下 Spring Integration JMS 组件指定 JMS 消息 selector 属性

  • JMS 通道

  • JMS 发布订阅通道

  • JMS 入站通道适配器

  • JMS 入站网关

  • JMS 消息驱动通道适配器

您不能使用 JMS 消息选择器引用消息正文值。

JMS 示例

要体验这些 JMS 适配器,请查看 Spring Integration Samples Git 存储库中提供的 JMS 示例:https://github.com/spring-projects/spring-integration-samples/tree/master/basic/jms

该存储库包含两个示例。一个提供入站和出站通道适配器,另一个提供入站和出站网关。它们配置为使用嵌入式 ActiveMQ 进程运行,但您可以修改每个示例的 common.xml Spring 应用程序上下文文件,以支持不同的 JMS 提供程序或独立的 ActiveMQ 进程。

换句话说,您可以拆分配置,以便入站和出站适配器在单独的 JVM 中运行。如果您已安装 ActiveMQ,请修改 common.xml 文件中的 brokerURL 属性,以使用 tcp://:61616(而不是 vm://)。这两个示例都接受来自 stdin 的输入并回显到 stdout。查看配置以了解这些消息如何通过 JMS 路由。

© . This site is unofficial and not affiliated with VMware.