配置 Broker

AMQP 规范描述了如何在 broker 上配置队列、交换机和绑定。这些操作(可从 0.8 规范及更高版本移植)存在于 org.springframework.amqp.core 包中的 AmqpAdmin 接口中。该类的 RabbitMQ 实现是位于 org.springframework.amqp.rabbit.core 包中的 RabbitAdmin

AmqpAdmin 接口基于使用 Spring AMQP 领域抽象,如下所示:

public interface AmqpAdmin {

    // Exchange Operations

    void declareExchange(Exchange exchange);

    void deleteExchange(String exchangeName);

    // Queue Operations

    Queue declareQueue();

    String declareQueue(Queue queue);

    void deleteQueue(String queueName);

    void deleteQueue(String queueName, boolean unused, boolean empty);

    void purgeQueue(String queueName, boolean noWait);

    // Binding Operations

    void declareBinding(Binding binding);

    void removeBinding(Binding binding);

    Properties getQueueProperties(String queueName);

}

另请参阅 Scoped Operations

getQueueProperties() 方法返回有关队列的一些有限信息(消息计数和消费者计数)。返回属性的键在 RabbitAdmin 中作为常量提供(QUEUE_NAMEQUEUE_MESSAGE_COUNTQUEUE_CONSUMER_COUNT)。RabbitMQ REST APIQueueInfo 对象中提供了更多信息。

无参数的 declareQueue() 方法在 broker 上定义一个自动生成名称的队列。此自动生成队列的附加属性是 exclusive=trueautoDelete=truedurable=false

declareQueue(Queue queue) 方法接受一个 Queue 对象并返回已声明队列的名称。如果提供的 Queuename 属性为空 String,则 broker 会使用生成的名称声明队列。该名称将返回给调用者。该名称也会添加到 QueueactualName 属性中。只有直接调用 RabbitAdmin 才能以编程方式使用此功能。当在 application context 中以声明方式定义队列时,使用 admin 进行自动声明时,可以将 name 属性设置为 ""(空字符串)。然后 broker 会创建名称。从版本 2.1 开始,监听器容器可以使用此类型的队列。有关更多信息,请参阅 Containers and Broker-Named queues

这与 AnonymousQueue 不同,AnonymousQueue 中框架生成一个唯一的(UUID)名称,并将 durable 设置为 falseexclusiveautoDelete 设置为 true。带有空(或缺失)name 属性的 <rabbit:queue/> 总是创建一个 AnonymousQueue

请参阅 AnonymousQueue,了解为何 AnonymousQueue 优先于 broker 生成的队列名称,以及如何控制名称的格式。从版本 2.1 开始,匿名队列默认声明时将参数 Queue.X_QUEUE_LEADER_LOCATOR 设置为 client-local。这确保队列声明在应用程序连接到的节点上。声明式队列必须具有固定名称,因为它们可能在上下文中的其他地方被引用,例如以下示例中所示的监听器:

<rabbit:listener-container>
    <rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>

此接口的 RabbitMQ 实现是 RabbitAdmin,当使用 Spring XML 配置时,示例如下:

<rabbit:connection-factory id="connectionFactory"/>

<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>

CachingConnectionFactory 的缓存模式是 CHANNEL(默认)时,RabbitAdmin 实现会自动延迟声明在同一个 ApplicationContext 中声明的队列、交换机和绑定。一旦打开与 broker 的连接,这些组件就会被声明。有一些命名空间特性使得这非常方便——例如,在 Stocks 示例应用程序中,我们有以下内容:

<rabbit:queue id="tradeQueue"/>

<rabbit:queue id="marketDataQueue"/>

<fanout-exchange name="broadcast.responses"
                 xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
        <binding queue="tradeQueue"/>
    </bindings>
</fanout-exchange>

<topic-exchange name="app.stock.marketdata"
                xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
        <binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/>
    </bindings>
</topic-exchange>

在前面的示例中,我们使用匿名队列(实际上,在内部,只是由框架而非 broker 生成名称的队列),并按 ID 引用它们。我们还可以声明具有显式名称的队列,这些名称也用作它们在上下文中的 bean 定义的标识符。以下示例配置了一个具有显式名称的队列:

<rabbit:queue name="stocks.trade.queue"/>
您可以同时提供 idname 属性。这使您可以通过独立于队列名称的 ID 来引用队列(例如,在绑定中)。它还允许使用标准的 Spring 功能(例如,用于队列名称的属性占位符和 SpEL 表达式)。当您使用 name 作为 bean 标识符时,这些功能不可用。

队列可以使用附加参数进行配置——例如 x-message-ttl。当您使用命名空间支持时,它们以参数名/参数值对的 Map 形式提供,这通过 <rabbit:queue-arguments> 元素定义。以下示例展示了如何进行配置:

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="myDLX"/>
        <entry key="x-dead-letter-routing-key" value="dlqRK"/>
    </rabbit:queue-arguments>
</rabbit:queue>

默认情况下,参数被假定为字符串类型。对于其他类型的参数,您必须提供类型。以下示例展示了如何指定类型:

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments value-type="java.lang.Long">
        <entry key="x-message-ttl" value="100"/>
    </rabbit:queue-arguments>
</rabbit:queue>

提供混合类型参数时,必须为每个 entry 元素提供类型。以下示例展示了如何进行配置:

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl">
            <value type="java.lang.Long">100</value>
        </entry>
        <entry key="x-dead-letter-exchange" value="myDLX"/>
        <entry key="x-dead-letter-routing-key" value="dlqRK"/>
    </rabbit:queue-arguments>
</rabbit:queue>

使用 Spring Framework 3.2 及更高版本,可以更简洁地声明,如下所示:

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
        <entry key="x-ha-policy" value="all"/>
    </rabbit:queue-arguments>
</rabbit:queue>

当您使用 Java 配置时,Queue.X_QUEUE_LEADER_LOCATOR 参数通过 Queue 类上的 setLeaderLocator() 方法作为一流属性得到支持。从版本 2.1 开始,匿名队列默认声明时将此属性设置为 client-local。这确保队列声明在应用程序连接到的节点上。

RabbitMQ broker 不允许声明具有不匹配参数的队列。例如,如果一个 queue 已经存在且没有 time to live 参数,而您尝试使用(例如)key="x-message-ttl" value="100" 来声明它,将会抛出异常。

默认情况下,当发生任何异常时,RabbitAdmin 会立即停止处理所有声明。这可能导致下游问题,例如监听器容器因另一个队列(在出错的队列之后定义)未声明而初始化失败。

可以通过在 RabbitAdmin 实例上将 ignore-declaration-exceptions 属性设置为 true 来修改此行为。此选项指示 RabbitAdmin 记录异常并继续声明其他元素。使用 Java 配置 RabbitAdmin 时,此属性称为 ignoreDeclarationExceptions。这是一个全局设置,适用于所有元素。队列、交换机和绑定具有仅适用于这些元素的类似属性。

在版本 1.6 之前,此属性仅在通道上发生 IOException 时才生效,例如当前属性与期望属性不匹配时。现在,此属性对任何异常都生效,包括 TimeoutException 及其他异常。

此外,任何声明异常都会导致发布 DeclarationExceptionEvent,它是一个 ApplicationEvent,可由上下文中的任何 ApplicationListener 消费。该事件包含对 admin、正在声明的元素和 Throwable 的引用。

Header 交换机

从版本 1.3 开始,您可以配置 HeadersExchange 以匹配多个 header。您还可以指定是否必须匹配任意或所有 header。以下示例展示了如何进行配置:

<rabbit:headers-exchange name="headers-test">
    <rabbit:bindings>
        <rabbit:binding queue="bucket">
            <rabbit:binding-arguments>
                <entry key="foo" value="bar"/>
                <entry key="baz" value="qux"/>
                <entry key="x-match" value="all"/>
            </rabbit:binding-arguments>
        </rabbit:binding>
    </rabbit:bindings>
</rabbit:headers-exchange>

从版本 1.6 开始,您可以配置带有 internal 标志(默认为 false)的 Exchanges,并且此类 Exchange 通过 RabbitAdmin(如果存在于 application context 中)在 Broker 上正确配置。如果交换机的 internal 标志为 true,RabbitMQ 不允许客户端使用该交换机。这对于死信交换机或交换机到交换机的绑定非常有用,在这种情况下,您不希望发布者直接使用该交换机。

要了解如何使用 Java 配置 AMQP 基础设施,请查看 Stock 示例应用程序,其中有一个 @ConfigurationAbstractStockRabbitConfiguration,它又包含 RabbitClientConfigurationRabbitServerConfiguration 子类。以下列表显示了 AbstractStockRabbitConfiguration 的代码:

@Configuration
public abstract class AbstractStockAppRabbitConfiguration {

    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(jsonMessageConverter());
        configureRabbitTemplate(template);
        return template;
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public TopicExchange marketDataExchange() {
        return new TopicExchange("app.stock.marketdata");
    }

    // additional code omitted for brevity

}

在 Stock 应用程序中,服务器使用以下 @Configuration 类进行配置:

@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration  {

    @Bean
    public Queue stockRequestQueue() {
        return new Queue("app.stock.request");
    }
}

这是 @Configuration 类整个继承链的末端。最终结果是 TopicExchangeQueue 在应用程序启动时被声明到 broker。server 配置中没有将 TopicExchange 绑定到队列,因为这在 client 应用程序中完成。然而,stock request queue 会自动绑定到 AMQP 默认交换机。此行为由规范定义。

client @Configuration 类更有趣一些。其声明如下:

@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {

    @Value("${stocks.quote.pattern}")
    private String marketDataRoutingKey;

    @Bean
    public Queue marketDataQueue() {
        return amqpAdmin().declareQueue();
    }

    /**
     * Binds to the market data exchange.
     * Interested in any stock quotes
     * that match its routing key.
     */
    @Bean
    public Binding marketDataBinding() {
        return BindingBuilder.bind(
                marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
    }

    // additional code omitted for brevity

}

client 通过 AmqpAdmin 上的 declareQueue() 方法声明另一个队列。它将该队列绑定到 market data exchange,并使用在 properties 文件中外部化的 routing pattern。

队列和交换机的 Builder API

版本 1.6 引入了一种方便的 fluent API,用于在使用 Java 配置时配置 QueueExchange 对象。以下示例展示了如何使用它:

@Bean
public Queue queue() {
    return QueueBuilder.nonDurable("foo")
        .autoDelete()
        .exclusive()
        .withArgument("foo", "bar")
        .build();
}

@Bean
public Exchange exchange() {
  return ExchangeBuilder.directExchange("foo")
      .autoDelete()
      .internal()
      .withArgument("foo", "bar")
      .build();
}

从版本 2.0 开始,ExchangeBuilder 现在默认创建 durable exchanges,这与各个 AbstractExchange 类上的简单构造函数保持一致。要使用 builder 创建 non-durable exchange,请在调用 .build() 之前使用 .durable(false)。不再提供无参数的 durable() 方法。

版本 2.2 引入了 fluent API,用于添加“已知”的 exchange 和 queue 参数...

@Bean
public Queue allArgs1() {
    return QueueBuilder.nonDurable("all.args.1")
            .ttl(1000)
            .expires(200_000)
            .maxLength(42)
            .maxLengthBytes(10_000)
            .overflow(Overflow.rejectPublish)
            .deadLetterExchange("dlx")
            .deadLetterRoutingKey("dlrk")
            .maxPriority(4)
            .lazy()
            .leaderLocator(LeaderLocator.minLeaders)
            .singleActiveConsumer()
            .build();
}

@Bean
public DirectExchange ex() {
    return ExchangeBuilder.directExchange("ex.with.alternate")
            .durable(true)
            .alternate("alternate")
            .build();
}

声明 Exchanges, Queues 和 Bindings 的集合

您可以将 Declarable 对象(QueueExchangeBinding)的集合包装在 Declarables 对象中。RabbitAdmin 检测 application context 中的此类 bean(以及离散的 Declarable bean),并在建立连接(初始连接和连接失败后)时将包含的对象声明到 broker。以下示例展示了如何进行配置:

@Configuration
public static class Config {

    @Bean
    public CachingConnectionFactory cf() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    public RabbitAdmin admin(ConnectionFactory cf) {
        return new RabbitAdmin(cf);
    }

    @Bean
    public DirectExchange e1() {
        return new DirectExchange("e1", false, true);
    }

    @Bean
    public Queue q1() {
        return new Queue("q1", false, false, true);
    }

    @Bean
    public Binding b1() {
        return BindingBuilder.bind(q1()).to(e1()).with("k1");
    }

    @Bean
    public Declarables es() {
        return new Declarables(
                new DirectExchange("e2", false, true),
                new DirectExchange("e3", false, true));
    }

    @Bean
    public Declarables qs() {
        return new Declarables(
                new Queue("q2", false, false, true),
                new Queue("q3", false, false, true));
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public Declarables prototypes() {
        return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
    }

    @Bean
    public Declarables bs() {
        return new Declarables(
                new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
                new Binding("q3", DestinationType.QUEUE, "e3", "k3", null));
    }

    @Bean
    public Declarables ds() {
        return new Declarables(
                new DirectExchange("e4", false, true),
                new Queue("q4", false, false, true),
                new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
    }

}
在版本 2.1 之前,您可以通过定义 Collection<Declarable> 类型的 bean 来声明多个 Declarable 实例。这在某些情况下可能导致不良副作用,因为 admin 必须迭代所有 Collection<?> bean。

版本 2.2 向 Declarables 添加了 getDeclarablesByType 方法;例如,在声明监听器容器 bean 时,这可以作为一种便利。

public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
        Declarables mixedDeclarables, MessageListener listener) {

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueues(mixedDeclarables.getDeclarablesByType(Queue.class).toArray(new Queue[0]));
    container.setMessageListener(listener);
    return container;
}

条件声明

默认情况下,application context 中的所有 RabbitAdmin 实例(假设它们具有 auto-startup="true")都会声明所有队列、交换机和绑定。

从版本 2.1.9 开始,RabbitAdmin 有一个新属性 explicitDeclarationsOnly(默认值为 false);当将其设置为 true 时,admin 将仅声明明确配置由该 admin 声明的 bean。

从 1.2 版本开始,您可以有条件地声明这些元素。当应用程序连接到多个 broker 并需要指定特定元素应在哪些 broker 上声明时,这特别有用。

表示这些元素的类实现了 Declarable 接口,该接口有两个方法:shouldDeclare()getDeclaringAdmins()RabbitAdmin 使用这些方法来确定特定实例是否应实际处理其 Connection 上的声明。

属性在命名空间中作为属性提供,示例如下:

<rabbit:admin id="admin1" connection-factory="CF1" />

<rabbit:admin id="admin2" connection-factory="CF2" />

<rabbit:admin id="admin3" connection-factory="CF3" explicit-declarations-only="true" />

<rabbit:queue id="declaredByAdmin1AndAdmin2Implicitly" />

<rabbit:queue id="declaredByAdmin1AndAdmin2" declared-by="admin1, admin2" />

<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />

<rabbit:queue id="notDeclaredByAllExceptAdmin3" auto-declare="false" />

<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
    <rabbit:bindings>
        <rabbit:binding key="foo" queue="bar"/>
    </rabbit:bindings>
</rabbit:direct-exchange>
默认情况下,auto-declare 属性为 true,如果 declared-by 未提供(或为空),则所有 RabbitAdmin 实例都会声明对象(只要 admin 的 auto-startup 属性为 true(默认值),并且 admin 的 explicit-declarations-only 属性为 false)。

同样,您可以使用基于 Java 的 @Configuration 来实现相同的效果。在以下示例中,组件由 admin1 声明,但不由 admin2 声明:

@Bean
public RabbitAdmin admin1() {
    return new RabbitAdmin(cf1());
}

@Bean
public RabbitAdmin admin2() {
    return new RabbitAdmin(cf2());
}

@Bean
public Queue queue() {
    Queue queue = new Queue("foo");
    queue.setAdminsThatShouldDeclare(admin1());
    return queue;
}

@Bean
public Exchange exchange() {
    DirectExchange exchange = new DirectExchange("bar");
    exchange.setAdminsThatShouldDeclare(admin1());
    return exchange;
}

@Bean
public Binding binding() {
    Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
    binding.setAdminsThatShouldDeclare(admin1());
    return binding;
}

关于 idname 属性的注意事项

<rabbit:queue/><rabbit:exchange/> 元素上的 name 属性反映了实体在 broker 中的名称。对于队列,如果 name 被省略,则创建一个匿名队列(请参阅 AnonymousQueue)。

在版本 2.0 之前,name 也被注册为 bean 名称别名(类似于 <bean/> 元素上的 name)。

这导致了两个问题:

  • 它阻止了声明具有相同名称的队列和交换机。

  • 如果别名包含 SpEL 表达式(#{…​}),则无法解析。

从版本 2.0 开始,如果您声明这些元素之一时同时带有 idname 属性,则 name 不再声明为 bean 名称别名。如果您希望声明具有相同 name 的队列和交换机,则必须提供 id

如果元素仅具有 name 属性,则没有变化。bean 仍然可以按 name 引用——例如,在绑定声明中。但是,如果 name 包含 SpEL,您仍然无法引用它——您必须提供一个 id 用于引用。

AnonymousQueue

通常,当您需要一个唯一命名、独占、自动删除的队列时,我们建议您使用 AnonymousQueue 而不是 broker 定义的队列名称(将 "" 用作 Queue 名称会导致 broker 生成队列名称)。

这是因为:

  1. 队列实际上是在与 broker 建立连接时声明的。这发生在 bean 创建和装配在一起之后很长时间。使用队列的 bean 需要知道其名称。事实上,当应用程序启动时,broker 可能甚至没有运行。

  2. 如果由于某种原因与 broker 的连接丢失,admin 会以相同的名称重新声明 AnonymousQueue。如果使用 broker 声明的队列,则队列名称会更改。

您可以控制 AnonymousQueue 实例使用的队列名称的格式。

默认情况下,队列名称以 spring.gen- 为前缀,后跟 UUID 的 base64 表示——例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g

您可以在构造函数参数中提供 AnonymousQueue.NamingStrategy 实现。以下示例展示了如何进行配置:

@Bean
public Queue anon1() {
    return new AnonymousQueue();
}

@Bean
public Queue anon2() {
    return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("something-"));
}

@Bean
public Queue anon3() {
    return new AnonymousQueue(AnonymousQueue.UUIDNamingStrategy.DEFAULT);
}

第一个 bean 生成以 spring.gen- 为前缀,后跟 UUID 的 base64 表示的队列名称——例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g。第二个 bean 生成以 something- 为前缀,后跟 UUID 的 base64 表示的队列名称。第三个 bean 仅使用 UUID(无 base64 转换)生成名称——例如:f20c818a-006b-4416-bf91-643590fedb0e

base64 编码使用 RFC 4648 中的“URL 和文件名安全字母表”。末尾的填充字符(=)被移除。

您可以提供自己的命名策略,从而可以在队列名称中包含其他信息(例如应用程序名称或客户端主机)。

使用 XML 配置时,可以指定命名策略。<rabbit:queue> 元素上存在 naming-strategy 属性,用于实现 AnonymousQueue.NamingStrategy 的 bean 引用。以下示例展示了如何以各种方式指定命名策略:

<rabbit:queue id="uuidAnon" />

<rabbit:queue id="springAnon" naming-strategy="uuidNamer" />

<rabbit:queue id="customAnon" naming-strategy="customNamer" />

<bean id="uuidNamer" class="org.springframework.amqp.core.AnonymousQueue.UUIDNamingStrategy" />

<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
    <constructor-arg value="custom.gen-" />
</bean>

第一个示例创建诸如 spring.gen-MRBv9sqISkuCiPfOYfpo4g 的名称。第二个示例创建使用 UUID 的 String 表示的名称。第三个示例创建诸如 custom.gen-MRBv9sqISkuCiPfOYfpo4g 的名称。

您还可以提供自己的命名策略 bean。

从版本 2.1 开始,匿名队列默认声明时将参数 Queue.X_QUEUE_LEADER_LOCATOR 设置为 client-local。这确保队列声明在应用程序连接到的节点上。在构建实例后,可以通过调用 queue.setLeaderLocator(null) 恢复到先前的行为。

恢复自动删除声明

通常,RabbitAdmin 仅恢复在 application context 中声明为 bean 的队列/交换机/绑定;如果任何此类声明是自动删除的,则在连接丢失时会被 broker 移除。重新建立连接后,admin 将重新声明这些实体。通常,通过调用 admin.declareQueue(…​)admin.declareExchange(…​)admin.declareBinding(…​) 创建的实体不会被恢复。

从版本 2.4 开始,admin 有一个新属性 redeclareManualDeclarations;当设置为 true 时,admin 将在恢复 application context 中的 bean 的同时恢复这些实体。

如果调用了 deleteQueue(…​)deleteExchange(…​)removeBinding(…​),则不会执行单个声明的恢复。删除队列和交换机时,关联的绑定会从可恢复实体中移除。

最后,调用 resetAllManualDeclarations() 将阻止恢复任何先前声明的实体。