严格的消息排序

本节描述了入站和出站消息的消息排序。

入站

如果需要严格排序入站消息,则必须将入站侦听器容器的 prefetchCount 属性配置为 1。这是因为,如果消息失败并被重新传递,它会在现有的预取消息之后到达。自 Spring AMQP 2.0 版以来,为了提高性能,prefetchCount 默认为 250。严格的排序要求会以降低性能为代价。

出站

考虑以下集成流

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .splitWith(s -> s.delimiters(","))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

假设我们向网关发送消息 ABC。虽然消息 ABC 很可能会按顺序发送,但无法保证。这是因为模板为每个发送操作从缓存中“借用”一个通道,并且不能保证每个消息都使用相同的通道。一个解决方案是在拆分器之前启动一个事务,但在 RabbitMQ 中事务开销很大,并且可能将性能降低数百倍。

为了更有效地解决此问题,从 5.1 版本开始,Spring Integration 提供了 BoundRabbitChannelAdvice,它是一个 HandleMessageAdvice。参见 处理消息通知。当在拆分器之前应用时,它确保所有下游操作都在同一通道上执行,并且可以选择等待所有发送消息的发布者确认被接收(如果连接工厂配置了确认)。以下示例展示了如何使用 BoundRabbitChannelAdvice

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .splitWith(s -> s.delimiters(",")
                    .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

请注意,在通知和出站适配器中使用了相同的 RabbitTemplate(它实现了 RabbitOperations)。通知在模板的 invoke 方法中运行下游流,以便所有操作都在同一通道上运行。如果提供了可选的超时,当流完成时,通知会调用 waitForConfirmsOrDie 方法,如果在指定时间内未收到确认,则会抛出异常。

下游流中不得有线程交接(QueueChannelExecutorChannel 等)。
© . This site is unofficial and not affiliated with VMware.