连接和资源管理

尽管我们在上一节中描述的 AMQP 模型是通用的,适用于所有实现,但当涉及到资源管理时,具体细节取决于 Broker 的实现。因此,在本节中,我们专注于“spring-rabbit”模块中存在的代码,因为目前 RabbitMQ 是唯一受支持的实现。

用于管理与 RabbitMQ Broker 连接的核心组件是 ConnectionFactory 接口。ConnectionFactory 实现的职责是提供 org.springframework.amqp.rabbit.connection.Connection 的实例,它是 com.rabbitmq.client.Connection 的包装器。

选择连接工厂

有三种连接工厂可供选择

  • PooledChannelConnectionFactory

  • ThreadChannelConnectionFactory

  • CachingConnectionFactory

前两个是在版本 2.3 中添加的。

对于大多数用例,应使用 CachingConnectionFactory。如果您想在不需要使用 作用域操作 的情况下确保严格的消息排序,则可以使用 ThreadChannelConnectionFactoryPooledChannelConnectionFactoryCachingConnectionFactory 类似,因为它使用单个连接和通道池。它的实现更简单,但不支持关联的发布者确认。

所有三个工厂都支持简单的发布者确认。

从版本 2.3.2 开始,当配置 RabbitTemplate 以使用 单独连接 时,您现在可以将发布连接工厂配置为不同的类型。默认情况下,发布工厂的类型与主工厂相同,并且在主工厂上设置的任何属性也会传播到发布工厂。

从版本 3.1 开始,AbstractConnectionFactory 包含 connectionCreatingBackOff 属性,该属性支持连接模块中的退避策略。目前,createChannel() 的行为中支持处理达到 channelMax 限制时发生的异常,实现基于尝试次数和间隔的退避策略。

PooledChannelConnectionFactory

该工厂管理一个连接和两个基于 Apache Pool2 的通道池。一个池用于事务性通道,另一个用于非事务性通道。这些池是具有默认配置的 GenericObjectPool;提供了一个回调来配置池;有关更多信息,请参阅 Apache 文档。

要使用此工厂,classpath 中必须包含 Apache commons-pool2 jar。

@Bean
PooledChannelConnectionFactory pcf() throws Exception {
    ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
    rabbitConnectionFactory.setHost("localhost");
    PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
    pcf.setPoolConfigurer((pool, tx) -> {
        if (tx) {
            // configure the transactional pool
        }
        else {
            // configure the non-transactional pool
        }
    });
    return pcf;
}

ThreadChannelConnectionFactory

该工厂管理一个连接和两个 ThreadLocal,一个用于事务性通道,另一个用于非事务性通道。该工厂确保同一线程上的所有操作都使用相同的通道(只要它保持打开状态)。这有助于实现严格的消息排序,而无需 作用域操作。为避免内存泄漏,如果您的应用程序使用许多短生命周期的线程,则必须调用工厂的 closeThreadChannel() 来释放通道资源。从版本 2.3.7 开始,一个线程可以将其通道传输到另一个线程。有关更多信息,请参见 多线程环境中的严格消息排序

CachingConnectionFactory

提供的第三个实现是 CachingConnectionFactory,它默认建立一个可以由应用程序共享的单个连接代理。连接共享是可能的,因为 AMQP 消息传递的“工作单元”实际上是“通道”(在某些方面,这类似于 JMS 中连接和会话之间的关系)。连接实例提供 createChannel 方法。CachingConnectionFactory 实现支持这些通道的缓存,并且它根据通道是否是事务性来维护单独的通道缓存。创建 CachingConnectionFactory 实例时,可以通过构造函数提供“主机名”。您还应该提供“用户名”和“密码”属性。要配置通道缓存的大小(默认为 25),可以调用 setChannelCacheSize() 方法。

