轮询消费者
AmqpTemplate
本身可以用于轮询式地接收 Message
。默认情况下,如果没有可用的消息,会立即返回 null
,不会阻塞。从版本 1.5 开始,你可以设置一个以毫秒为单位的 receiveTimeout
,接收方法会阻塞最多这么长时间,等待消息。小于零的值表示无限期阻塞(或者至少直到与 Broker 的连接丢失)。版本 1.6 引入了 receive
方法的变体,允许在每次调用时传入超时时间。
由于每个消息的接收操作都会创建一个新的 QueueingConsumer ,这种技术不太适用于高吞吐量的环境。对于这些用例,请考虑使用异步消费者或将 receiveTimeout 设置为零。 |
从版本 2.4.8 开始,在使用非零超时时,可以指定传递给用于将消费者与通道关联的 basicConsume
方法的参数。例如:template.addConsumerArg("x-priority", 10)
。
有四个简单的 receive
方法可用。与发送端的 Exchange
一样,有一个方法要求直接在模板本身上设置了默认队列属性,还有一个方法接受在运行时传入队列参数。版本 1.6 引入了接受 timeoutMillis
参数的变体,以便在每次请求时覆盖 receiveTimeout
。以下列表显示了这四个方法的定义
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
与发送消息类似,AmqpTemplate
提供了一些方便的方法用于接收 POJO 而不是 Message
实例,并且实现提供了一种自定义用于创建返回的 Object
的 MessageConverter
的方式:以下列表显示了这些方法
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
从版本 2.0 开始,这些方法有一些变体,它们接受一个额外的 ParameterizedTypeReference
参数来转换复杂类型。模板必须配置一个 SmartMessageConverter
。有关更多信息,请参阅 使用 RabbitTemplate
从 Message
进行转换。
与 sendAndReceive
方法类似,从版本 1.3 开始,AmqpTemplate
提供了一些方便的 receiveAndReply
方法,用于同步接收、处理和回复消息。以下列表显示了这些方法的定义
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
AmqpTemplate
实现负责 receive
和 reply
阶段。在大多数情况下,您只需提供 ReceiveAndReplyCallback
的实现来处理接收到的消息并根据需要构建回复对象或消息。请注意,ReceiveAndReplyCallback
可以返回 null
。在这种情况下,不会发送回复,并且 receiveAndReply
的工作方式与 receive
方法相同。这允许同一个队列用于混合的消息,其中一些可能不需要回复。
只有当提供的回调不是 ReceiveAndReplyMessageCallback
的实例时,才会应用自动消息(请求和回复)转换,后者提供了原始消息交换契约。
ReplyToAddressCallback
对于需要自定义逻辑来根据接收到的消息和 ReceiveAndReplyCallback
的回复在运行时确定 replyTo
地址的情况非常有用。默认情况下,请求消息中的 replyTo
信息用于路由回复。
以下列表显示了一个基于 POJO 的接收和回复示例
boolean received =
this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() {
public Invoice handle(Order order) {
return processOrder(order);
}
});
if (received) {
log.info("We received an order!");
}