请求/回复消息传递

AmqpTemplate 还提供了多种 sendAndReceive 方法,这些方法接受与前面描述的单向发送操作相同的参数选项(exchangeroutingKeyMessage)。 这些方法对于请求/回复场景非常有用,因为它们在发送前处理必要的 reply-to 属性的配置,并且可以监听为该目的在内部创建的独占队列上的回复消息。

类似的请求/回复方法也可用,其中 MessageConverter 同时应用于请求和回复。 这些方法名为 convertSendAndReceive。 详情请参阅 AmqpTemplate 的 Javadoc

从版本 1.5.0 开始,每个 sendAndReceive 方法变体都有一个重载版本,接受 CorrelationData。 结合正确配置的连接工厂,这使得可以接收操作发送端的发布者确认。 更多信息请参阅 相关发布者确认和返回 以及 RabbitOperations 的 Javadoc

从版本 2.0 开始,这些方法有变体(convertSendAndReceiveAsType),它们接受一个额外的 ParameterizedTypeReference 参数,用于转换复杂的返回类型。 必须为模板配置一个 SmartMessageConverter。 更多信息请参阅 使用 RabbitTemplateMessage 转换

从版本 2.1 开始,您可以配置 RabbitTemplatenoLocalReplyConsumer 选项,以控制回复消费者的 noLocal 标志。 默认情况下,此标志为 false

回复超时

默认情况下,发送和接收方法会在五秒后超时并返回 null。 您可以通过设置 replyTimeout 属性来修改此行为。 从版本 1.5 开始,如果您将 mandatory 属性设置为 true(或 mandatory-expression 对特定消息评估为 true),如果消息无法发送到队列,将抛出 AmqpMessageReturnedException 异常。 此异常包含 returnedMessagereplyCodereplyText 属性,以及用于发送的 exchangeroutingKey

此功能使用发布者返回。 您可以通过在 CachingConnectionFactory 上将 publisherReturns 设置为 true 来启用它(请参阅 发布者确认和返回)。 另外,您不得在 RabbitTemplate 中注册自己的 ReturnCallback

从版本 2.1.2 开始,添加了一个 replyTimedOut 方法,允许子类获知超时,以便它们可以清理任何保留的状态。

从版本 2.0.11 和 2.1.3 开始,当您使用默认的 DirectReplyToMessageListenerContainer 时,您可以通过设置模板的 replyErrorHandler 属性来添加错误处理器。 此错误处理器会在任何失败的发送(例如延迟回复以及没有关联头接收到的消息)时被调用。 传入的异常是 ListenerExecutionFailedException,它包含一个 failedMessage 属性。

RabbitMQ Direct reply-to

从版本 3.4.0 开始,RabbitMQ 服务器支持 direct reply-to。 这消除了固定回复队列的主要原因(避免为每个请求创建临时队列的需要)。 从 Spring AMQP 版本 1.4.1 开始,默认使用 direct reply-to(如果服务器支持),而不是创建临时回复队列。 当未提供 replyQueue(或将其名称设置为 amq.rabbitmq.reply-to)时,RabbitTemplate 会自动检测是否支持 direct reply-to,然后使用它或回退到使用临时回复队列。 使用 direct reply-to 时,不需要配置 reply-listener,也不应进行配置。

回复监听器仍然支持命名队列(除了 amq.rabbitmq.reply-to),允许控制回复并发等。

从版本 1.6 开始,如果您希望为每个回复使用一个临时的、独占的、自动删除的队列,请将 useTemporaryReplyQueues 属性设置为 true。 如果您设置了 replyAddress,此属性将被忽略。

您可以通过继承 RabbitTemplate 并重写 useDirectReplyTo() 方法来改变使用 direct reply-to 的判定标准。 此方法仅在第一次发送请求时被调用一次。

在版本 2.0 之前,RabbitTemplate 为每个请求创建一个新的消费者,并在收到回复(或超时)时取消该消费者。 现在模板改为使用 DirectReplyToMessageListenerContainer,允许消费者被重用。 模板仍然负责关联回复,因此不必担心延迟回复发送给不同的发送者。 如果您想恢复到之前的行为,请将 useDirectReplyToContainer(使用 XML 配置时为 direct-reply-to-container)属性设置为 false。

AsyncRabbitTemplate 没有此选项。 在使用 direct reply-to 时,它始终使用 DirectReplyToContainer 来处理回复。

从版本 2.3.7 开始,模板添加了一个新的属性 useChannelForCorrelation。 当此属性为 true 时,服务器无需从请求消息头复制关联 ID 到回复消息。 相反,用于发送请求的通道被用来将回复与请求关联起来。