从版本 1.3 开始,您可以配置 CachingConnectionFactory 以缓存连接以及仅缓存通道。在这种情况下,每次调用 createConnection() 都会创建一个新连接(或从缓存中检索一个空闲连接)。关闭连接会将其返回到缓存中(如果未达到缓存大小)。在此类连接上创建的通道也会被缓存。在某些环境中,使用单独的连接可能很有用,例如从 HA 集群消费,结合负载均衡器连接到不同的集群成员等等。要缓存连接,请将 cacheMode 设置为 CacheMode.CONNECTION

这不限制连接的数量。相反,它指定允许有多少空闲的开放连接。

从版本 1.5.5 开始,提供了一个名为 connectionLimit 的新属性。设置此属性后,它会限制允许的总连接数。设置后,如果达到限制,则使用 channelCheckoutTimeLimit 等待连接变为空闲。如果超过时间,则抛出 AmqpTimeoutException

当缓存模式为 CONNECTION 时,不支持队列等的自动声明(请参阅 交换机、队列和绑定的自动声明)。

此外,在撰写本文时,amqp-client 库默认会为每个连接创建一个固定线程池(默认大小:Runtime.getRuntime().availableProcessors() * 2 个线程)。当使用大量连接时,您应该考虑在 CachingConnectionFactory 上设置自定义 executor。然后,所有连接都可以使用相同的执行器,并且可以共享其线程。执行器的线程池应该是无界的,或者根据预期用途适当设置(通常,每个连接至少一个线程)。如果每个连接上创建多个通道,则池大小会影响并发性,因此可变(或简单的缓存)线程池执行器是最合适的。

重要的是要理解缓存大小(默认情况下)不是限制,而仅仅是可以缓存的通道数量。例如,如果缓存大小为 10,则实际上可以使用任意数量的通道。如果使用了超过 10 个通道,并且它们都返回到缓存中,则有 10 个通道进入缓存。其余的通道会被物理关闭。

从版本 1.6 开始,默认通道缓存大小已从 1 增加到 25。在高并发、多线程环境中,小缓存意味着通道以高速率创建和关闭。增加默认缓存大小可以避免这种开销。您应该通过 RabbitMQ 管理 UI 监控正在使用的通道,如果看到许多通道正在创建和关闭,请考虑进一步增加缓存大小。缓存仅在按需时增长(以适应应用程序的并发要求),因此此更改不会影响现有的低吞吐量应用程序。

从版本 1.4.2 开始,CachingConnectionFactory 有一个名为 channelCheckoutTimeout 的属性。当此属性大于零时,channelCacheSize 成为可以在连接上创建的通道数量的限制。如果达到限制,调用线程会阻塞,直到通道可用或达到此超时,在这种情况下会抛出 AmqpTimeoutException

框架内使用的通道(例如 RabbitTemplate)会可靠地返回到缓存中。如果您在框架外部创建通道(例如,通过直接访问连接并调用 createChannel()),则必须可靠地将其返回(通过关闭),可能在 finally 块中,以避免通道耗尽。

以下示例显示如何创建新 connection

CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection connection = connectionFactory.createConnection();

使用 XML 时,配置可能如下例所示

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="somehost"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
</bean>
还有一个 SingleConnectionFactory 实现,仅在框架的单元测试代码中可用。它比 CachingConnectionFactory 更简单,因为它不缓存通道,但由于其性能和弹性不足,不适合在简单测试之外的实际使用。如果出于某种原因需要实现自己的 ConnectionFactoryAbstractConnectionFactory 基类可以提供一个很好的起点。

可以使用 Rabbit 命名空间快速方便地创建 ConnectionFactory,如下所示

<rabbit:connection-factory id="connectionFactory"/>

在大多数情况下,此方法更可取,因为框架可以为您选择最佳默认值。创建的实例是 CachingConnectionFactory。请记住,通道的默认缓存大小为 25。如果您希望缓存更多通道,请通过设置“channelCacheSize”属性来设置更大的值。在 XML 中,它看起来像这样

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="somehost"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
    <property name="channelCacheSize" value="50"/>
</bean>

此外,使用命名空间,您可以添加“channel-cache-size”属性,如下所示

<rabbit:connection-factory
    id="connectionFactory" channel-cache-size="50"/>

