RabbitMQ AMQP 1.0 支持

版本 4.0 引入了 spring-rabbitmq-client 模块,用于 RabbitMQ 上的 AMQP 1.0 协议支持。

此 artifact 基于 com.rabbitmq.client:amqp-client 库,因此只能与 RabbitMQ 及其 AMQP 1.0 协议支持一起使用。它不能用于任何任意 AMQP 1.0 代理。为此,目前建议使用 JMS 桥接器 和相应的 Spring JMS 集成。

必须将此依赖项添加到项目中才能与 RabbitMQ AMQP 1.0 支持进行交互。

  • Maven

  • Gradle

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbitmq-client</artifactId>
  <version>4.0.0</version>
</dependency>
compile 'org.springframework.amqp:spring-rabbitmq-client:4.0.0'

spring-rabbit(用于 AMQP 0.9.1 协议)作为传递依赖项引入,以重用此新客户端中的一些通用 API,例如异常、@RabbitListener 支持。在目标项目中无需同时使用这两个功能,但 RabbitMQ 允许 AMQP 0.9.1 和 1.0 共存。

有关 RabbitMQ AMQP 1.0 Java 客户端的更多信息,请参阅其文档

RabbitMQ AMQP 1.0 环境

com.rabbitmq.client.amqp.Environment 是项目中必须添加的第一项,用于连接管理和其他通用设置。它是节点或节点集群的入口点。环境允许创建连接。它可以包含连接之间共享的基础设施相关配置设置,例如线程池、指标和/或观察。

@Bean
Environment environment() {
    return new AmqpEnvironmentBuilder()
            .connectionSettings()
            .port(5672)
            .environmentBuilder()
            .build();
}

相同的 Environment 实例可用于连接不同的 RabbitMQ 代理,然后必须在特定连接上提供连接设置。请参阅下文。

AMQP 连接工厂

引入了 org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory 抽象来管理 com.rabbitmq.client.amqp.Connection。不要将其与仅用于 AMQP 0.9.1 协议的 org.springframework.amqp.rabbit.connection.ConnectionFactory 混淆。SingleAmqpConnectionFactory 实现用于管理一个连接及其设置。相同的 Connection 可以由许多生产者、消费者和管理共享。AMQP 1.0 协议实现在 AMQP 客户端库内部通过链接抽象处理多路复用。Connection 具有恢复能力,并且还处理拓扑。

在大多数情况下,只需将此 bean 添加到项目中即可

@Bean
AmqpConnectionFactory connectionFactory(Environment environment) {
    return new SingleAmqpConnectionFactory(environment);
}

请参阅 SingleAmqpConnectionFactory 的 setter 方法,了解所有连接特定的设置。

RabbitMQ 拓扑管理

从应用程序角度来看的拓扑管理(交换机、队列和绑定),提供了 RabbitAmqpAdmin,它是现有 AmqpAdmin 接口的实现。

@Bean
RabbitAmqpAdmin admin(AmqpConnectionFactory connectionFactory) {
    return new RabbitAmqpAdmin(connectionFactory);
}

配置代理中描述的 ExchangeQueueBindingDeclarables 实例的相同 bean 定义必须用于管理拓扑。spring-rabbit 中的 RabbitAdmin 也可以做到这一点,但它是在 AMQP 0.9.1 连接上进行的,而 RabbitAmqpAdmin 是基于 AMQP 1.0 连接的,因此可以从中平稳地处理拓扑恢复,以及发布者和消费者恢复。

RabbitAmqpAdmin 在其 start() 生命周期回调中执行相应的 bean 扫描。initialize() 以及所有其他 RabbitMQ 实体管理方法都可以在运行时手动调用。在内部,RabbitAmqpAdmin 使用 com.rabbitmq.client.amqp.Connection.management() API 执行相应的拓扑操作。

RabbitAmqpTemplate