使用回复队列进行消息关联

当使用固定回复队列(除了 amq.rabbitmq.reply-to)时,必须提供关联数据以便将回复与请求关联起来。 请参阅 RabbitMQ 远程过程调用 (RPC)。 默认情况下,标准 correlationId 属性用于存放关联数据。 但是,如果您希望使用自定义属性存放关联数据,可以在 <rabbit-template/> 上设置 correlation-key 属性。 显式地将属性设置为 correlationId 与省略该属性效果相同。 客户端和服务器必须使用相同的头来存放关联数据。

Spring AMQP 版本 1.1 使用一个名为 spring_reply_correlation 的自定义属性来存放此数据。 如果您希望在当前版本中恢复此行为(可能为了保持与使用 1.1 的其他应用程序的兼容性),必须将属性设置为 spring_reply_correlation

默认情况下,模板生成自己的关联 ID(忽略任何用户提供的值)。 如果您希望使用自己的关联 ID,请将 RabbitTemplate 实例的 userCorrelationId 属性设置为 true

关联 ID 必须是唯一的,以避免为请求返回错误回复的可能性。

回复监听器容器

当使用早于版本 3.4.0 的 RabbitMQ 时,每个回复会使用一个新的临时队列。 然而,可以在模板上配置一个单一的回复队列,这更有效率,并且允许您在该队列上设置参数。 但在这种情况下,您还必须提供一个 <reply-listener/> 子元素。 此元素为回复队列提供了一个监听器容器,模板作为监听器。 消息监听器容器配置 中允许用于 <listener-container/> 的所有属性都允许用于此元素,除了 connection-factorymessage-converter,它们继承自模板的配置。

如果您运行应用程序的多个实例或使用多个 RabbitTemplate 实例,则**必须**为每个实例使用一个唯一的回复队列。 RabbitMQ 没有从队列中选择消息的能力,因此,如果它们都使用同一个队列,每个实例将竞争回复,并且不一定能收到自己的回复。

以下示例定义了一个带有连接工厂的 rabbit template

<rabbit:template id="amqpTemplate"
        connection-factory="connectionFactory"
        reply-queue="replies"
        reply-address="replyEx/routeReply">
    <rabbit:reply-listener/>
</rabbit:template>

虽然容器和模板共享一个连接工厂,但它们不共享通道。 因此,请求和回复不会在同一事务内执行(如果启用了事务)。

在版本 1.5.0 之前,reply-address 属性不可用。 回复总是使用默认交换器和 reply-queue 名称作为路由键进行路由。 这仍然是默认行为,但您现在可以指定新的 reply-address 属性。 reply-address 可以包含格式为 <exchange>/<routingKey> 的地址,回复将被路由到指定的交换器,并路由到使用该路由键绑定的队列。 reply-address 优先于 reply-queue。 当只使用 reply-address 时,<reply-listener> 必须配置为一个独立的 <listener-container> 组件。 reply-addressreply-queue(或 <listener-container> 上的 queues 属性)在逻辑上必须指向同一个队列。

使用此配置,SimpleListenerContainer 用于接收回复,RabbitTemplate 作为 MessageListener。 使用 <rabbit:template/> 命名空间元素定义模板时,如前例所示,解析器会定义容器并将模板注入为监听器。

当模板不使用固定的 replyQueue(或使用 direct reply-to,请参阅 RabbitMQ Direct reply-to)时,不需要监听器容器。 Direct reply-to 是使用 RabbitMQ 3.4.0 或更高版本时的首选机制。

如果您将 RabbitTemplate 定义为一个 <bean/>,或者使用 `@Configuration` 类将其定义为 `@Bean`,或者通过编程方式创建模板,则需要自己定义并连接回复监听器容器。 如果未能这样做,模板将永远收不到回复,最终超时并对 sendAndReceive 方法调用返回 null。

从版本 1.5 开始,RabbitTemplate 会检测是否已将其配置为 MessageListener 来接收回复。 如果没有,尝试使用回复地址发送和接收消息将失败,并抛出 IllegalStateException(因为回复永远不会被接收)。

此外,如果使用简单的 replyAddress(队列名称),回复监听器容器会验证它是否正在监听同名的队列。 如果回复地址是交换器和路由键,则无法执行此检查,并且会写入一条调试日志消息。

当您自己连接回复监听器和模板时,务必确保模板的 replyAddress 和容器的 queues(或 queueNames)属性引用同一个队列。 模板会将回复地址插入到 outbound 消息的 replyTo 属性中。

以下列表显示了如何手动连接 beans 的示例