默认缓存模式是 CHANNEL,但您可以将其配置为缓存连接。在以下示例中,我们使用 connection-cache-size

<rabbit:connection-factory
    id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>

您可以使用命名空间提供主机和端口属性,如下所示

<rabbit:connection-factory
    id="connectionFactory" host="somehost" port="5672"/>

或者,如果在集群环境中运行,您可以使用地址属性,如下所示

<rabbit:connection-factory
    id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>

有关 address-shuffle-mode 的信息,请参见 连接到集群

以下示例使用自定义线程工厂,该工厂将线程名称前缀为 rabbitmq-

<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
    thread-factory="tf"
    channel-cache-size="10" username="user" password="password" />

<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
    <constructor-arg value="rabbitmq-" />
</bean>

地址解析器

从版本 2.1.15 开始,您现在可以使用 AddressResolver 来解析连接地址。这将覆盖 addresseshost/port 属性的任何设置。

命名连接

从版本 1.7 开始,提供了 ConnectionNameStrategy 用于注入 AbstractionConnectionFactory。生成的名称用于目标 RabbitMQ 连接的应用程序特定标识。如果 RabbitMQ 服务器支持,连接名称会显示在管理 UI 中。此值不必是唯一的,不能用作连接标识符——例如,在 HTTP API 请求中。此值应该是人类可读的,并且是 ClientPropertiesconnection_name 键的一部分。您可以使用简单的 Lambda,如下所示

connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");

ConnectionFactory 参数可用于通过某些逻辑区分目标连接名称。默认情况下,AbstractConnectionFactorybeanName、表示对象的十六进制字符串和内部计数器用于生成 connection_name<rabbit:connection-factory> 命名空间组件也提供了 connection-name-strategy 属性。

SimplePropertyValueConnectionNameStrategy 的实现将连接名称设置为应用程序属性。您可以将其声明为 @Bean 并将其注入连接工厂,如下例所示

@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
    return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}

@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    ...
    connectionFactory.setConnectionNameStrategy(cns);
    return connectionFactory;
}

该属性必须存在于应用程序上下文的 Environment 中。

使用 Spring Boot 及其自动配置的连接工厂时,您只需声明 ConnectionNameStrategy @Bean。Spring Boot 会自动检测 bean 并将其连接到工厂中。

阻塞的连接和资源限制

连接可能会因 Broker 的交互而阻塞,这对应于 内存警报。从版本 2.0 开始,org.springframework.amqp.rabbit.connection.Connection 可以提供 com.rabbitmq.client.BlockedListener 实例,以便在连接阻塞和解除阻塞事件时收到通知。此外,AbstractConnectionFactory 通过其内部 BlockedListener 实现分别发出 ConnectionBlockedEventConnectionUnblockedEvent。这些允许您提供应用程序逻辑,以适当响应 Broker 上的问题并(例如)采取一些纠正措施。

当应用程序配置为单个 CachingConnectionFactory 时(Spring Boot 自动配置默认如此),当连接被 Broker 阻塞时,应用程序将停止工作。当它被 Broker 阻塞时,其任何客户端都会停止工作。如果我们在同一个应用程序中有生产者和消费者,当生产者阻塞连接(因为 Broker 上不再有资源)并且消费者无法释放它们(因为连接被阻塞)时,我们可能会陷入死锁。为了缓解这个问题,我们建议再创建一个单独的 CachingConnectionFactory 实例,具有相同的选项——一个用于生产者,一个用于消费者。对于在消费者线程上执行的事务性生产者,单独的 CachingConnectionFactory 是不可能的,因为它们应该重用与消费者事务关联的 Channel

从版本 2.0.2 开始,RabbitTemplate 有一个配置选项,可以在不使用事务的情况下自动使用第二个连接工厂。有关更多信息,请参见 使用单独连接。发布者连接的 ConnectionNameStrategy 与主策略相同,并在调用方法的结果后附加 .publisher

从版本 1.7.7 开始,提供了一个 AmqpResourceNotAvailableException,当 SimpleConnection.createChannel() 无法创建 Channel 时(例如,因为达到 channelMax 限制并且缓存中没有可用通道),将抛出此异常。您可以在 RetryPolicy 中使用此异常,以在一些退避后恢复操作。

