严格的消息排序

本节介绍入站和出站消息的消息排序。

入站

如果您需要严格的入站消息排序,则必须将入站监听器容器的prefetchCount属性配置为1。这是因为,如果消息失败并重新传递,它将在已预取的消息之后到达。从 Spring AMQP 2.0 版本开始,prefetchCount 默认值为250,以提高性能。严格的排序要求会降低性能。

出站

考虑以下集成流程

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .splitWith(s -> s.delimiters(","))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

假设我们将消息ABC发送到网关。虽然消息ABC很可能按顺序发送,但没有保证。这是因为模板为每次发送操作从缓存中“借用”一个通道,并且不能保证对每个消息使用相同的通道。一种解决方案是在拆分器之前启动事务,但 RabbitMQ 中的事务很昂贵,并且会将性能降低数百倍。

为了更有效地解决这个问题,从 5.1 版本开始,Spring Integration 提供了BoundRabbitChannelAdvice,它是一个HandleMessageAdvice。请参阅处理消息建议。当在拆分器之前应用时,它确保所有下游操作都在同一个通道上执行,并且可以选择等待所有已发送消息的发布者确认(如果连接工厂配置了确认)。以下示例展示了如何使用BoundRabbitChannelAdvice

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .splitWith(s -> s.delimiters(",")
                    .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

请注意,相同的RabbitTemplate(它实现了RabbitOperations)在建议和出站适配器中使用。建议在模板的invoke方法中运行下游流程,以便所有操作都在同一个通道上运行。如果提供了可选的超时时间,当流程完成时,建议会调用waitForConfirmsOrDie方法,如果在指定时间内未收到确认,则会抛出异常。

下游流程中不能有线程交接(QueueChannelExecutorChannel 等)。