连接与资源管理

虽然我们在上一节中描述的 AMQP 模型是通用的,适用于所有实现,但当我们谈到资源管理时,细节就取决于 Broker 的具体实现。因此,在本节中,我们专注于只存在于我们的“spring-rabbit”模块中的代码,因为目前只支持 RabbitMQ 实现。

用于管理与 RabbitMQ Broker 连接的核心组件是 ConnectionFactory 接口。ConnectionFactory 实现的职责是提供 org.springframework.amqp.rabbit.connection.Connection 的实例,它是 com.rabbitmq.client.Connection 的包装类。

选择连接工厂

有三种连接工厂可供选择

  • PooledChannelConnectionFactory

  • ThreadChannelConnectionFactory

  • CachingConnectionFactory

前两种在版本 2.3 中添加。

对于大多数用例,应使用 CachingConnectionFactory。如果您想确保严格的消息顺序而无需使用 Scoped Operations,则可以使用 ThreadChannelConnectionFactoryPooledChannelConnectionFactoryCachingConnectionFactory 类似,因为它使用单个连接和通道池。它的实现更简单,但不支持关联的发布者确认。

所有这三种工厂都支持简单的发布者确认。

配置 RabbitTemplate 使用 单独连接 时,从版本 2.3.2 开始,您可以配置发布连接工厂为不同的类型。默认情况下,发布工厂与主工厂类型相同,并且设置在主工厂上的任何属性也会传播到发布工厂。

从版本 3.1 开始,AbstractConnectionFactory 包含 connectionCreatingBackOff 属性,该属性在连接模块中支持指数退避策略。目前,createChannel() 方法的行为支持处理达到 channelMax 限制时发生的异常,实现了基于尝试次数和间隔的退避策略。

PooledChannelConnectionFactory

该工厂管理单个连接和两个通道池,基于 Apache Pool2。一个池用于事务性通道,另一个用于非事务性通道。这些池是配置默认为 GenericObjectPool 的池;提供回调以配置这些池;有关更多信息,请参阅 Apache 文档。

要使用此工厂,必须将 Apache commons-pool2 jar 放在类路径上。

@Bean
PooledChannelConnectionFactory pcf() throws Exception {
    ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
    rabbitConnectionFactory.setHost("localhost");
    PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
    pcf.setPoolConfigurer((pool, tx) -> {
        if (tx) {
            // configure the transactional pool
        }
        else {
            // configure the non-transactional pool
        }
    });
    return pcf;
}

ThreadChannelConnectionFactory

该工厂管理单个连接和两个 ThreadLocal,一个用于事务性通道,另一个用于非事务性通道。此工厂确保同一线程上的所有操作都使用同一通道(只要它保持打开状态)。这有助于实现严格的消息顺序,而无需 Scoped Operations。为避免内存泄漏,如果您的应用程序使用许多生命周期短暂的线程,则必须调用工厂的 closeThreadChannel() 方法释放通道资源。从版本 2.3.7 开始,线程可以将其通道转移到另一个线程。有关更多信息,请参阅 多线程环境下的严格消息顺序

CachingConnectionFactory

提供的第三种实现是 CachingConnectionFactory,它默认建立一个可供应用程序共享的单个连接代理。由于 AMQP 消息传递的“工作单元”实际上是一个“通道”(在某些方面,这类似于 JMS 中连接和会话之间的关系),因此连接共享成为可能。连接实例提供了一个 createChannel 方法。CachingConnectionFactory 实现支持缓存这些通道,并根据它们是否具有事务性来维护独立的通道缓存。创建 CachingConnectionFactory 实例时,您可以通过构造函数提供“hostname”。您还应该提供“username”和“password”属性。要配置通道缓存的大小(默认为 25),您可以调用 setChannelCacheSize() 方法。