配置底层客户端连接工厂

CachingConnectionFactory 使用 Rabbit 客户端 ConnectionFactory 的实例。当在 CachingConnectionFactory 上设置等效属性时,许多配置属性会传递(例如 hostportuserNamepasswordrequestedHeartBeatconnectionTimeout)。要设置其他属性(例如 clientProperties),您可以定义 Rabbit 工厂的实例,并通过使用 CachingConnectionFactory 的相应构造函数提供对其的引用。使用命名空间时(如前所述),您需要提供对 connection-factory 属性中配置的工厂的引用。为方便起见,提供了一个工厂 bean,以帮助在 Spring 应用程序上下文中配置连接工厂,如 下一节 所述。

<rabbit:connection-factory
      id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
4.0.x 客户端默认启用自动恢复。虽然与此功能兼容,但 Spring AMQP 有自己的恢复机制,通常不需要客户端恢复功能。我们建议禁用 amqp-client 自动恢复,以避免在 Broker 可用但连接尚未恢复时出现 AutoRecoverConnectionNotCurrentlyOpenException 实例。例如,当 RabbitTemplate 中配置了 RetryTemplate 时,即使在集群中故障转移到另一个 Broker,您也可能会注意到此异常。由于自动恢复连接在计时器上恢复,因此使用 Spring AMQP 的恢复机制可以更快地恢复连接。从版本 1.7.1 开始,Spring AMQP 禁用 amqp-client 自动恢复,除非您明确创建自己的 RabbitMQ 连接工厂并将其提供给 CachingConnectionFactoryRabbitConnectionFactoryBean 创建的 RabbitMQ ConnectionFactory 实例也默认禁用此选项。

RabbitConnectionFactoryBean 和配置 SSL

从版本 1.4 开始,提供了一个方便的 RabbitConnectionFactoryBean,可以通过依赖注入方便地配置底层客户端连接工厂上的 SSL 属性。其他设置器委托给底层工厂。以前,您必须以编程方式配置 SSL 选项。以下示例显示如何配置 RabbitConnectionFactoryBean

  • Java

  • XML

@Bean
RabbitConnectionFactoryBean rabbitConnectionFactory() {
    RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
    factoryBean.setUseSSL(true);
    factoryBean.setSslPropertiesLocation(new ClassPathResource("secrets/rabbitSSL.properties"));
    return factoryBean;
}

@Bean
CachingConnectionFactory connectionFactory(ConnectionFactory rabbitConnectionFactory) {
    CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
    ccf.setHost("...");
    // ...
    return ccf;
}
<bean id="rabbitConnectionFactory"
        class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
    <property name="useSSL" value="true" />
    <property name="sslPropertiesLocation" value="classpath:secrets/rabbitSSL.properties"/>
</bean>

<rabbit:connection-factory id="connectionFactory"
    connection-factory="rabbitConnectionFactory"
    host="${host}"
    port="${port}"
    virtual-host="${vhost}"
    username="${username}" password="${password}" />

