连接和资源管理
尽管我们在上一节中描述的 AMQP 模型是通用的,适用于所有实现,但当涉及到资源管理时,具体细节取决于 Broker 的实现。因此,在本节中,我们专注于“spring-rabbit”模块中存在的代码,因为目前 RabbitMQ 是唯一受支持的实现。
用于管理与 RabbitMQ Broker 连接的核心组件是 ConnectionFactory 接口。ConnectionFactory 实现的职责是提供 org.springframework.amqp.rabbit.connection.Connection 的实例,它是 com.rabbitmq.client.Connection 的包装器。
选择连接工厂
有三种连接工厂可供选择
-
PooledChannelConnectionFactory -
ThreadChannelConnectionFactory -
CachingConnectionFactory
前两个是在版本 2.3 中添加的。
对于大多数用例,应使用 CachingConnectionFactory。如果您想在不需要使用 作用域操作 的情况下确保严格的消息排序,则可以使用 ThreadChannelConnectionFactory。PooledChannelConnectionFactory 与 CachingConnectionFactory 类似,因为它使用单个连接和通道池。它的实现更简单,但不支持关联的发布者确认。
所有三个工厂都支持简单的发布者确认。
从版本 2.3.2 开始,当配置 RabbitTemplate 以使用 单独连接 时,您现在可以将发布连接工厂配置为不同的类型。默认情况下,发布工厂的类型与主工厂相同,并且在主工厂上设置的任何属性也会传播到发布工厂。
从版本 3.1 开始,AbstractConnectionFactory 包含 connectionCreatingBackOff 属性,该属性支持连接模块中的退避策略。目前,createChannel() 的行为中支持处理达到 channelMax 限制时发生的异常,实现基于尝试次数和间隔的退避策略。
PooledChannelConnectionFactory
该工厂管理一个连接和两个基于 Apache Pool2 的通道池。一个池用于事务性通道,另一个用于非事务性通道。这些池是具有默认配置的 GenericObjectPool;提供了一个回调来配置池;有关更多信息,请参阅 Apache 文档。
要使用此工厂,classpath 中必须包含 Apache commons-pool2 jar。
@Bean
PooledChannelConnectionFactory pcf() throws Exception {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
// configure the transactional pool
}
else {
// configure the non-transactional pool
}
});
return pcf;
}
ThreadChannelConnectionFactory
该工厂管理一个连接和两个 ThreadLocal,一个用于事务性通道,另一个用于非事务性通道。该工厂确保同一线程上的所有操作都使用相同的通道(只要它保持打开状态)。这有助于实现严格的消息排序,而无需 作用域操作。为避免内存泄漏,如果您的应用程序使用许多短生命周期的线程,则必须调用工厂的 closeThreadChannel() 来释放通道资源。从版本 2.3.7 开始,一个线程可以将其通道传输到另一个线程。有关更多信息,请参见 多线程环境中的严格消息排序。
CachingConnectionFactory
提供的第三个实现是 CachingConnectionFactory,它默认建立一个可以由应用程序共享的单个连接代理。连接共享是可能的,因为 AMQP 消息传递的“工作单元”实际上是“通道”(在某些方面,这类似于 JMS 中连接和会话之间的关系)。连接实例提供 createChannel 方法。CachingConnectionFactory 实现支持这些通道的缓存,并且它根据通道是否是事务性来维护单独的通道缓存。创建 CachingConnectionFactory 实例时,可以通过构造函数提供“主机名”。您还应该提供“用户名”和“密码”属性。要配置通道缓存的大小(默认为 25),可以调用 setChannelCacheSize() 方法。
从版本 1.3 开始,您可以配置 CachingConnectionFactory 以缓存连接以及仅缓存通道。在这种情况下,每次调用 createConnection() 都会创建一个新连接(或从缓存中检索一个空闲连接)。关闭连接会将其返回到缓存中(如果未达到缓存大小)。在此类连接上创建的通道也会被缓存。在某些环境中,使用单独的连接可能很有用,例如从 HA 集群消费,结合负载均衡器连接到不同的集群成员等等。要缓存连接,请将 cacheMode 设置为 CacheMode.CONNECTION。
| 这不限制连接的数量。相反,它指定允许有多少空闲的开放连接。 |
从版本 1.5.5 开始,提供了一个名为 connectionLimit 的新属性。设置此属性后,它会限制允许的总连接数。设置后,如果达到限制,则使用 channelCheckoutTimeLimit 等待连接变为空闲。如果超过时间,则抛出 AmqpTimeoutException。
|
当缓存模式为 此外,在撰写本文时, |
重要的是要理解缓存大小(默认情况下)不是限制,而仅仅是可以缓存的通道数量。例如,如果缓存大小为 10,则实际上可以使用任意数量的通道。如果使用了超过 10 个通道,并且它们都返回到缓存中,则有 10 个通道进入缓存。其余的通道会被物理关闭。
从版本 1.6 开始,默认通道缓存大小已从 1 增加到 25。在高并发、多线程环境中,小缓存意味着通道以高速率创建和关闭。增加默认缓存大小可以避免这种开销。您应该通过 RabbitMQ 管理 UI 监控正在使用的通道,如果看到许多通道正在创建和关闭,请考虑进一步增加缓存大小。缓存仅在按需时增长(以适应应用程序的并发要求),因此此更改不会影响现有的低吞吐量应用程序。
从版本 1.4.2 开始,CachingConnectionFactory 有一个名为 channelCheckoutTimeout 的属性。当此属性大于零时,channelCacheSize 成为可以在连接上创建的通道数量的限制。如果达到限制,调用线程会阻塞,直到通道可用或达到此超时,在这种情况下会抛出 AmqpTimeoutException。
框架内使用的通道(例如 RabbitTemplate)会可靠地返回到缓存中。如果您在框架外部创建通道(例如,通过直接访问连接并调用 createChannel()),则必须可靠地将其返回(通过关闭),可能在 finally 块中,以避免通道耗尽。 |
以下示例显示如何创建新 connection
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();
使用 XML 时,配置可能如下例所示
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>
还有一个 SingleConnectionFactory 实现,仅在框架的单元测试代码中可用。它比 CachingConnectionFactory 更简单,因为它不缓存通道,但由于其性能和弹性不足,不适合在简单测试之外的实际使用。如果出于某种原因需要实现自己的 ConnectionFactory,AbstractConnectionFactory 基类可以提供一个很好的起点。 |
可以使用 Rabbit 命名空间快速方便地创建 ConnectionFactory,如下所示
<rabbit:connection-factory id="connectionFactory"/>
在大多数情况下,此方法更可取,因为框架可以为您选择最佳默认值。创建的实例是 CachingConnectionFactory。请记住,通道的默认缓存大小为 25。如果您希望缓存更多通道,请通过设置“channelCacheSize”属性来设置更大的值。在 XML 中,它看起来像这样
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>
此外,使用命名空间,您可以添加“channel-cache-size”属性,如下所示
<rabbit:connection-factory
id="connectionFactory" channel-cache-size="50"/>
默认缓存模式是 CHANNEL,但您可以将其配置为缓存连接。在以下示例中,我们使用 connection-cache-size
<rabbit:connection-factory
id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>
您可以使用命名空间提供主机和端口属性,如下所示
<rabbit:connection-factory
id="connectionFactory" host="somehost" port="5672"/>
或者,如果在集群环境中运行,您可以使用地址属性,如下所示
<rabbit:connection-factory
id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>
有关 address-shuffle-mode 的信息,请参见 连接到集群。
以下示例使用自定义线程工厂,该工厂将线程名称前缀为 rabbitmq-
<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
thread-factory="tf"
channel-cache-size="10" username="user" password="password" />
<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-" />
</bean>
命名连接
从版本 1.7 开始,提供了 ConnectionNameStrategy 用于注入 AbstractionConnectionFactory。生成的名称用于目标 RabbitMQ 连接的应用程序特定标识。如果 RabbitMQ 服务器支持,连接名称会显示在管理 UI 中。此值不必是唯一的,不能用作连接标识符——例如,在 HTTP API 请求中。此值应该是人类可读的,并且是 ClientProperties 中 connection_name 键的一部分。您可以使用简单的 Lambda,如下所示
connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");
ConnectionFactory 参数可用于通过某些逻辑区分目标连接名称。默认情况下,AbstractConnectionFactory 的 beanName、表示对象的十六进制字符串和内部计数器用于生成 connection_name。<rabbit:connection-factory> 命名空间组件也提供了 connection-name-strategy 属性。
SimplePropertyValueConnectionNameStrategy 的实现将连接名称设置为应用程序属性。您可以将其声明为 @Bean 并将其注入连接工厂,如下例所示
@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}
@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
...
connectionFactory.setConnectionNameStrategy(cns);
return connectionFactory;
}
该属性必须存在于应用程序上下文的 Environment 中。
使用 Spring Boot 及其自动配置的连接工厂时,您只需声明 ConnectionNameStrategy @Bean。Spring Boot 会自动检测 bean 并将其连接到工厂中。 |
阻塞的连接和资源限制
连接可能会因 Broker 的交互而阻塞,这对应于 内存警报。从版本 2.0 开始,org.springframework.amqp.rabbit.connection.Connection 可以提供 com.rabbitmq.client.BlockedListener 实例,以便在连接阻塞和解除阻塞事件时收到通知。此外,AbstractConnectionFactory 通过其内部 BlockedListener 实现分别发出 ConnectionBlockedEvent 和 ConnectionUnblockedEvent。这些允许您提供应用程序逻辑,以适当响应 Broker 上的问题并(例如)采取一些纠正措施。
当应用程序配置为单个 CachingConnectionFactory 时(Spring Boot 自动配置默认如此),当连接被 Broker 阻塞时,应用程序将停止工作。当它被 Broker 阻塞时,其任何客户端都会停止工作。如果我们在同一个应用程序中有生产者和消费者,当生产者阻塞连接(因为 Broker 上不再有资源)并且消费者无法释放它们(因为连接被阻塞)时,我们可能会陷入死锁。为了缓解这个问题,我们建议再创建一个单独的 CachingConnectionFactory 实例,具有相同的选项——一个用于生产者,一个用于消费者。对于在消费者线程上执行的事务性生产者,单独的 CachingConnectionFactory 是不可能的,因为它们应该重用与消费者事务关联的 Channel。 |
从版本 2.0.2 开始,RabbitTemplate 有一个配置选项,可以在不使用事务的情况下自动使用第二个连接工厂。有关更多信息,请参见 使用单独连接。发布者连接的 ConnectionNameStrategy 与主策略相同,并在调用方法的结果后附加 .publisher。
从版本 1.7.7 开始,提供了一个 AmqpResourceNotAvailableException,当 SimpleConnection.createChannel() 无法创建 Channel 时(例如,因为达到 channelMax 限制并且缓存中没有可用通道),将抛出此异常。您可以在 RetryPolicy 中使用此异常,以在一些退避后恢复操作。
配置底层客户端连接工厂
CachingConnectionFactory 使用 Rabbit 客户端 ConnectionFactory 的实例。当在 CachingConnectionFactory 上设置等效属性时,许多配置属性会传递(例如 host、port、userName、password、requestedHeartBeat 和 connectionTimeout)。要设置其他属性(例如 clientProperties),您可以定义 Rabbit 工厂的实例,并通过使用 CachingConnectionFactory 的相应构造函数提供对其的引用。使用命名空间时(如前所述),您需要提供对 connection-factory 属性中配置的工厂的引用。为方便起见,提供了一个工厂 bean,以帮助在 Spring 应用程序上下文中配置连接工厂,如 下一节 所述。
<rabbit:connection-factory
id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
4.0.x 客户端默认启用自动恢复。虽然与此功能兼容,但 Spring AMQP 有自己的恢复机制,通常不需要客户端恢复功能。我们建议禁用 amqp-client 自动恢复,以避免在 Broker 可用但连接尚未恢复时出现 AutoRecoverConnectionNotCurrentlyOpenException 实例。例如,当 RabbitTemplate 中配置了 RetryTemplate 时,即使在集群中故障转移到另一个 Broker,您也可能会注意到此异常。由于自动恢复连接在计时器上恢复,因此使用 Spring AMQP 的恢复机制可以更快地恢复连接。从版本 1.7.1 开始,Spring AMQP 禁用 amqp-client 自动恢复,除非您明确创建自己的 RabbitMQ 连接工厂并将其提供给 CachingConnectionFactory。RabbitConnectionFactoryBean 创建的 RabbitMQ ConnectionFactory 实例也默认禁用此选项。 |
RabbitConnectionFactoryBean 和配置 SSL
从版本 1.4 开始,提供了一个方便的 RabbitConnectionFactoryBean,可以通过依赖注入方便地配置底层客户端连接工厂上的 SSL 属性。其他设置器委托给底层工厂。以前,您必须以编程方式配置 SSL 选项。以下示例显示如何配置 RabbitConnectionFactoryBean
-
Java
-
XML
@Bean
RabbitConnectionFactoryBean rabbitConnectionFactory() {
RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
factoryBean.setUseSSL(true);
factoryBean.setSslPropertiesLocation(new ClassPathResource("secrets/rabbitSSL.properties"));
return factoryBean;
}
@Bean
CachingConnectionFactory connectionFactory(ConnectionFactory rabbitConnectionFactory) {
CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
ccf.setHost("...");
// ...
return ccf;
}
<bean id="rabbitConnectionFactory"
class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="classpath:secrets/rabbitSSL.properties"/>
</bean>
<rabbit:connection-factory id="connectionFactory"
connection-factory="rabbitConnectionFactory"
host="${host}"
port="${port}"
virtual-host="${vhost}"
username="${username}" password="${password}" />
Spring Boot 应用程序文件(.yaml 或 .properties)
-
属性
-
YAML
spring.rabbitmq.host=...
spring.rabbitmq.ssl.keyStoreType=jks
spring.rabbitmq.ssl.trustStoreType=jks
spring.rabbitmq.ssl.keyStore=...
spring.rabbitmq.ssl.trustStore=...
spring.rabbitmq.ssl.trustStorePassword=...
spring.rabbitmq.ssl.keyStorePassword=...
spring.rabbitmq.ssl.enabled=true
spring:
rabbitmq:
host: ...
ssl:
keyStoreType: jks
trustStoreType: jks
keyStore: ...
trustStore: ...
trustStorePassword: ...
keyStorePassword: ...
enabled: true
有关配置 SSL 的信息,请参见 RabbitMQ 文档。省略 keyStore 和 trustStore 配置可在不验证证书的情况下通过 SSL 连接。下一个示例显示如何提供密钥和信任库配置。
sslPropertiesLocation 属性是一个 Spring Resource,指向一个包含以下键的属性文件
keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret
keyStore 和 truststore 是指向存储的 Spring Resources。通常,此属性文件由操作系统保护,应用程序具有读取访问权限。
从 Spring AMQP 1.5 版开始,您可以直接在工厂 bean 上设置这些属性。如果同时提供了离散属性和 sslPropertiesLocation,则后者中的属性会覆盖离散值。
从版本 2.0 开始,默认情况下会验证服务器证书,因为它更安全。如果出于某种原因要跳过此验证,请将工厂 bean 的 skipServerCertificateValidation 属性设置为 true。从版本 2.1 开始,RabbitConnectionFactoryBean 现在默认调用 enableHostnameVerification()。要恢复到以前的行为,请将 enableHostnameVerification 属性设置为 false。 |
从版本 2.2.5 开始,工厂 bean 默认将始终使用 TLS v1.2;以前,在某些情况下它使用 v1.1,在其他情况下使用 v1.2(取决于其他属性)。如果出于某种原因需要使用 v1.1,请设置 sslAlgorithm 属性:setSslAlgorithm("TLSv1.1")。 |
连接到集群
要连接到集群,请在 CachingConnectionFactory 上配置 addresses 属性
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
return ccf;
}
从版本 3.0 开始,每当建立新连接时,底层连接工厂都会通过选择随机地址来尝试连接到主机。要恢复到以前的从第一个到最后一个尝试连接的行为,请将 addressShuffleMode 属性设置为 AddressShuffleMode.NONE。
从版本 2.3 开始,添加了 INORDER 洗牌模式,这意味着在创建连接后,第一个地址会移到末尾。如果您希望使用 RabbitMQ 分片插件 与 CacheMode.CONNECTION 和适当的并发性从所有节点的所有分片进行消费,则可能希望使用此模式。
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
return ccf;
}
路由连接工厂
从版本 1.3 开始,引入了 AbstractRoutingConnectionFactory。该工厂提供了一种机制,用于配置多个 ConnectionFactories 的映射,并根据运行时的一些 lookupKey 确定目标 ConnectionFactory。通常,实现会检查线程绑定的上下文。为了方便起见,Spring AMQP 提供了 SimpleRoutingConnectionFactory,它从 SimpleResourceHolder 获取当前线程绑定的 lookupKey。以下示例展示了如何在 XML 和 Java 中配置 SimpleRoutingConnectionFactory
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<property name="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>
<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}
}
使用后解除绑定资源很重要。有关更多信息,请参阅 AbstractRoutingConnectionFactory 的 JavaDoc。
从版本 1.4 开始,RabbitTemplate 支持 SpEL sendConnectionFactorySelectorExpression 和 receiveConnectionFactorySelectorExpression 属性,这些属性在每个 AMQP 协议交互操作(send、sendAndReceive、receive 或 receiveAndReply)上进行评估,解析为提供的 AbstractRoutingConnectionFactory 的 lookupKey 值。您可以在表达式中使用 bean 引用,例如 @vHostResolver.getVHost(#root)。对于 send 操作,要发送的消息是根评估对象。对于 receive 操作,queueName 是根评估对象。
路由算法如下:如果选择器表达式为 null 或被评估为 null,或者提供的 ConnectionFactory 不是 AbstractRoutingConnectionFactory 的实例,则一切照旧,依赖于提供的 ConnectionFactory 实现。如果评估结果不为 null,但该 lookupKey 没有目标 ConnectionFactory,并且 AbstractRoutingConnectionFactory 配置为 lenientFallback = true,也会发生同样的情况。在 AbstractRoutingConnectionFactory 的情况下,它会根据 determineCurrentLookupKey() 回退到其 routing 实现。但是,如果 lenientFallback = false,则会抛出 IllegalStateException。
命名空间支持还为 <rabbit:template> 组件提供了 send-connection-factory-selector-expression 和 receive-connection-factory-selector-expression 属性。
此外,从版本 1.4 开始,您可以在监听器容器中配置路由连接工厂。在这种情况下,队列名称列表用作查找键。例如,如果使用 setQueueNames("thing1", "thing2") 配置容器,则查找键为 [thing1,thing]"(请注意键中没有空格)。
从版本 1.6.9 开始,您可以使用监听器容器上的 setLookupKeyQualifier 为查找键添加限定符。这样做可以实现,例如,侦听同名但在不同虚拟主机中的队列(在这种情况下,您将为每个队列拥有一个连接工厂)。
例如,如果查找键限定符为 thing1,并且容器正在侦听队列 thing2,则您可以注册目标连接工厂的查找键可以是 thing1[thing2]。
| 目标(以及提供的默认)连接工厂必须具有相同的发布者确认和返回设置。请参阅 发布者确认和返回。 |
从版本 2.4.4 开始,可以禁用此验证。如果您在某些情况下需要确认和返回的值不相等,可以使用 AbstractRoutingConnectionFactory#setConsistentConfirmsReturns 关闭验证。请注意,添加到 AbstractRoutingConnectionFactory 的第一个连接工厂将确定 confirms 和 returns 的一般值。
如果您遇到某些消息需要检查确认/返回而其他消息不需要的情况,这可能会很有用。例如
@Bean
public RabbitTemplate rabbitTemplate() {
final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
cf.setHost("localhost");
cf.setPort(5672);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);
final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
connectionFactoryMap.put("true", cachingConnectionFactory);
connectionFactoryMap.put("false", pooledChannelConnectionFactory);
final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
routingConnectionFactory.setConsistentConfirmsReturns(false);
routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);
final Expression sendExpression = new SpelExpressionParser().parseExpression(
"messageProperties.headers['x-use-publisher-confirms'] ?: false");
rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}
这样,带有 x-use-publisher-confirms: true 头部的消息将通过缓存连接发送,您可以确保消息传递。有关确保消息传递的更多信息,请参阅 发布者确认和返回。
队列亲和性和 LocalizedQueueConnectionFactory
在集群中使用 HA 队列时,为了获得最佳性能,您可能希望连接到主队列所在的物理 Broker。CachingConnectionFactory 可以配置多个 Broker 地址。这是为了故障转移,客户端根据配置的 AddressShuffleMode 顺序尝试连接。LocalizedQueueConnectionFactory 使用管理插件提供的 REST API 来确定哪个节点是队列的主节点。然后,它创建一个(或从缓存中检索)CachingConnectionFactory,该工厂仅连接到该节点。如果连接失败,则确定新的主节点,并且消费者连接到该节点。LocalizedQueueConnectionFactory 配置了一个默认连接工厂,以防无法确定队列的物理位置,在这种情况下,它会像正常一样连接到集群。
LocalizedQueueConnectionFactory 是一个 RoutingConnectionFactory,SimpleMessageListenerContainer 使用队列名称作为查找键,如上文 路由连接工厂 中所述。
因此(使用队列名称进行查找),LocalizedQueueConnectionFactory 只能在容器配置为侦听单个队列时使用。 |
| 每个节点都必须启用 RabbitMQ 管理插件。 |
此连接工厂用于长期连接,例如 SimpleMessageListenerContainer 使用的连接。它不适用于短期连接,例如与 RabbitTemplate 一起使用,因为在建立连接之前调用 REST API 会产生开销。此外,对于发布操作,队列是未知的,并且消息无论如何都会发布到所有集群成员,因此查找节点的逻辑价值不大。 |
以下配置示例显示了如何配置工厂
@Autowired
private ConfigurationProperties props;
@Bean
public CachingConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
return cf;
}
@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
@Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
return new LocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
false, null);
}
请注意,前三个参数是 addresses、adminUris 和 nodes 的数组。这些是位置性的,即当容器尝试连接到队列时,它使用管理 API 来确定哪个节点是队列的主节点,并连接到与该节点在同一数组位置的地址。
从版本 3.0 开始,RabbitMQ http-client 不再用于访问 Rest API。相反,默认情况下,如果 spring-webflux 在类路径上,则使用 Spring Webflux 中的 WebClient;否则使用 RestTemplate。 |
要将 WebFlux 添加到类路径
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
compile 'org.springframework.amqp:spring-rabbit'
您还可以通过实现 LocalizedQueueConnectionFactory#setNodeLocator 并重写其 createClient、restCall 和(可选)close 方法来使用其他 REST 技术。
lqcf.setNodeLocator(new NodeLocator<MyClient>() {
@Override
public MyClient createClient(String userName, String password) {
...
}
@Override
public Map<String, Object> restCall(MyClient client, String baseUri, String vhost, String queue) throws URISyntaxException {
...
}
});
该框架提供了 WebFluxNodeLocator 和 RestTemplateNodeLocator,默认值如上所述。
发布者确认和返回
通过将 CachingConnectionFactory 属性 publisherConfirmType 设置为 ConfirmType.CORRELATED 并将 publisherReturns 属性设置为“true”,支持已确认(带关联)和已返回的消息。
当设置了这些选项后,工厂创建的 Channel 实例将包装在 PublisherCallbackChannel 中,用于促进回调。当获得这样的通道时,客户端可以向 Channel 注册一个 PublisherCallbackChannel.Listener。PublisherCallbackChannel 实现包含将确认或返回路由到相应监听器的逻辑。这些功能将在以下部分中进一步解释。
另请参阅 关联的发布者确认和返回 和 作用域操作 中的 simplePublisherConfirms。
| 有关更多背景信息,请参阅 RabbitMQ 团队的博客文章 Introducing Publisher Confirms。 |
连接和通道监听器
连接工厂支持注册 ConnectionListener 和 ChannelListener 实现。这允许您接收连接和通道相关事件的通知。(RabbitAdmin 使用 ConnectionListener 在建立连接时执行声明——有关更多信息,请参见 交换机、队列和绑定的自动声明)。以下列表显示了 ConnectionListener 接口定义
@FunctionalInterface
public interface ConnectionListener {
void onCreate(Connection connection);
default void onClose(Connection connection) {
}
default void onShutDown(ShutdownSignalException signal) {
}
}
从版本 2.0 开始,org.springframework.amqp.rabbit.connection.Connection 对象可以提供 com.rabbitmq.client.BlockedListener 实例,以便在连接阻塞和解除阻塞事件时收到通知。以下示例显示了 ChannelListener 接口定义
@FunctionalInterface
public interface ChannelListener {
void onCreate(Channel channel, boolean transactional);
default void onShutDown(ShutdownSignalException signal) {
}
}
有关您可能希望注册 ChannelListener 的一种场景,请参见 发布是异步的——如何检测成功和失败。
日志通道关闭事件
版本 1.5 引入了一种机制,允许用户控制日志级别。
AbstractConnectionFactory 使用默认策略记录通道关闭,如下所示
-
正常的通道关闭(200 OK)不会被记录。
-
如果通道因被动队列声明失败而关闭,则以 DEBUG 级别记录。
-
如果通道因排他消费者条件导致
basic.consume被拒绝而关闭,则以 DEBUG 级别记录(从 3.1 开始,以前是 INFO)。 -
所有其他都以 ERROR 级别记录。
要修改此行为,您可以将自定义 ConditionalExceptionLogger 注入 CachingConnectionFactory 的 closeExceptionLogger 属性中。
此外,AbstractConnectionFactory.DefaultChannelCloseLogger 现在是公共的,允许其子类化。
另请参阅 消费者事件。
运行时缓存属性
从版本 1.6 开始,CachingConnectionFactory 现在通过 getCacheProperties() 方法提供缓存统计信息。这些统计信息可用于调整缓存在生产中的优化。例如,高水位线可用于确定是否应增加缓存大小。如果它等于缓存大小,您可能需要考虑进一步增加。下表描述了 CacheMode.CHANNEL 属性
| 财产 | 含义 |
|---|---|
connectionName |
由 |
channelCacheSize |
当前配置的允许空闲的最大通道数。 |
localPort |
连接的本地端口(如果可用)。这可用于与 RabbitMQ 管理 UI 上的连接和通道关联。 |
idleChannelsTx |
当前空闲(缓存)的事务性通道数。 |
idleChannelsNotTx |
当前空闲(缓存)的非事务性通道数。 |
idleChannelsTxHighWater |
同时空闲(缓存)的事务性通道的最大数量。 |
idleChannelsNotTxHighWater |
同时空闲(缓存)的非事务性通道的最大数量。 |
下表描述了 CacheMode.CONNECTION 属性
| 财产 | 含义 |
|---|---|
connectionName:<localPort> |
由 |
openConnections |
表示与 Broker 连接的连接对象数。 |
channelCacheSize |
当前配置的允许空闲的最大通道数。 |
connectionCacheSize |
当前配置的允许空闲的最大连接数。 |
idleConnections |
当前空闲的连接数。 |
idleConnectionsHighWater |
同时空闲的最大连接数。 |
idleChannelsTx:<localPort> |
此连接当前空闲(缓存)的事务性通道数。您可以使用属性名称的 |
idleChannelsNotTx:<localPort> |
此连接当前空闲(缓存)的非事务性通道数。属性名称的 |
idleChannelsTxHighWater:<localPort> |
同时空闲(缓存)的事务性通道的最大数量。属性名称的 localPort 部分可用于与 RabbitMQ 管理 UI 上的连接和通道相关联。 |
idleChannelsNotTxHighWater:<localPort> |
同时空闲(缓存)的非事务性通道的最大数量。您可以使用属性名称的 |
还包括 cacheMode 属性(CHANNEL 或 CONNECTION)。
RabbitMQ 自动连接/拓扑恢复
自 Spring AMQP 的第一个版本以来,该框架在 Broker 发生故障时提供了自己的连接和通道恢复。此外,如 配置 Broker 中所述,当连接重新建立时,RabbitAdmin 会重新声明任何基础设施 bean(队列等)。因此,它不依赖于 amqp-client 库现在提供的 自动恢复。amqp-client 默认启用自动恢复。两种恢复机制之间存在一些不兼容性,因此,Spring 默认将底层 RabbitMQ connectionFactory 上的 automaticRecoveryEnabled 属性设置为 false。即使该属性为 true,Spring 也会通过立即关闭任何已恢复的连接来有效禁用它。
| 默认情况下,只有被定义为 bean 的元素(队列、交换器、绑定)会在连接失败后重新声明。有关如何更改该行为的信息,请参阅 恢复自动删除声明。 |