AMQP 抽象
Spring AMQP 由两个模块组成(每个模块在发行版中都由一个 JAR 表示):spring-amqp 和 spring-rabbit。'spring-amqp' 模块包含 org.springframework.amqp.core 包。在该包中,您可以找到代表核心 AMQP “模型” 的类。我们的目的是提供不依赖于任何特定 AMQP 代理实现或客户端库的通用抽象。由于最终用户代码可以仅针对抽象层进行开发,因此在不同供应商实现之间可以更具可移植性。然后,这些抽象由特定于代理的模块实现,例如 'spring-rabbit'。目前只有 RabbitMQ 的实现。但是,除了 RabbitMQ 之外,这些抽象也已在 .NET 中使用 Apache Qpid 进行了验证。由于 AMQP 在协议级别运行,原则上,您可以将 RabbitMQ 客户端与任何支持相同协议版本的代理一起使用,但我们目前不测试任何其他代理。
本概述假设您已熟悉 AMQP 规范的基础知识。如果不是,请查看其他资源中列出的资源
Message
0-9-1 AMQP 规范没有定义 Message 类或接口。相反,当执行诸如 basicPublish() 等操作时,内容作为字节数组参数传递,附加属性作为单独的参数传递。Spring AMQP 定义了一个 Message 类,作为更通用的 AMQP 领域模型表示的一部分。Message 类的目的是将正文和属性封装在一个实例中,从而使 API 更加简单。以下示例显示了 Message 类的定义
public class Message {
private final MessageProperties messageProperties;
private final byte[] body;
public Message(byte[] body, MessageProperties messageProperties) {
this.body = body;
this.messageProperties = messageProperties;
}
public byte[] getBody() {
return this.body;
}
public MessageProperties getMessageProperties() {
return this.messageProperties;
}
}
MessageProperties 接口定义了几个常用属性,例如“messageId”、“timestamp”、“contentType”以及更多属性。您还可以通过调用 setHeader(String key, Object value) 方法,使用用户定义的“headers”来扩展这些属性。
从版本 1.5.7、1.6.11、1.7.4 和 2.0.0 开始,如果消息正文是序列化的 Serializable Java 对象,则在执行 toString() 操作(例如在日志消息中)时,默认情况下不再对其进行反序列化。这是为了防止不安全的 deserialization。默认情况下,只反序列化 java.util 和 java.lang 类。要恢复以前的行为,您可以通过调用 Message.addAllowedListPatterns(…) 添加允许的类/包模式。支持简单的 * 通配符,例如 com.something.*, *.MyClass。无法反序列化的正文在日志消息中表示为 byte[<size>]。 |
Exchange
Exchange 接口表示一个 AMQP 交换机,消息生产者将消息发送到该交换机。代理的虚拟主机中的每个交换机都有一个唯一的名称以及一些其他属性。以下示例显示了 Exchange 接口
public interface Exchange extends Declarable {
String getName();
String getType();
boolean isDurable();
boolean isAutoDelete();
Map<String, Object> getArguments();
}
如您所见,Exchange 还有一个由 ExchangeTypes 中定义的常量表示的“类型”。基本类型有:direct、topic、fanout 和 headers。在核心包中,您可以找到这些类型中每个 Exchange 接口的实现。这些 Exchange 类型的行为在处理队列绑定方面有所不同。例如,Direct 交换机允许队列通过固定的路由键(通常是队列的名称)进行绑定。Topic 交换机支持包含 '*' 和 '#' 通配符的路由模式绑定,分别表示“恰好一个”和“零个或多个”。Fanout 交换机将消息发布到所有绑定到它的队列,而不考虑任何路由键。有关这些以及其他交换机类型的更多信息,请参阅 AMQP 交换机。
从 3.2 版本开始,引入了 ConsistentHashExchange 类型,以方便应用程序配置阶段。它提供了诸如 x-consistent-hash 等选项作为交换机类型。允许配置 hash-header 或 hash-property 交换机定义参数。必须在代理上启用相应的 RabbitMQ rabbitmq_consistent_hash_exchange 插件。有关一致哈希交换机的目的、逻辑和行为的更多信息,请参见 RabbitMQ 官方文档。
AMQP 规范还要求任何代理都提供一个没有名称的“默认”直连交换机。所有声明的队列都以其名称作为路由键绑定到该默认 Exchange。您可以在AmqpTemplate中了解更多关于默认 Exchange 在 Spring AMQP 中的用法。 |
Queue
Queue 类表示消息消费者接收消息的组件。与各种 Exchange 类一样,我们的实现旨在成为此核心 AMQP 类型的抽象表示。以下列表显示了 Queue 类
public class Queue {
private final String name;
private final boolean durable;
private final boolean exclusive;
private final boolean autoDelete;
/**
* The queue is durable, non-exclusive and non auto-delete.
*
* @param name the name of the queue.
*/
public Queue(String name) {
this(name, true, false, false);
}
// Getters and Setters omitted for brevity
}
请注意,构造函数接受队列名称。根据实现的不同,管理模板可能会提供用于生成唯一命名队列的方法。此类队列可用作“reply-to”地址或在其他临时情况下使用。因此,自动生成的队列的“exclusive”和“autoDelete”属性都将设置为“true”。
| 有关使用命名空间支持(包括队列参数)声明队列的信息,请参阅配置代理中关于队列的部分。 |
Binding
鉴于生产者发送到交换机,消费者从队列接收,连接队列与交换机的绑定对于通过消息传递连接这些生产者和消费者至关重要。在 Spring AMQP 中,我们定义了一个 Binding 类来表示这些连接。本节回顾了将队列绑定到交换机的基本选项。
我们提供了一个 BindingBuilder 以方便“流畅 API”样式,如下例所示
Queue queue = ...;
// bind a queue to a DirectExchange with a fixed routing key
Binding directBinding = BindingBuilder.bind(queue).to(new DirectExchange("someDirectExchange")).with("foo.bar");
// bind a queue to a TopicExchange with a routing pattern
Binding topicBinding = BindingBuilder.bind(queue).to(new TopicExchange("someTopicExchange")).with("foo.*");
// bind a queue to a FanoutExchange with no routing key
Binding fanoutBinding = BindingBuilder.bind(queue).to(new FanoutExchange("someFanoutExchange"));
为清晰起见,上述示例显示了 BindingBuilder 类,但当对 'bind()' 方法使用静态导入时,此样式效果很好。 |
Binding 类的实例本身只保存有关连接的数据。换句话说,它不是一个“活动”组件。然而,正如您将在配置代理中稍后看到的那样,AmqpAdmin 类可以使用 Binding 实例来实际触发代理上的绑定操作。此外,正如您在该同一部分中看到的,您可以使用 Spring 的 @Bean 注解在 @Configuration 类中定义 Binding 实例。还有一个方便的基类,它进一步简化了生成 AMQP 相关 bean 定义的方法,并识别队列、交换机和绑定,以便在应用程序启动时在 AMQP 代理上声明它们。
AmqpTemplate 也定义在核心包中。作为涉及实际 AMQP 消息传递的主要组件之一,它在自己的部分中进行了详细讨论(参见AmqpTemplate)。