Spring Boot 应用程序文件(.yaml.properties

  • 属性

  • YAML

spring.rabbitmq.host=...
spring.rabbitmq.ssl.keyStoreType=jks
spring.rabbitmq.ssl.trustStoreType=jks
spring.rabbitmq.ssl.keyStore=...
spring.rabbitmq.ssl.trustStore=...
spring.rabbitmq.ssl.trustStorePassword=...
spring.rabbitmq.ssl.keyStorePassword=...
spring.rabbitmq.ssl.enabled=true
spring:
  rabbitmq:
    host: ...
    ssl:
      keyStoreType: jks
      trustStoreType: jks
      keyStore: ...
      trustStore: ...
      trustStorePassword: ...
      keyStorePassword: ...
      enabled: true

有关配置 SSL 的信息,请参见 RabbitMQ 文档。省略 keyStoretrustStore 配置可在不验证证书的情况下通过 SSL 连接。下一个示例显示如何提供密钥和信任库配置。

sslPropertiesLocation 属性是一个 Spring Resource,指向一个包含以下键的属性文件

keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret

keyStoretruststore 是指向存储的 Spring Resources。通常,此属性文件由操作系统保护,应用程序具有读取访问权限。

从 Spring AMQP 1.5 版开始,您可以直接在工厂 bean 上设置这些属性。如果同时提供了离散属性和 sslPropertiesLocation,则后者中的属性会覆盖离散值。

从版本 2.0 开始,默认情况下会验证服务器证书,因为它更安全。如果出于某种原因要跳过此验证,请将工厂 bean 的 skipServerCertificateValidation 属性设置为 true。从版本 2.1 开始,RabbitConnectionFactoryBean 现在默认调用 enableHostnameVerification()。要恢复到以前的行为,请将 enableHostnameVerification 属性设置为 false
从版本 2.2.5 开始,工厂 bean 默认将始终使用 TLS v1.2;以前,在某些情况下它使用 v1.1,在其他情况下使用 v1.2(取决于其他属性)。如果出于某种原因需要使用 v1.1,请设置 sslAlgorithm 属性:setSslAlgorithm("TLSv1.1")

连接到集群

要连接到集群,请在 CachingConnectionFactory 上配置 addresses 属性

@Bean
public CachingConnectionFactory ccf() {
    CachingConnectionFactory ccf = new CachingConnectionFactory();
    ccf.setAddresses("host1:5672,host2:5672,host3:5672");
    return ccf;
}

从版本 3.0 开始,每当建立新连接时,底层连接工厂都会通过选择随机地址来尝试连接到主机。要恢复到以前的从第一个到最后一个尝试连接的行为,请将 addressShuffleMode 属性设置为 AddressShuffleMode.NONE

从版本 2.3 开始,添加了 INORDER 洗牌模式,这意味着在创建连接后,第一个地址会移到末尾。如果您希望使用 RabbitMQ 分片插件CacheMode.CONNECTION 和适当的并发性从所有节点的所有分片进行消费,则可能希望使用此模式。

@Bean
public CachingConnectionFactory ccf() {
    CachingConnectionFactory ccf = new CachingConnectionFactory();
    ccf.setAddresses("host1:5672,host2:5672,host3:5672");
    ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
    return ccf;
}

路由连接工厂

从版本 1.3 开始,引入了 AbstractRoutingConnectionFactory。该工厂提供了一种机制,用于配置多个 ConnectionFactories 的映射,并根据运行时的一些 lookupKey 确定目标 ConnectionFactory。通常,实现会检查线程绑定的上下文。为了方便起见,Spring AMQP 提供了 SimpleRoutingConnectionFactory,它从 SimpleResourceHolder 获取当前线程绑定的 lookupKey。以下示例展示了如何在 XML 和 Java 中配置 SimpleRoutingConnectionFactory

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
    <property name="targetConnectionFactories">
        <map>
            <entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
            <entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
        </map>
    </property>
</bean>

<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void service(String vHost, String payload) {
        SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
        rabbitTemplate.convertAndSend(payload);
        SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
    }

}

使用后解除绑定资源很重要。有关更多信息,请参阅 AbstractRoutingConnectionFactoryJavaDoc

