配置 Broker
AMQP 规范描述了如何在 broker 上配置队列、交换机和绑定。这些操作(可从 0.8 规范及更高版本移植)存在于 org.springframework.amqp.core
包中的 AmqpAdmin
接口中。该类的 RabbitMQ 实现是位于 org.springframework.amqp.rabbit.core
包中的 RabbitAdmin
。
AmqpAdmin
接口基于使用 Spring AMQP 领域抽象,如下所示:
public interface AmqpAdmin {
// Exchange Operations
void declareExchange(Exchange exchange);
void deleteExchange(String exchangeName);
// Queue Operations
Queue declareQueue();
String declareQueue(Queue queue);
void deleteQueue(String queueName);
void deleteQueue(String queueName, boolean unused, boolean empty);
void purgeQueue(String queueName, boolean noWait);
// Binding Operations
void declareBinding(Binding binding);
void removeBinding(Binding binding);
Properties getQueueProperties(String queueName);
}
另请参阅 Scoped Operations。
getQueueProperties()
方法返回有关队列的一些有限信息(消息计数和消费者计数)。返回属性的键在 RabbitAdmin
中作为常量提供(QUEUE_NAME
、QUEUE_MESSAGE_COUNT
和 QUEUE_CONSUMER_COUNT
)。RabbitMQ REST API 在 QueueInfo
对象中提供了更多信息。
无参数的 declareQueue()
方法在 broker 上定义一个自动生成名称的队列。此自动生成队列的附加属性是 exclusive=true
、autoDelete=true
和 durable=false
。
declareQueue(Queue queue)
方法接受一个 Queue
对象并返回已声明队列的名称。如果提供的 Queue
的 name
属性为空 String
,则 broker 会使用生成的名称声明队列。该名称将返回给调用者。该名称也会添加到 Queue
的 actualName
属性中。只有直接调用 RabbitAdmin
才能以编程方式使用此功能。当在 application context 中以声明方式定义队列时,使用 admin 进行自动声明时,可以将 name 属性设置为 ""
(空字符串)。然后 broker 会创建名称。从版本 2.1 开始,监听器容器可以使用此类型的队列。有关更多信息,请参阅 Containers and Broker-Named queues。
这与 AnonymousQueue
不同,AnonymousQueue
中框架生成一个唯一的(UUID
)名称,并将 durable
设置为 false
,exclusive
、autoDelete
设置为 true
。带有空(或缺失)name
属性的 <rabbit:queue/>
总是创建一个 AnonymousQueue
。
请参阅 AnonymousQueue
,了解为何 AnonymousQueue
优先于 broker 生成的队列名称,以及如何控制名称的格式。从版本 2.1 开始,匿名队列默认声明时将参数 Queue.X_QUEUE_LEADER_LOCATOR
设置为 client-local
。这确保队列声明在应用程序连接到的节点上。声明式队列必须具有固定名称,因为它们可能在上下文中的其他地方被引用,例如以下示例中所示的监听器:
<rabbit:listener-container>
<rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>
此接口的 RabbitMQ 实现是 RabbitAdmin
,当使用 Spring XML 配置时,示例如下:
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>
当 CachingConnectionFactory
的缓存模式是 CHANNEL
(默认)时,RabbitAdmin
实现会自动延迟声明在同一个 ApplicationContext
中声明的队列、交换机和绑定。一旦打开与 broker 的连接,这些组件就会被声明。有一些命名空间特性使得这非常方便——例如,在 Stocks 示例应用程序中,我们有以下内容:
<rabbit:queue id="tradeQueue"/>
<rabbit:queue id="marketDataQueue"/>
<fanout-exchange name="broadcast.responses"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="tradeQueue"/>
</bindings>
</fanout-exchange>
<topic-exchange name="app.stock.marketdata"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/>
</bindings>
</topic-exchange>
在前面的示例中,我们使用匿名队列(实际上,在内部,只是由框架而非 broker 生成名称的队列),并按 ID 引用它们。我们还可以声明具有显式名称的队列,这些名称也用作它们在上下文中的 bean 定义的标识符。以下示例配置了一个具有显式名称的队列:
<rabbit:queue name="stocks.trade.queue"/>
您可以同时提供 id 和 name 属性。这使您可以通过独立于队列名称的 ID 来引用队列(例如,在绑定中)。它还允许使用标准的 Spring 功能(例如,用于队列名称的属性占位符和 SpEL 表达式)。当您使用 name 作为 bean 标识符时,这些功能不可用。 |
队列可以使用附加参数进行配置——例如 x-message-ttl
。当您使用命名空间支持时,它们以参数名/参数值对的 Map
形式提供,这通过 <rabbit:queue-arguments>
元素定义。以下示例展示了如何进行配置:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="myDLX"/>
<entry key="x-dead-letter-routing-key" value="dlqRK"/>
</rabbit:queue-arguments>
</rabbit:queue>
默认情况下,参数被假定为字符串类型。对于其他类型的参数,您必须提供类型。以下示例展示了如何指定类型:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments value-type="java.lang.Long">
<entry key="x-message-ttl" value="100"/>
</rabbit:queue-arguments>
</rabbit:queue>
提供混合类型参数时,必须为每个 entry 元素提供类型。以下示例展示了如何进行配置:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<value type="java.lang.Long">100</value>
</entry>
<entry key="x-dead-letter-exchange" value="myDLX"/>
<entry key="x-dead-letter-routing-key" value="dlqRK"/>
</rabbit:queue-arguments>
</rabbit:queue>
使用 Spring Framework 3.2 及更高版本,可以更简洁地声明,如下所示:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
当您使用 Java 配置时,Queue.X_QUEUE_LEADER_LOCATOR
参数通过 Queue
类上的 setLeaderLocator()
方法作为一流属性得到支持。从版本 2.1 开始,匿名队列默认声明时将此属性设置为 client-local
。这确保队列声明在应用程序连接到的节点上。
RabbitMQ broker 不允许声明具有不匹配参数的队列。例如,如果一个 queue 已经存在且没有 time to live 参数,而您尝试使用(例如)key="x-message-ttl" value="100" 来声明它,将会抛出异常。 |
默认情况下,当发生任何异常时,RabbitAdmin
会立即停止处理所有声明。这可能导致下游问题,例如监听器容器因另一个队列(在出错的队列之后定义)未声明而初始化失败。
可以通过在 RabbitAdmin
实例上将 ignore-declaration-exceptions
属性设置为 true
来修改此行为。此选项指示 RabbitAdmin
记录异常并继续声明其他元素。使用 Java 配置 RabbitAdmin
时,此属性称为 ignoreDeclarationExceptions
。这是一个全局设置,适用于所有元素。队列、交换机和绑定具有仅适用于这些元素的类似属性。
在版本 1.6 之前,此属性仅在通道上发生 IOException
时才生效,例如当前属性与期望属性不匹配时。现在,此属性对任何异常都生效,包括 TimeoutException
及其他异常。
此外,任何声明异常都会导致发布 DeclarationExceptionEvent
,它是一个 ApplicationEvent
,可由上下文中的任何 ApplicationListener
消费。该事件包含对 admin、正在声明的元素和 Throwable
的引用。
Header 交换机
从版本 1.3 开始,您可以配置 HeadersExchange
以匹配多个 header。您还可以指定是否必须匹配任意或所有 header。以下示例展示了如何进行配置:
<rabbit:headers-exchange name="headers-test">
<rabbit:bindings>
<rabbit:binding queue="bucket">
<rabbit:binding-arguments>
<entry key="foo" value="bar"/>
<entry key="baz" value="qux"/>
<entry key="x-match" value="all"/>
</rabbit:binding-arguments>
</rabbit:binding>
</rabbit:bindings>
</rabbit:headers-exchange>
从版本 1.6 开始,您可以配置带有 internal
标志(默认为 false
)的 Exchanges
,并且此类 Exchange
通过 RabbitAdmin
(如果存在于 application context 中)在 Broker 上正确配置。如果交换机的 internal
标志为 true
,RabbitMQ 不允许客户端使用该交换机。这对于死信交换机或交换机到交换机的绑定非常有用,在这种情况下,您不希望发布者直接使用该交换机。
要了解如何使用 Java 配置 AMQP 基础设施,请查看 Stock 示例应用程序,其中有一个 @Configuration
类 AbstractStockRabbitConfiguration
,它又包含 RabbitClientConfiguration
和 RabbitServerConfiguration
子类。以下列表显示了 AbstractStockRabbitConfiguration
的代码:
@Configuration
public abstract class AbstractStockAppRabbitConfiguration {
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jsonMessageConverter());
configureRabbitTemplate(template);
return template;
}
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public TopicExchange marketDataExchange() {
return new TopicExchange("app.stock.marketdata");
}
// additional code omitted for brevity
}
在 Stock 应用程序中,服务器使用以下 @Configuration
类进行配置:
@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration {
@Bean
public Queue stockRequestQueue() {
return new Queue("app.stock.request");
}
}
这是 @Configuration
类整个继承链的末端。最终结果是 TopicExchange
和 Queue
在应用程序启动时被声明到 broker。server 配置中没有将 TopicExchange
绑定到队列,因为这在 client 应用程序中完成。然而,stock request queue 会自动绑定到 AMQP 默认交换机。此行为由规范定义。
client @Configuration
类更有趣一些。其声明如下:
@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
/**
* Binds to the market data exchange.
* Interested in any stock quotes
* that match its routing key.
*/
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
// additional code omitted for brevity
}
client 通过 AmqpAdmin
上的 declareQueue()
方法声明另一个队列。它将该队列绑定到 market data exchange,并使用在 properties 文件中外部化的 routing pattern。
队列和交换机的 Builder API
版本 1.6 引入了一种方便的 fluent API,用于在使用 Java 配置时配置 Queue
和 Exchange
对象。以下示例展示了如何使用它:
@Bean
public Queue queue() {
return QueueBuilder.nonDurable("foo")
.autoDelete()
.exclusive()
.withArgument("foo", "bar")
.build();
}
@Bean
public Exchange exchange() {
return ExchangeBuilder.directExchange("foo")
.autoDelete()
.internal()
.withArgument("foo", "bar")
.build();
}
有关更多信息,请参阅 org.springframework.amqp.core.QueueBuilder
和 org.springframework.amqp.core.ExchangeBuilder
的 Javadoc。
从版本 2.0 开始,ExchangeBuilder
现在默认创建 durable exchanges,这与各个 AbstractExchange
类上的简单构造函数保持一致。要使用 builder 创建 non-durable exchange,请在调用 .build()
之前使用 .durable(false)
。不再提供无参数的 durable()
方法。
版本 2.2 引入了 fluent API,用于添加“已知”的 exchange 和 queue 参数...
@Bean
public Queue allArgs1() {
return QueueBuilder.nonDurable("all.args.1")
.ttl(1000)
.expires(200_000)
.maxLength(42)
.maxLengthBytes(10_000)
.overflow(Overflow.rejectPublish)
.deadLetterExchange("dlx")
.deadLetterRoutingKey("dlrk")
.maxPriority(4)
.lazy()
.leaderLocator(LeaderLocator.minLeaders)
.singleActiveConsumer()
.build();
}
@Bean
public DirectExchange ex() {
return ExchangeBuilder.directExchange("ex.with.alternate")
.durable(true)
.alternate("alternate")
.build();
}
声明 Exchanges, Queues 和 Bindings 的集合
您可以将 Declarable
对象(Queue
、Exchange
和 Binding
)的集合包装在 Declarables
对象中。RabbitAdmin
检测 application context 中的此类 bean(以及离散的 Declarable
bean),并在建立连接(初始连接和连接失败后)时将包含的对象声明到 broker。以下示例展示了如何进行配置:
@Configuration
public static class Config {
@Bean
public CachingConnectionFactory cf() {
return new CachingConnectionFactory("localhost");
}
@Bean
public RabbitAdmin admin(ConnectionFactory cf) {
return new RabbitAdmin(cf);
}
@Bean
public DirectExchange e1() {
return new DirectExchange("e1", false, true);
}
@Bean
public Queue q1() {
return new Queue("q1", false, false, true);
}
@Bean
public Binding b1() {
return BindingBuilder.bind(q1()).to(e1()).with("k1");
}
@Bean
public Declarables es() {
return new Declarables(
new DirectExchange("e2", false, true),
new DirectExchange("e3", false, true));
}
@Bean
public Declarables qs() {
return new Declarables(
new Queue("q2", false, false, true),
new Queue("q3", false, false, true));
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Declarables prototypes() {
return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
}
@Bean
public Declarables bs() {
return new Declarables(
new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
new Binding("q3", DestinationType.QUEUE, "e3", "k3", null));
}
@Bean
public Declarables ds() {
return new Declarables(
new DirectExchange("e4", false, true),
new Queue("q4", false, false, true),
new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
}
}
在版本 2.1 之前,您可以通过定义 Collection<Declarable> 类型的 bean 来声明多个 Declarable 实例。这在某些情况下可能导致不良副作用,因为 admin 必须迭代所有 Collection<?> bean。 |
版本 2.2 向 Declarables
添加了 getDeclarablesByType
方法;例如,在声明监听器容器 bean 时,这可以作为一种便利。
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
Declarables mixedDeclarables, MessageListener listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(mixedDeclarables.getDeclarablesByType(Queue.class).toArray(new Queue[0]));
container.setMessageListener(listener);
return container;
}
条件声明
默认情况下,application context 中的所有 RabbitAdmin
实例(假设它们具有 auto-startup="true"
)都会声明所有队列、交换机和绑定。
从版本 2.1.9 开始,RabbitAdmin
有一个新属性 explicitDeclarationsOnly
(默认值为 false
);当将其设置为 true
时,admin 将仅声明明确配置由该 admin 声明的 bean。
从 1.2 版本开始,您可以有条件地声明这些元素。当应用程序连接到多个 broker 并需要指定特定元素应在哪些 broker 上声明时,这特别有用。 |
表示这些元素的类实现了 Declarable
接口,该接口有两个方法:shouldDeclare()
和 getDeclaringAdmins()
。RabbitAdmin
使用这些方法来确定特定实例是否应实际处理其 Connection
上的声明。
属性在命名空间中作为属性提供,示例如下:
<rabbit:admin id="admin1" connection-factory="CF1" />
<rabbit:admin id="admin2" connection-factory="CF2" />
<rabbit:admin id="admin3" connection-factory="CF3" explicit-declarations-only="true" />
<rabbit:queue id="declaredByAdmin1AndAdmin2Implicitly" />
<rabbit:queue id="declaredByAdmin1AndAdmin2" declared-by="admin1, admin2" />
<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />
<rabbit:queue id="notDeclaredByAllExceptAdmin3" auto-declare="false" />
<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
<rabbit:bindings>
<rabbit:binding key="foo" queue="bar"/>
</rabbit:bindings>
</rabbit:direct-exchange>
默认情况下,auto-declare 属性为 true ,如果 declared-by 未提供(或为空),则所有 RabbitAdmin 实例都会声明对象(只要 admin 的 auto-startup 属性为 true (默认值),并且 admin 的 explicit-declarations-only 属性为 false)。 |
同样,您可以使用基于 Java 的 @Configuration
来实现相同的效果。在以下示例中,组件由 admin1
声明,但不由 admin2
声明:
@Bean
public RabbitAdmin admin1() {
return new RabbitAdmin(cf1());
}
@Bean
public RabbitAdmin admin2() {
return new RabbitAdmin(cf2());
}
@Bean
public Queue queue() {
Queue queue = new Queue("foo");
queue.setAdminsThatShouldDeclare(admin1());
return queue;
}
@Bean
public Exchange exchange() {
DirectExchange exchange = new DirectExchange("bar");
exchange.setAdminsThatShouldDeclare(admin1());
return exchange;
}
@Bean
public Binding binding() {
Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
binding.setAdminsThatShouldDeclare(admin1());
return binding;
}
关于 id
和 name
属性的注意事项
<rabbit:queue/>
和 <rabbit:exchange/>
元素上的 name
属性反映了实体在 broker 中的名称。对于队列,如果 name 被省略,则创建一个匿名队列(请参阅 AnonymousQueue
)。
在版本 2.0 之前,name
也被注册为 bean 名称别名(类似于 <bean/>
元素上的 name
)。
这导致了两个问题:
-
它阻止了声明具有相同名称的队列和交换机。
-
如果别名包含 SpEL 表达式(
#{…}
),则无法解析。
从版本 2.0 开始,如果您声明这些元素之一时同时带有 id
和 name
属性,则 name 不再声明为 bean 名称别名。如果您希望声明具有相同 name 的队列和交换机,则必须提供 id
。
如果元素仅具有 name
属性,则没有变化。bean 仍然可以按 name 引用——例如,在绑定声明中。但是,如果 name 包含 SpEL,您仍然无法引用它——您必须提供一个 id
用于引用。
AnonymousQueue
通常,当您需要一个唯一命名、独占、自动删除的队列时,我们建议您使用 AnonymousQueue
而不是 broker 定义的队列名称(将 ""
用作 Queue
名称会导致 broker 生成队列名称)。
这是因为:
-
队列实际上是在与 broker 建立连接时声明的。这发生在 bean 创建和装配在一起之后很长时间。使用队列的 bean 需要知道其名称。事实上,当应用程序启动时,broker 可能甚至没有运行。
-
如果由于某种原因与 broker 的连接丢失,admin 会以相同的名称重新声明
AnonymousQueue
。如果使用 broker 声明的队列,则队列名称会更改。
您可以控制 AnonymousQueue
实例使用的队列名称的格式。
默认情况下,队列名称以 spring.gen-
为前缀,后跟 UUID
的 base64 表示——例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g
。
您可以在构造函数参数中提供 AnonymousQueue.NamingStrategy
实现。以下示例展示了如何进行配置:
@Bean
public Queue anon1() {
return new AnonymousQueue();
}
@Bean
public Queue anon2() {
return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("something-"));
}
@Bean
public Queue anon3() {
return new AnonymousQueue(AnonymousQueue.UUIDNamingStrategy.DEFAULT);
}
第一个 bean 生成以 spring.gen-
为前缀,后跟 UUID
的 base64 表示的队列名称——例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g
。第二个 bean 生成以 something-
为前缀,后跟 UUID
的 base64 表示的队列名称。第三个 bean 仅使用 UUID
(无 base64 转换)生成名称——例如:f20c818a-006b-4416-bf91-643590fedb0e
。
base64 编码使用 RFC 4648 中的“URL 和文件名安全字母表”。末尾的填充字符(=
)被移除。
您可以提供自己的命名策略,从而可以在队列名称中包含其他信息(例如应用程序名称或客户端主机)。
使用 XML 配置时,可以指定命名策略。<rabbit:queue>
元素上存在 naming-strategy
属性,用于实现 AnonymousQueue.NamingStrategy
的 bean 引用。以下示例展示了如何以各种方式指定命名策略:
<rabbit:queue id="uuidAnon" />
<rabbit:queue id="springAnon" naming-strategy="uuidNamer" />
<rabbit:queue id="customAnon" naming-strategy="customNamer" />
<bean id="uuidNamer" class="org.springframework.amqp.core.AnonymousQueue.UUIDNamingStrategy" />
<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
<constructor-arg value="custom.gen-" />
</bean>
第一个示例创建诸如 spring.gen-MRBv9sqISkuCiPfOYfpo4g
的名称。第二个示例创建使用 UUID
的 String 表示的名称。第三个示例创建诸如 custom.gen-MRBv9sqISkuCiPfOYfpo4g
的名称。
您还可以提供自己的命名策略 bean。
从版本 2.1 开始,匿名队列默认声明时将参数 Queue.X_QUEUE_LEADER_LOCATOR
设置为 client-local
。这确保队列声明在应用程序连接到的节点上。在构建实例后,可以通过调用 queue.setLeaderLocator(null)
恢复到先前的行为。
恢复自动删除声明
通常,RabbitAdmin
仅恢复在 application context 中声明为 bean 的队列/交换机/绑定;如果任何此类声明是自动删除的,则在连接丢失时会被 broker 移除。重新建立连接后,admin 将重新声明这些实体。通常,通过调用 admin.declareQueue(…)
、admin.declareExchange(…)
和 admin.declareBinding(…)
创建的实体不会被恢复。
从版本 2.4 开始,admin 有一个新属性 redeclareManualDeclarations
;当设置为 true
时,admin 将在恢复 application context 中的 bean 的同时恢复这些实体。
如果调用了 deleteQueue(…)
、deleteExchange(…)
或 removeBinding(…)
,则不会执行单个声明的恢复。删除队列和交换机时,关联的绑定会从可恢复实体中移除。
最后,调用 resetAllManualDeclarations()
将阻止恢复任何先前声明的实体。