从版本 1.3 开始,您可以配置 CachingConnectionFactory 同时缓存连接和通道。在这种情况下,每次调用 createConnection() 都会创建一个新连接(或从缓存中检索一个空闲连接)。关闭连接会将其返回到缓存(如果尚未达到缓存大小)。在此类连接上创建的通道也会被缓存。在某些环境中,例如从 HA 集群消费时,结合负载均衡器连接到不同的集群成员,以及其他情况下,使用单独的连接可能很有用。要缓存连接,请将 cacheMode 设置为 CacheMode.CONNECTION

这不会限制连接的数量。相反,它指定允许缓存多少个空闲的打开连接。

从版本 1.5.5 开始,提供了一个名为 connectionLimit 的新属性。设置此属性后,它会限制允许的总连接数。设置后,如果达到限制,将使用 channelCheckoutTimeLimit 等待连接变为空闲。如果超出时间,将抛出 AmqpTimeoutException

当缓存模式为 CONNECTION 时,不支持队列等的自动声明(请参阅 自动声明交换、队列和绑定)。

此外,在撰写本文时,amqp-client 库默认会为每个连接创建一个固定线程池(默认大小:Runtime.getRuntime().availableProcessors() * 2 个线程)。使用大量连接时,应考虑在 CachingConnectionFactory 上设置自定义 executor。这样,所有连接都可以使用同一个执行器,并且其线程可以共享。执行器的线程池应该无界或根据预期用途进行适当设置(通常,每个连接至少一个线程)。如果在每个连接上创建多个通道,池大小会影响并发性,因此可变(或简单的缓存)线程池执行器将是最合适的。

重要的是要理解,缓存大小(默认情况下)不是限制,而只是可以缓存的通道数量。例如,缓存大小为 10,实际上可以使用任意数量的通道。如果使用了超过 10 个通道并且它们都返回到缓存,则 10 个进入缓存。其余的将被物理关闭。

从版本 1.6 开始,默认通道缓存大小从 1 增加到 25。在高流量、多线程环境中,小缓存意味着通道创建和关闭的速度很快。增加默认缓存大小可以避免这种开销。您应该通过 RabbitMQ 管理 UI 监控正在使用的通道,如果您看到大量通道被创建和关闭,请考虑进一步增加缓存大小。缓存仅在按需增长(以适应应用程序的并发要求),因此此更改不会影响现有的低流量应用程序。

从版本 1.4.2 开始,CachingConnectionFactory 有一个名为 channelCheckoutTimeout 的属性。当此属性大于零时,channelCacheSize 成为可以在连接上创建的通道数量的限制。如果达到限制,调用线程将阻塞,直到通道可用或达到此超时时间,此时将抛出 AmqpTimeoutException

框架内使用的通道(例如,RabbitTemplate)可靠地返回到缓存。如果您在框架外部创建通道(例如,通过直接访问连接并调用 createChannel()),则必须可靠地(也许在 finally 块中)关闭它们,以避免通道耗尽。

以下示例展示了如何创建新连接

CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection connection = connectionFactory.createConnection();

使用 XML 时,配置可能如下例所示

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="somehost"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
</bean>
框架的单元测试代码中还有一个 SingleConnectionFactory 实现。它比 CachingConnectionFactory 更简单,因为它不缓存通道,但由于其性能和弹性不足,不适合在简单测试之外的实际应用中使用。如果您因某些原因需要实现自己的 ConnectionFactoryAbstractConnectionFactory 基类可能是一个不错的起点。

可以使用 rabbit 命名空间快速方便地创建 ConnectionFactory,如下所示

<rabbit:connection-factory id="connectionFactory"/>

在大多数情况下,这种方法是首选,因为框架可以为您选择最佳默认值。创建的实例是 CachingConnectionFactory。请记住,通道的默认缓存大小是 25。如果您希望缓存更多通道,请通过设置 'channelCacheSize' 属性设置一个更大的值。在 XML 中,它将如下所示

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="somehost"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
    <property name="channelCacheSize" value="50"/>
</bean>

此外,使用命名空间,您可以添加 'channel-cache-size' 属性,如下所示

<rabbit:connection-factory
    id="connectionFactory" channel-cache-size="50"/>

默认缓存模式是 CHANNEL,但您可以将其配置为缓存连接。在以下示例中,我们使用 connection-cache-size