从版本 1.4 开始,RabbitTemplate 支持 SpEL sendConnectionFactorySelectorExpressionreceiveConnectionFactorySelectorExpression 属性,这些属性在每个 AMQP 协议交互操作(sendsendAndReceivereceivereceiveAndReply)上进行评估,解析为提供的 AbstractRoutingConnectionFactorylookupKey 值。您可以在表达式中使用 bean 引用,例如 @vHostResolver.getVHost(#root)。对于 send 操作,要发送的消息是根评估对象。对于 receive 操作,queueName 是根评估对象。

路由算法如下:如果选择器表达式为 null 或被评估为 null,或者提供的 ConnectionFactory 不是 AbstractRoutingConnectionFactory 的实例,则一切照旧,依赖于提供的 ConnectionFactory 实现。如果评估结果不为 null,但该 lookupKey 没有目标 ConnectionFactory,并且 AbstractRoutingConnectionFactory 配置为 lenientFallback = true,也会发生同样的情况。在 AbstractRoutingConnectionFactory 的情况下,它会根据 determineCurrentLookupKey() 回退到其 routing 实现。但是,如果 lenientFallback = false,则会抛出 IllegalStateException

命名空间支持还为 <rabbit:template> 组件提供了 send-connection-factory-selector-expressionreceive-connection-factory-selector-expression 属性。

此外,从版本 1.4 开始,您可以在监听器容器中配置路由连接工厂。在这种情况下,队列名称列表用作查找键。例如,如果使用 setQueueNames("thing1", "thing2") 配置容器,则查找键为 [thing1,thing]"(请注意键中没有空格)。

从版本 1.6.9 开始,您可以使用监听器容器上的 setLookupKeyQualifier 为查找键添加限定符。这样做可以实现,例如,侦听同名但在不同虚拟主机中的队列(在这种情况下,您将为每个队列拥有一个连接工厂)。

例如,如果查找键限定符为 thing1,并且容器正在侦听队列 thing2,则您可以注册目标连接工厂的查找键可以是 thing1[thing2]

目标(以及提供的默认)连接工厂必须具有相同的发布者确认和返回设置。请参阅 发布者确认和返回

从版本 2.4.4 开始,可以禁用此验证。如果您在某些情况下需要确认和返回的值不相等,可以使用 AbstractRoutingConnectionFactory#setConsistentConfirmsReturns 关闭验证。请注意,添加到 AbstractRoutingConnectionFactory 的第一个连接工厂将确定 confirmsreturns 的一般值。

如果您遇到某些消息需要检查确认/返回而其他消息不需要的情况,这可能会很有用。例如

@Bean
public RabbitTemplate rabbitTemplate() {
    final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
    cf.setHost("localhost");
    cf.setPort(5672);

    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
    cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);

    PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);

    final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
    connectionFactoryMap.put("true", cachingConnectionFactory);
    connectionFactoryMap.put("false", pooledChannelConnectionFactory);

    final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
    routingConnectionFactory.setConsistentConfirmsReturns(false);
    routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
    routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);

    final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);

    final Expression sendExpression = new SpelExpressionParser().parseExpression(
            "messageProperties.headers['x-use-publisher-confirms'] ?: false");
    rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}

这样,带有 x-use-publisher-confirms: true 头部的消息将通过缓存连接发送,您可以确保消息传递。有关确保消息传递的更多信息,请参阅 发布者确认和返回

队列亲和性和 LocalizedQueueConnectionFactory

在集群中使用 HA 队列时,为了获得最佳性能,您可能希望连接到主队列所在的物理 Broker。CachingConnectionFactory 可以配置多个 Broker 地址。这是为了故障转移,客户端根据配置的 AddressShuffleMode 顺序尝试连接。LocalizedQueueConnectionFactory 使用管理插件提供的 REST API 来确定哪个节点是队列的主节点。然后,它创建一个(或从缓存中检索)CachingConnectionFactory,该工厂仅连接到该节点。如果连接失败,则确定新的主节点,并且消费者连接到该节点。LocalizedQueueConnectionFactory 配置了一个默认连接工厂,以防无法确定队列的物理位置,在这种情况下,它会像正常一样连接到集群。

LocalizedQueueConnectionFactory 是一个 RoutingConnectionFactorySimpleMessageListenerContainer 使用队列名称作为查找键,如上文 路由连接工厂 中所述。

因此(使用队列名称进行查找),LocalizedQueueConnectionFactory 只能在容器配置为侦听单个队列时使用。
每个节点都必须启用 RabbitMQ 管理插件。
此连接工厂用于长期连接,例如 SimpleMessageListenerContainer 使用的连接。它不适用于短期连接,例如与 RabbitTemplate 一起使用,因为在建立连接之前调用 REST API 会产生开销。此外,对于发布操作,队列是未知的,并且消息无论如何都会发布到所有集群成员,因此查找节点的逻辑价值不大。