<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <constructor-arg ref="connectionFactory" />
    <property name="exchange" value="foo.exchange" />
    <property name="routingKey" value="foo" />
    <property name="replyQueue" ref="replyQ" />
    <property name="replyTimeout" value="600000" />
    <property name="useDirectReplyToContainer" value="false" />
</bean>

<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <constructor-arg ref="connectionFactory" />
    <property name="queues" ref="replyQ" />
    <property name="messageListener" ref="amqpTemplate" />
</bean>

<rabbit:queue id="replyQ" name="my.reply.queue" />
    @Bean
    public RabbitTemplate amqpTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(msgConv());
        rabbitTemplate.setReplyAddress(replyQueue().getName());
        rabbitTemplate.setReplyTimeout(60000);
        rabbitTemplate.setUseDirectReplyToContainer(false);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer replyListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueues(replyQueue());
        container.setMessageListener(amqpTemplate());
        return container;
    }

    @Bean
    public Queue replyQueue() {
        return new Queue("my.reply.queue");
    }

一个完整的示例,显示了与固定回复队列连接的 RabbitTemplate,以及一个处理请求并返回回复的“远程”监听器容器,见 此测试用例

当回复超时(replyTimeout)时,sendAndReceive() 方法返回 null。

在版本 1.3.6 之前,对于超时的消息,延迟回复只会记录日志。 现在,如果收到延迟回复,它将被拒绝(模板会抛出 AmqpRejectAndDontRequeueException 异常)。 如果回复队列配置为将拒绝的消息发送到死信交换器,则可以检索该回复以供后续分析。 为此,请将一个队列绑定到配置的死信交换器,并使用与回复队列名称相同的路由键。

更多关于配置死信的信息,请参阅 RabbitMQ 死信文档。 您也可以查看 FixedReplyQueueDeadLetterTests 测试用例以获取示例。

异步 Rabbit 模板

版本 1.6 引入了 AsyncRabbitTemplate。 它具有类似于 AmqpTemplatesendAndReceive(和 convertSendAndReceive)方法。 但是,它们不阻塞,而是返回一个 CompletableFuture

sendAndReceive 方法返回一个 RabbitMessageFutureconvertSendAndReceive 方法返回一个 RabbitConverterFuture

您可以稍后通过在 future 上调用 get() 同步检索结果,也可以注册一个回调,该回调会异步地接收结果。 以下列表显示了这两种方法

@Autowired
private AsyncRabbitTemplate template;

...

public void doSomeWorkAndGetResultLater() {

    ...

    CompletableFuture<String> future = this.template.convertSendAndReceive("foo");

    // do some more work

    String reply = null;
    try {
        reply = future.get(10, TimeUnit.SECONDS);
    }
    catch (ExecutionException e) {
        ...
    }

    ...

}

public void doSomeWorkAndGetResultAsync() {

    ...

    RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            // success
        }
        else {
            // failure
        }
    });

    ...

}

如果设置了 mandatory 并且消息无法发送,future 将抛出 ExecutionException,其原因为 AmqpMessageReturnedException,该异常封装了返回的消息以及有关返回的信息。

如果设置了 enableConfirms,future 有一个名为 confirm 的属性,它本身是一个 CompletableFuture<Boolean>,其中 true 表示发布成功。 如果 confirm future 为 false,则 RabbitFuture 还有一个名为 nackCause 的属性,其中包含失败的原因(如果可用)。

如果在收到回复后才收到发布者确认,则会丢弃该确认,因为回复意味着发布成功。

您可以设置模板的 receiveTimeout 属性来使回复超时(默认为 30000 毫秒 - 30 秒)。 发生超时时,future 将以 AmqpReplyTimeoutException 完成。

模板实现了 SmartLifecycle 接口。 在有待处理的回复时停止模板会导致待处理的 Future 实例被取消。

从版本 2.0 开始,异步模板现在支持 direct reply-to,而不是配置的回复队列。 要启用此功能,请使用以下构造函数之一

public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)

public AsyncRabbitTemplate(RabbitTemplate template)

有关如何在同步 RabbitTemplate 中使用 direct reply-to,请参阅 RabbitMQ Direct reply-to

版本 2.0 引入了这些方法的变体(convertSendAndReceiveAsType),它们接受一个额外的 ParameterizedTypeReference 参数,用于转换复杂的返回类型。 您必须使用 SmartMessageConverter 配置底层的 RabbitTemplate。 更多信息请参阅 使用 RabbitTemplateMessage 转换

使用 AMQP 的 Spring Remoting

Spring remoting 不再受支持,因为该功能已从 Spring Framework 中移除。

请改用 RabbitTemplate(客户端)的 sendAndReceive 操作和 `@RabbitListener`。