RabbitAmqpTemplateAsyncAmqpTemplate 的实现,并使用 AMQP 1.0 协议执行各种发送/接收操作。它需要一个 AmqpConnectionFactory,并且可以配置一些默认值。即使 com.rabbitmq.client:amqp-client 库带有 com.rabbitmq.client.amqp.MessageRabbitAmqpTemplate 仍然公开基于众所周知的 org.springframework.amqp.core.Message 的 API,以及所有支持类,如 MessagePropertiesMessageConverter 抽象。向/从 com.rabbitmq.client.amqp.Message 的转换在 RabbitAmqpTemplate 内部完成。所有方法都返回一个 CompletableFuture 以最终获取操作结果。普通对象的操作需要消息体转换,默认使用 SimpleMessageConverter。有关转换的更多信息,请参阅消息转换器

通常,只需一个这样的 bean 即可执行所有可能的模板模式操作

@Bean
RabbitAmqpTemplate rabbitTemplate(AmqpConnectionFactory connectionFactory) {
    return new RabbitAmqpTemplate(connectionFactory);
}

它可以配置一些默认的交换机和路由键或只配置队列。RabbitAmqpTemplate 有一个用于接收操作的默认队列,以及另一个用于请求-回复操作的默认队列,其中如果客户端不存在,则会为请求创建临时队列。

以下是 RabbitAmqpTemplate 操作的一些示例

@Bean
DirectExchange e1() {
    return new DirectExchange("e1");
}

@Bean
Queue q1() {
    return QueueBuilder.durable("q1").deadLetterExchange("dlx1").build();
}

@Bean
Binding b1() {
    return BindingBuilder.bind(q1()).to(e1()).with("k1");
}

...

@Test
void defaultExchangeAndRoutingKey() {
    this.rabbitAmqpTemplate.setExchange("e1");
    this.rabbitAmqpTemplate.setRoutingKey("k1");
	this.rabbitAmqpTemplate.setReceiveQueue("q1");

    assertThat(this.rabbitAmqpTemplate.convertAndSend("test1"))
            .succeedsWithin(Duration.ofSeconds(10));

    assertThat(this.rabbitAmqpTemplate.receiveAndConvert())
            .succeedsWithin(Duration.ofSeconds(10))
            .isEqualTo("test1");
}

这里我们声明了一个 e1 交换机、q1 队列,并将其与 k1 路由键绑定到该交换机。然后我们使用 RabbitAmqpTemplate 的默认设置将消息发布到上述交换机,并使用相应的路由键,并将 q1 作为接收操作的默认队列。这些方法有重载变体,可以发送到特定的交换机或队列(用于发送和接收)。使用 ParameterizedTypeReference<T>receiveAndConvert() 操作需要将 SmartMessageConverter 注入到 RabbitAmqpTemplate 中。

下一个示例演示了使用 RabbitAmqpTemplate 的 RPC 实现(假设与上一个示例相同的 RabbitMQ 对象)

