RabbitMQ AMQP 1.0 支持
版本 4.0 引入了 spring-rabbitmq-client 模块,用于 RabbitMQ 上的 AMQP 1.0 协议支持。
此 artifact 基于 com.rabbitmq.client:amqp-client 库,因此只能与 RabbitMQ 及其 AMQP 1.0 协议支持一起使用。它不能用于任何任意 AMQP 1.0 代理。为此,目前建议使用 JMS 桥接器 和相应的 Spring JMS 集成。
必须将此依赖项添加到项目中才能与 RabbitMQ AMQP 1.0 支持进行交互。
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbitmq-client</artifactId>
<version>4.0.0</version>
</dependency>
compile 'org.springframework.amqp:spring-rabbitmq-client:4.0.0'
spring-rabbit(用于 AMQP 0.9.1 协议)作为传递依赖项引入,以重用此新客户端中的一些通用 API,例如异常、@RabbitListener 支持。在目标项目中无需同时使用这两个功能,但 RabbitMQ 允许 AMQP 0.9.1 和 1.0 共存。
有关 RabbitMQ AMQP 1.0 Java 客户端的更多信息,请参阅其文档。
RabbitMQ AMQP 1.0 环境
com.rabbitmq.client.amqp.Environment 是项目中必须添加的第一项,用于连接管理和其他通用设置。它是节点或节点集群的入口点。环境允许创建连接。它可以包含连接之间共享的基础设施相关配置设置,例如线程池、指标和/或观察。
@Bean
Environment environment() {
return new AmqpEnvironmentBuilder()
.connectionSettings()
.port(5672)
.environmentBuilder()
.build();
}
相同的 Environment 实例可用于连接不同的 RabbitMQ 代理,然后必须在特定连接上提供连接设置。请参阅下文。
AMQP 连接工厂
引入了 org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory 抽象来管理 com.rabbitmq.client.amqp.Connection。不要将其与仅用于 AMQP 0.9.1 协议的 org.springframework.amqp.rabbit.connection.ConnectionFactory 混淆。SingleAmqpConnectionFactory 实现用于管理一个连接及其设置。相同的 Connection 可以由许多生产者、消费者和管理共享。AMQP 1.0 协议实现在 AMQP 客户端库内部通过链接抽象处理多路复用。Connection 具有恢复能力,并且还处理拓扑。
在大多数情况下,只需将此 bean 添加到项目中即可
@Bean
AmqpConnectionFactory connectionFactory(Environment environment) {
return new SingleAmqpConnectionFactory(environment);
}
请参阅 SingleAmqpConnectionFactory 的 setter 方法,了解所有连接特定的设置。
RabbitMQ 拓扑管理
从应用程序角度来看的拓扑管理(交换机、队列和绑定),提供了 RabbitAmqpAdmin,它是现有 AmqpAdmin 接口的实现。
@Bean
RabbitAmqpAdmin admin(AmqpConnectionFactory connectionFactory) {
return new RabbitAmqpAdmin(connectionFactory);
}
与配置代理中描述的 Exchange、Queue、Binding 和 Declarables 实例的相同 bean 定义必须用于管理拓扑。spring-rabbit 中的 RabbitAdmin 也可以做到这一点,但它是在 AMQP 0.9.1 连接上进行的,而 RabbitAmqpAdmin 是基于 AMQP 1.0 连接的,因此可以从中平稳地处理拓扑恢复,以及发布者和消费者恢复。
RabbitAmqpAdmin 在其 start() 生命周期回调中执行相应的 bean 扫描。initialize() 以及所有其他 RabbitMQ 实体管理方法都可以在运行时手动调用。在内部,RabbitAmqpAdmin 使用 com.rabbitmq.client.amqp.Connection.management() API 执行相应的拓扑操作。
RabbitAmqpTemplate
RabbitAmqpTemplate 是 AsyncAmqpTemplate 的实现,并使用 AMQP 1.0 协议执行各种发送/接收操作。它需要一个 AmqpConnectionFactory,并且可以配置一些默认值。即使 com.rabbitmq.client:amqp-client 库带有 com.rabbitmq.client.amqp.Message,RabbitAmqpTemplate 仍然公开基于众所周知的 org.springframework.amqp.core.Message 的 API,以及所有支持类,如 MessageProperties 和 MessageConverter 抽象。向/从 com.rabbitmq.client.amqp.Message 的转换在 RabbitAmqpTemplate 内部完成。所有方法都返回一个 CompletableFuture 以最终获取操作结果。普通对象的操作需要消息体转换,默认使用 SimpleMessageConverter。有关转换的更多信息,请参阅消息转换器。
通常,只需一个这样的 bean 即可执行所有可能的模板模式操作
@Bean
RabbitAmqpTemplate rabbitTemplate(AmqpConnectionFactory connectionFactory) {
return new RabbitAmqpTemplate(connectionFactory);
}
它可以配置一些默认的交换机和路由键或只配置队列。RabbitAmqpTemplate 有一个用于接收操作的默认队列,以及另一个用于请求-回复操作的默认队列,其中如果客户端不存在,则会为请求创建临时队列。
以下是 RabbitAmqpTemplate 操作的一些示例
@Bean
DirectExchange e1() {
return new DirectExchange("e1");
}
@Bean
Queue q1() {
return QueueBuilder.durable("q1").deadLetterExchange("dlx1").build();
}
@Bean
Binding b1() {
return BindingBuilder.bind(q1()).to(e1()).with("k1");
}
...
@Test
void defaultExchangeAndRoutingKey() {
this.rabbitAmqpTemplate.setExchange("e1");
this.rabbitAmqpTemplate.setRoutingKey("k1");
this.rabbitAmqpTemplate.setReceiveQueue("q1");
assertThat(this.rabbitAmqpTemplate.convertAndSend("test1"))
.succeedsWithin(Duration.ofSeconds(10));
assertThat(this.rabbitAmqpTemplate.receiveAndConvert())
.succeedsWithin(Duration.ofSeconds(10))
.isEqualTo("test1");
}
这里我们声明了一个 e1 交换机、q1 队列,并将其与 k1 路由键绑定到该交换机。然后我们使用 RabbitAmqpTemplate 的默认设置将消息发布到上述交换机,并使用相应的路由键,并将 q1 作为接收操作的默认队列。这些方法有重载变体,可以发送到特定的交换机或队列(用于发送和接收)。使用 ParameterizedTypeReference<T> 的 receiveAndConvert() 操作需要将 SmartMessageConverter 注入到 RabbitAmqpTemplate 中。
下一个示例演示了使用 RabbitAmqpTemplate 的 RPC 实现(假设与上一个示例相同的 RabbitMQ 对象)
@Test
void verifyRpc() {
String testRequest = "rpc-request";
String testReply = "rpc-reply";
CompletableFuture<Object> rpcClientResult = this.template.convertSendAndReceive("e1", "k1", testRequest);
AtomicReference<String> receivedRequest = new AtomicReference<>();
CompletableFuture<Boolean> rpcServerResult =
this.rabbitAmqpTemplate.<String, String>receiveAndReply("q1",
payload -> {
receivedRequest.set(payload);
return testReply;
});
assertThat(rpcServerResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(true);
assertThat(rpcClientResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(testReply);
assertThat(receivedRequest.get()).isEqualTo(testRequest);
}
关联和 replyTo 队列在内部管理。服务器端可以通过下面描述的 @RabbitListener POJO 方法实现。
RabbitMQ AMQP 1.0 消费者
与许多其他用于消费者端的消息实现一样,spring-rabbitmq-client 模块附带 RabbitAmqpListenerContainer,它本质上是众所周知的 MessageListenerContainer 的实现。它的功能与 DirectMessageListenerContainer 完全相同,但适用于 RabbitMQ AMQP 1.0 支持。它需要一个 AmqpConnectionFactory 和至少一个要消费的队列。此外,必须提供 MessageListener(或 AMQP 1.0 特定的 RabbitAmqpMessageListener)。它可以配置 autoSettle = false,其含义是 AcknowledgeMode.MANUAL。在这种情况下,提供给 MessageListener 的 Message 在其 MessageProperties 中包含一个 AmqpAcknowledgment 回调,以供目标逻辑考虑。
RabbitAmqpMessageListener 具有 com.rabbitmq.client:amqp-client 抽象的契约
/**
* Process an AMQP message.
* @param message the message to process.
* @param context the consumer context to settle message.
* Null if container is configured for {@code autoSettle}.
*/
void onAmqpMessage(Message message, Consumer.Context context);
其中第一个参数是原生接收到的 com.rabbitmq.client.amqp.Message,context 是消息结算的原生回调,类似于上面提到的 AmqpAcknowledgment 抽象。
当提供了 batchSize 选项时,RabbitAmqpMessageListener 可以批量处理和结算消息。为此,必须实现 MessageListener.onMessageBatch() 契约。batchReceiveDuration 选项用于安排强制释放未满批次,以避免内存和消费者信用额度耗尽。
通常,RabbitAmqpMessageListener 类不直接在目标项目中使用,而是选择通过 @RabbitListener 进行 POJO 方法注释配置,用于声明式消费者配置。RabbitAmqpListenerContainerFactory 必须在 RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME 下注册,并且 @RabbitListener 注释过程将在 RabbitListenerEndpointRegistry 中注册 RabbitAmqpMessageListener 实例。目标 POJO 方法调用由特定的 RabbitAmqpMessageListenerAdapter 实现处理,该实现扩展了 MessagingMessageListenerAdapter 并重用了其许多功能,包括请求-回复场景(异步或非异步)。因此,注解驱动的侦听器端点中描述的所有概念也适用于此 RabbitAmqpMessageListener。
除了传统的 messaging payload 和 headers,@RabbitListener POJO 方法契约还可以包含这些参数
-
com.rabbitmq.client.amqp.Message- 未经任何转换的原生 AMQP 1.0 消息; -
org.springframework.amqp.core.Message- Spring AMQP 消息抽象,作为原生 AMQP 1.0 消息的转换结果; -
org.springframework.messaging.Message- Spring Messaging 抽象,作为 Spring AMQP 消息的转换结果; -
Consumer.Context- RabbitMQ AMQP 客户端消费者结算 API; -
org.springframework.amqp.core.AmqpAcknowledgment- Spring AMQP 确认抽象:委托给Consumer.Context。
以下示例演示了一个简单的 @RabbitListener,用于 RabbitMQ AMQP 1.0 交互,并进行手动结算
@Bean(RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
RabbitAmqpListenerContainerFactory rabbitAmqpListenerContainerFactory(AmqpConnectionFactory connectionFactory) {
return new RabbitAmqpListenerContainerFactory(connectionFactory);
}
final List<String> received = Collections.synchronizedList(new ArrayList<>());
CountDownLatch consumeIsDone = new CountDownLatch(11);
@RabbitListener(queues = {"q1", "q2"},
ackMode = "#{T(org.springframework.amqp.core.AcknowledgeMode).MANUAL}",
concurrency = "2",
id = "testAmqpListener")
void processQ1AndQ2Data(String data, AmqpAcknowledgment acknowledgment, Consumer.Context context) {
try {
if ("discard".equals(data)) {
if (!this.received.contains(data)) {
context.discard();
}
else {
throw new MessageConversionException("Test message is rejected");
}
}
else if ("requeue".equals(data) && !this.received.contains(data)) {
acknowledgment.acknowledge(AmqpAcknowledgment.Status.REQUEUE);
}
else {
acknowledgment.acknowledge();
}
this.received.add(data);
}
finally {
this.consumeIsDone.countDown();
}
}