<rabbit:connection-factory
    id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>

您可以使用命名空间提供 host 和 port 属性,如下所示

<rabbit:connection-factory
    id="connectionFactory" host="somehost" port="5672"/>

或者,如果在集群环境中运行,可以使用 addresses 属性,如下所示

<rabbit:connection-factory
    id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>

有关 address-shuffle-mode 的信息,请参阅 连接到集群

以下示例使用一个自定义线程工厂,将线程名称加上 rabbitmq- 前缀

<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
    thread-factory="tf"
    channel-cache-size="10" username="user" password="password" />

<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
    <constructor-arg value="rabbitmq-" />
</bean>

AddressResolver

从版本 2.1.15 开始,您现在可以使用 AddressResolver 来解析连接地址。这将覆盖 addresseshost/port 属性的任何设置。

连接命名

从版本 1.7 开始,提供了 ConnectionNameStrategy,用于注入到 AbstractionConnectionFactory 中。生成的名称用于目标 RabbitMQ 连接的应用程序特定标识。如果 RabbitMQ 服务器支持,连接名称将显示在管理 UI 中。此值不必是唯一的,并且不能用作连接标识符(例如,在 HTTP API 请求中)。此值应该是人类可读的,并且是 ClientProperties 下的 connection_name 键的一部分。您可以使用简单的 Lambda,如下所示

connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");

ConnectionFactory 参数可用于通过某些逻辑区分目标连接名称。默认情况下,使用 AbstractConnectionFactorybeanName、表示对象的十六进制字符串以及内部计数器来生成 connection_name<rabbit:connection-factory> 命名空间组件也提供了 connection-name-strategy 属性。

SimplePropertyValueConnectionNameStrategy 的实现将连接名称设置为应用程序属性。您可以将其声明为 @Bean 并将其注入到连接工厂中,如下例所示

@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
    return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}

@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    ...
    connectionFactory.setConnectionNameStrategy(cns);
    return connectionFactory;
}

该属性必须存在于应用程序上下文的 Environment 中。

使用 Spring Boot 及其自动配置的连接工厂时,您只需声明 ConnectionNameStrategy@Bean。Boot 会自动检测该 Bean 并将其注入到工厂中。

阻塞的连接和资源约束

连接可能因 Broker 的 内存警报 而被阻塞。从版本 2.0 开始,org.springframework.amqp.rabbit.connection.Connection 可以提供 com.rabbitmq.client.BlockedListener 实例,以接收连接阻塞和取消阻塞事件的通知。此外,AbstractConnectionFactory 通过其内部的 BlockedListener 实现,分别发出 ConnectionBlockedEventConnectionUnblockedEvent。这些事件使您能够提供应用程序逻辑,以适当响应 Broker 上的问题并(例如)采取一些纠正措施。

当应用程序配置了单个 CachingConnectionFactory 时(Spring Boot 自动配置默认如此),当连接被 Broker 阻塞时,应用程序将停止工作。而当连接被 Broker 阻塞时,它的任何客户端都会停止工作。如果我们在同一个应用程序中有生产者和消费者,当生产者阻塞连接(因为 Broker 上没有资源了)并且消费者无法释放它们(因为连接被阻塞)时,我们可能会遇到死锁。为了缓解这个问题,我们建议使用另一个具有相同选项的独立 CachingConnectionFactory 实例——一个用于生产者,一个用于消费者。对于在消费者线程上执行的事务性生产者来说,使用单独的 CachingConnectionFactory 是不可能的,因为它们应该重用与消费者事务关联的 Channel

从版本 2.0.2 开始,RabbitTemplate 有一个配置选项,可以在不使用事务的情况下自动使用第二个连接工厂。有关更多信息,请参阅 使用单独的连接。发布者连接的 ConnectionNameStrategy 与主策略相同,并在调用方法的结果后附加 .publisher

