发送消息

发送消息时,您可以使用以下任何方法

void send(Message message) throws AmqpException;

void send(String routingKey, Message message) throws AmqpException;

void send(String exchange, String routingKey, Message message) throws AmqpException;

我们可以从前面列表中最后一种方法开始讨论,因为它实际上是最明确的。它允许在运行时提供 AMQP 交换名称(以及路由键)。最后一个参数是负责实际创建消息实例的回调。使用此方法发送消息的示例如下:以下示例展示了如何使用 send 方法发送消息

amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
    new Message("12.34".getBytes(), someProperties));

如果您打算一直或大部分时间使用该模板实例发送到同一个交换机,则可以在模板本身上设置 exchange 属性。在这种情况下,您可以使用前面列表中的第二种方法。以下示例与前面的示例功能等效

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));

如果模板上同时设置了 exchangeroutingKey 属性,则可以使用只接受 Message 的方法。以下示例展示了如何实现

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));

思考交换机和路由键属性的一个更好的方法是,显式方法参数总是覆盖模板的默认值。实际上,即使您没有在模板上显式设置这些属性,也总会有默认值。在这两种情况下,默认值都是空 String,但这实际上是一个合理的默认值。就路由键而言,它最初并非总是必需的(例如,对于 Fanout 交换机)。此外,队列可能与空 String 绑定到交换机。这些都是依赖于模板路由键属性的默认空 String 值的合理场景。就交换机名称而言,空 String 经常使用,因为 AMQP 规范将“默认交换机”定义为没有名称。由于所有队列都自动绑定到该默认交换机(这是一个直接交换机),并将其名称用作绑定值,因此前面列表中的第二种方法可用于通过默认交换机向任何队列进行简单的点对点消息传递。您可以将队列名称作为 routingKey 提供,方法是在运行时提供方法参数。以下示例展示了如何实现

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));

另外,您可以创建一个模板,主要或专门用于发布到单个队列。以下示例展示了如何实现

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));

消息构建器 API

从版本 1.3 开始,MessageBuilderMessagePropertiesBuilder 提供了消息构建器 API。这些方法提供了一种方便的“流畅”方式来创建消息或消息属性。以下示例展示了流畅 API 的实际应用

Message message = MessageBuilder.withBody("foo".getBytes())
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
Message message = MessageBuilder.withBody("foo".getBytes())
    .andProperties(props)
    .build();

MessageProperties 上定义的每个属性都可以设置。其他方法包括 setHeader(String key, String value)removeHeader(String key)removeHeaders()copyProperties(MessageProperties properties)。每个属性设置方法都有一个 set*IfAbsent() 变体。在存在默认初始值的情况下,方法名为 set*IfAbsentOrDefault()

提供了五个静态方法来创建初始消息构建器

public static MessageBuilder withBody(byte[] body) (1)

public static MessageBuilder withClonedBody(byte[] body) (2)

public static MessageBuilder withBody(byte[] body, int from, int to) (3)

public static MessageBuilder fromMessage(Message message) (4)

public static MessageBuilder fromClonedMessage(Message message) (5)
1 由构建器创建的消息的 body 是参数的直接引用。
2 由构建器创建的消息的 body 是一个新数组,其中包含参数中字节的副本。
3 由构建器创建的消息的 body 是一个新数组,其中包含参数中的字节范围。有关更多详细信息,请参见 Arrays.copyOfRange()
4 由构建器创建的消息的 body 是对参数 body 的直接引用。参数的属性被复制到一个新的 MessageProperties 对象中。
5 由构建器创建的消息的 body 是一个新数组,其中包含参数 body 的副本。参数的属性被复制到一个新的 MessageProperties 对象中。

提供了三个静态方法来创建 MessagePropertiesBuilder 实例

public static MessagePropertiesBuilder newInstance() (1)

public static MessagePropertiesBuilder fromProperties(MessageProperties properties) (2)

public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) (3)
1 一个新的消息属性对象使用默认值进行初始化。
2 构建器使用提供的属性对象进行初始化,并且 build() 将返回该对象。
3 参数的属性被复制到一个新的 MessageProperties 对象中。

使用 AmqpTemplateRabbitTemplate 实现,每个 send() 方法都有一个重载版本,它接受一个额外的 CorrelationData 对象。当发布者确认功能启用时,此对象会在 AmqpTemplate 中描述的回调中返回。这允许发送方将确认(acknack)与发送的消息关联起来。

从版本 1.6.7 开始,引入了 CorrelationAwareMessagePostProcessor 接口,允许在消息转换后修改关联数据。以下示例展示了如何使用它

Message postProcessMessage(Message message, Correlation correlation);

在版本 2.0 中,此接口已弃用。该方法已移至 MessagePostProcessor,并带有一个委托给 postProcessMessage(Message message) 的默认实现。

同样从版本 1.6.7 开始,提供了一个名为 CorrelationDataPostProcessor 的新回调接口。它在所有 MessagePostProcessor 实例(在 send() 方法中提供以及在 setBeforePublishPostProcessors() 中提供)之后调用。实现可以更新或替换 send() 方法中提供的关联数据(如果有)。Message 和原始 CorrelationData(如果有)作为参数提供。以下示例展示了如何使用 postProcess 方法

CorrelationData postProcess(Message message, CorrelationData correlationData);

发布者返回

当模板的 mandatory 属性为 true 时,返回的消息由 AmqpTemplate 中描述的回调提供。

从版本 1.4 开始,RabbitTemplate 支持 SpEL mandatoryExpression 属性,该属性针对每个请求消息作为根评估对象进行评估,解析为 boolean 值。表达式中可以使用 bean 引用,例如 @myBean.isMandatory(#root)

发布者返回也可以在 RabbitTemplate 内部用于发送和接收操作。有关更多信息,请参见 回复超时

批处理

版本 1.4.2 引入了 BatchingRabbitTemplate。它是 RabbitTemplate 的子类,具有重写的 send 方法,该方法根据 BatchingStrategy 批量处理消息。只有当批次完成时,消息才会发送到 RabbitMQ。以下列表显示了 BatchingStrategy 接口定义

public interface BatchingStrategy {

    MessageBatch addToBatch(String exchange, String routingKey, Message message);

    Date nextRelease();

    Collection<MessageBatch> releaseBatches();

}
批量数据保存在内存中。系统故障时,未发送的消息可能会丢失。

提供了 SimpleBatchingStrategy。它支持将消息发送到单个交换机或路由键。它具有以下属性

  • batchSize:在发送之前批处理中的消息数量。

  • bufferLimit:批量消息的最大大小。如果超过此限制,则会优先于 batchSize,并导致发送部分批次。

  • timeout:当没有新的活动将消息添加到批次时,部分批次发送后的时间。

SimpleBatchingStrategy 通过在每个嵌入消息前面加上一个四字节的二进制长度来格式化批次。通过将 springBatchFormat 消息属性设置为 lengthHeader4,将其传达给接收系统。

默认情况下,侦听器容器会自动解批处理消息(通过使用 springBatchFormat 消息头)。拒绝批次中的任何消息都会导致整个批次被拒绝。

但是,有关更多信息,请参见 带有批处理的 @RabbitListener

© . This site is unofficial and not affiliated with VMware.