AMQP 支持的消息通道

有两种消息通道实现可用。一个是点对点,另一个是发布订阅。这两种通道都为底层的 AmqpTemplateSimpleMessageListenerContainer 提供了广泛的配置属性(如本章前面关于通道适配器和网关部分所示)。但是,我们在此展示的示例配置很少。请探索 XML 模式以查看可用的属性。

点对点通道可能看起来像以下示例

<int-amqp:channel id="p2pChannel"/>

在幕后,上面的示例会导致声明一个名为 si.p2pChannelQueue,并且该通道会发送到该 Queue(从技术上讲,通过发送到无名称的直接交换,其路由键与该 Queue 的名称匹配)。该通道还会在该 Queue 上注册一个消费者。如果您希望通道是“可轮询”而不是消息驱动的,请提供 message-driven 标志,其值为 false,如下面的示例所示

<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>

发布订阅通道可能看起来像以下示例

<int-amqp:publish-subscribe-channel id="pubSubChannel"/>

在幕后,上面的示例会导致声明一个名为 si.fanout.pubSubChannel 的扇出交换,并且该通道会发送到该扇出交换。该通道还会声明一个服务器命名的独占的、自动删除的、非持久的 Queue,并将该 Queue 绑定到扇出交换,同时在该 Queue 上注册一个消费者以接收消息。发布订阅通道没有“可轮询”选项。它必须是消息驱动的。

从 4.1 版本开始,AMQP 支持的消息通道(与 channel-transacted 结合使用)支持 template-channel-transacted 来分别为 AbstractMessageListenerContainerRabbitTemplate 配置 transactional。请注意,以前,channel-transacted 默认情况下为 true。现在,默认情况下,它对于 AbstractMessageListenerContainerfalse

在 4.3 版本之前,AMQP 支持的通道仅支持带有 `Serializable` 负载和头的消息。整个消息会被转换(序列化)并发送到 RabbitMQ。现在,您可以将 `extract-payload` 属性(或使用 Java 配置时使用 `setExtractPayload()`)设置为 `true`。当此标志为 `true` 时,消息负载会被转换,而头会被映射,类似于您使用通道适配器时的操作方式。这种安排允许 AMQP 支持的通道与非可序列化负载一起使用(可能使用其他消息转换器,例如 `Jackson2JsonMessageConverter`)。有关默认映射头的更多信息,请参阅 AMQP 消息头。您可以通过提供使用 `outbound-header-mapper` 和 `inbound-header-mapper` 属性的自定义映射器来修改映射。您现在还可以指定一个 `default-delivery-mode`,它用于在没有 `amqp_deliveryMode` 头时设置传递模式。默认情况下,Spring AMQP `MessageProperties` 使用 `PERSISTENT` 传递模式。

与其他持久化支持的通道一样,AMQP 支持的通道旨在为消息提供持久性,以避免消息丢失。它们并非旨在将工作分发到其他对等应用程序。为此,请使用通道适配器。
从 5.0 版本开始,可轮询通道现在会阻塞轮询线程,持续时间为指定的 `receiveTimeout`(默认值为 1 秒)。以前,与其他 `PollableChannel` 实现不同,如果在指定时间内没有消息可用,线程会立即返回到调度程序,而不管接收超时时间如何。阻塞比使用 `basicGet()` 来检索消息(没有超时)要昂贵一些,因为必须创建一个消费者来接收每条消息。要恢复以前的行为,请将轮询器的 `receiveTimeout` 设置为 0。

使用 Java 配置进行配置

以下示例展示了如何使用 Java 配置来配置通道

@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("foo");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("bar");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("baz");
    factoryBean.setPubSub(false);
    return factoryBean;
}

使用 Java DSL 进行配置

以下示例展示了如何使用 Java DSL 来配置通道

@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.pollableChannel(connectionFactory)
                    .queueName("foo"))
            ...
            .get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.channel(connectionFactory)
                    .queueName("bar"))
            ...
            .get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.publishSubscribeChannel(connectionFactory)
                    .queueName("baz"))
            ...
            .get();
}