从版本 1.7.7 开始,提供了 AmqpResourceNotAvailableException,当 SimpleConnection.createChannel() 无法创建 Channel 时会抛出此异常(例如,因为达到 channelMax 限制且缓存中没有可用通道)。您可以在 RetryPolicy 中使用此异常,在退避后恢复操作。

配置底层客户端连接工厂

CachingConnectionFactory 使用 Rabbit 客户端的 ConnectionFactory 实例。通过在 CachingConnectionFactory 上设置等效属性时,许多配置属性(例如 hostportuserNamepasswordrequestedHeartBeatconnectionTimeout)会透传下去。要设置其他属性(例如 clientProperties),您可以定义一个 Rabbit 工厂实例,并通过使用 CachingConnectionFactory 的适当构造函数提供对其的引用。使用命名空间(如前所述)时,需要在 connection-factory 属性中提供对已配置工厂的引用。为了方便起见,提供了一个工厂 Bean,以帮助在 Spring 应用程序上下文中配置连接工厂,如 下一节 中所述。

<rabbit:connection-factory
      id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
4.0.x 客户端默认启用自动恢复。尽管与此功能兼容,Spring AMQP 有其自己的恢复机制,并且客户端恢复功能通常不需要。我们建议禁用 amqp-client 自动恢复,以避免在 Broker 可用但连接尚未恢复时出现 AutoRecoverConnectionNotCurrentlyOpenException 实例。您可能会注意到此异常,例如,当在 RabbitTemplate 中配置了 RetryTemplate 时,即使故障转移到集群中的另一个 Broker 时也是如此。由于自动恢复连接会按计时器恢复,使用 Spring AMQP 的恢复机制可能会更快地恢复连接。从版本 1.7.1 开始,除非您明确创建自己的 RabbitMQ 连接工厂并将其提供给 CachingConnectionFactory,否则 Spring AMQP 将禁用 amqp-client 自动恢复。由 RabbitConnectionFactoryBean 创建的 RabbitMQ ConnectionFactory 实例也默认禁用此选项。

RabbitConnectionFactoryBean 和配置 SSL

从版本 1.4 开始,提供了一个方便的 RabbitConnectionFactoryBean,通过依赖注入方便地配置底层客户端连接工厂的 SSL 属性。其他 setter 方法委托给底层工厂。以前,您必须以编程方式配置 SSL 选项。以下示例展示了如何配置 RabbitConnectionFactoryBean

Java
@Bean
RabbitConnectionFactoryBean rabbitConnectionFactory() {
    RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
    factoryBean.setUseSSL(true);
    factoryBean.setSslPropertiesLocation(new ClassPathResource("secrets/rabbitSSL.properties"));
    return factoryBean;
}

@Bean
CachingConnectionFactory connectionFactory(ConnectionFactory rabbitConnectionFactory) {
    CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
    ccf.setHost("...");
    // ...
    return ccf;
}
Boot application.properties
spring.rabbitmq.ssl.enabled:true
spring.rabbitmq.ssl.keyStore=...
spring.rabbitmq.ssl.keyStoreType=jks
spring.rabbitmq.ssl.keyStorePassword=...
spring.rabbitmq.ssl.trustStore=...
spring.rabbitmq.ssl.trustStoreType=jks
spring.rabbitmq.ssl.trustStorePassword=...
spring.rabbitmq.host=...
...
XML
<rabbit:connection-factory id="rabbitConnectionFactory"
    connection-factory="clientConnectionFactory"
    host="${host}"
    port="${port}"
    virtual-host="${vhost}"
    username="${username}" password="${password}" />

<bean id="clientConnectionFactory"
        class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
    <property name="useSSL" value="true" />
    <property name="sslPropertiesLocation" value="classpath:secrets/rabbitSSL.properties"/>
</bean>

有关配置 SSL 的信息,请参阅 RabbitMQ 文档。省略 keyStoretrustStore 配置以通过 SSL 连接而无需证书验证。下一个示例展示了如何提供密钥库和信任库配置。

sslPropertiesLocation 属性是一个指向包含以下键的属性文件的 Spring Resource

keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret

keyStoretruststore 是指向存储文件的 Spring Resource。通常,此属性文件由操作系统保护,应用程序具有读取权限。

