轮询消费者
AmqpTemplate 本身可用于轮询 Message 接收。默认情况下,如果没有消息可用,会立即返回 null。不会有阻塞。从 1.5 版开始,你可以设置一个以毫秒为单位的 receiveTimeout,接收方法会阻塞长达该时间,等待消息。小于零的值表示无限期阻塞(或至少直到与代理的连接丢失)。1.6 版引入了 receive 方法的变体,允许在每次调用时传入超时时间。
由于接收操作为每个消息创建一个新的 QueueingConsumer,因此该技术不适用于高吞吐量环境。对于这些用例,请考虑使用异步消费者或将 receiveTimeout 设置为零。 |
从 2.4.8 版开始,当使用非零超时时,你可以指定传递给 basicConsume 方法的参数,用于将消费者与通道关联。例如:template.addConsumerArg("x-priority", 10)。
有四种简单的 receive 方法可用。与发送侧的 Exchange 一样,有一个方法要求直接在模板本身设置了默认队列属性,还有一个方法接受运行时队列参数。1.6 版引入了接受 timeoutMillis 的变体,以在每次请求时覆盖 receiveTimeout。以下列表显示了这四种方法的定义。
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
与发送消息的情况一样,AmqpTemplate 具有一些便利方法,用于接收 POJO 而不是 Message 实例,并且实现提供了一种自定义用于创建返回的 Object 的 MessageConverter 的方法:以下列表显示了这些方法。
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
从 2.0 版开始,这些方法有额外的变体,接受一个 ParameterizedTypeReference 参数来转换复杂类型。模板必须配置 SmartMessageConverter。有关更多信息,请参阅 使用 RabbitTemplate 从 Message 转换。
与 sendAndReceive 方法类似,从 1.3 版开始,AmqpTemplate 具有几个便利的 receiveAndReply 方法,用于同步接收、处理和回复消息。以下列表显示了这些方法的定义。
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
AmqpTemplate 实现负责 receive 和 reply 阶段。在大多数情况下,你只需提供 ReceiveAndReplyCallback 的实现来执行接收消息的业务逻辑,并在需要时构建回复对象或消息。请注意,ReceiveAndReplyCallback 可以返回 null。在这种情况下,不发送回复,receiveAndReply 的工作方式类似于 receive 方法。这允许同一个队列用于混合消息,其中一些可能不需要回复。
只有当提供的回调不是 ReceiveAndReplyMessageCallback 的实例时,才会应用自动消息(请求和回复)转换,ReceiveAndReplyMessageCallback 提供了原始消息交换契约。
ReplyToAddressCallback 对于需要自定义逻辑以根据收到的消息和来自 ReceiveAndReplyCallback 的回复在运行时确定 replyTo 地址的情况很有用。默认情况下,请求消息中的 replyTo 信息用于路由回复。
以下列表显示了一个基于 POJO 的接收和回复示例。
boolean received =
this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() {
public Invoice handle(Order order) {
return processOrder(order);
}
});
if (received) {
log.info("We received an order!");
}