严格消息排序
本节介绍入站和出站消息的消息排序。
入站
如果需要严格的入站消息排序,必须将入站监听器容器的 prefetchCount
属性配置为 1
。这是因为,如果消息失败并被重新投递,它会在现有预取消息之后到达。自 Spring AMQP 2.0 版本以来,prefetchCount
默认设置为 250
以提高性能。严格排序要求会以降低性能为代价。
出站
考虑以下集成流
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(","))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
假设我们向网关发送消息 A、B 和 C。尽管消息 A、B、C 很可能按顺序发送,但不能保证。这是因为模板为每个发送操作从缓存中“借用”一个通道,并且不能保证每个消息都使用相同的通道。一种解决方案是在分发器之前启动事务,但在 RabbitMQ 中事务开销很大,可能导致性能降低数百倍。
为了更高效地解决此问题,从 5.1 版本开始,Spring Integration 提供了 BoundRabbitChannelAdvice
,它是一个 HandleMessageAdvice
。参见 处理消息 Advice。当应用于分发器之前时,它确保所有下游操作在同一通道上执行,并且可以选择等待所有已发送消息的发布者确认(如果连接工厂配置为进行确认)。以下示例展示了如何使用 BoundRabbitChannelAdvice
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
请注意,advice 和出站适配器中使用了相同的 RabbitTemplate
(它实现了 RabbitOperations
)。advice 在模板的 invoke
方法内运行下游流,以便所有操作都在同一通道上运行。如果提供了可选的超时时间,当流完成后,advice 会调用 waitForConfirmsOrDie
方法,如果在指定时间内未收到确认,则会抛出异常。
下游流中不得存在线程移交(例如 QueueChannel 、ExecutorChannel 等)。 |