从 Spring AMQP 1.5 版本开始,您可以直接在工厂 Bean 上设置这些属性。如果同时提供了离散属性和 sslPropertiesLocation,则后者的属性会覆盖离散值。

从版本 2.0 开始,服务器证书默认经过验证,因为它更安全。如果您出于某种原因希望跳过此验证,请将工厂 Bean 的 skipServerCertificateValidation 属性设置为 true。从版本 2.1 开始,RabbitConnectionFactoryBean 现在默认调用 enableHostnameVerification()。要恢复到以前的行为,请将 enableHostnameVerification 属性设置为 false
从版本 2.2.5 开始,工厂 Bean 默认将始终使用 TLS v1.2;之前,在某些情况下使用 v1.1,在其他情况下使用 v1.2(取决于其他属性)。如果您出于某种原因需要使用 v1.1,请设置 sslAlgorithm 属性:setSslAlgorithm("TLSv1.1")

连接到集群

要连接到集群,请在 CachingConnectionFactory 上配置 addresses 属性

@Bean
public CachingConnectionFactory ccf() {
    CachingConnectionFactory ccf = new CachingConnectionFactory();
    ccf.setAddresses("host1:5672,host2:5672,host3:5672");
    return ccf;
}

从版本 3.0 开始,底层连接工厂将在建立新连接时尝试连接到随机选择的地址。要恢复到从第一个到最后一个的先前行为,请将 addressShuffleMode 属性设置为 AddressShuffleMode.NONE

从版本 2.3 开始,添加了 INORDER 混洗模式,这意味着连接创建后第一个地址会移到末尾。如果您希望在所有节点上消费所有分片,并且使用 CacheMode.CONNECTION 和适当的并发性,则可以考虑将此模式与 RabbitMQ Sharding Plugin 一起使用。

@Bean
public CachingConnectionFactory ccf() {
    CachingConnectionFactory ccf = new CachingConnectionFactory();
    ccf.setAddresses("host1:5672,host2:5672,host3:5672");
    ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
    return ccf;
}

路由连接工厂

从版本 1.3 开始,引入了 AbstractRoutingConnectionFactory。此工厂提供了一种机制,可以配置多个 ConnectionFactories 的映射,并在运行时通过某个 lookupKey 确定目标 ConnectionFactory。通常,实现会检查线程绑定的上下文。为了方便起见,Spring AMQP 提供了 SimpleRoutingConnectionFactory,它从 SimpleResourceHolder 获取当前线程绑定的 lookupKey。以下示例展示了如何在 XML 和 Java 中配置 SimpleRoutingConnectionFactory

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
    <property name="targetConnectionFactories">
        <map>
            <entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
            <entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
        </map>
    </property>
</bean>

<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void service(String vHost, String payload) {
        SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
        rabbitTemplate.convertAndSend(payload);
        SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
    }

}

使用后解绑资源非常重要。有关更多信息,请参阅 AbstractRoutingConnectionFactoryJavaDoc

