回复管理

MessageListenerAdapter 中现有的支持已允许您的方法具有非 void 返回类型。在这种情况下,调用的结果将封装在一个消息中,发送到原始消息的 ReplyToAddress 标头中指定的地址,或发送到侦听器上配置的默认地址。您可以使用消息抽象的 @SendTo 注解来设置该默认地址。

假设我们的 processOrder 方法现在应该返回一个 OrderStatus,我们可以将其编写如下以自动发送回复

@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
    // order processing
    return status;
}

如果您需要以传输无关的方式设置额外的标头,您可以返回一个 Message,如下所示:

@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
    // order processing
    return MessageBuilder
        .withPayload(status)
        .setHeader("code", 1234)
        .build();
}

或者,您可以在 beforeSendReplyMessagePostProcessors 容器工厂属性中使用 MessagePostProcessor 来添加更多标头。从版本 2.2.3 开始,被调用的 bean/方法会在回复消息中提供,这可以在消息后处理器中使用,将信息传回给调用方。

factory.setBeforeSendReplyPostProcessors(msg -> {
    msg.getMessageProperties().setHeader("calledBean",
            msg.getMessageProperties().getTargetBean().getClass().getSimpleName());
    msg.getMessageProperties().setHeader("calledMethod",
            msg.getMessageProperties().getTargetMethod().getName());
    return m;
});

从版本 2.2.5 开始,您可以配置一个 ReplyPostProcessor 来修改回复消息,然后发送;它在设置 correlationId 标头以匹配请求之后调用。

@RabbitListener(queues = "test.header", group = "testGroup", replyPostProcessor = "echoCustomHeader")
public String capitalizeWithHeader(String in) {
    return in.toUpperCase();
}

@Bean
public ReplyPostProcessor echoCustomHeader() {
    return (req, resp) -> {
        resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
        return resp;
    };
}

从版本 3.0 开始,您可以在容器工厂而不是注解上配置后处理器。

factory.setReplyPostProcessorProvider(id -> (req, resp) -> {
    resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
    return resp;
});

id 参数是侦听器 id。

注解上的设置将取代工厂设置。

@SendTo 值被假定为一个回复 exchangeroutingKey 对,遵循 exchange/routingKey 模式,其中一部分可以省略。有效值如下:

  • thing1/thing2replyTo 交换和 routingKeything1/replyTo 交换和默认(空)routingKeything2/thing2replyTo routingKey 和默认(空)交换。/ 或空:replyTo 默认交换和默认 routingKey

此外,您可以不带 value 属性使用 @SendTo。这种情况等同于空 sendTo 模式。@SendTo 仅在入站消息没有 replyToAddress 属性时使用。

从版本 1.5 开始,@SendTo 值可以是 bean 初始化 SpEL 表达式,如以下示例所示

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
    return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
    return "test.sendTo.reply.spel";
}

该表达式必须解析为 String,它可以是简单的队列名称(发送到默认交换),也可以是 exchange/routingKey 形式,如前一个示例之前所讨论的。

#{…​} 表达式在初始化期间评估一次。

对于动态回复路由,消息发送方应包含 reply_to 消息属性,或使用备用运行时 SpEL 表达式(在下一个示例之后描述)。

从版本 1.6 开始,@SendTo 可以是一个 SpEL 表达式,它在运行时针对请求和回复进行评估,如以下示例所示

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.' + result.queueName}")
public Bar capitalizeWithSendToSpel(Foo foo) {
    return processTheFooAndReturnABar(foo);
}

SpEL 表达式的运行时性质由 !{…​} 分隔符表示。表达式的评估上下文 #root 对象具有三个属性

  • requesto.s.amqp.core.Message 请求对象。

  • source:转换后的 o.s.messaging.Message<?>

  • result:方法结果。

该上下文具有一个映射属性访问器、一个标准类型转换器和一个 bean 解析器,允许引用上下文中的其他 bean(例如,@someBeanName.determineReplyQ(request, result))。

总而言之,#{…​} 在初始化期间评估一次,其中 #root 对象是应用程序上下文。bean 通过其名称引用。!{…​} 在运行时为每条消息评估,其中根对象具有前面列出的属性。bean 通过其名称(前缀为 @)引用。

从版本 2.1 开始,还支持简单的属性占位符(例如,${some.reply.to})。对于早期版本,可以使用以下解决方法,如以下示例所示

@RabbitListener(queues = "foo")
@SendTo("#{environment['my.send.to']}")
public String listen(Message in) {
    ...
    return ...
}
© . This site is unofficial and not affiliated with VMware.