AMQP 抽象

Spring AMQP 由两个模块组成(每个模块在分发中都由一个 JAR 表示):spring-amqpspring-rabbit。'spring-amqp' 模块包含 org.springframework.amqp.core 包。在该包中,您可以找到代表核心 AMQP “模型”的类。我们的目的是提供不依赖于任何特定 AMQP broker 实现或客户端库的通用抽象。最终用户代码可以跨供应商实现更具可移植性,因为它只需针对抽象层进行开发。然后,这些抽象由特定于 broker 的模块(如 'spring-rabbit')实现。目前仅有 RabbitMQ 实现。然而,除了 RabbitMQ,这些抽象已在 .NET 中使用 Apache Qpid 进行了验证。由于 AMQP 在协议级别运行,原则上,您可以使用 RabbitMQ 客户端连接任何支持相同协议版本的 broker,但我们目前不对任何其他 broker 进行测试。

本概述假设您已经熟悉 AMQP 规范的基础知识。如果不熟悉,请查看 其他资源 中列出的资源。

Message

0-9-1 AMQP 规范没有定义 Message 类或接口。相反,在执行 basicPublish() 等操作时,内容作为字节数组参数传递,附加属性作为单独的参数传递。Spring AMQP 定义了一个 Message 类,作为更通用的 AMQP 领域模型表示的一部分。Message 类的目的是将消息体和属性封装在单个实例中,从而使 API 更简洁。以下示例展示了 Message 类的定义

public class Message {

    private final MessageProperties messageProperties;

    private final byte[] body;

    public Message(byte[] body, MessageProperties messageProperties) {
        this.body = body;
        this.messageProperties = messageProperties;
    }

    public byte[] getBody() {
        return this.body;
    }

    public MessageProperties getMessageProperties() {
        return this.messageProperties;
    }
}

MessageProperties 接口定义了一些常见属性,例如 'messageId'、'timestamp'、'contentType' 等。您还可以通过调用 setHeader(String key, Object value) 方法使用用户定义的 'headers' 扩展这些属性。

从版本 1.5.71.6.111.7.42.0.0 开始,如果消息体是序列化的 Serializable Java 对象,则在执行 toString() 操作(例如日志消息)时不再(默认情况下)进行反序列化。这是为了防止不安全的反序列化。默认情况下,仅反序列化 java.utiljava.lang 类。要恢复到先前的行为,您可以通过调用 Message.addAllowedListPatterns(…​) 添加允许的类/包模式。支持简单的 * 通配符,例如 com.something.*, *.MyClass。无法反序列化的消息体在日志消息中以 byte[<size>] 表示。

交换器 (Exchange)

Exchange 接口代表一个 AMQP Exchange,Message Producer 将消息发送到它。Broker 的虚拟主机中的每个 Exchange 都具有唯一的名称以及其他一些属性。以下示例展示了 Exchange 接口

public interface Exchange {

    String getName();

    String getExchangeType();

    boolean isDurable();

    boolean isAutoDelete();

    Map<String, Object> getArguments();

}

正如您所见,Exchange 还有一个由 ExchangeTypes 中定义的常量表示的“类型”。基本类型是:directtopicfanoutheaders。在 core 包中,您可以找到这些类型中每一种的 Exchange 接口实现。这些 Exchange 类型的行为在如何处理与队列的绑定方面有所不同。例如,Direct 交换器允许队列通过固定的路由键(通常是队列的名称)进行绑定。Topic 交换器支持使用路由模式进行绑定,这些模式可能包含 '*' 和 '#' 通配符,分别表示“恰好一个”和“零个或多个”。Fanout 交换器会将消息发布到所有绑定到它的队列,而不考虑任何路由键。有关这些交换器类型和其他交换器类型的更多信息,请参阅 AMQP Exchanges

从 3.2 版本开始,引入了 ConsistentHashExchange 类型,方便应用程序配置阶段的使用。它为交换器类型提供了 x-consistent-hash 等选项。允许配置 hash-headerhash-property 交换器定义参数。相应的 RabbitMQ rabbitmq_consistent_hash_exchange 插件必须在 broker 上启用。有关 Consistent Hash Exchange 的目的、逻辑和行为的更多信息,请参阅 RabbitMQ 官方文档

AMQP 规范还要求任何 broker 提供一个“默认”的 direct exchange,它没有名称。所有声明的队列都通过它们的名称作为路由键绑定到这个默认 Exchange。您可以在 AmqpTemplate 中了解更多关于 Spring AMQP 中默认 Exchange 的用法。

队列 (Queue)

Queue 类代表消息消费者接收消息的组件。与各种 Exchange 类一样,我们的实现旨在抽象表示这个核心 AMQP 类型。以下列表展示了 Queue

public class Queue  {

    private final String name;

    private volatile boolean durable;

    private volatile boolean exclusive;

    private volatile boolean autoDelete;

    private volatile Map<String, Object> arguments;

    /**
     * The queue is durable, non-exclusive and non auto-delete.
     *
     * @param name the name of the queue.
     */
    public Queue(String name) {
        this(name, true, false, false);
    }

    // Getters and Setters omitted for brevity

}

请注意,构造函数接受队列名称。根据实现,admin 模板可能提供用于生成唯一命名队列的方法。这些队列可以用作“reply-to”地址或用于其他临时情况。因此,自动生成的队列的 'exclusive' 和 'autoDelete' 属性都将被设置为 'true'。

有关使用命名空间支持声明队列(包括队列参数)的信息,请参阅 配置 Broker 中关于队列的部分。

绑定 (Binding)

鉴于生产者发送到交换器,消费者从队列接收消息,连接队列与交换器的绑定对于通过消息传递连接生产者和消费者至关重要。在 Spring AMQP 中,我们定义了一个 Binding 类来表示这些连接。本节回顾了将队列绑定到交换器的基本选项。

您可以使用固定的路由键将队列绑定到 DirectExchange,如下例所示

new Binding(someQueue, someDirectExchange, "foo.bar");

您可以使用路由模式将队列绑定到 TopicExchange,如下例所示

new Binding(someQueue, someTopicExchange, "foo.*");

您可以不使用路由键将队列绑定到 FanoutExchange,如下例所示

new Binding(someQueue, someFanoutExchange);

我们还提供了一个 BindingBuilder 来促进“流畅 API”风格,如下例所示

Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
为清晰起见,前面的示例展示了 BindingBuilder 类,但当使用 'bind()' 方法的静态导入时,这种风格也效果很好。

Binding 类实例本身仅包含连接的数据。换句话说,它不是一个“活动”组件。然而,正如您稍后在 配置 Broker 中将看到的,AmqpAdmin 类可以使用 Binding 实例实际触发 broker 上的绑定操作。此外,正如您在该同一节中看到的,您可以使用 Spring 的 @Bean 注解在 @Configuration 类中定义 Binding 实例。还有一个方便的基类,它进一步简化了生成 AMQP 相关 bean 定义的方法,并识别队列、交换器和绑定,以便它们都在应用程序启动时声明在 AMQP broker 上。

AmqpTemplate 也定义在核心包中。作为实际 AMQP 消息传递的主要组件之一,它在自己的章节中有详细讨论(参见 AmqpTemplate)。