以下配置示例显示了如何配置工厂

@Autowired
private ConfigurationProperties props;

@Bean
public CachingConnectionFactory defaultConnectionFactory() {
    CachingConnectionFactory cf = new CachingConnectionFactory();
    cf.setAddresses(this.props.getAddresses());
    cf.setUsername(this.props.getUsername());
    cf.setPassword(this.props.getPassword());
    cf.setVirtualHost(this.props.getVirtualHost());
    return cf;
}

@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
        @Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
    return new LocalizedQueueConnectionFactory(defaultCF,
            StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
            StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
            StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
            this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
            false, null);
}

请注意,前三个参数是 addressesadminUrisnodes 的数组。这些是位置性的,即当容器尝试连接到队列时,它使用管理 API 来确定哪个节点是队列的主节点,并连接到与该节点在同一数组位置的地址。

从版本 3.0 开始,RabbitMQ http-client 不再用于访问 Rest API。相反,默认情况下,如果 spring-webflux 在类路径上,则使用 Spring Webflux 中的 WebClient;否则使用 RestTemplate

要将 WebFlux 添加到类路径

Maven
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
</dependency>
Gradle
compile 'org.springframework.amqp:spring-rabbit'

您还可以通过实现 LocalizedQueueConnectionFactory#setNodeLocator 并重写其 createClientrestCall 和(可选)close 方法来使用其他 REST 技术。

lqcf.setNodeLocator(new NodeLocator<MyClient>() {

    @Override
    public MyClient createClient(String userName, String password) {
        ...
    }

    @Override
    public Map<String, Object> restCall(MyClient client, String baseUri, String vhost, String queue) throws URISyntaxException {
        ...
    }

});

该框架提供了 WebFluxNodeLocatorRestTemplateNodeLocator,默认值如上所述。

发布者确认和返回

通过将 CachingConnectionFactory 属性 publisherConfirmType 设置为 ConfirmType.CORRELATED 并将 publisherReturns 属性设置为“true”,支持已确认(带关联)和已返回的消息。

当设置了这些选项后,工厂创建的 Channel 实例将包装在 PublisherCallbackChannel 中,用于促进回调。当获得这样的通道时,客户端可以向 Channel 注册一个 PublisherCallbackChannel.ListenerPublisherCallbackChannel 实现包含将确认或返回路由到相应监听器的逻辑。这些功能将在以下部分中进一步解释。

另请参阅 关联的发布者确认和返回作用域操作 中的 simplePublisherConfirms

有关更多背景信息,请参阅 RabbitMQ 团队的博客文章 Introducing Publisher Confirms

连接和通道监听器

连接工厂支持注册 ConnectionListenerChannelListener 实现。这允许您接收连接和通道相关事件的通知。(RabbitAdmin 使用 ConnectionListener 在建立连接时执行声明——有关更多信息,请参见 交换机、队列和绑定的自动声明)。以下列表显示了 ConnectionListener 接口定义

@FunctionalInterface
public interface ConnectionListener {

    void onCreate(Connection connection);

    default void onClose(Connection connection) {
    }

    default void onShutDown(ShutdownSignalException signal) {
    }

}

从版本 2.0 开始,org.springframework.amqp.rabbit.connection.Connection 对象可以提供 com.rabbitmq.client.BlockedListener 实例,以便在连接阻塞和解除阻塞事件时收到通知。以下示例显示了 ChannelListener 接口定义

@FunctionalInterface
public interface ChannelListener {

    void onCreate(Channel channel, boolean transactional);

    default void onShutDown(ShutdownSignalException signal) {
    }

}

有关您可能希望注册 ChannelListener 的一种场景,请参见 发布是异步的——如何检测成功和失败

日志通道关闭事件

版本 1.5 引入了一种机制,允许用户控制日志级别。

