AmqpTemplate

与 Spring Framework 和相关项目提供的许多其他高级抽象一样,Spring AMQP 提供了一个起核心作用的“模板”。定义主要操作的接口称为 AmqpTemplate。这些操作涵盖了发送和接收消息的通用行为。换句话说,它们不是任何特定实现的独有功能——因此名称中带有“AMQP”。另一方面,该接口的实现与 AMQP 协议的实现相关联。与 JMS 不同,JMS 本身是接口级别的 API,AMQP 是线级别协议。该协议的实现提供了自己的客户端库,因此模板接口的每个实现都依赖于特定的客户端库。目前,只有一个实现:RabbitTemplate。在随后的示例中,我们经常使用 AmqpTemplate。然而,当您查看配置示例或任何实例化模板或调用 setter 的代码片段时,您会看到实现的类型(例如,RabbitTemplate)。

如前所述,AmqpTemplate 接口定义了所有用于发送和接收消息的基本操作。我们将在发送消息接收消息中分别探讨消息发送和接收。

另请参见Async Rabbit Template

添加重试功能

从 1.3 版本开始,您现在可以配置 RabbitTemplate 使用 RetryTemplate 来帮助处理与 Broker 连接相关的问题。有关完整信息,请参阅 spring-retry 项目。以下仅是一个示例,它使用指数回退策略和默认的 SimpleRetryPolicy,在向调用者抛出异常之前会尝试三次。

以下示例使用 XML 命名空间

<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="500" />
            <property name="multiplier" value="10.0" />
            <property name="maxInterval" value="10000" />
        </bean>
    </property>
</bean>

以下示例在 Java 中使用 @Configuration 注解

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    RetryTemplate retryTemplate = new RetryTemplate();
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(500);
    backOffPolicy.setMultiplier(10.0);
    backOffPolicy.setMaxInterval(10000);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    template.setRetryTemplate(retryTemplate);
    return template;
}

从 1.4 版本开始,除了 retryTemplate 属性外,RabbitTemplate 还支持 recoveryCallback 选项。它被用作 RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback) 的第二个参数。

RecoveryCallback 有一定的限制,因为它只包含重试上下文中的 lastThrowable 字段。对于更复杂的用例,您应该使用外部的 RetryTemplate,以便通过上下文属性向 RecoveryCallback 传递额外信息。以下示例展示了如何实现:
retryTemplate.execute(
    new RetryCallback<Object, Exception>() {

        @Override
        public Object doWithRetry(RetryContext context) throws Exception {
            context.setAttribute("message", message);
            return rabbitTemplate.convertAndSend(exchange, routingKey, message);
        }

    }, new RecoveryCallback<Object>() {

        @Override
        public Object recover(RetryContext context) throws Exception {
            Object message = context.getAttribute("message");
            Throwable t = context.getLastThrowable();
            // Do something with message
            return null;
        }
    });
}

在这种情况下,您会将 RetryTemplate 注入到 RabbitTemplate 中。

发布是异步的 — 如何检测成功和失败

发布消息是一种异步机制,默认情况下,无法路由的消息会被 RabbitMQ 丢弃。对于成功的发布,您可以收到异步确认,如关联的发布者确认和返回中所述。考虑两种失败场景:

  • 发布到一个 Exchange,但没有匹配的目的地队列。

  • 发布到一个不存在的 Exchange。

第一种情况由发布者返回涵盖,如关联的发布者确认和返回中所述。

对于第二种情况,消息会被丢弃并且不生成返回。底层 Channel 会因异常而关闭。默认情况下,此异常会被记录日志,但您可以通过向 CachingConnectionFactory 注册一个 ChannelListener 来获取此类事件的通知。以下示例展示了如何添加 ConnectionListener

this.connectionFactory.addConnectionListener(new ConnectionListener() {

    @Override
    public void onCreate(Connection connection) {
    }

    @Override
    public void onShutDown(ShutdownSignalException signal) {
        ...
    }

});

您可以检查 signal 的 reason 属性以确定发生的问题。

要在发送线程上检测异常,您可以在 RabbitTemplate 上设置 setChannelTransacted(true),异常将在 txCommit() 时检测到。然而,事务会显著降低性能,因此在仅为此一个用例启用事务之前请仔细考虑。

关联的发布者确认和返回

AmqpTemplateRabbitTemplate 实现支持发布者确认和返回。

对于返回的消息,模板的 mandatory 属性必须设置为 true,或者针对特定消息的 mandatory-expression 必须评估为 true。此功能需要一个将其 publisherReturns 属性设置为 trueCachingConnectionFactory(参见发布者确认和返回)。通过调用 setReturnsCallback(ReturnsCallback callback) 并注册 RabbitTemplate.ReturnsCallback,返回会发送给客户端。该回调必须实现以下方法:

void returnedMessage(ReturnedMessage returned);

ReturnedMessage 包含以下属性:

  • message - 返回的消息本身

  • replyCode - 指示返回原因的代码

  • replyText - 返回的文本原因 - 例如 NO_ROUTE

  • exchange - 消息发送到的 Exchange

  • routingKey - 使用的 Routing Key

每个 RabbitTemplate 只支持一个 ReturnsCallback。另请参见回复超时

对于发布者确认(也称为发布者 ACK),模板需要一个将其 publisherConfirm 属性设置为 ConfirmType.CORRELATEDCachingConnectionFactory。通过调用 setConfirmCallback(ConfirmCallback callback) 并注册 RabbitTemplate.ConfirmCallback,确认会发送给客户端。该回调必须实现此方法:

void confirm(CorrelationData correlationData, boolean ack, String cause);

CorrelationData 是客户端发送原始消息时提供的一个对象。ack 对于 ack 为 true,对于 nack 为 false。对于 nack 实例,如果生成 nack 时原因可用,则 cause 可能包含 nack 的原因。一个例子是发送消息到不存在的 Exchange。在这种情况下,Broker 会关闭 Channel。关闭的原因包含在 cause 中。cause 是在 1.4 版本中添加的。

每个 RabbitTemplate 只支持一个 ConfirmCallback

当 Rabbit Template 的发送操作完成时,Channel 会被关闭。当连接工厂缓存已满时(缓存中有空间时,Channel 不会被物理关闭,返回和确认正常进行),这将阻止接收确认或返回。当缓存已满时,框架会延迟关闭最多五秒钟,以便有时间接收确认和返回。使用确认时,当收到最后一个确认后 Channel 会被关闭。仅使用返回时,Channel 会保持打开状态长达五秒钟。我们通常建议将连接工厂的 channelCacheSize 设置为一个足够大的值,以便发布消息的 Channel 返回到缓存中,而不是被关闭。您可以使用 RabbitMQ 管理插件监控 Channel 使用情况。如果您看到 Channel 快速打开和关闭,应考虑增加缓存大小以减少服务器开销。
在 2.1 版本之前,启用发布者确认的 Channel 会在收到确认之前返回到缓存。其他一些进程可能会取出该 Channel 并执行导致 Channel 关闭的操作,例如向不存在的 Exchange 发布消息。这可能导致确认丢失。2.1 及更高版本不再在确认未完成时将 Channel 返回到缓存。RabbitTemplate 在每次操作后对 Channel 执行逻辑 close()。一般来说,这意味着一个 Channel 上一次只有一个未完成的确认。
从 2.2 版本开始,回调会在连接工厂的 executor 线程之一上调用。这是为了避免在回调内执行 Rabbit 操作时可能发生的死锁。在以前的版本中,回调直接在 amqp-client 连接 I/O 线程上调用;如果您执行某些 RPC 操作(例如打开一个新 Channel),这将导致死锁,因为 I/O 线程会阻塞等待结果,但结果需要由 I/O 线程本身处理。在那些版本中,需要在回调内部将工作(例如发送消息)交给另一个线程。现在不再需要这样做,因为框架现在将回调调用交给执行器处理。
只要返回回调在 60 秒或更短时间内执行完成,即可保证在收到 ACK 之前收到返回的消息。确认计划在返回回调退出后或 60 秒后交付,以先到者为准。

CorrelationData 对象有一个 CompletableFuture,您可以使用它来获取结果,而不是在模板上使用 ConfirmCallback。以下示例展示了如何配置 CorrelationData 实例:

CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...

由于它是一个 CompletableFuture<Confirm>,您可以在就绪时 get() 获取结果,或使用 whenComplete() 进行异步回调。Confirm 对象是一个简单的 bean,具有两个属性:ackreason(针对 nack 实例)。Broker 生成的 nack 实例不会填充 reason。框架生成的 nack 实例(例如,在 ACK 未完成时关闭连接)会填充 reason。

此外,当同时启用确认和返回时,如果消息无法路由到任何队列,CorrelationDatareturn 属性将填充返回的消息。保证在 future 设置 ack 之前,返回消息属性已被设置。CorrelationData.getReturn() 返回一个包含以下属性的 ReturnMessage

  • message (返回的消息)

  • replyCode

  • replyText

  • exchange

  • routingKey

另请参见范围操作,了解一种更简单的方式等待发布者确认。

范围操作

通常,使用模板时,会从缓存中取出(或创建)一个 Channel,用于执行操作,然后返回到缓存中以便重用。在多线程环境中,无法保证下一个操作使用同一个 Channel。然而,有时您可能希望对 Channel 的使用有更多控制,并确保多个操作都在同一个 Channel 上执行。

