使用 RabbitMQ Stream 插件

2.4 版本初步引入了对 RabbitMQ Stream Plugin Java Client 的支持,用于 RabbitMQ Stream Plugin

  • RabbitStreamTemplate

  • StreamListenerContainer

添加 spring-rabbit-stream 依赖到您的项目

maven
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit-stream</artifactId>
  <version>3.2.5</version>
</dependency>
gradle
compile 'org.springframework.amqp:spring-rabbit-stream:3.2.5'

您可以使用一个 RabbitAdmin bean 来正常配置队列,使用 QueueBuilder.stream() 方法来指定队列类型。例如

@Bean
Queue stream() {
    return QueueBuilder.durable("stream.queue1")
            .stream()
            .build();
}

然而,这仅在您同时使用非流组件(例如 SimpleMessageListenerContainerDirectMessageListenerContainer)时才有效,因为 admin 在 AMQP 连接打开时会触发声明定义的 bean。如果您的应用程序仅使用流组件,或者您希望使用高级流配置功能,则应配置一个 StreamAdmin 代替

@Bean
StreamAdmin streamAdmin(Environment env) {
    return new StreamAdmin(env, sc -> {
        sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
        sc.stream("stream.queue2").create();
    });
}

有关 StreamCreator 的更多信息,请参阅 RabbitMQ 文档。

发送消息

RabbitStreamTemplate 提供了 RabbitTemplate (AMQP) 功能的子集。

RabbitStreamOperations
public interface RabbitStreamOperations extends AutoCloseable {

	CompletableFuture<Boolean> send(Message message);

	CompletableFuture<Boolean> convertAndSend(Object message);

	CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);

	CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);

	MessageBuilder messageBuilder();

	MessageConverter messageConverter();

	StreamMessageConverter streamMessageConverter();

	@Override
	void close() throws AmqpException;

}

RabbitStreamTemplate 实现具有以下构造函数和属性

RabbitStreamTemplate
public RabbitStreamTemplate(Environment environment, String streamName) {
}

public void setMessageConverter(MessageConverter messageConverter) {
}

public void setStreamConverter(StreamMessageConverter streamConverter) {
}

public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}

MessageConverterconvertAndSend 方法中用于将对象转换为 Spring AMQP Message

StreamMessageConverter 用于将 Spring AMQP Message 转换为原生流 Message

您也可以直接发送原生流 Message ;使用 messageBuilder() 方法可以访问 Producer 的消息构建器。

ProducerCustomizer 提供了一种在构建生产者之前对其进行自定义的机制。

请参考 Java 客户端文档 关于自定义 EnvironmentProducer 的内容。

接收消息

异步消息接收由 StreamListenerContainer (以及使用 @RabbitListener 时使用的 StreamRabbitListenerContainerFactory)提供。

监听器容器需要一个 Environment 以及一个单独的流名称。

您可以使用经典的 MessageListener 来接收 Spring AMQP Message ,或者使用新的接口来接收原生流 Message

public interface StreamMessageListener extends MessageListener {

	void onStreamMessage(Message message, Context context);

}

请参阅 消息监听器容器配置 以获取有关支持属性的信息。

类似于模板,容器有一个 ConsumerCustomizer 属性。

请参考 Java 客户端文档 关于自定义 EnvironmentConsumer 的内容。

使用 @RabbitListener 时,配置一个 StreamRabbitListenerContainerFactory ;目前,大多数 @RabbitListener 属性(concurrency 等)会被忽略。仅支持 idqueuesautoStartupcontainerFactory。此外,queues 只能包含一个流名称。

示例

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
    template.setProducerCustomizer((name, builder) -> builder.name("test"));
    return template;
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
    return new StreamRabbitListenerContainerFactory(env);
}

@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
    ...
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
    StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
    factory.setNativeListener(true);
    factory.setConsumerCustomizer((id, builder) -> {
        builder.name("myConsumer")
                .offset(OffsetSpecification.first())
                .manualTrackingStrategy();
    });
    return factory;
}

@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
    ...
    context.storeOffset();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue1")
            .stream()
            .build();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue2")
            .stream()
            .build();
}

2.4.5 版本向 StreamListenerContainer (及其工厂)添加了 adviceChain 属性。还提供了一个新的工厂 bean,用于创建一个无状态重试拦截器,该拦截器带有一个可选的 StreamMessageRecoverer ,用于消费原始流消息时使用。

@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
    StreamRetryOperationsInterceptorFactoryBean rfb =
            new StreamRetryOperationsInterceptorFactoryBean();
    rfb.setRetryOperations(retryTemplate);
    rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
        ...
    });
    return rfb;
}
此容器不支持有状态重试。

超级流

超级流是一个分区流的抽象概念,通过将多个流队列绑定到一个具有参数 x-super-stream: true 的交换机来实现。

配置

为方便起见,可以通过定义一个类型为 SuperStream 的 bean 来配置一个超级流。

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3);
}

RabbitAdmin 会检测到此 bean 并声明交换机(my.super.stream)和 3 个队列(分区)—— my.super-stream-n ,其中 n012,绑定路由键等于 n

如果您也希望通过 AMQP 发布到此交换机,可以提供自定义路由键

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
					.mapToObj(j -> "rk-" + j)
					.collect(Collectors.toList()));
}

路由键的数量必须与分区数量相等。

生产到超级流

您必须向 RabbitStreamTemplate 添加一个 superStreamRoutingFunction

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
    template.setSuperStreamRouting(message -> {
        // some logic to return a String for the client's hashing algorithm
    });
    return template;
}

您也可以使用 RabbitTemplate 通过 AMQP 发布。

使用单活跃消费者消费超级流

在监听器容器上调用 superStream 方法,以在超级流上启用单活跃消费者。

@Bean
StreamListenerContainer container(Environment env, String name) {
    StreamListenerContainer container = new StreamListenerContainer(env);
    container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
    container.setupMessageListener(msg -> {
        ...
    });
    container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
    return container;
}
目前,当并发数大于 1 时,实际的并发数会进一步受 Environment 控制;要实现完全并发,请将 environment 的 maxConsumersByConnection 设置为 1。请参阅 配置 Environment

Micrometer 观测

自 3.0.5 版本起,现在支持使用 Micrometer 进行观测,包括针对 RabbitStreamTemplate 和流监听器容器。该容器现在也支持 Micrometer 定时器(当未启用观测时)。

observationEnabled 设置在每个组件上以启用观测;这将禁用 Micrometer 定时器 ,因为定时器现在会随每次观测进行管理。使用注解监听器时,将 observationEnabled 设置在容器工厂上。

请参考 Micrometer Tracing 以获取更多信息。

要向定时器/追踪添加标签,请向模板或监听器容器分别配置一个自定义的 RabbitStreamTemplateObservationConventionRabbitStreamListenerObservationConvention

默认实现会为模板观测添加 name 标签,并为容器添加 listener.id 标签。

您可以继承 DefaultRabbitStreamTemplateObservationConventionDefaultStreamRabbitListenerObservationConvention ,或者提供全新的实现。

请参阅 Micrometer 观测文档 获取更多详细信息。