从版本 1.4 开始,RabbitTemplate 支持 SpEL 表达式属性 sendConnectionFactorySelectorExpressionreceiveConnectionFactorySelectorExpression,它们在每次 AMQP 协议交互操作(sendsendAndReceivereceivereceiveAndReply)时进行评估,解析为提供的 AbstractRoutingConnectionFactorylookupKey 值。您可以在表达式中使用 Bean 引用,例如 @vHostResolver.getVHost(#root)。对于 send 操作,要发送的消息是根评估对象。对于 receive 操作,queueName 是根评估对象。

路由算法如下:如果选择器表达式为 null 或评估结果为 null,或者提供的 ConnectionFactory 不是 AbstractRoutingConnectionFactory 的实例,则一切如常,依赖于提供的 ConnectionFactory 实现。如果评估结果不为 null,但对于该 lookupKey 没有目标 ConnectionFactory 并且 AbstractRoutingConnectionFactory 配置为 lenientFallback = true,也会发生同样的情况。对于 AbstractRoutingConnectionFactory,它确实会根据 determineCurrentLookupKey() 回退到其路由实现。但是,如果 lenientFallback = false,则会抛出 IllegalStateException

命名空间支持还在 <rabbit:template> 组件上提供了 send-connection-factory-selector-expressionreceive-connection-factory-selector-expression 属性。

此外,从版本 1.4 开始,您可以在监听器容器中配置路由连接工厂。在这种情况下,队列名称列表将用作查找键。例如,如果您将容器配置为监听 setQueueNames("thing1", "thing2"),则查找键将是 [thing1,thing](注意键中没有空格)。

从版本 1.6.9 开始,您可以使用监听器容器上的 setLookupKeyQualifier 方法向查找键添加限定符。这样做可以实现,例如,监听名称相同但位于不同虚拟主机(您将为每个虚拟主机设置一个连接工厂)中的队列。

例如,使用查找键限定符 thing1 和监听队列 thing2 的容器,您可以注册目标连接工厂的查找键可以是 thing1[thing2]

目标连接工厂(如果提供默认连接工厂,也包括默认连接工厂)对于发布者确认和返回必须具有相同的设置。请参阅 发布者确认和返回

从版本 2.4.4 开始,可以禁用此验证。如果您遇到确认和返回之间的值需要不相等的情况,可以使用 AbstractRoutingConnectionFactory#setConsistentConfirmsReturns 关闭验证。请注意,添加到 AbstractRoutingConnectionFactory 的第一个连接工厂将决定 confirmsreturns 的通用值。

如果您遇到某些消息需要检查确认/返回而其他消息不需要的情况,这可能很有用。例如

@Bean
public RabbitTemplate rabbitTemplate() {
    final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
    cf.setHost("localhost");
    cf.setPort(5672);

    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
    cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);

    PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);

    final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
    connectionFactoryMap.put("true", cachingConnectionFactory);
    connectionFactoryMap.put("false", pooledChannelConnectionFactory);

    final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
    routingConnectionFactory.setConsistentConfirmsReturns(false);
    routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
    routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);

    final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);

    final Expression sendExpression = new SpelExpressionParser().parseExpression(
            "messageProperties.headers['x-use-publisher-confirms'] ?: false");
    rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}

这样,带有头 x-use-publisher-confirms: true 的消息将通过缓存连接发送,您可以确保消息的投递。有关确保消息投递的更多信息,请参阅 发布者确认和返回

队列亲和性与 LocalizedQueueConnectionFactory

在集群中使用 HA 队列时,为了获得最佳性能,您可能希望连接到主队列所在的物理 Broker。CachingConnectionFactory 可以配置多个 Broker 地址。这是为了实现故障转移,客户端会根据配置的 AddressShuffleMode 顺序尝试连接。LocalizedQueueConnectionFactory 使用管理插件提供的 REST API 来确定哪个节点是该队列的主节点。然后,它会创建一个连接到该节点的 CachingConnectionFactory(或从缓存中检索一个)。如果连接失败,会确定新的主节点,并且消费者会连接到它。LocalizedQueueConnectionFactory 配置了一个默认连接工厂,以防无法确定队列的物理位置,在这种情况下,它会正常连接到集群。

LocalizedQueueConnectionFactory 是一个 RoutingConnectionFactorySimpleMessageListenerContainer 使用队列名称作为查找键,如上文 路由连接工厂 中所述。

因此(由于使用队列名称进行查找),只有当容器配置为监听单个队列时,才能使用 LocalizedQueueConnectionFactory
必须在每个节点上启用 RabbitMQ 管理插件。
此连接工厂旨在用于长连接,例如 SimpleMessageListenerContainer 使用的连接。由于在建立连接之前调用 REST API 的开销,它不适用于短连接使用,例如与 RabbitTemplate 一起使用。此外,对于发布操作,队列是未知的,并且消息无论如何都会发布到所有集群成员,因此查找节点的逻辑价值不大。

以下示例配置展示了如何配置这些工厂

@Autowired
private ConfigurationProperties props;

