基于AMQP的消息通道
提供了两种消息通道实现。一种是点对点 (point-to-point),另一种是发布订阅 (publish-subscribe)。这两种通道都为底层的AmqpTemplate
和SimpleMessageListenerContainer
提供了广泛的配置属性(如本章前面关于通道适配器和网关部分所示)。但是,我们在此展示的示例配置最小化。请浏览XML模式以查看可用属性。
点对点通道可能如下所示:
<int-amqp:channel id="p2pChannel"/>
在底层,上面的示例会导致声明一个名为si.p2pChannel
的队列,并且此通道向该队列发送消息(从技术上讲,它是通过向无名直接交换机发送消息,并且路由密钥与该队列的名称匹配来实现的)。此通道还在该队列上注册了一个消费者。如果您希望通道是“轮询”(可轮询)的而不是消息驱动的,请将message-driven
标志的值提供为false
,如下例所示:
<int-amqp:channel id="p2pPollableChannel" message-driven="false"/>
发布订阅通道可能如下所示:
<int-amqp:publish-subscribe-channel id="pubSubChannel"/>
在底层,上面的示例会导致声明一个名为si.fanout.pubSubChannel
的扇出交换机,并且此通道向该扇出交换机发送消息。此通道还会声明一个服务器命名的独占的、自动删除的、非持久的队列,并将该队列绑定到扇出交换机,同时在这个队列上注册一个消费者来接收消息。发布订阅通道没有“轮询”选项。它必须是消息驱动的。
从4.1版本开始,支持使用template-channel-transacted
(与channel-transacted
结合使用)的基于AMQP的消息通道,以便为AbstractMessageListenerContainer
和RabbitTemplate
分别进行transactional
配置。请注意,以前,channel-transacted
默认情况下为true
。现在,对于AbstractMessageListenerContainer
,它默认情况下为false
。
在4.3版本之前,基于AMQP的通道仅支持具有Serializable
有效负载和标头的消息。整个消息会被转换(序列化)并发送到RabbitMQ。现在,您可以将extract-payload
属性(或使用Java配置时使用setExtractPayload()
)设置为true
。当此标志为true
时,消息有效负载将被转换,并且标头将被映射,方式类似于使用通道适配器时。此安排允许基于AMQP的通道与非可序列化有效负载一起使用(可能使用另一个消息转换器,例如Jackson2JsonMessageConverter
)。有关默认映射标头的更多信息,请参见AMQP消息标头。您可以通过提供使用outbound-header-mapper
和inbound-header-mapper
属性的自定义映射器来修改映射。您现在还可以指定一个default-delivery-mode
,它用于在没有amqp_deliveryMode
标头时设置传递模式。默认情况下,Spring AMQP MessageProperties
使用PERSISTENT
传递模式。
与其他持久性支持的通道一样,基于AMQP的通道旨在提供消息持久性以避免消息丢失。它们并非旨在将工作分发到其他对等应用程序。为此,请改用通道适配器。 |
从5.0版本开始,可轮询通道现在会阻塞轮询线程,持续时间为指定的receiveTimeout (默认为1秒)。以前,与其他PollableChannel 实现不同,如果不可用消息,则线程会立即返回到调度程序,而不管接收超时时间如何。阻塞比使用basicGet() 检索消息(无超时)开销略大一些,因为必须创建一个消费者来接收每条消息。要恢复以前的行为,请将轮询程序的receiveTimeout 设置为0。 |
使用Java配置进行配置
以下示例演示如何使用Java配置配置通道:
@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("foo");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("bar");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("baz");
factoryBean.setPubSub(false);
return factoryBean;
}
使用Java DSL进行配置
以下示例演示如何使用Java DSL配置通道:
@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.pollableChannel(connectionFactory)
.queueName("foo"))
...
.get();
}
@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.channel(connectionFactory)
.queueName("bar"))
...
.get();
}
@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.publishSubscribeChannel(connectionFactory)
.queueName("baz"))
...
.get();
}