@Test
void verifyRpc() {
    String testRequest = "rpc-request";
    String testReply = "rpc-reply";

    CompletableFuture<Object> rpcClientResult = this.template.convertSendAndReceive("e1", "k1", testRequest);

    AtomicReference<String> receivedRequest = new AtomicReference<>();
    CompletableFuture<Boolean> rpcServerResult =
            this.rabbitAmqpTemplate.<String, String>receiveAndReply("q1",
                     payload -> {
                         receivedRequest.set(payload);
                         return testReply;
                     });

    assertThat(rpcServerResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(true);
    assertThat(rpcClientResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(testReply);
    assertThat(receivedRequest.get()).isEqualTo(testRequest);
}

关联和 replyTo 队列在内部管理。服务器端可以通过下面描述的 @RabbitListener POJO 方法实现。

RabbitMQ AMQP 1.0 消费者

与许多其他用于消费者端的消息实现一样,spring-rabbitmq-client 模块附带 RabbitAmqpListenerContainer,它本质上是众所周知的 MessageListenerContainer 的实现。它的功能与 DirectMessageListenerContainer 完全相同,但适用于 RabbitMQ AMQP 1.0 支持。它需要一个 AmqpConnectionFactory 和至少一个要消费的队列。此外,必须提供 MessageListener(或 AMQP 1.0 特定的 RabbitAmqpMessageListener)。它可以配置 autoSettle = false,其含义是 AcknowledgeMode.MANUAL。在这种情况下,提供给 MessageListenerMessage 在其 MessageProperties 中包含一个 AmqpAcknowledgment 回调,以供目标逻辑考虑。

RabbitAmqpMessageListener 具有 com.rabbitmq.client:amqp-client 抽象的契约

/**
 * Process an AMQP message.
 * @param message the message to process.
 * @param context the consumer context to settle message.
 *                Null if container is configured for {@code autoSettle}.
 */
void onAmqpMessage(Message message, Consumer.Context context);

其中第一个参数是原生接收到的 com.rabbitmq.client.amqp.Messagecontext 是消息结算的原生回调,类似于上面提到的 AmqpAcknowledgment 抽象。

当提供了 batchSize 选项时,RabbitAmqpMessageListener 可以批量处理和结算消息。为此,必须实现 MessageListener.onMessageBatch() 契约。batchReceiveDuration 选项用于安排强制释放未满批次,以避免内存和消费者信用额度耗尽。

通常,RabbitAmqpMessageListener 类不直接在目标项目中使用,而是选择通过 @RabbitListener 进行 POJO 方法注释配置,用于声明式消费者配置。RabbitAmqpListenerContainerFactory 必须在 RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME 下注册,并且 @RabbitListener 注释过程将在 RabbitListenerEndpointRegistry 中注册 RabbitAmqpMessageListener 实例。目标 POJO 方法调用由特定的 RabbitAmqpMessageListenerAdapter 实现处理,该实现扩展了 MessagingMessageListenerAdapter 并重用了其许多功能,包括请求-回复场景(异步或非异步)。因此,注解驱动的侦听器端点中描述的所有概念也适用于此 RabbitAmqpMessageListener

除了传统的 messaging payloadheaders@RabbitListener POJO 方法契约还可以包含这些参数

  • com.rabbitmq.client.amqp.Message - 未经任何转换的原生 AMQP 1.0 消息;

  • org.springframework.amqp.core.Message - Spring AMQP 消息抽象,作为原生 AMQP 1.0 消息的转换结果;

  • org.springframework.messaging.Message - Spring Messaging 抽象,作为 Spring AMQP 消息的转换结果;

  • Consumer.Context - RabbitMQ AMQP 客户端消费者结算 API;

  • org.springframework.amqp.core.AmqpAcknowledgment - Spring AMQP 确认抽象:委托给 Consumer.Context

以下示例演示了一个简单的 @RabbitListener,用于 RabbitMQ AMQP 1.0 交互,并进行手动结算

@Bean(RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
RabbitAmqpListenerContainerFactory rabbitAmqpListenerContainerFactory(AmqpConnectionFactory connectionFactory) {
    return new RabbitAmqpListenerContainerFactory(connectionFactory);
}

final List<String> received = Collections.synchronizedList(new ArrayList<>());

CountDownLatch consumeIsDone = new CountDownLatch(11);

@RabbitListener(queues = {"q1", "q2"},
        ackMode = "#{T(org.springframework.amqp.core.AcknowledgeMode).MANUAL}",
        concurrency = "2",
        id = "testAmqpListener")
void processQ1AndQ2Data(String data, AmqpAcknowledgment acknowledgment, Consumer.Context context) {
    try {
        if ("discard".equals(data)) {
            if (!this.received.contains(data)) {
                context.discard();
            }
            else {
                throw new MessageConversionException("Test message is rejected");
            }
        }
        else if ("requeue".equals(data) && !this.received.contains(data)) {
            acknowledgment.acknowledge(AmqpAcknowledgment.Status.REQUEUE);
        }
        else {
            acknowledgment.acknowledge();
        }
        this.received.add(data);
    }
    finally {
        this.consumeIsDone.countDown();
    }
}
© . This site is unofficial and not affiliated with VMware.