@Bean
public CachingConnectionFactory defaultConnectionFactory() {
    CachingConnectionFactory cf = new CachingConnectionFactory();
    cf.setAddresses(this.props.getAddresses());
    cf.setUsername(this.props.getUsername());
    cf.setPassword(this.props.getPassword());
    cf.setVirtualHost(this.props.getVirtualHost());
    return cf;
}

@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
        @Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
    return new LocalizedQueueConnectionFactory(defaultCF,
            StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
            StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
            StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
            this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
            false, null);
}

请注意,前三个参数是 addressesadminUrisnodes 的数组。它们是位置参数,即当容器尝试连接到队列时,它使用管理 API 确定哪个节点是该队列的主节点,并连接到与该节点在同一数组位置的地址。

从版本 3.0 开始,不再使用 RabbitMQ 的 http-client 来访问 Rest API。默认情况下,如果类路径中存在 spring-webflux,则使用 Spring Webflux 的 WebClient;否则,使用 RestTemplate

WebFlux 添加到类路径

Maven
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
</dependency>
Gradle
compile 'org.springframework.amqp:spring-rabbit'

您还可以通过实现 LocalizedQueueConnectionFactory.NodeLocator 并重写其 createClientrestCall 方法,以及可选的 close 方法来使用其他 REST 技术。

lqcf.setNodeLocator(new NodeLocator<MyClient>() {

    @Override
    public MyClient createClient(String userName, String password) {
        ...
    }

    @Override
    public HashMap<String, Object> restCall(MyClient client, URI uri) {
        ...
    });

});

框架提供了 WebFluxNodeLocatorRestTemplateNodeLocator,默认设置如上所述。

Publisher Confirms 和 Returns

通过将 CachingConnectionFactory 属性 publisherConfirmType 设置为 ConfirmType.CORRELATED 并将 publisherReturns 属性设置为 'true',可以支持 confirmed (带关联) 和 returned 消息。

设置这些选项后,工厂创建的 Channel 实例将包装在 PublisherCallbackChannel 中,用于促进回调。获取此类 channel 后,客户端可以向 Channel 注册一个 PublisherCallbackChannel.ListenerPublisherCallbackChannel 实现包含将 confirm 或 return 路由到相应监听器的逻辑。这些功能将在以下章节中进一步解释。

另请参阅 关联的 Publisher Confirms 和 Returns,以及 作用域操作 中的 simplePublisherConfirms

更多背景信息,请参阅 RabbitMQ 团队题为 Introducing Publisher Confirms 的博客文章。

Connection 和 Channel Listeners

连接工厂支持注册 ConnectionListenerChannelListener 实现。这允许您接收连接和 channel 相关事件的通知。(RabbitAdmin 在建立连接时使用 ConnectionListener 来执行声明 - 更多信息请参见 Exchange、Queue 和 Binding 的自动声明)。以下列表显示了 ConnectionListener 接口定义

@FunctionalInterface
public interface ConnectionListener {

    void onCreate(Connection connection);

    default void onClose(Connection connection) {
    }

    default void onShutDown(ShutdownSignalException signal) {
    }

}

从版本 2.0 开始,org.springframework.amqp.rabbit.connection.Connection 对象可以提供 com.rabbitmq.client.BlockedListener 实例,以便在连接被阻塞和解除阻塞时收到通知。以下示例显示了 ChannelListener 接口定义

@FunctionalInterface
public interface ChannelListener {

    void onCreate(Channel channel, boolean transactional);

    default void onShutDown(ShutdownSignalException signal) {
    }

}

有关何时可能需要注册 ChannelListener 的一个场景,请参见 发布是异步的 — 如何检测成功和失败

记录 Channel 关闭事件

版本 1.5 引入了一种机制,允许用户控制日志记录级别。

AbstractConnectionFactory 使用默认策略记录 channel 关闭,如下所示

  • 正常的 channel 关闭 (200 OK) 不会记录日志。

  • 如果 channel 因被动队列声明失败而关闭,则以 DEBUG 级别记录日志。

  • 如果 channel 因独占消费者条件导致 basic.consume 被拒绝而关闭,则以 DEBUG 级别记录日志 (自 3.1 版起,之前是 INFO)。

  • 所有其他情况都以 ERROR 级别记录日志。

