发送消息

发送消息时,您可以使用以下任何方法

void send(Message message) throws AmqpException;

void send(String routingKey, Message message) throws AmqpException;

void send(String exchange, String routingKey, Message message) throws AmqpException;

我们可以从前面列表中最后一个方法开始讨论,因为它实际上是最明确的。它允许在运行时提供 AMQP 交换机名称(以及路由键)。最后一个参数是负责实际创建消息实例的回调。使用此方法发送消息的示例如下所示:以下示例展示了如何使用 send 方法发送消息

amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
    new Message("12.34".getBytes(), someProperties));

如果您打算大部分或全部时间使用该模板实例向同一个交换机发送消息,则可以在模板本身上设置 exchange 属性。在这种情况下,您可以使用前面列表中的第二个方法。以下示例在功能上等同于上一个示例

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));

如果模板上同时设置了 exchangeroutingKey 属性,则可以使用只接受 Message 的方法。以下示例展示了如何执行此操作

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));

更好的理解交换机和路由键属性的方式是,明确的方法参数总是覆盖模板的默认值。事实上,即使您没有在模板上明确设置这些属性,默认值也总是存在。在两种情况下,默认值都是空字符串 String,但这实际上是一个合理的默认值。就路由键而言,它并非总是必需的(例如,对于 Fanout 交换机)。此外,队列可以绑定到一个空字符串 String 的交换机上。这些都是依赖于模板路由键属性的默认空字符串 String 值的合法场景。就交换机名称而言,空字符串 String 通常被使用,因为 AMQP 规范将“默认交换机”定义为没有名称。由于所有队列都自动绑定到该默认交换机(这是一个 direct 交换机),并使用它们的名称作为绑定值,因此可以使用前面列表中的第二个方法通过默认交换机向任何队列进行简单的点对点消息传递。您可以在运行时通过提供方法参数来将队列名称作为 routingKey。以下示例展示了如何执行此操作

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));

或者,您可以创建一个主要或专门用于向单个队列发布消息的模板。以下示例展示了如何执行此操作

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));

消息构建器 API

从 1.3 版本开始,MessageBuilderMessagePropertiesBuilder 提供了消息构建器 API。这些方法提供了创建消息或消息属性的方便的“流畅”方式。以下示例展示了流畅 API 的实际应用

Message message = MessageBuilder.withBody("foo".getBytes())
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
Message message = MessageBuilder.withBody("foo".getBytes())
    .andProperties(props)
    .build();

MessageProperties 上定义的每个属性都可以设置。其他方法包括 setHeader(String key, String value)removeHeader(String key)removeHeaders()copyProperties(MessageProperties properties)。每个属性设置方法都有一个 set*IfAbsent() 变体。在存在默认初始值的情况下,方法名为 set*IfAbsentOrDefault()

提供了五个静态方法来创建初始消息构建器

public static MessageBuilder withBody(byte[] body) (1)

public static MessageBuilder withClonedBody(byte[] body) (2)

public static MessageBuilder withBody(byte[] body, int from, int to) (3)

public static MessageBuilder fromMessage(Message message) (4)

public static MessageBuilder fromClonedMessage(Message message) (5)
1 构建器创建的消息体是对参数的直接引用。
2 构建器创建的消息体是一个新数组,包含参数字节的副本。
3 构建器创建的消息体是一个新数组,包含参数中指定范围的字节。更多详情请参阅 Arrays.copyOfRange()
4 构建器创建的消息体是对参数消息体的直接引用。参数的属性被复制到一个新的 MessageProperties 对象中。
5 构建器创建的消息体是一个新数组,包含参数消息体的副本。参数的属性被复制到一个新的 MessageProperties 对象中。

提供了三个静态方法来创建 MessagePropertiesBuilder 实例

public static MessagePropertiesBuilder newInstance() (1)

public static MessagePropertiesBuilder fromProperties(MessageProperties properties) (2)

public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) (3)
1 使用默认值初始化新的消息属性对象。
2 构建器使用提供的属性对象进行初始化,并且 build() 将返回该对象。
3 参数的属性被复制到一个新的 MessageProperties 对象中。

对于 AmqpTemplateRabbitTemplate 实现,每个 send() 方法都有一个重载版本,它接受一个额外的 CorrelationData 对象。启用发布者确认时,此对象在 AmqpTemplate 中描述的回调中返回。这使得发送方可以将确认(acknack)与发送的消息关联起来。

从 1.6.7 版本开始,引入了 CorrelationAwareMessagePostProcessor 接口,允许在消息转换后修改关联数据。以下示例展示了如何使用它

Message postProcessMessage(Message message, Correlation correlation);

在 2.0 版本中,此接口已弃用。该方法已移至 MessagePostProcessor,其默认实现委托给 postProcessMessage(Message message)

同样从 1.6.7 版本开始,提供了一个名为 CorrelationDataPostProcessor 的新回调接口。在所有 MessagePostProcessor 实例(在 send() 方法中提供的以及在 setBeforePublishPostProcessors() 中提供的)之后调用此接口。实现可以更新或替换在 send() 方法中提供的关联数据(如果有)。Message 和原始 CorrelationData(如果有)作为参数提供。以下示例展示了如何使用 postProcess 方法

CorrelationData postProcess(Message message, CorrelationData correlationData);

发布者返回

当模板的 mandatory 属性为 true 时,AmqpTemplate 中描述的回调将提供返回的消息。

从 1.4 版本开始,RabbitTemplate 支持 SpEL mandatoryExpression 属性,该属性针对每个请求消息作为根评估对象进行评估,解析为布尔值 boolean。可以在表达式中使用 Bean 引用,例如 @myBean.isMandatory(#root)

RabbitTemplate 在发送和接收操作中也可以内部使用发布者返回。更多信息请参阅 回复超时

批量处理

版本 1.4.2 引入了 BatchingRabbitTemplate。它是 RabbitTemplate 的一个子类,其 send 方法被重写,根据 BatchingStrategy 批量处理消息。只有当批次完成后,消息才会被发送到 RabbitMQ。以下清单显示了 BatchingStrategy 接口定义

public interface BatchingStrategy {

    MessageBatch addToBatch(String exchange, String routingKey, Message message);

    Date nextRelease();

    Collection<MessageBatch> releaseBatches();

}
批量数据保存在内存中。系统故障时,未发送的消息可能会丢失。

提供了一个 SimpleBatchingStrategy。它支持将消息发送到单个交换机或路由键。它具有以下属性

  • batchSize: 在发送之前,批次中的消息数量。

  • bufferLimit: 批量消息的最大大小。如果超过此限制,即使未达到 batchSize,也会导致发送部分批次。

  • timeout: 在没有新活动向批次添加消息时,经过此时间后会发送部分批次。

SimpleBatchingStrategy 通过在每个嵌入消息前加上一个四字节二进制长度来格式化批次。通过将 springBatchFormat 消息属性设置为 lengthHeader4,将此信息传达给接收系统。

批量消息默认由监听器容器自动解批量处理(通过使用 springBatchFormat 消息头)。拒绝批次中的任何消息都会导致整个批次被拒绝。

但是,有关更多信息,请参阅 @RabbitListener 批量处理