AbstractConnectionFactory 使用默认策略记录通道关闭,如下所示

  • 正常的通道关闭(200 OK)不会被记录。

  • 如果通道因被动队列声明失败而关闭,则以 DEBUG 级别记录。

  • 如果通道因排他消费者条件导致 basic.consume 被拒绝而关闭,则以 DEBUG 级别记录(从 3.1 开始,以前是 INFO)。

  • 所有其他都以 ERROR 级别记录。

要修改此行为,您可以将自定义 ConditionalExceptionLogger 注入 CachingConnectionFactorycloseExceptionLogger 属性中。

此外,AbstractConnectionFactory.DefaultChannelCloseLogger 现在是公共的,允许其子类化。

另请参阅 消费者事件

运行时缓存属性

从版本 1.6 开始,CachingConnectionFactory 现在通过 getCacheProperties() 方法提供缓存统计信息。这些统计信息可用于调整缓存在生产中的优化。例如,高水位线可用于确定是否应增加缓存大小。如果它等于缓存大小,您可能需要考虑进一步增加。下表描述了 CacheMode.CHANNEL 属性

表 1. CacheMode.CHANNEL 的缓存属性
财产 含义
connectionName

ConnectionNameStrategy 生成的连接名称。

channelCacheSize

当前配置的允许空闲的最大通道数。

localPort

连接的本地端口(如果可用)。这可用于与 RabbitMQ 管理 UI 上的连接和通道关联。

idleChannelsTx

当前空闲(缓存)的事务性通道数。

idleChannelsNotTx

当前空闲(缓存)的非事务性通道数。

idleChannelsTxHighWater

同时空闲(缓存)的事务性通道的最大数量。

idleChannelsNotTxHighWater

同时空闲(缓存)的非事务性通道的最大数量。

下表描述了 CacheMode.CONNECTION 属性

表 2. CacheMode.CONNECTION 的缓存属性
财产 含义
connectionName:<localPort>

ConnectionNameStrategy 生成的连接名称。

openConnections

表示与 Broker 连接的连接对象数。

channelCacheSize

当前配置的允许空闲的最大通道数。

connectionCacheSize

当前配置的允许空闲的最大连接数。

idleConnections

当前空闲的连接数。

idleConnectionsHighWater

同时空闲的最大连接数。

idleChannelsTx:<localPort>

此连接当前空闲(缓存)的事务性通道数。您可以使用属性名称的 localPort 部分来与 RabbitMQ 管理 UI 上的连接和通道相关联。

idleChannelsNotTx:<localPort>

此连接当前空闲(缓存)的非事务性通道数。属性名称的 localPort 部分可用于与 RabbitMQ 管理 UI 上的连接和通道相关联。

idleChannelsTxHighWater:<localPort>

同时空闲(缓存)的事务性通道的最大数量。属性名称的 localPort 部分可用于与 RabbitMQ 管理 UI 上的连接和通道相关联。

idleChannelsNotTxHighWater:<localPort>

同时空闲(缓存)的非事务性通道的最大数量。您可以使用属性名称的 localPort 部分来与 RabbitMQ 管理 UI 上的连接和通道相关联。

还包括 cacheMode 属性(CHANNELCONNECTION)。

cacheStats
图 1. JVisualVM 示例

RabbitMQ 自动连接/拓扑恢复

自 Spring AMQP 的第一个版本以来,该框架在 Broker 发生故障时提供了自己的连接和通道恢复。此外,如 配置 Broker 中所述,当连接重新建立时,RabbitAdmin 会重新声明任何基础设施 bean(队列等)。因此,它不依赖于 amqp-client 库现在提供的 自动恢复amqp-client 默认启用自动恢复。两种恢复机制之间存在一些不兼容性,因此,Spring 默认将底层 RabbitMQ connectionFactory 上的 automaticRecoveryEnabled 属性设置为 false。即使该属性为 true,Spring 也会通过立即关闭任何已恢复的连接来有效禁用它。

默认情况下,只有被定义为 bean 的元素(队列、交换器、绑定)会在连接失败后重新声明。有关如何更改该行为的信息,请参阅 恢复自动删除声明
© . This site is unofficial and not affiliated with VMware.