AMQP 1.0 支持

从版本 7.0 开始,Spring Integration 为 RabbitMQ AMQP 1.0 支持提供了通道适配器。这些通道适配器基于 org.springframework.amqp:spring-rabbitmq-client 库。

Spring AMQP 文档提供了关于 RabbitMQ AMQP 1.0 支持的更多详细信息。

AMQP 1.0 出站通道适配器

AmqpClientMessageHandlerAbstractReplyProducingMessageHandler 的实现,可以根据 setRequiresReply() 配置作为单向通道适配器或出站网关。此通道适配器实例需要一个用于 AMQP 1.0 协议的 AsyncAmqpTemplate 实现,例如来自上述 spring-rabbitmq-client 库的 RabbitAmqpTemplate。此消息处理程序默认是异步的;因此,发布错误应通过请求消息中的 errorChannel 头部或应用程序上下文中的全局默认 errorChannel 来处理。

发布消息的 exchange(以及可选的 routingKey)与要发布的 queue 互斥。如果两者都没有提供,那么 AsyncAmqpTemplate 实现必须确保这些目标部分的一些默认值;否则消息将被拒绝,因为它未送达。

默认情况下,MessageConverter 是一个 org.springframework.amqp.support.converter.SimpleMessageConverter,它处理 String、可序列化实例和字节数组。此外,默认的 AmqpHeaderMapper 是一个 DefaultAmqpHeaderMapper.outboundMapper()。此头部映射器还用于将 AMQP 消息属性映射回回复中的头部。

在网关模式下,可以提供 replyPayloadType 以转换回复消息体。但是,MessageConverter 必须是 SmartMessageConverter 的实现,例如 JacksonJsonMessageConverter。此外,与 replyPayloadType 互斥的是,可以将 returnMessage 标志设置为 true 以返回整个 org.springframework.amqp.core.Message 实例作为回复消息负载。

以下示例演示如何将 AmqpClientMessageHandler 配置为简单的 @ServiceActivator

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • Java

@Bean
IntegrationFlow sendFlow(RabbitAmqpTemplate rabbitTemplate) {
    return f -> f
            .handle(AmqpClient.outboundAdapter(rabbitTemplate)
                    .exchange("e1")
                    .routingKeyExpression("'k1'"));
}
@Bean
fun sendFlow(rabbitTemplate: RabbitAmqpTemplate) =
    integrationFlow {
                handle(AmqpClient.outboundAdapter(rabbitTemplate)
    		            .apply {
    		                exchange("e1")
                            routingKeyExpression("'k1'")
    		            }
    		    )
    }
@Bean
sendFlow() {
    integrationFlow {
        handle(AmqpClient.outboundAdapter(rabbitTemplate)
                .with {
                     exchange 'e1'
                     routingKeyExpression '''k1'''
                }
        )
    }
}
@Bean
@ServiceActivator(inputChannel = "amqpClientSendChannel")
AmqpClientMessageHandler amqpClientMessageHandler(RabbitAmqpTemplate rabbitTemplate) {
    AmqpClientMessageHandler messageHandler = new AmqpClientMessageHandler(rabbitTemplate);
    messageHandler.setExchangeExpressionString("headers[exchange]");
    messageHandler.setRoutingKeyExpressionString("headers[routingKey]");
    return messageHandler;
}

AmqpClientMessageHandler 的网关变体可以是

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • Java

@Bean
IntegrationFlow requestReplyOutboundFlow(RabbitAmqpTemplate rabbitTemplate) {
    return f -> f
            .handle(AmqpClient.outboundGateway(rabbitTemplate)
                    .queueFunction(m -> "requestReply"));
}
@Bean
fun sendFlow(rabbitTemplate: RabbitAmqpTemplate) =
    integrationFlow {
                handle(AmqpClient.outboundGateway(rabbitTemplate)
    		            .queueFunction { "requestReply" }
                )
    }
@Bean
sendFlow() {
    integrationFlow {
        handle(AmqpClient.outboundGateway(rabbitTemplate)
                .with {
                     queueFunction { 'requestReply' }
                }
        )
    }
}
@Bean
@ServiceActivator(inputChannel = "amqpClientSendAndReceiveChannel")
AmqpClientMessageHandler amqpClientGateway(RabbitAmqpTemplate rabbitTemplate) {
    AmqpClientMessageHandler messageHandler = new AmqpClientMessageHandler(rabbitTemplate);
    messageHandler.setRequiresReply(true);
    messageHandler.setReplyPayloadType(String.class);
    messageHandler.setMessageConverter(new JacksonJsonMessageConverter());
    messageHandler.setQueue("q1");
    return messageHandler;
}

AMQP 1.0 消息驱动通道适配器

AmqpClientMessageProducerMessageProducerSupport 的实现,作为消息驱动通道适配器,通过 RabbitMQ AMQP 1.0 协议从队列消费消息。它需要一个 AmqpConnectionFactory 和至少一个要消费的队列。其内部逻辑基于 RabbitAmqpListenerContainerIntegrationRabbitAmqpMessageListener,用于将消费的 AMQP 消息(转换后)中继到 outputChannelRabbitAmqpListenerContainer 的一些配置选项作为 AmqpClientMessageProducer 的设置器暴露。

