严格的消息顺序
本节描述入站和出站消息的消息顺序。
入站
如果您需要严格排序入站消息,则必须将入站侦听器容器的prefetchCount
属性配置为1
。这是因为,如果消息失败并重新传递,它将在现有的预取消息之后到达。从 Spring AMQP 2.0 版开始,prefetchCount
默认为250
,以提高性能。严格排序要求会以降低性能为代价。
出站
考虑以下集成流
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(","))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
假设我们将消息A
、B
和C
发送到网关。虽然消息A
、B
、C
很可能按顺序发送,但没有保证。这是因为模板为每次发送操作从缓存中“借用”一个通道,并且不能保证对每条消息都使用相同的通道。一种解决方案是在拆分器之前启动一个事务,但 RabbitMQ 中的事务代价很高,并且可以将性能降低数百倍。
为了以更有效的方式解决此问题,从 5.1 版开始,Spring Integration 提供了BoundRabbitChannelAdvice
,它是一个HandleMessageAdvice
。请参阅处理消息建议。当应用于拆分器之前时,它确保所有下游操作都在同一通道上执行,并且可以选择等待收到所有已发送消息的发布者确认(如果连接工厂配置了确认)。以下示例显示了如何使用BoundRabbitChannelAdvice
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
请注意,相同的RabbitTemplate
(它实现了RabbitOperations
)在建议和出站适配器中使用。建议在模板的invoke
方法内运行下游流,以便所有操作都在同一通道上运行。如果提供了可选的超时,则当流完成时,建议调用waitForConfirmsOrDie
方法,如果在指定时间内未收到确认,则该方法会抛出异常。
在下游流中(QueueChannel 、ExecutorChannel 等)不能有线程切换。 |