AMQP 支持的消息通道

有两种消息通道实现可用。一种是点对点,另一种是发布-订阅。这两种通道都为底层的 AmqpTemplateSimpleMessageListenerContainer 提供了广泛的配置属性(如本章前面通道适配器和网关所示)。然而,我们在这里展示的示例配置最少。请查阅 XML Schema 以查看可用属性。

点对点通道可能如下所示

<int-amqp:channel id="p2pChannel"/>

在内部,前面的示例会声明一个名为 si.p2pChannelQueue,并且此通道会向该 Queue 发送消息(技术上,通过将消息发送到与此 Queue 名称匹配的路由键的无名直接交换机)。此通道还会在此 Queue 上注册一个消费者。如果您希望通道是“可轮询的”而不是消息驱动的,请提供 message-driven 标志并将其值设为 false,如下例所示

<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>

发布-订阅通道可能如下所示

<int-amqp:publish-subscribe-channel id="pubSubChannel"/>

在内部,前面的示例会声明一个名为 si.fanout.pubSubChannel 的扇出交换机,并且此通道会向该扇出交换机发送消息。此通道还会声明一个服务器命名的独占、自动删除、非持久化 Queue,并将其绑定到扇出交换机,同时在该 Queue 上注册一个消费者来接收消息。发布-订阅通道没有“可轮询”选项。它必须是消息驱动的。

从 4.1 版本开始,AMQP 支持的消息通道(结合 channel-transacted)支持 template-channel-transacted,用于分离 AbstractMessageListenerContainerRabbitTemplatetransactional 配置。请注意,之前 channel-transacted 默认为 true。现在,对于 AbstractMessageListenerContainer,它默认为 false

在 4.3 版本之前,AMQP 支持的通道仅支持有效载荷和头部为 Serializable 的消息。整个消息会被转换(序列化)并发送到 RabbitMQ。现在,您可以将 extract-payload 属性(或使用 Java 配置时的 setExtractPayload() 方法)设置为 true。当此标志为 true 时,消息有效载荷会被转换,头部会被映射,方式类似于使用通道适配器。这种配置允许 AMQP 支持的通道与非序列化有效载荷一起使用(可能与另一个消息转换器,例如 Jackson2JsonMessageConverter 一起使用)。有关默认映射头部的更多信息,请参阅 AMQP 消息头。您可以通过提供使用 outbound-header-mapperinbound-header-mapper 属性的自定义映射器来修改映射。您现在还可以指定一个 default-delivery-mode,当没有 amqp_deliveryMode 头部时,它用于设置投递模式。默认情况下,Spring AMQP MessageProperties 使用 PERSISTENT 投递模式。

与其他持久化支持的通道一样,AMQP 支持的通道旨在提供消息持久化以避免消息丢失。它们不用于将工作分发到其他对等应用程序。为此目的,请使用通道适配器。
从 5.0 版本开始,可轮询通道现在会阻塞轮询器线程达指定的 receiveTimeout 时间(默认值为 1 秒)。之前,与其他 PollableChannel 实现不同,如果队列中没有可用消息,无论接收超时时间是多少,线程都会立即返回给调度器。阻塞比使用 basicGet()(无超时)检索消息稍微昂贵一些,因为必须为接收每条消息创建一个消费者。要恢复之前的行为,请将轮询器的 receiveTimeout 设置为 0。

使用 Java 配置

以下示例展示了如何使用 Java 配置来配置通道

@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("foo");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("bar");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("baz");
    factoryBean.setPubSub(false);
    return factoryBean;
}

使用 Java DSL 配置

以下示例展示了如何使用 Java DSL 来配置通道

@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.pollableChannel(connectionFactory)
                    .queueName("foo"))
            ...
            .get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.channel(connectionFactory)
                    .queueName("bar"))
            ...
            .get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.publishSubscribeChannel(connectionFactory)
                    .queueName("baz"))
            ...
            .get();
}