从 2.0 版本开始,提供了一个名为 invoke 的新方法,带有一个 OperationsCallback。在回调范围内并使用提供的 RabbitOperations 参数执行的任何操作都将使用相同的专用 Channel,该 Channel 最后会被关闭(而不是返回到缓存)。如果 Channel 是 PublisherCallbackChannel,它会在收到所有确认后返回到缓存(参见关联的发布者确认和返回)。

@FunctionalInterface
public interface OperationsCallback<T> {

    T doInRabbit(RabbitOperations operations);

}

您可能需要此功能的一个例子是,如果您希望使用底层 Channel 上的 waitForConfirms() 方法。如前所述,Spring API 以前没有公开此方法,因为 Channel 通常是缓存和共享的。RabbitTemplate 现在提供了 waitForConfirms(long timeout)waitForConfirmsOrDie(long timeout),它们委托给在 OperationsCallback 范围内使用的专用 Channel。出于显而易见的原因,这些方法不能在该范围之外使用。

请注意,一个更高级的抽象,允许您将确认与请求关联,已在其他地方提供(参见关联的发布者确认和返回)。如果您只想等待 Broker 确认交付,可以使用以下示例所示的技术:

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
});

如果您希望 RabbitAdmin 操作在 OperationsCallback 范围内的同一个 Channel 上调用,则必须使用用于 invoke 操作的同一个 RabbitTemplate 构建 Admin。

如果模板操作已在现有事务的范围内执行(例如,在事务性监听器容器线程上运行并在事务性模板上执行操作),则前面的讨论就没有意义了。在这种情况下,操作会在该 Channel 上执行,并在线程返回到容器时提交。在这种场景下,没有必要使用 invoke

以这种方式使用确认时,为将确认与请求关联而设置的大部分基础设施实际上是不需要的(除非也启用了返回)。从 2.2 版本开始,连接工厂支持一个名为 publisherConfirmType 的新属性。当此属性设置为 ConfirmType.SIMPLE 时,可以避免使用该基础设施,从而使确认处理更加高效。

此外,RabbitTemplate 会在发送消息的 MessageProperties 中设置 publisherSequenceNumber 属性。如果您希望检查(或记录或以其他方式使用)特定的确认,可以使用重载的 invoke 方法,如下例所示:

public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
        com.rabbitmq.client.ConfirmCallback nacks);
这些 ConfirmCallback 对象(针对 acknack 实例)是 Rabbit 客户端的回调,而不是模板的回调。

以下示例记录 acknack 实例:

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
}, (tag, multiple) -> {
        log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
        log.info("Nack: " + tag + ":" + multiple);
}));
范围操作绑定到线程。有关多线程环境中严格顺序的讨论,请参见多线程环境中的严格消息顺序

多线程环境中的严格消息顺序

范围操作中的讨论仅适用于在同一线程上执行操作的情况。

考虑以下情况:

  • thread-1 将消息发送到队列,并将工作移交给 thread-2

  • thread-2 将消息发送到同一个队列

由于 RabbitMQ 的异步特性和缓存 Channel 的使用,无法确定是否会使用同一个 Channel,因此消息到达队列的顺序无法保证。(大多数情况下它们会按顺序到达,但乱序交付的概率不为零)。为解决此用例,您可以使用大小为 1 的有界 Channel 缓存(以及 channelCheckoutTimeout)以确保消息始终在同一个 Channel 上发布,从而保证顺序。为此,如果您有连接工厂的其他用途(例如消费者),您应该要么为模板使用专用的连接工厂,要么配置模板使用主连接工厂中嵌入的发布者连接工厂(参见使用独立的连接)。

用一个简单的 Spring Boot 应用可以最好地说明这一点:

@SpringBootApplication
public class Application {

	private static final Logger log = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	TaskExecutor exec() {
		ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
		exec.setCorePoolSize(10);
		return exec;
	}

	@Bean
	CachingConnectionFactory ccf() {
		CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
		CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
		publisherCF.setChannelCacheSize(1);
		publisherCF.setChannelCheckoutTimeout(1000L);
		return ccf;
	}

	@RabbitListener(queues = "queue")
	void listen(String in) {
		log.info(in);
	}

	@Bean
	Queue queue() {
		return new Queue("queue");
	}


	@Bean
	public ApplicationRunner runner(Service service, TaskExecutor exec) {
		return args -> {
			exec.execute(() -> service.mainService("test"));
		};
	}

}

@Component
class Service {

	private static final Logger LOG = LoggerFactory.getLogger(Service.class);

	private final RabbitTemplate template;

	private final TaskExecutor exec;

	Service(RabbitTemplate template, TaskExecutor exec) {
		template.setUsePublisherConnection(true);
		this.template = template;
		this.exec = exec;
	}

	void mainService(String toSend) {
		LOG.info("Publishing from main service");
		this.template.convertAndSend("queue", toSend);
		this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
	}

