AmqpTemplate
与Spring Framework及相关项目提供的许多其他高级抽象一样,Spring AMQP也提供了一个扮演核心角色的“模板”。定义主要操作的接口称为AmqpTemplate。这些操作涵盖了发送和接收消息的一般行为。换句话说,它们并非任何实现所独有——因此名称中带有“AMQP”。另一方面,该接口的实现与AMQP协议的实现紧密相关。与JMS本身是一个接口级API不同,AMQP是一个线级协议。该协议的实现提供了自己的客户端库,因此模板接口的每个实现都依赖于特定的客户端库。目前,只有一个实现:RabbitTemplate。在接下来的示例中,我们经常使用AmqpTemplate。但是,当您查看配置示例或任何实例化模板或调用setter的代码片段时,您可以看到实现类型(例如,RabbitTemplate)。
另请参阅异步Rabbit模板。
添加重试功能
从1.3版本开始,您可以配置RabbitTemplate使用RetryTemplate来帮助处理代理连接问题。有关更多信息,请参阅Spring Framework中的核心重试支持。以下只是一个示例,它使用了指数退避策略和默认的SimpleRetryPolicy,该策略在将异常抛给调用者之前进行三次尝试。
以下示例在Java中使用了@Configuration注解。
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryPolicy retryPolicy = RetryPolicy.builder()
.delay(Duration.ofMillis(500))
.multiplier(2.0)
.maxDelay(Duration.ofSeconds(10))
.build();
template.setRetryTemplate(new RetryTemplate(retryPolicy));
return template;
}
从1.4版本开始,除了retryTemplate属性之外,RabbitTemplate还支持recoveryCallback选项。它作为RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback)的第二个参数使用。
RecoveryCallback有些受限,因为重试上下文只包含lastThrowable字段。对于更复杂的用例,您应该使用外部RetryTemplate,以便通过上下文的属性将额外信息传递给RecoveryCallback。以下示例展示了如何实现: |
retryTemplate.execute(
new RetryCallback<Object, Exception>() {
@Override
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute("message", message);
return rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}, new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) throws Exception {
Object message = context.getAttribute("message");
Throwable t = context.getLastThrowable();
// Do something with message
return null;
}
});
}
在这种情况下,您将不会向RabbitTemplate注入RetryTemplate。
发布是异步的——如何检测成功和失败
发布消息是一种异步机制,默认情况下,无法路由的消息会被RabbitMQ丢弃。对于成功的发布,您可以收到异步确认,如关联发布者确认和返回中所述。考虑两种失败场景:
-
发布到交换机但没有匹配的目标队列。
-
发布到不存在的交换机。
第一种情况由发布者返回覆盖,如关联发布者确认和返回中所述。
对于第二种情况,消息被丢弃,不会生成返回。底层通道会因异常而关闭。默认情况下,此异常会被记录,但您可以通过向CachingConnectionFactory注册ChannelListener来获取此类事件的通知。以下示例展示了如何添加ConnectionListener:
this.connectionFactory.addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
}
@Override
public void onShutDown(ShutdownSignalException signal) {
...
}
});
您可以检查信号的reason属性以确定发生的问题。
要在发送线程上检测异常,您可以在RabbitTemplate上设置setChannelTransacted(true),异常将在txCommit()上被检测到。但是,事务会显著降低性能,因此在仅为此用例启用事务之前,请仔细考虑这一点。
关联发布者确认和返回
AmqpTemplate的RabbitTemplate实现支持发布者确认和返回。
对于返回的消息,模板的mandatory属性必须设置为true,或者特定消息的mandatory-expression必须评估为true。此功能需要一个CachingConnectionFactory,其publisherReturns属性设置为true(参见发布者确认和返回)。客户端通过调用setReturnsCallback(ReturnsCallback callback)注册RabbitTemplate.ReturnsCallback来接收返回。回调必须实现以下方法:
void returnedMessage(ReturnedMessage returned);
ReturnedMessage具有以下属性:
-
message- 返回的消息本身 -
replyCode- 指示返回原因的代码 -
replyText- 返回的文本原因 - 例如NO_ROUTE -
exchange- 消息发送到的交换机 -
routingKey- 使用的路由键
每个RabbitTemplate只支持一个ReturnsCallback。另请参阅回复超时。
对于发布者确认(也称为发布者回执),模板需要一个CachingConnectionFactory,其publisherConfirm属性设置为ConfirmType.CORRELATED。客户端通过调用setConfirmCallback(ConfirmCallback callback)注册RabbitTemplate.ConfirmCallback来接收确认。回调必须实现此方法:
void confirm(CorrelationData correlationData, boolean ack, String cause);
CorrelationData是客户端在发送原始消息时提供的对象。ack为ack时为true,为nack时为false。对于nack实例,如果生成nack时可用,cause可能包含nack的原因。一个例子是向不存在的交换机发送消息。在这种情况下,代理会关闭通道。关闭的原因包含在cause中。cause是在1.4版本中添加的。
一个RabbitTemplate只支持一个ConfirmCallback。
当 Rabbit 模板发送操作完成后,通道会关闭。这会阻止在连接工厂缓存已满时接收确认或返回(当缓存中有空间时,通道不会物理关闭,返回和确认正常进行)。当缓存已满时,框架会延迟关闭长达五秒,以便有时间接收确认和返回。使用确认时,当收到最后一个确认时通道会关闭。只使用返回时,通道会保持打开状态整整五秒。我们通常建议将连接工厂的channelCacheSize设置为足够大的值,以便发布消息的通道返回到缓存而不是关闭。您可以使用 RabbitMQ 管理插件监控通道使用情况。如果您看到通道快速打开和关闭,您应该考虑增加缓存大小以减少服务器开销。 |
在2.1版本之前,为发布者确认启用的通道在收到确认之前会返回到缓存中。其他一些进程可能会检出该通道并执行一些导致通道关闭的操作——例如将消息发布到不存在的交换机。这可能导致确认丢失。2.1及更高版本在确认未完成时不再将通道返回到缓存。RabbitTemplate在每次操作后对通道执行逻辑close()。通常,这意味着通道上一次只有一个未完成的确认。 |
从2.2版本开始,回调在连接工厂的一个executor线程上调用。这是为了避免如果您在回调中执行Rabbit操作时可能发生的死锁。在以前的版本中,回调直接在amqp-client连接I/O线程上调用;如果您执行某些RPC操作(例如打开新通道),这会导致死锁,因为I/O线程会阻塞等待结果,但结果需要由I/O线程本身处理。在那些版本中,有必要将工作(例如发送消息)移交给回调中的另一个线程。现在不再需要这样做,因为框架现在将回调调用移交给执行器。 |
| 只要返回回调在60秒或更短时间内执行,接收返回消息早于ack的保证仍然得到维护。确认的交付安排在返回回调退出后或60秒后(以先发生者为准)。 |
CorrelationData对象有一个CompletableFuture,您可以使用它来获取结果,而不是在模板上使用ConfirmCallback。以下示例显示了如何配置CorrelationData实例:
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...
由于它是一个CompletableFuture<Confirm>,您可以在结果准备好时调用get()来获取结果,或者使用whenComplete()进行异步回调。Confirm对象是一个简单的bean,包含两个属性:ack和reason(用于nack实例)。对于代理生成的nack实例,reason不填充。对于框架生成的nack实例(例如,在ack实例未决时关闭连接),reason会被填充。
此外,当同时启用确认和返回时,如果消息无法路由到任何队列,则CorrelationData的return属性将填充返回的消息。保证在未来设置ack之前,返回消息属性已设置。CorrelationData.getReturn()返回一个带有属性的ReturnMessage:
-
message(返回的消息)
-
replyCode
-
replyText
-
exchange
-
routingKey
另请参阅范围操作,以获取等待发布者确认的更简单机制。
作用域操作
通常,当使用模板时,一个Channel会从缓存中检出(或创建),用于操作,然后返回到缓存中以供重用。在多线程环境中,不能保证下一个操作会使用相同的通道。但是,有时您可能希望对通道的使用有更多的控制,并确保许多操作都在同一个通道上执行。
从2.0版本开始,提供了一个名为invoke的新方法,带有OperationsCallback。在回调范围内以及在提供的RabbitOperations参数上执行的任何操作都使用相同的专用Channel,该通道将在结束时关闭(不会返回到缓存中)。如果通道是PublisherCallbackChannel,则在收到所有确认后返回到缓存中(参见关联发布者确认和返回)。
@FunctionalInterface
public interface OperationsCallback<T> {
T doInRabbit(RabbitOperations operations);
}
您可能需要这样做的原因之一是,如果您希望在底层Channel上使用waitForConfirms()方法。由于通道通常是缓存和共享的,如前所述,此方法以前未通过Spring API公开。RabbitTemplate现在提供了waitForConfirms(long timeout)和waitForConfirmsOrDie(long timeout),它们委托给在OperationsCallback范围内使用的专用通道。出于显而易见的原因,这些方法不能在该范围之外使用。
请注意,其他地方提供了更高级别的抽象,允许您将确认与请求关联起来(参见关联发布者确认和返回)。如果您只想等到代理确认交付,您可以使用以下示例中显示的技术:
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
});
如果您希望RabbitAdmin操作在OperationsCallback范围内在同一通道上调用,则admin必须已使用与invoke操作相同的RabbitTemplate构建。
如果模板操作已在现有事务的范围内执行(例如,当在事务监听器容器线程上运行并对事务模板执行操作时),则前面的讨论就无关紧要了。在这种情况下,操作在该通道上执行,并在线程返回到容器时提交。在这种情况下,无需使用invoke。 |
以这种方式使用确认时,为将确认与请求关联而设置的大部分基础设施实际上是不需要的(除非也启用了返回)。从2.2版本开始,连接工厂支持一个名为publisherConfirmType的新属性。当将其设置为ConfirmType.SIMPLE时,将避免基础设施,并且确认处理可以更高效。
此外,RabbitTemplate在发送消息的MessageProperties中设置publisherSequenceNumber属性。如果您希望检查(或记录或以其他方式使用)特定的确认,您可以使用重载的invoke方法,如以下示例所示:
public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
com.rabbitmq.client.ConfirmCallback nacks);
这些ConfirmCallback对象(用于ack和nack实例)是Rabbit客户端回调,而不是模板回调。 |
以下示例记录了ack和nack实例:
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
}, (tag, multiple) -> {
log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
log.info("Nack: " + tag + ":" + multiple);
}));
| 作用域操作绑定到线程。有关多线程环境中严格排序的讨论,请参阅多线程环境中的严格消息排序。 |
多线程环境中的严格消息排序
作用域操作中的讨论仅适用于在同一线程上执行操作的情况。
考虑以下情况:
-
thread-1向队列发送消息并将工作移交给thread-2 -
thread-2向同一个队列发送消息
由于RabbitMQ的异步特性和缓存通道的使用;不确定是否会使用相同的通道,因此不能保证消息到达队列的顺序。(在大多数情况下,它们会按顺序到达,但乱序交付的概率不为零)。为了解决此用例,您可以使用大小为1的有界通道缓存(以及channelCheckoutTimeout)来确保消息始终在同一通道上发布,并且顺序将得到保证。为此,如果您有连接工厂的其他用途,例如消费者,您应该为模板使用专用连接工厂,或者配置模板以使用嵌入在主连接工厂中的发布者连接工厂(参见使用单独的连接)。
这最好通过一个简单的Spring Boot应用程序来说明:
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
publisherCF.setChannelCacheSize(1);
publisherCF.setChannelCheckoutTimeout(1000L);
return ccf;
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
Service(RabbitTemplate template, TaskExecutor exec) {
template.setUsePublisherConnection(true);
this.template = template;
this.exec = exec;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
}
void secondaryService(String toSend) {
LOG.info("Publishing from secondary service");
this.template.convertAndSend("queue", toSend);
}
}
即使发布是在两个不同的线程上执行的,它们都将使用相同的通道,因为缓存被限制为单个通道。
从2.3.7版本开始,ThreadChannelConnectionFactory支持使用prepareContextSwitch和switchContext方法将线程的通道传输到另一个线程。第一个方法返回一个上下文,该上下文传递给调用第二个方法的第二个线程。一个线程可以绑定一个非事务性通道或一个事务性通道(或两者都有);您不能单独传输它们,除非您使用两个连接工厂。示例如下:
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
ThreadChannelConnectionFactory tccf() {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
private final ThreadChannelConnectionFactory connFactory;
Service(RabbitTemplate template, TaskExecutor exec,
ThreadChannelConnectionFactory tccf) {
this.template = template;
this.exec = exec;
this.connFactory = tccf;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
Object context = this.connFactory.prepareSwitchContext();
this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
}
void secondaryService(String toSend, Object threadContext) {
LOG.info("Publishing from secondary service");
this.connFactory.switchContext(threadContext);
this.template.convertAndSend("queue", toSend);
this.connFactory.closeThreadChannel();
}
}
一旦调用了prepareSwitchContext,如果当前线程执行任何其他操作,它们将在新通道上执行。当不再需要线程绑定的通道时,关闭它是很重要的。 |
消息集成
从1.4版本开始,RabbitMessagingTemplate(基于RabbitTemplate构建)提供了与Spring Framework消息抽象(即org.springframework.messaging.Message)的集成。这允许您使用spring-messaging的Message<?>抽象来发送和接收消息。此抽象被其他Spring项目使用,例如Spring Integration和Spring的STOMP支持。涉及两个消息转换器:一个用于在spring-messaging的Message<?>和Spring AMQP的Message抽象之间进行转换,另一个用于在Spring AMQP的Message抽象和底层RabbitMQ客户端库所需的格式之间进行转换。默认情况下,消息负载由提供的RabbitTemplate实例的消息转换器进行转换。或者,您可以注入一个带有其他负载转换器的自定义MessagingMessageConverter,示例如下:
MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);
验证用户ID
从1.6版本开始,模板现在支持user-id-expression(使用Java配置时为userIdExpression)。如果发送消息,则在评估此表达式后设置用户ID属性(如果尚未设置)。评估的根对象是要发送的消息。
以下示例展示了如何使用user-id-expression属性:
<rabbit:template ... user-id-expression="'guest'" />
<rabbit:template ... user-id-expression="@myConnectionFactory.username" />
第一个示例是文字表达式。第二个从应用程序上下文中的连接工厂bean获取username属性。
使用单独的连接
从2.0.2版本开始,您可以将usePublisherConnection属性设置为true,以便在可能的情况下使用与监听器容器不同的连接。这是为了避免当生产者因任何原因被阻塞时消费者也被阻塞。为此,连接工厂维护第二个内部连接工厂;默认情况下,它与主工厂类型相同,但如果您希望为发布使用不同的工厂类型,则可以明确设置。如果rabbit模板在由监听器容器启动的事务中运行,则无论此设置如何,都将使用容器的通道。
通常,您不应将RabbitAdmin与将此属性设置为true的模板一起使用。请使用接受连接工厂的RabbitAdmin构造函数。如果您使用接受模板的其他构造函数,请确保模板的属性为false。这是因为,通常,admin用于为监听器容器声明队列。使用将此属性设置为true的模板将意味着独占队列(例如AnonymousQueue)将在与监听器容器使用的连接不同的连接上声明。在这种情况下,容器无法使用这些队列。 |