AMQP 支持的消息通道
有两种消息通道实现可用。一个是点对点,另一个是发布订阅。这两种通道都为底层的 AmqpTemplate
和 SimpleMessageListenerContainer
提供了广泛的配置属性(如本章前面关于通道适配器和网关部分所示)。但是,我们在此展示的示例配置很少。请探索 XML 模式以查看可用的属性。
点对点通道可能看起来像以下示例
<int-amqp:channel id="p2pChannel"/>
在幕后,上面的示例会导致声明一个名为 si.p2pChannel
的 Queue
,并且该通道会发送到该 Queue
(从技术上讲,通过发送到无名称的直接交换,其路由键与该 Queue
的名称匹配)。该通道还会在该 Queue
上注册一个消费者。如果您希望通道是“可轮询”而不是消息驱动的,请提供 message-driven
标志,其值为 false
,如下面的示例所示
<int-amqp:channel id="p2pPollableChannel" message-driven="false"/>
发布订阅通道可能看起来像以下示例
<int-amqp:publish-subscribe-channel id="pubSubChannel"/>
在幕后,上面的示例会导致声明一个名为 si.fanout.pubSubChannel
的扇出交换,并且该通道会发送到该扇出交换。该通道还会声明一个服务器命名的独占的、自动删除的、非持久的 Queue
,并将该 Queue
绑定到扇出交换,同时在该 Queue
上注册一个消费者以接收消息。发布订阅通道没有“可轮询”选项。它必须是消息驱动的。
从 4.1 版本开始,AMQP 支持的消息通道(与 channel-transacted
结合使用)支持 template-channel-transacted
来分别为 AbstractMessageListenerContainer
和 RabbitTemplate
配置 transactional
。请注意,以前,channel-transacted
默认情况下为 true
。现在,默认情况下,它对于 AbstractMessageListenerContainer
为 false
。
在 4.3 版本之前,AMQP 支持的通道仅支持带有 `Serializable` 负载和头的消息。整个消息会被转换(序列化)并发送到 RabbitMQ。现在,您可以将 `extract-payload` 属性(或使用 Java 配置时使用 `setExtractPayload()`)设置为 `true`。当此标志为 `true` 时,消息负载会被转换,而头会被映射,类似于您使用通道适配器时的操作方式。这种安排允许 AMQP 支持的通道与非可序列化负载一起使用(可能使用其他消息转换器,例如 `Jackson2JsonMessageConverter`)。有关默认映射头的更多信息,请参阅 AMQP 消息头。您可以通过提供使用 `outbound-header-mapper` 和 `inbound-header-mapper` 属性的自定义映射器来修改映射。您现在还可以指定一个 `default-delivery-mode`,它用于在没有 `amqp_deliveryMode` 头时设置传递模式。默认情况下,Spring AMQP `MessageProperties` 使用 `PERSISTENT` 传递模式。
与其他持久化支持的通道一样,AMQP 支持的通道旨在为消息提供持久性,以避免消息丢失。它们并非旨在将工作分发到其他对等应用程序。为此,请使用通道适配器。 |
从 5.0 版本开始,可轮询通道现在会阻塞轮询线程,持续时间为指定的 `receiveTimeout`(默认值为 1 秒)。以前,与其他 `PollableChannel` 实现不同,如果在指定时间内没有消息可用,线程会立即返回到调度程序,而不管接收超时时间如何。阻塞比使用 `basicGet()` 来检索消息(没有超时)要昂贵一些,因为必须创建一个消费者来接收每条消息。要恢复以前的行为,请将轮询器的 `receiveTimeout` 设置为 0。 |
使用 Java 配置进行配置
以下示例展示了如何使用 Java 配置来配置通道
@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("foo");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("bar");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("baz");
factoryBean.setPubSub(false);
return factoryBean;
}
使用 Java DSL 进行配置
以下示例展示了如何使用 Java DSL 来配置通道
@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.pollableChannel(connectionFactory)
.queueName("foo"))
...
.get();
}
@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.channel(connectionFactory)
.queueName("bar"))
...
.get();
}
@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.publishSubscribeChannel(connectionFactory)
.queueName("baz"))
...
.get();
}