请求/回复消息
AmqpTemplate 还提供了多种 sendAndReceive 方法,它们接受与前面描述的单向发送操作(exchange、routingKey 和 Message)相同的参数选项。这些方法对于请求-回复场景非常有用,因为它们在发送前处理必要的 reply-to 属性配置,并可以侦听为此目的内部创建的专用队列上的回复消息。
类似的请求-回复方法也适用于对请求和回复都应用 MessageConverter 的情况。这些方法被命名为 convertSendAndReceive。有关更多详细信息,请参阅 AmqpTemplate 的 Javadoc。
从 1.5.0 版本开始,每个 sendAndReceive 方法变体都有一个接受 CorrelationData 的重载版本。结合正确配置的连接工厂,这使得操作的发送端能够接收发布者确认。有关更多信息,请参阅关联的发布者确认和返回以及 RabbitOperations 的 Javadoc。
从 2.0 版本开始,这些方法(convertSendAndReceiveAsType)有一些变体,它们接受一个额外的 ParameterizedTypeReference 参数来转换复杂的返回类型。模板必须配置 SmartMessageConverter。有关更多信息,请参阅使用 RabbitTemplate 从 Message 转换。
从 2.1 版本开始,您可以使用 noLocalReplyConsumer 选项配置 RabbitTemplate,以控制回复消费者是否设置 noLocal 标志。默认情况下,此选项为 false。
回复超时
默认情况下,发送和接收方法在五秒后超时并返回 null。您可以通过设置 replyTimeout 属性来修改此行为。从 1.5 版本开始,如果您将 mandatory 属性设置为 true(或 mandatory-expression 对特定消息评估为 true),如果消息无法传递到队列,则会抛出 AmqpMessageReturnedException。此异常具有 returnedMessage、replyCode 和 replyText 属性,以及用于发送的 exchange 和 routingKey。
此功能使用发布者返回。您可以通过在 CachingConnectionFactory 上将 publisherReturns 设置为 true 来启用它(请参阅发布者确认和返回)。此外,您不能在 RabbitTemplate 中注册自己的 ReturnCallback。 |
从 2.1.2 版本开始,已添加 replyTimedOut 方法,允许子类收到超时通知,以便它们可以清理任何保留的状态。
从 2.0.11 和 2.1.3 版本开始,当您使用默认的 DirectReplyToMessageListenerContainer 时,可以通过设置模板的 replyErrorHandler 属性来添加错误处理程序。此错误处理程序在任何失败的交付(例如延迟回复和没有关联头的消息)时被调用。传入的异常是 ListenerExecutionFailedException,它具有 failedMessage 属性。
RabbitMQ 直接回复
从 3.4.0 版本开始,RabbitMQ 服务器支持直接回复。这消除了固定回复队列的主要原因(避免为每个请求创建临时队列的需要)。从 Spring AMQP 1.4.1 版本开始,默认情况下(如果服务器支持)使用直接回复,而不是创建临时回复队列。当未提供 replyQueue(或将其名称设置为 amq.rabbitmq.reply-to)时,RabbitTemplate 会自动检测是否支持直接回复,并使用它或回退到使用临时回复队列。使用直接回复时,不需要 reply-listener,也不应配置它。 |
回复侦听器仍然支持命名队列(除了 amq.rabbitmq.reply-to),允许控制回复并发等。
从 1.6 版本开始,如果您希望为每个回复使用临时、排他、自动删除的队列,请将 useTemporaryReplyQueues 属性设置为 true。如果您设置了 replyAddress,此属性将被忽略。
您可以通过继承 RabbitTemplate 并覆盖 useDirectReplyTo() 来检查不同的条件,从而更改决定是否使用直接回复的标准。该方法仅在发送第一个请求时调用一次。
在 2.0 版本之前,RabbitTemplate 为每个请求创建一个新的消费者,并在收到回复(或超时)时取消消费者。现在,模板改为使用 DirectReplyToMessageListenerContainer,允许消费者重用。模板仍然负责关联回复,因此不会出现延迟回复发送给不同发送者的风险。如果您想恢复到以前的行为,请将 useDirectReplyToContainer(使用 XML 配置时为 direct-reply-to-container)属性设置为 false。
AsyncRabbitTemplate 没有这样的选项。当使用直接回复时,它总是为回复使用 DirectReplyToContainer。
从 2.3.7 版本开始,模板有一个新的属性 useChannelForCorrelation。当此属性为 true 时,服务器不必将关联 ID 从请求消息头复制到回复消息。相反,用于发送请求的通道用于将回复与请求关联起来。
使用回复队列进行消息关联
当使用固定回复队列(除了 amq.rabbitmq.reply-to)时,您必须提供关联数据,以便将回复与请求关联起来。请参阅 RabbitMQ 远程过程调用 (RPC)。默认情况下,标准的 correlationId 属性用于保存关联数据。但是,如果您希望使用自定义属性来保存关联数据,您可以在 <rabbit-template/> 上设置 correlation-key 属性。明确将属性设置为 correlationId 与省略该属性相同。客户端和服务器必须使用相同的头部进行关联数据。
Spring AMQP 1.1 版本为此数据使用了名为 spring_reply_correlation 的自定义属性。如果您希望在当前版本中恢复此行为(可能为了保持与使用 1.1 的另一个应用程序的兼容性),您必须将属性设置为 spring_reply_correlation。 |
默认情况下,模板生成自己的关联 ID(忽略任何用户提供的值)。如果您希望使用自己的关联 ID,请将 RabbitTemplate 实例的 userCorrelationId 属性设置为 true。
| 关联 ID 必须是唯一的,以避免请求返回错误回复的可能性。 |
回复监听器容器
当使用 3.4.0 版本之前的 RabbitMQ 版本时,每个回复都会使用一个新的临时队列。但是,可以在模板上配置单个回复队列,这更高效,并且还允许您在该队列上设置参数。在这种情况下,您还必须提供一个 <reply-listener/> 子元素。此元素为回复队列提供一个监听器容器,模板作为监听器。允许在 <listener-container/> 上使用的所有消息监听器容器配置属性都允许在此元素上使用,除了 connection-factory 和 message-converter,它们继承自模板的配置。
如果您运行应用程序的多个实例或使用多个 RabbitTemplate 实例,则 必须 为每个实例使用唯一的回复队列。RabbitMQ 无法从队列中选择消息,因此,如果它们都使用相同的队列,每个实例将竞争回复,并且不一定会收到自己的回复。 |
以下示例定义了一个带有连接工厂的 Rabbit 模板
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-queue="replies"
reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>
虽然容器和模板共享一个连接工厂,但它们不共享一个通道。因此,请求和回复不会在同一事务中执行(如果事务性)。
在 1.5.0 版本之前,reply-address 属性不可用。回复总是通过使用默认交换和 reply-queue 名称作为路由键进行路由。这仍然是默认值,但您现在可以指定新的 reply-address 属性。reply-address 可以包含形式为 <exchange>/<routingKey> 的地址,回复将路由到指定的交换并路由到绑定了路由键的队列。reply-address 优先于 reply-queue。当只使用 reply-address 时,<reply-listener> 必须配置为单独的 <listener-container> 组件。reply-address 和 reply-queue(或 <listener-container> 上的 queues 属性)在逻辑上必须引用同一个队列。 |
通过此配置,SimpleListenerContainer 用于接收回复,RabbitTemplate 作为 MessageListener。当使用 <rabbit:template/> 命名空间元素定义模板时,如前一个示例所示,解析器定义容器并将模板作为监听器连接。
当模板不使用固定 replyQueue(或使用直接回复——参见RabbitMQ 直接回复)时,不需要监听器容器。在使用 RabbitMQ 3.4.0 或更高版本时,直接 reply-to 是首选机制。 |
如果您将 RabbitTemplate 定义为 <bean/>,或使用 @Configuration 类将其定义为 @Bean,或以编程方式创建模板,您需要自己定义和连接回复监听器容器。如果您未能这样做,模板将永远不会收到回复,最终会超时并返回 null 作为对 sendAndReceive 方法调用的回复。
从 1.5 版本开始,RabbitTemplate 会检测是否已将其配置为 MessageListener 以接收回复。如果未配置,尝试使用回复地址发送和接收消息将因 IllegalStateException 而失败(因为永远不会收到回复)。
此外,如果使用简单的 replyAddress(队列名称),回复监听器容器会验证它是否正在监听具有相同名称的队列。如果回复地址是交换和路由键,则无法执行此检查,并且会写入调试日志消息。
当您自己连接回复监听器和模板时,重要的是确保模板的 replyAddress 和容器的 queues(或 queueNames)属性引用相同的队列。模板将回复地址插入到出站消息的 replyTo 属性中。 |
以下列表显示了如何手动连接 bean 的示例
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<property name="exchange" value="foo.exchange" />
<property name="routingKey" value="foo" />
<property name="replyQueue" ref="replyQ" />
<property name="replyTimeout" value="600000" />
<property name="useDirectReplyToContainer" value="false" />
</bean>
<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory" />
<property name="queues" ref="replyQ" />
<property name="messageListener" ref="amqpTemplate" />
</bean>
<rabbit:queue id="replyQ" name="my.reply.queue" />
@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyAddress(replyQueue().getName());
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}
@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
}
一个完整的 RabbitTemplate 与固定回复队列连接,以及处理请求并返回回复的“远程”监听器容器的示例显示在此测试用例中。
当回复超时(replyTimeout)时,sendAndReceive() 方法返回 null。 |
在 1.3.6 版本之前,超时消息的延迟回复只会被记录。现在,如果收到延迟回复,它将被拒绝(模板抛出 AmqpRejectAndDontRequeueException)。如果回复队列配置为将拒绝的消息发送到死信交换,则可以检索回复以供以后分析。为此,将一个队列绑定到配置的死信交换,其路由键等于回复队列的名称。
有关配置死信的更多信息,请参阅 RabbitMQ 死信文档。您还可以查看 FixedReplyQueueDeadLetterTests 测试用例以获取示例。
异步 Rabbit 模板
1.6 版本引入了 AsyncRabbitTemplate。它具有与 AmqpTemplate 上类似的 sendAndReceive(和 convertSendAndReceive)方法。但是,它们不会阻塞,而是返回一个 CompletableFuture。
sendAndReceive 方法返回一个 RabbitMessageFuture。convertSendAndReceive 方法返回一个 RabbitConverterFuture。
您可以通过在 future 上调用 get() 同步地检索结果,或者您可以注册一个回调,该回调会异步地与结果一起调用。以下列表显示了两种方法
@Autowired
private AsyncRabbitTemplate template;
...
public void doSomeWorkAndGetResultLater() {
...
CompletableFuture<String> future = this.template.convertSendAndReceive("foo");
// do some more work
String reply = null;
try {
reply = future.get(10, TimeUnit.SECONDS);
}
catch (ExecutionException e) {
...
}
...
}
public void doSomeWorkAndGetResultAsync() {
...
RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
future.whenComplete((result, ex) -> {
if (ex == null) {
// success
}
else {
// failure
}
});
...
}
如果设置了 mandatory 并且消息无法传递,future 会抛出 ExecutionException,其原因是 AmqpMessageReturnedException,它封装了返回的消息和有关返回的信息。
如果设置了 enableConfirms,future 具有一个名为 confirm 的属性,它本身是一个 CompletableFuture<Boolean>,其中 true 表示成功发布。如果确认 future 为 false,RabbitFuture 还有一个名为 nackCause 的属性,其中包含失败的原因(如果可用)。
| 如果发布者确认在回复之后收到,则会被丢弃,因为回复意味着成功发布。 |
您可以在模板上设置 receiveTimeout 属性以使回复超时(默认为 30000 - 30 秒)。如果发生超时,future 将使用 AmqpReplyTimeoutException 完成。
该模板实现了 SmartLifecycle。在有待处理回复时停止模板会导致待处理的 Future 实例被取消。
从 2.0 版本开始,异步模板现在支持直接回复,而不是配置的回复队列。要启用此功能,请使用以下构造函数之一
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)
public AsyncRabbitTemplate(RabbitTemplate template)
请参阅RabbitMQ 直接回复,以将直接回复与同步 RabbitTemplate 一起使用。
2.0 版本引入了这些方法的变体(convertSendAndReceiveAsType),它们接受一个额外的 ParameterizedTypeReference 参数来转换复杂的返回类型。您必须使用 SmartMessageConverter 配置底层 RabbitTemplate。有关更多信息,请参阅使用 RabbitTemplate 从 Message 转换。