AMQP

高级消息队列协议(AMQP)是一种与平台无关、面向连接的协议,用于消息中间件。Spring AMQP 项目将 Spring 核心概念应用于开发基于 AMQP 的消息解决方案。Spring Boot 通过 RabbitMQ 为使用 AMQP 提供了多项便利,包括 `spring-boot-starter-amqp` Starter。

RabbitMQ 支持

RabbitMQ 是一个基于 AMQP 协议的轻量级、可靠、可伸缩且可移植的消息代理。Spring 使用 RabbitMQ 通过 AMQP 协议进行通信。

RabbitMQ 配置由 `spring.rabbitmq.*` 中的外部配置属性控制。例如,你可以在 `application.properties` 中声明以下部分:

  • 属性

  • YAML

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

或者,你可以使用 `addresses` 属性配置相同的连接:

  • 属性

  • YAML

spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"
当以这种方式指定地址时,`host` 和 `port` 属性将被忽略。如果地址使用 `amqps` 协议,则会自动启用 SSL 支持。

有关更多支持的基于属性的配置选项,请参见 RabbitProperties。要配置 Spring AMQP 使用的 RabbitMQ ConnectionFactory 的更底层细节,请定义一个 ConnectionFactoryCustomizer Bean。

如果上下文中存在 ConnectionNameStrategy Bean,它将自动用于命名由自动配置的 CachingConnectionFactory 创建的连接。

要对 RabbitTemplate 进行应用范围内的累加式定制,请使用 RabbitTemplateCustomizer Bean。

有关更多详细信息,请参阅 理解 AMQP:RabbitMQ 使用的协议

发送消息

Spring 的 AmqpTemplateAmqpAdmin 会自动配置,你可以将它们直接自动注入到你自己的 Bean 中,如以下示例所示:

  • Java

  • Kotlin

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final AmqpAdmin amqpAdmin;

	private final AmqpTemplate amqpTemplate;

	public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
		this.amqpAdmin = amqpAdmin;
		this.amqpTemplate = amqpTemplate;
	}

	// ...

	public void someMethod() {
		this.amqpAdmin.getQueueInfo("someQueue");
	}

	public void someOtherMethod() {
		this.amqpTemplate.convertAndSend("hello");
	}

}
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {

	// ...

	fun someMethod() {
		amqpAdmin.getQueueInfo("someQueue")
	}

	fun someOtherMethod() {
		amqpTemplate.convertAndSend("hello")
	}

}
RabbitMessagingTemplate 可以以类似的方式注入。如果定义了 MessageConverter Bean,它会自动关联到自动配置的 AmqpTemplate

如果需要,任何定义为 Bean 的 Queue 会自动用于在 RabbitMQ 实例上声明相应的队列。

要重试操作,你可以在 AmqpTemplate 上启用重试(例如,在 Broker 连接丢失的情况下):

  • 属性

  • YAML

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"

默认情况下,重试是禁用的。你也可以通过声明 RabbitRetryTemplateCustomizer Bean 来编程方式定制 RetryTemplate

如果你需要创建更多 RabbitTemplate 实例或者想覆盖默认配置,Spring Boot 提供了一个 RabbitTemplateConfigurer Bean,你可以使用它来初始化一个 RabbitTemplate,其设置与自动配置使用的工厂相同。

发送消息到 Stream

要发送消息到特定的 Stream,请指定 Stream 的名称,如以下示例所示:

  • 属性

  • YAML

spring.rabbitmq.stream.name=my-stream
spring:
  rabbitmq:
    stream:
      name: "my-stream"

如果定义了 MessageConverterStreamMessageConverterProducerCustomizer Bean,它会自动关联到自动配置的 RabbitStreamTemplate

如果你需要创建更多 RabbitStreamTemplate 实例或者想覆盖默认配置,Spring Boot 提供了一个 RabbitStreamTemplateConfigurer Bean,你可以使用它来初始化一个 RabbitStreamTemplate,其设置与自动配置使用的工厂相同。

接收消息

当存在 Rabbit 基础设施时,任何 Bean 都可以用 @RabbitListener 注解进行标记,以创建监听器端点。如果没有定义 RabbitListenerContainerFactory,则会自动配置一个默认的 SimpleRabbitListenerContainerFactory,你可以使用 `spring.rabbitmq.listener.type` 属性切换到 Direct 容器。如果定义了 MessageConverterMessageRecoverer Bean,它会自动关联到默认工厂。

以下示例组件在 `someQueue` 队列上创建了一个监听器端点:

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"])
	fun processMessage(content: String?) {
		// ...
	}

}
有关更多详细信息,请参阅 @EnableRabbit

如果你需要创建更多 RabbitListenerContainerFactory 实例或者想覆盖默认配置,Spring Boot 提供了一个 SimpleRabbitListenerContainerFactoryConfigurer 和一个 DirectRabbitListenerContainerFactoryConfigurer,你可以使用它们来初始化一个 SimpleRabbitListenerContainerFactory 和一个 DirectRabbitListenerContainerFactory,其设置与自动配置使用的工厂相同。

你选择哪种容器类型并不重要。这两个 Bean 都由自动配置暴露出来。

例如,以下配置类暴露了另一个使用特定 MessageConverter 的工厂:

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

	@Bean
	public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		ConnectionFactory connectionFactory = getCustomConnectionFactory();
		configurer.configure(factory, connectionFactory);
		factory.setMessageConverter(new MyMessageConverter());
		return factory;
	}

	private ConnectionFactory getCustomConnectionFactory() {
		return ...
	}

}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {

	@Bean
	fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
		val factory = SimpleRabbitListenerContainerFactory()
		val connectionFactory = getCustomConnectionFactory()
		configurer.configure(factory, connectionFactory)
		factory.setMessageConverter(MyMessageConverter())
		return factory
	}

	fun getCustomConnectionFactory() : ConnectionFactory? {
		return ...
	}

}

然后你可以在任何带有 @RabbitListener 注解的方法中使用该工厂,如下所示:

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
	fun processMessage(content: String?) {
		// ...
	}

}

你可以启用重试来处理监听器抛出异常的情况。默认情况下,使用 RejectAndDontRequeueRecoverer,但你可以定义自己的 MessageRecoverer。当重试次数用尽后,消息将被拒绝,并且如果 Broker 配置了死信交换机,消息会被丢弃或路由到死信交换机。默认情况下,重试是禁用的。你也可以通过声明 RabbitRetryTemplateCustomizer Bean 来编程方式定制 RetryTemplate

默认情况下,如果重试被禁用且监听器抛出异常,消息将无限期地重试投递。你可以通过两种方式修改此行为:将 `defaultRequeueRejected` 属性设置为 `false`,这样将不再尝试重新投递;或者抛出 AmqpRejectAndDontRequeueException 来表明消息应该被拒绝。后者是启用重试且达到最大投递尝试次数时使用的机制。