AMQP

高级消息队列协议(AMQP)是一种平台中立的、线级协议,用于面向消息的中间件。Spring AMQP 项目将 Spring 核心概念应用于基于 AMQP 的消息解决方案的开发。Spring Boot 通过 RabbitMQ 提供了多种使用 AMQP 的便利功能,包括 spring-boot-starter-amqp 启动器。

RabbitMQ 支持

RabbitMQ 是一个基于 AMQP 协议的轻量级、可靠、可扩展和可移植的消息代理。Spring 使用 RabbitMQ 通过 AMQP 协议进行通信。

RabbitMQ 的配置由 spring.rabbitmq.* 中的外部配置属性控制。例如,你可以在 application.properties 中声明以下部分:

  • 属性

  • YAML

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

或者,你可以使用 addresses 属性配置相同的连接:

  • 属性

  • YAML

spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"
以这种方式指定地址时,hostport 属性将被忽略。如果地址使用 amqps 协议,则会自动启用 SSL 支持。

有关更多支持的基于属性的配置选项,请参阅 RabbitProperties。要配置 Spring AMQP 使用的 RabbitMQ ConnectionFactory 的更底层细节,请定义一个 ConnectionFactoryCustomizer bean。

如果上下文中存在 ConnectionNameStrategy bean,它将自动用于命名由自动配置的 CachingConnectionFactory 创建的连接。

要对 RabbitTemplate 进行应用程序范围的累加定制,请使用 RabbitTemplateCustomizer bean。

有关更多详细信息,请参阅 理解 AMQP,RabbitMQ 使用的协议

发送消息

Spring 的 AmqpTemplateAmqpAdmin 已自动配置,你可以将它们直接自动注入到你的 bean 中,如以下示例所示:

  • Java

  • Kotlin

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final AmqpAdmin amqpAdmin;

	private final AmqpTemplate amqpTemplate;

	public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
		this.amqpAdmin = amqpAdmin;
		this.amqpTemplate = amqpTemplate;
	}

	// ...

	public void someMethod() {
		this.amqpAdmin.getQueueInfo("someQueue");
	}

	public void someOtherMethod() {
		this.amqpTemplate.convertAndSend("hello");
	}

}
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {

	// ...

	fun someMethod() {
		amqpAdmin.getQueueInfo("someQueue")
	}

	fun someOtherMethod() {
		amqpTemplate.convertAndSend("hello")
	}

}
RabbitMessagingTemplate 可以以类似的方式注入。如果定义了 MessageConverter bean,它将自动与自动配置的 AmqpTemplate 关联。

如有必要,任何定义为 bean 的 Queue 都将自动用于在 RabbitMQ 实例上声明相应的队列。

要重试操作,你可以为 AmqpTemplate 启用重试(例如,在代理连接丢失的情况下):

  • 属性

  • YAML

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"

重试默认是禁用的。你还可以通过声明一个 RabbitTemplateRetrySettingsCustomizer bean,以编程方式自定义 RetryTemplate

如果你需要创建更多 RabbitTemplate 实例,或者想要覆盖默认设置,Spring Boot 提供了一个 RabbitTemplateConfigurer bean,你可以使用它来初始化一个 RabbitTemplate,使其具有与自动配置使用的工厂相同的设置。

向流发送消息

要向特定流发送消息,请指定流的名称,如以下示例所示:

  • 属性

  • YAML

spring.rabbitmq.stream.name=my-stream
spring:
  rabbitmq:
    stream:
      name: "my-stream"

如果定义了 MessageConverterStreamMessageConverterProducerCustomizer bean,它将自动与自动配置的 RabbitStreamTemplate 关联。

如果你需要创建更多 RabbitStreamTemplate 实例,或者想要覆盖默认设置,Spring Boot 提供了一个 RabbitStreamTemplateConfigurer bean,你可以使用它来初始化一个 RabbitStreamTemplate,使其具有与自动配置使用的工厂相同的设置。

接收消息

当 Rabbit 基础设施存在时,任何 bean 都可以用 @RabbitListener 注解来创建监听器端点。如果没有定义 RabbitListenerContainerFactory,则会自动配置一个默认的 SimpleRabbitListenerContainerFactory,你可以使用 spring.rabbitmq.listener.type 属性切换到直接容器。如果定义了 MessageConverterMessageRecoverer bean,它将自动与默认工厂关联。

以下示例组件在 someQueue 队列上创建了一个监听器端点:

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"])
	fun processMessage(content: String?) {
		// ...
	}

}
有关更多详细信息,请参阅 @EnableRabbit

如果你需要创建更多 RabbitListenerContainerFactory 实例,或者想要覆盖默认设置,Spring Boot 提供了 SimpleRabbitListenerContainerFactoryConfigurerDirectRabbitListenerContainerFactoryConfigurer,你可以使用它们来初始化一个 SimpleRabbitListenerContainerFactory 和一个 DirectRabbitListenerContainerFactory,使其具有与自动配置使用的工厂相同的设置。

你选择哪种容器类型并不重要。这两个 bean 都由自动配置暴露。

例如,以下配置类暴露了另一个使用特定 MessageConverter 的工厂:

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.amqp.autoconfigure.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

	@Bean
	public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		ConnectionFactory connectionFactory = getCustomConnectionFactory();
		configurer.configure(factory, connectionFactory);
		factory.setMessageConverter(new MyMessageConverter());
		return factory;
	}

	private ConnectionFactory getCustomConnectionFactory() {
		return ...
	}

}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.amqp.autoconfigure.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {

	@Bean
	fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
		val factory = SimpleRabbitListenerContainerFactory()
		val connectionFactory = getCustomConnectionFactory()
		configurer.configure(factory, connectionFactory)
		factory.setMessageConverter(MyMessageConverter())
		return factory
	}

	fun getCustomConnectionFactory() : ConnectionFactory {
		return ...
	}

}

然后,你可以在任何带有 @RabbitListener 注解的方法中使用该工厂,如下所示:

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
	fun processMessage(content: String?) {
		// ...
	}

}

你可以启用重试来处理监听器抛出异常的情况。默认情况下,使用 RejectAndDontRequeueRecoverer,但你可以定义自己的 MessageRecoverer。当重试耗尽时,消息将被拒绝,如果代理配置为这样做,则会被丢弃或路由到死信交换。默认情况下,重试是禁用的。你还可以通过声明一个 RabbitListenerRetrySettingsCustomizer bean,以编程方式自定义 RetryPolicy

默认情况下,如果重试被禁用且监听器抛出异常,则会无限期地重试投递。你可以通过两种方式修改此行为:将 defaultRequeueRejected 属性设置为 false,这样就不会尝试重新投递;或者抛出 AmqpRejectAndDontRequeueException 以表示消息应该被拒绝。后者是在启用重试且达到最大投递尝试次数时使用的机制。
© . This site is unofficial and not affiliated with VMware.