@RabbitListener 批量处理
当接收 一批消息时,通常由容器执行反批量处理,并逐条调用监听器。从 2.2 版本开始,您可以配置监听器容器工厂和监听器,以在一次调用中接收整个批次。只需设置工厂的 batchListener
属性为 true,并将方法参数(payload)设置为 List
或 Collection
即可。
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setBatchListener(true);
return factory;
}
@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
...
}
// or
@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
...
}
将 batchListener
属性设置为 true 会自动关闭工厂创建的容器中的 deBatchingEnabled
容器属性(除非 consumerBatchEnabled
为 true - 参见下文)。实际上,反批量处理从容器转移到了监听器适配器,适配器创建了传递给监听器的列表。
启用了批量处理的工厂不能与 多方法监听器一起使用。
同样从 2.2 版本开始,当逐条接收批量消息时,最后一条消息包含一个设置为 true
的布尔头部。通过向您的监听器方法添加 @Header(AmqpHeaders.LAST_IN_BATCH) boolean last
参数可以获取此头部。该头部映射自 MessageProperties.isLastInBatch()
。此外,AmqpHeaders.BATCH_SIZE
在每个消息片段中都会填充批次的大小。
此外,SimpleMessageListenerContainer
中添加了一个新属性 consumerBatchEnabled
。当此属性为 true 时,容器将创建一个最多包含 batchSize
条消息的批次;如果在 receiveTimeout
超时时间内没有新消息到达,则会交付部分批次。如果接收到生产者创建的批次,它将被反批量处理并添加到消费者端的批次中;因此实际交付的消息数量可能会超过 batchSize
,batchSize
表示从 Broker 接收到的消息数量。当 consumerBatchEnabled
为 true 时,deBatchingEnabled
也必须为 true;容器工厂将强制执行此要求。
@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConsumerTagStrategy(consumerTagStrategy());
factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
return factory;
}
当使用 consumerBatchEnabled
与 @RabbitListener
时
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
...
}
@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
...
}
@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
...
}
-
第一个例子使用接收到的原始、未转换的
org.springframework.amqp.core.Message
s 进行调用。 -
第二个例子使用经过转换 payload 并映射了 headers/properties 的
org.springframework.messaging.Message<?>
s 进行调用。 -
第三个例子使用转换后的 payload 进行调用,无法访问 headers/properties。
您还可以添加一个 Channel
参数,这在使用 MANUAL
确认模式时经常用到。这对于第三个例子不太有用,因为您无法访问 delivery_tag
属性。
Spring Boot 为 consumerBatchEnabled
和 batchSize
提供了配置属性,但没有为 batchListener
提供。从 3.0 版本开始,在容器工厂上将 consumerBatchEnabled
设置为 true
也会将 batchListener
设置为 true
。当 consumerBatchEnabled
为 true
时,监听器必须是批量监听器。
从 3.0 版本开始,监听器方法可以消费 Collection<?>
或 List<?>
。
批量模式下的监听器不支持回复,因为批次中的消息与产生的单个回复之间可能没有关联。异步返回类型仍然支持批量监听器。 |