回复管理
MessageListenerAdapter 中已有的支持允许你的方法具有非 void 返回类型。在这种情况下,方法调用的结果会被封装在一个消息中,发送到原始消息的 ReplyToAddress 头部指定的地址,或者发送到监听器上配置的默认地址。你可以通过使用消息抽象的 @SendTo 注解来设置该默认地址。
假设我们的 processOrder 方法现在应该返回 OrderStatus,我们可以如下编写代码以自动发送回复
@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}
如果你需要以传输独立的方式设置额外的头部,你可以返回一个 Message 代替,示例如下
@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
// order processing
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}
另外,你可以在 beforeSendReplyMessagePostProcessors 容器工厂属性中使用 MessagePostProcessor 添加更多头部。从 2.2.3 版本开始,被调用的 bean/方法在回复消息中可用,这可以在消息后处理器中使用,以便将信息反馈给调用者
factory.setBeforeSendReplyPostProcessors(msg -> {
msg.getMessageProperties().setHeader("calledBean",
msg.getMessageProperties().getTargetBean().getClass().getSimpleName());
msg.getMessageProperties().setHeader("calledMethod",
msg.getMessageProperties().getTargetMethod().getName());
return m;
});
从 2.2.5 版本开始,你可以配置 ReplyPostProcessor 来修改回复消息发送前的内容;它在 correlationId 头部设置为匹配请求之后被调用。
@RabbitListener(queues = "test.header", group = "testGroup", replyPostProcessor = "echoCustomHeader")
public String capitalizeWithHeader(String in) {
return in.toUpperCase();
}
@Bean
public ReplyPostProcessor echoCustomHeader() {
return (req, resp) -> {
resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
return resp;
};
}
从 3.0 版本开始,你可以在容器工厂上配置后处理器,而不是在注解上。
factory.setReplyPostProcessorProvider(id -> (req, resp) -> {
resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
return resp;
});
id 参数是监听器的 id。
注解上的设置将覆盖工厂设置。
@SendTo 的值被假定为一个回复交换机和路由键对,遵循 exchange/routingKey 模式,其中一个部分可以省略。有效值如下
-
thing1/thing2
: 回复交换机和路由键。thing1/
: 回复交换机和默认(空)路由键。thing2
或/thing2
: 回复路由键和默认(空)交换机。/
或空: 回复默认交换机和默认路由键。
另外,你可以使用不带 value 属性的 @SendTo。这种情况等同于一个空的 sendTo 模式。只有在入站消息没有 replyToAddress 属性时,才使用 @SendTo。
从 1.5 版本开始,@SendTo 的值可以是一个 bean 初始化 SpEL 表达式,如下例所示
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
return "test.sendTo.reply.spel";
}
表达式必须解析为一个 String,它可以是一个简单的队列名(发送到默认交换机),或者如前面示例之前讨论的 exchange/routingKey 形式。
#{…} 表达式在初始化期间只评估一次。 |
对于动态回复路由,消息发送者应包含一个 reply_to 消息属性或使用备用的运行时 SpEL 表达式(在下一个示例后描述)。
从 1.6 版本开始,@SendTo 可以是一个 SpEL 表达式,它在运行时针对请求和回复进行评估,如下例所示
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.' + result.queueName}")
public Bar capitalizeWithSendToSpel(Foo foo) {
return processTheFooAndReturnABar(foo);
}
SpEL 表达式的运行时特性通过 !{…}
分隔符表示。表达式的评估上下文 #root
对象有三个属性
-
request
: o.s.amqp.core.Message 请求对象。 -
source
: 转换后的 o.s.messaging.Message>。 -
result
: 方法结果。
上下文包含一个 map 属性访问器、一个标准类型转换器和一个 bean 解析器,允许引用上下文中的其他 bean(例如,@someBeanName.determineReplyQ(request, result)
)。
总之,#{…}
在初始化期间评估一次,#root
对象是应用上下文。Bean 按其名称引用。!{…}
在运行时为每条消息评估,根对象具有前面列出的属性。Bean 按其名称引用,前缀为 @
。
从 2.1 版本开始,也支持简单的属性占位符(例如,${some.reply.to}
)。在早期版本中,可以使用以下方法作为变通方案,如下例所示
@RabbitListener(queues = "foo")
@SendTo("#{environment['my.send.to']}")
public String listen(Message in) {
...
return ...
}