请求/回复消息传递
AmqpTemplate
还提供了多种 sendAndReceive
方法,这些方法接受与前面描述的单向发送操作相同的参数选项(exchange
、routingKey
和 Message
)。 这些方法对于请求/回复场景非常有用,因为它们在发送前处理必要的 reply-to
属性的配置,并且可以监听为该目的在内部创建的独占队列上的回复消息。
类似的请求/回复方法也可用,其中 MessageConverter
同时应用于请求和回复。 这些方法名为 convertSendAndReceive
。 详情请参阅 AmqpTemplate
的 Javadoc。
从版本 1.5.0 开始,每个 sendAndReceive
方法变体都有一个重载版本,接受 CorrelationData
。 结合正确配置的连接工厂,这使得可以接收操作发送端的发布者确认。 更多信息请参阅 相关发布者确认和返回 以及 RabbitOperations
的 Javadoc。
从版本 2.0 开始,这些方法有变体(convertSendAndReceiveAsType
),它们接受一个额外的 ParameterizedTypeReference
参数,用于转换复杂的返回类型。 必须为模板配置一个 SmartMessageConverter
。 更多信息请参阅 使用 RabbitTemplate
从 Message
转换。
从版本 2.1 开始,您可以配置 RabbitTemplate
的 noLocalReplyConsumer
选项,以控制回复消费者的 noLocal
标志。 默认情况下,此标志为 false
。
回复超时
默认情况下,发送和接收方法会在五秒后超时并返回 null。 您可以通过设置 replyTimeout
属性来修改此行为。 从版本 1.5 开始,如果您将 mandatory
属性设置为 true
(或 mandatory-expression
对特定消息评估为 true
),如果消息无法发送到队列,将抛出 AmqpMessageReturnedException
异常。 此异常包含 returnedMessage
、replyCode
和 replyText
属性,以及用于发送的 exchange
和 routingKey
。
此功能使用发布者返回。 您可以通过在 CachingConnectionFactory 上将 publisherReturns 设置为 true 来启用它(请参阅 发布者确认和返回)。 另外,您不得在 RabbitTemplate 中注册自己的 ReturnCallback 。 |
从版本 2.1.2 开始,添加了一个 replyTimedOut
方法,允许子类获知超时,以便它们可以清理任何保留的状态。
从版本 2.0.11 和 2.1.3 开始,当您使用默认的 DirectReplyToMessageListenerContainer
时,您可以通过设置模板的 replyErrorHandler
属性来添加错误处理器。 此错误处理器会在任何失败的发送(例如延迟回复以及没有关联头接收到的消息)时被调用。 传入的异常是 ListenerExecutionFailedException
,它包含一个 failedMessage
属性。
RabbitMQ Direct reply-to
从版本 3.4.0 开始,RabbitMQ 服务器支持 direct reply-to。 这消除了固定回复队列的主要原因(避免为每个请求创建临时队列的需要)。 从 Spring AMQP 版本 1.4.1 开始,默认使用 direct reply-to(如果服务器支持),而不是创建临时回复队列。 当未提供 replyQueue (或将其名称设置为 amq.rabbitmq.reply-to )时,RabbitTemplate 会自动检测是否支持 direct reply-to,然后使用它或回退到使用临时回复队列。 使用 direct reply-to 时,不需要配置 reply-listener ,也不应进行配置。 |
回复监听器仍然支持命名队列(除了 amq.rabbitmq.reply-to
),允许控制回复并发等。
从版本 1.6 开始,如果您希望为每个回复使用一个临时的、独占的、自动删除的队列,请将 useTemporaryReplyQueues
属性设置为 true
。 如果您设置了 replyAddress
,此属性将被忽略。
您可以通过继承 RabbitTemplate
并重写 useDirectReplyTo()
方法来改变使用 direct reply-to 的判定标准。 此方法仅在第一次发送请求时被调用一次。
在版本 2.0 之前,RabbitTemplate
为每个请求创建一个新的消费者,并在收到回复(或超时)时取消该消费者。 现在模板改为使用 DirectReplyToMessageListenerContainer
,允许消费者被重用。 模板仍然负责关联回复,因此不必担心延迟回复发送给不同的发送者。 如果您想恢复到之前的行为,请将 useDirectReplyToContainer
(使用 XML 配置时为 direct-reply-to-container
)属性设置为 false。
AsyncRabbitTemplate
没有此选项。 在使用 direct reply-to 时,它始终使用 DirectReplyToContainer
来处理回复。
从版本 2.3.7 开始,模板添加了一个新的属性 useChannelForCorrelation
。 当此属性为 true
时,服务器无需从请求消息头复制关联 ID 到回复消息。 相反,用于发送请求的通道被用来将回复与请求关联起来。
使用回复队列进行消息关联
当使用固定回复队列(除了 amq.rabbitmq.reply-to
)时,必须提供关联数据以便将回复与请求关联起来。 请参阅 RabbitMQ 远程过程调用 (RPC)。 默认情况下,标准 correlationId
属性用于存放关联数据。 但是,如果您希望使用自定义属性存放关联数据,可以在 <rabbit-template/> 上设置 correlation-key
属性。 显式地将属性设置为 correlationId
与省略该属性效果相同。 客户端和服务器必须使用相同的头来存放关联数据。
Spring AMQP 版本 1.1 使用一个名为 spring_reply_correlation 的自定义属性来存放此数据。 如果您希望在当前版本中恢复此行为(可能为了保持与使用 1.1 的其他应用程序的兼容性),必须将属性设置为 spring_reply_correlation 。 |
默认情况下,模板生成自己的关联 ID(忽略任何用户提供的值)。 如果您希望使用自己的关联 ID,请将 RabbitTemplate
实例的 userCorrelationId
属性设置为 true
。
关联 ID 必须是唯一的,以避免为请求返回错误回复的可能性。 |
回复监听器容器
当使用早于版本 3.4.0 的 RabbitMQ 时,每个回复会使用一个新的临时队列。 然而,可以在模板上配置一个单一的回复队列,这更有效率,并且允许您在该队列上设置参数。 但在这种情况下,您还必须提供一个 <reply-listener/> 子元素。 此元素为回复队列提供了一个监听器容器,模板作为监听器。 消息监听器容器配置 中允许用于 <listener-container/> 的所有属性都允许用于此元素,除了 connection-factory
和 message-converter
,它们继承自模板的配置。
如果您运行应用程序的多个实例或使用多个 RabbitTemplate 实例,则**必须**为每个实例使用一个唯一的回复队列。 RabbitMQ 没有从队列中选择消息的能力,因此,如果它们都使用同一个队列,每个实例将竞争回复,并且不一定能收到自己的回复。 |
以下示例定义了一个带有连接工厂的 rabbit template
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-queue="replies"
reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>
虽然容器和模板共享一个连接工厂,但它们不共享通道。 因此,请求和回复不会在同一事务内执行(如果启用了事务)。
在版本 1.5.0 之前,reply-address 属性不可用。 回复总是使用默认交换器和 reply-queue 名称作为路由键进行路由。 这仍然是默认行为,但您现在可以指定新的 reply-address 属性。 reply-address 可以包含格式为 <exchange>/<routingKey> 的地址,回复将被路由到指定的交换器,并路由到使用该路由键绑定的队列。 reply-address 优先于 reply-queue 。 当只使用 reply-address 时,<reply-listener> 必须配置为一个独立的 <listener-container> 组件。 reply-address 和 reply-queue (或 <listener-container> 上的 queues 属性)在逻辑上必须指向同一个队列。 |
使用此配置,SimpleListenerContainer
用于接收回复,RabbitTemplate
作为 MessageListener
。 使用 <rabbit:template/>
命名空间元素定义模板时,如前例所示,解析器会定义容器并将模板注入为监听器。
当模板不使用固定的 replyQueue (或使用 direct reply-to,请参阅 RabbitMQ Direct reply-to)时,不需要监听器容器。 Direct reply-to 是使用 RabbitMQ 3.4.0 或更高版本时的首选机制。 |
如果您将 RabbitTemplate
定义为一个 <bean/>
,或者使用 `@Configuration` 类将其定义为 `@Bean`,或者通过编程方式创建模板,则需要自己定义并连接回复监听器容器。 如果未能这样做,模板将永远收不到回复,最终超时并对 sendAndReceive
方法调用返回 null。
从版本 1.5 开始,RabbitTemplate
会检测是否已将其配置为 MessageListener
来接收回复。 如果没有,尝试使用回复地址发送和接收消息将失败,并抛出 IllegalStateException
(因为回复永远不会被接收)。
此外,如果使用简单的 replyAddress
(队列名称),回复监听器容器会验证它是否正在监听同名的队列。 如果回复地址是交换器和路由键,则无法执行此检查,并且会写入一条调试日志消息。
当您自己连接回复监听器和模板时,务必确保模板的 replyAddress 和容器的 queues (或 queueNames )属性引用同一个队列。 模板会将回复地址插入到 outbound 消息的 replyTo 属性中。 |
以下列表显示了如何手动连接 beans 的示例
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<property name="exchange" value="foo.exchange" />
<property name="routingKey" value="foo" />
<property name="replyQueue" ref="replyQ" />
<property name="replyTimeout" value="600000" />
<property name="useDirectReplyToContainer" value="false" />
</bean>
<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory" />
<property name="queues" ref="replyQ" />
<property name="messageListener" ref="amqpTemplate" />
</bean>
<rabbit:queue id="replyQ" name="my.reply.queue" />
@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyAddress(replyQueue().getName());
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}
@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
}
一个完整的示例,显示了与固定回复队列连接的 RabbitTemplate
,以及一个处理请求并返回回复的“远程”监听器容器,见 此测试用例。
当回复超时(replyTimeout )时,sendAndReceive() 方法返回 null。 |
在版本 1.3.6 之前,对于超时的消息,延迟回复只会记录日志。 现在,如果收到延迟回复,它将被拒绝(模板会抛出 AmqpRejectAndDontRequeueException
异常)。 如果回复队列配置为将拒绝的消息发送到死信交换器,则可以检索该回复以供后续分析。 为此,请将一个队列绑定到配置的死信交换器,并使用与回复队列名称相同的路由键。
更多关于配置死信的信息,请参阅 RabbitMQ 死信文档。 您也可以查看 FixedReplyQueueDeadLetterTests
测试用例以获取示例。
异步 Rabbit 模板
版本 1.6 引入了 AsyncRabbitTemplate
。 它具有类似于 AmqpTemplate
的 sendAndReceive
(和 convertSendAndReceive
)方法。 但是,它们不阻塞,而是返回一个 CompletableFuture
。
sendAndReceive
方法返回一个 RabbitMessageFuture
。 convertSendAndReceive
方法返回一个 RabbitConverterFuture
。
您可以稍后通过在 future 上调用 get()
同步检索结果,也可以注册一个回调,该回调会异步地接收结果。 以下列表显示了这两种方法
@Autowired
private AsyncRabbitTemplate template;
...
public void doSomeWorkAndGetResultLater() {
...
CompletableFuture<String> future = this.template.convertSendAndReceive("foo");
// do some more work
String reply = null;
try {
reply = future.get(10, TimeUnit.SECONDS);
}
catch (ExecutionException e) {
...
}
...
}
public void doSomeWorkAndGetResultAsync() {
...
RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
future.whenComplete((result, ex) -> {
if (ex == null) {
// success
}
else {
// failure
}
});
...
}
如果设置了 mandatory
并且消息无法发送,future 将抛出 ExecutionException
,其原因为 AmqpMessageReturnedException
,该异常封装了返回的消息以及有关返回的信息。
如果设置了 enableConfirms
,future 有一个名为 confirm
的属性,它本身是一个 CompletableFuture<Boolean>
,其中 true
表示发布成功。 如果 confirm future 为 false
,则 RabbitFuture
还有一个名为 nackCause
的属性,其中包含失败的原因(如果可用)。
如果在收到回复后才收到发布者确认,则会丢弃该确认,因为回复意味着发布成功。 |
您可以设置模板的 receiveTimeout
属性来使回复超时(默认为 30000
毫秒 - 30 秒)。 发生超时时,future 将以 AmqpReplyTimeoutException
完成。
模板实现了 SmartLifecycle
接口。 在有待处理的回复时停止模板会导致待处理的 Future
实例被取消。
从版本 2.0 开始,异步模板现在支持 direct reply-to,而不是配置的回复队列。 要启用此功能,请使用以下构造函数之一
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)
public AsyncRabbitTemplate(RabbitTemplate template)
有关如何在同步 RabbitTemplate
中使用 direct reply-to,请参阅 RabbitMQ Direct reply-to。
版本 2.0 引入了这些方法的变体(convertSendAndReceiveAsType
),它们接受一个额外的 ParameterizedTypeReference
参数,用于转换复杂的返回类型。 您必须使用 SmartMessageConverter
配置底层的 RabbitTemplate
。 更多信息请参阅 使用 RabbitTemplate
从 Message
转换。