要修改此行为,您可以在 CachingConnectionFactorycloseExceptionLogger 属性中注入自定义的 ConditionalExceptionLogger

此外,AbstractConnectionFactory.DefaultChannelCloseLogger 现在是公共的,允许对其进行子类化。

另请参阅 消费者事件

运行时缓存属性

从版本 1.6 开始,CachingConnectionFactory 现在通过 getCacheProperties() 方法提供缓存统计信息。这些统计信息可用于调整缓存以优化生产环境。例如,高水位标记可用于确定是否应增加缓存大小。如果它等于缓存大小,您可能需要考虑进一步增加。下表描述了 CacheMode.CHANNEL 的属性

表 1. CacheMode.CHANNEL 的缓存属性
属性 含义
connectionName

ConnectionNameStrategy 生成的连接名称。

channelCacheSize

当前配置的最大允许空闲 channel 数。

localPort

连接的本地端口(如果可用)。这可用于与 RabbitMQ 管理 UI 上的连接和 channel 相关联。

idleChannelsTx

当前空闲(缓存)的事务性 channel 数。

idleChannelsNotTx

当前空闲(缓存)的非事务性 channel 数。

idleChannelsTxHighWater

曾经并发空闲(缓存)的最大事务性 channel 数。

idleChannelsNotTxHighWater

曾经并发空闲(缓存)的最大非事务性 channel 数。

下表描述了 CacheMode.CONNECTION 的属性

表 2. CacheMode.CONNECTION 的缓存属性
属性 含义
connectionName:<localPort>

ConnectionNameStrategy 生成的连接名称。

openConnections

代表与 broker 连接的连接对象数。

channelCacheSize

当前配置的最大允许空闲 channel 数。

connectionCacheSize

当前配置的最大允许空闲连接数。

idleConnections

当前空闲的连接数。

idleConnectionsHighWater

曾经并发空闲的最大连接数。

idleChannelsTx:<localPort>

此连接当前空闲(缓存)的事务性 channel 数。您可以使用属性名称中的 localPort 部分与 RabbitMQ 管理 UI 上的连接和 channel 相关联。

idleChannelsNotTx:<localPort>

此连接当前空闲(缓存)的非事务性 channel 数。属性名称中的 localPort 部分可用于与 RabbitMQ 管理 UI 上的连接和 channel 相关联。

idleChannelsTxHighWater:<localPort>

曾经并发空闲(缓存)的最大事务性 channel 数。属性名称中的 localPort 部分可用于与 RabbitMQ 管理 UI 上的连接和 channel 相关联。

idleChannelsNotTxHighWater:<localPort>

曾经并发空闲(缓存)的最大非事务性 channel 数。您可以使用属性名称中的 localPort 部分与 RabbitMQ 管理 UI 上的连接和 channel 相关联。

还包括 cacheMode 属性 (CHANNELCONNECTION)。

cacheStats
图 1. JVisualVM 示例

RabbitMQ 自动连接/拓扑恢复

自 Spring AMQP 的第一个版本以来,该框架在发生 broker 故障时提供了自己的连接和 channel 恢复机制。此外,如 配置 Broker 中所述,RabbitAdmin 在重新建立连接时会重新声明任何基础设施 bean (队列等)。因此,它不依赖于现在由 amqp-client 库提供的自动恢复。默认情况下,amqp-client 启用了自动恢复。这两种恢复机制之间存在一些不兼容性,因此默认情况下,Spring 将底层 RabbitMQ connectionFactory 上的 automaticRecoveryEnabled 属性设置为 false。即使该属性为 true,Spring 也通过立即关闭任何已恢复的连接来有效禁用它。

默认情况下,连接失败后仅会重新声明定义为 bean 的元素(队列、交换器、绑定)。有关如何更改此行为,请参阅 恢复自动删除声明