默认情况下,MessageConverter 是一个 org.springframework.amqp.support.converter.SimpleMessageConverter,它处理 String、可序列化实例和字节数组。此外,默认的 AmqpHeaderMapper 是一个 DefaultAmqpHeaderMapper.inboundMapper()。可以将 messageConverter 选项设置为 null 以完全跳过转换(包括头部映射),并返回收到的 AMQP 消息作为要生成的 Spring 消息的负载。

此外,AmqpClientMessageProducer 实现了 Pausable 契约,并委托给相应的 RabbitAmqpListenerContainer API。

AmqpClientMessageProducer.setBatchSize() > 1 时,此通道适配器以批处理模式工作。在这种情况下,接收到的消息将被收集,直到达到批处理大小,或 batchReceiveTimeout 期限耗尽。然后,所有批处理的 AMQP 消息将被转换为 Spring 消息,并且结果列表将作为包装消息的负载生成,以发送到 outputChannel。批处理模式由于一次性结算所有批处理消息而带来一些性能提升。

autoSettle 标志设置为 false 时,AcknowledgmentCallback 实例作为 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 消息头部提供,以对接收到的消息或整个批处理做出结算决策。

以下示例演示如何将 AmqpClientMessageProducer 配置为简单的入站端点

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • Java

@Bean
IntegrationFlow receiveFlow(AmqpConnectionFactory connectionFactory) {
    return IntegrationFlow.from(AmqpClient.inboundChannelAdapter(connectionFactory, "q1"))
            .channel(c -> c.queue("receiveChannel"))
            .get();
}
@Bean
fun receiveFlow(connectionFactory: AmqpConnectionFactory) =
        integrationFlow(AmqpClient.inboundChannelAdapter(connectionFactory, "q1")) {
            channel("inputChannel")
        }
@Bean
receiveFlow(AmqpConnectionFactory connectionFactory) {
    integrationFlow(AmqpClient.inboundChannelAdapter(connectionFactory, 'q1')) {
        channel 'inputChannel'
    }
}
@Bean
AmqpClientMessageProducer batchAmqpClientMessageProducer(AmqpConnectionFactory connectionFactory,
        QueueChannel inputChannel) {

    AmqpClientMessageProducer amqpClientMessageProducer = new AmqpClientMessageProducer(connectionFactory, "q3");
    amqpClientMessageProducer.setOutputChannel(inputChannel);
    amqpClientMessageProducer.setBatchSize(2);
    return amqpClientMessageProducer;
}

AMQP 1.0 入站网关

AmqpClientInboundGatewayMessagingGatewaySupport 的实现,用于通过 RabbitMQ AMQP 1.0 协议接收请求并生成回复。它与上述 AmqpClientMessageProducer 类似,并共享许多 RabbitAmqpListenerContainer 配置选项。此外,为了生成 AMQP 1.0 回复,AmqpClientInboundGateway 内部使用 RabbitAmqpTemplate

为了自动将回复与其请求关联起来,必须提供请求消息的 replyTo 属性。例如,RabbitAmqpTemplate.sendAndReceive() 依赖于 RabbitMQ AMQP 1.0 库中的 RpcClient,该库生成一个独占且自动删除的队列。或者,回复地址可以作为 AmqpClientInboundGateway 上的 replyExchange(和可选的 replyRoutingKey)或 replyQueue(但不能同时设置)来设置,这些选项将委托给 RabbitAmqpTemplate 的默认选项。messageIdcorrelationId 请求消息属性可以用于与回复关联。如果缺失,RabbitAmqpTemplate.sendAndReceive() 中的 RpcClient 会生成一个。AmqpClientInboundGateway 能够将此类关联键映射回回复消息。

以下示例演示如何将 AmqpClientInboundGateway 配置为简单的入站网关

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • Java

@Bean
IntegrationFlow amqpClientInboundGatewayFlow(AmqpConnectionFactory connectionFactory) {
    return IntegrationFlow.from(AmqpClient.inboundGateway(connectionFactory, "requestReply"))
            .channel(c -> c.queue("inputChannel"))
            .get();
}
@Bean
fun receiveFlow(connectionFactory: AmqpConnectionFactory) =
        integrationFlow(AmqpClient.inboundGateway(connectionFactory, "requestReply")) {
            channel { queue("inputChannel") }
        }
@Bean
receiveFlow(AmqpConnectionFactory connectionFactory) {
    integrationFlow(AmqpClient.inboundGateway(connectionFactory, 'requestReply')) {
        channel { queue 'inputChannel' }
    }
}
@Bean
AmqpClientInboundGateway amqpClientInboundGateway(AmqpConnectionFactory connectionFactory) {
    AmqpClientInboundGateway amqpClientInboundGateway = new AmqpClientInboundGateway(connectionFactory, "requestReply");
    amqpClientInboundGateway.setRequestChannelName("inputChannel");
    return amqpClientInboundGateway;
}
© . This site is unofficial and not affiliated with VMware.