对 RabbitMQ Stream 插件的初始消费者支持

现在提供了对 RabbitMQ Stream Plugin 的基本支持。要启用此功能,您必须将 spring-rabbit-stream jar 添加到 classpath 中——它的版本必须与 spring-amqpspring-rabbit 的版本相同。

当您将 containerType 属性设置为 stream 时,上面描述的消费者属性不受支持;对于 super streams,仅支持 concurrency。每个绑定只能消费一个 stream 队列。

要配置 binder 使用 containerType=stream,Spring Boot 将自动从应用属性中配置一个 Environment @Bean。您可以选择添加一个定制器来定制监听器容器。

@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
    return (cont, dest, group) -> {
        StreamListenerContainer container = (StreamListenerContainer) cont;
        container.setConsumerCustomizer((name, builder) -> {
            builder.offset(OffsetSpecification.first());
        });
        // ...
    };
}

传递给定制器的 name 参数格式为 destination + '.' + group + '.container'

stream 的 name()(用于偏移量跟踪)被设置为绑定 destination + '.' + group。可以使用上面显示的 ConsumerCustomizer 进行更改。如果您决定使用手动偏移量跟踪,则 Context 作为消息头可用。

int count;

@Bean
public Consumer<Message<?>> input() {
    return msg -> {
        System.out.println(msg);
        if (++count % 1000 == 0) {
            Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
            context.consumer().store(context.offset());
        }
    };
}

有关配置环境和消费者构建器的信息,请参考 RabbitMQ Stream Java Client 文档

RabbitMQ Super Streams 的消费者支持

有关 super streams 的信息,请参阅 Super Streams

使用 super streams 允许自动伸缩,每个 super stream 的分区上只有一个活跃消费者。

配置示例

@Bean
public Consumer<Thing> input() {
    ...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true

框架将创建一个名为 super 的 super stream,包含 9 个分区。可以部署多达 3 个此应用的实例。