RabbitMQ Stream 插件的初步生产者支持

现已提供对 RabbitMQ Stream 插件 的基本支持。要启用此功能,您必须将 spring-rabbit-stream jar 添加到类路径中 - 它必须与 spring-amqpspring-rabbit 版本相同。

当您将 producerType 属性设置为 STREAM_SYNCSTREAM_ASYNC 时,上述生产者属性将不受支持。

要配置绑定器使用流 ProducerType,Spring Boot 将从应用属性中配置一个 Environment @Bean。您可以选择添加一个定制器来定制消息处理器。

@Bean
ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer() {
    return (hand, dest) -> {
        RabbitStreamMessageHandler handler = (RabbitStreamMessageHandler) hand;
        handler.setConfirmTimeout(5000);
        ((RabbitStreamTemplate) handler.getStreamOperations()).setProducerCustomizer(
                (name, builder) -> {
                    ...
                });
    };
}

有关配置环境和生产者构建器的信息,请参阅 RabbitMQ Stream Java 客户端文档

RabbitMQ Super Streams 的生产者支持

有关超级流(Super Streams)的信息,请参阅 Super Streams

使用超级流可以实现自动伸缩,每个超级流分区上只有一个活跃消费者。使用 Spring Cloud Stream,您可以通过 AMQP 或使用流客户端发布到超级流。

超级流必须已经存在;生产者绑定不支持创建超级流。

通过 AMQP 发布到超级流

spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false

使用流客户端发布到超级流

spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.producer-type=stream-async
spring.cloud.stream.rabbit.bindings.output.producer.super-stream=true
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false

使用流客户端时,如果您设置了 confirmAckChannel,成功发送消息的副本将被发送到该通道。