	void secondaryService(String toSend) {
		LOG.info("Publishing from secondary service");
		this.template.convertAndSend("queue", toSend);
	}

}

即使发布操作在两个不同的线程上执行,它们都将使用同一个 Channel,因为缓存被限制为单个 Channel。

从 2.3.7 版本开始,ThreadChannelConnectionFactory 支持使用 prepareContextSwitchswitchContext 方法将线程的 Channel 转移到另一个线程。第一个方法返回一个上下文,该上下文传递给调用第二个方法的第二个线程。一个线程可以绑定一个非事务性 Channel 或一个事务性 Channel(或各一个);您无法单独转移它们,除非您使用两个连接工厂。示例如下:

@SpringBootApplication
public class Application {

	private static final Logger log = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	TaskExecutor exec() {
		ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
		exec.setCorePoolSize(10);
		return exec;
	}

	@Bean
	ThreadChannelConnectionFactory tccf() {
		ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
		rabbitConnectionFactory.setHost("localhost");
		return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
	}

	@RabbitListener(queues = "queue")
	void listen(String in) {
		log.info(in);
	}

	@Bean
	Queue queue() {
		return new Queue("queue");
	}


	@Bean
	public ApplicationRunner runner(Service service, TaskExecutor exec) {
		return args -> {
			exec.execute(() -> service.mainService("test"));
		};
	}

}

@Component
class Service {

	private static final Logger LOG = LoggerFactory.getLogger(Service.class);

	private final RabbitTemplate template;

	private final TaskExecutor exec;

	private final ThreadChannelConnectionFactory connFactory;

	Service(RabbitTemplate template, TaskExecutor exec,
			ThreadChannelConnectionFactory tccf) {

		this.template = template;
		this.exec = exec;
		this.connFactory = tccf;
	}

	void mainService(String toSend) {
		LOG.info("Publishing from main service");
		this.template.convertAndSend("queue", toSend);
		Object context = this.connFactory.prepareSwitchContext();
		this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
	}

	void secondaryService(String toSend, Object threadContext) {
		LOG.info("Publishing from secondary service");
		this.connFactory.switchContext(threadContext);
		this.template.convertAndSend("queue", toSend);
		this.connFactory.closeThreadChannel();
	}

}
调用 prepareSwitchContext 后,如果当前线程再执行任何操作,它们将在新的 Channel 上执行。不再需要时关闭线程绑定的 Channel 非常重要。

消息集成

从 1.4 版本开始,RabbitMessagingTemplate(构建在 RabbitTemplate 之上)提供了与 Spring Framework 消息抽象的集成——即 org.springframework.messaging.Message。这允许您使用 spring-messagingMessage<?> 抽象来发送和接收消息。此抽象被其他 Spring 项目使用,例如 Spring Integration 和 Spring 的 STOMP 支持。涉及两种消息转换器:一种用于在 spring-messaging 的 Message<?> 抽象和 Spring AMQP 的 Message 抽象之间进行转换,另一种用于在 Spring AMQP 的 Message 抽象和底层 RabbitMQ 客户端库所需的格式之间进行转换。默认情况下,消息负载由提供的 RabbitTemplate 实例的消息转换器转换。或者,您可以注入一个使用其他负载转换器的自定义 MessagingMessageConverter,如下例所示:

MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);

验证用户 ID

从 1.6 版本开始,模板现在支持 user-id-expression(使用 Java 配置时为 userIdExpression)。如果发送消息,则在评估此表达式后设置(如果尚未设置)用户 ID 属性。评估的根对象是要发送的消息。

以下示例展示了如何使用 user-id-expression 属性:

<rabbit:template ... user-id-expression="'guest'" />

<rabbit:template ... user-id-expression="@myConnectionFactory.username" />

第一个示例是字面表达式。第二个示例从应用上下文中的连接工厂 bean 获取 username 属性。

使用独立的连接

从 2.0.2 版本开始,您可以将 usePublisherConnection 属性设置为 true,以便在可能的情况下使用与监听器容器所用连接不同的连接。这是为了避免当生产者因任何原因阻塞时,消费者也被阻塞。连接工厂为此目的维护第二个内部连接工厂;默认情况下,它与主工厂类型相同,但如果希望为发布使用不同的工厂类型,可以显式设置。如果 Rabbit Template 在由监听器容器启动的事务中运行,无论此设置如何,都将使用容器的 Channel。

通常,您不应将 RabbitAdmin 与此属性设置为 true 的模板一起使用。使用接受连接工厂的 RabbitAdmin 构造函数。如果您使用接受模板的另一个构造函数,请确保模板的此属性为 false。这是因为,Admin 通常用于为监听器容器声明队列。使用将此属性设置为 true 的模板将意味着独占队列(例如 AnonymousQueue)将在与监听器容器使用的连接不同的连接上声明。在这种情况下,容器无法使用这些队列。