MQTT 支持

Spring Integration 提供入站和出站通道适配器以支持消息队列遥测传输 (MQTT) 协议。

您需要将此依赖项包含到您的项目中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-mqtt:6.3.5"

当前实现使用 Eclipse Paho MQTT 客户端 库。

XML 配置和本章的大部分内容都与 MQTT v3.1 协议支持和相应的 Paho 客户端有关。有关相应协议支持,请参阅 MQTT v5 支持 段落。

这两个适配器的配置都是通过使用 DefaultMqttPahoClientFactory 来实现的。有关配置选项的更多信息,请参阅 Paho 文档。

我们建议配置一个 MqttConnectOptions 对象并将其注入工厂,而不是在工厂本身上设置(已弃用)选项。

入站 (消息驱动) 通道适配器

入站通道适配器由 MqttPahoMessageDrivenChannelAdapter 实现。为了方便起见,您可以使用命名空间对其进行配置。最小配置可能如下所示

<bean id="clientFactory"
        class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
    <property name="connectionOptions">
        <bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
            <property name="userName" value="${mqtt.username}"/>
            <property name="password" value="${mqtt.password}"/>
        </bean>
    </property>
</bean>

<int-mqtt:message-driven-channel-adapter id="mqttInbound"
    client-id="${mqtt.default.client.id}.src"
    url="${mqtt.url}"
    topics="sometopic"
    client-factory="clientFactory"
    channel="output"/>

以下列表显示了可用的属性

<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
    client-id="foo"  (1)
    url="tcp://127.0.0.1:1883"  (2)
    topics="bar,baz"  (3)
    qos="1,2"  (4)
    converter="myConverter"  (5)
    client-factory="clientFactory"  (6)
    send-timeout="123"  (7)
    error-channel="errors"  (8)
    recovery-interval="10000"  (9)
    manual-acks="false" (10)
    channel="out" />
1 客户端 ID。
2 代理 URL。
3 此适配器从中接收消息的主题的逗号分隔列表。
4 QoS 值的逗号分隔列表。它可以是应用于所有主题的单个值,也可以是每个主题的值(在这种情况下,列表的长度必须相同)。
5 一个 MqttMessageConverter(可选)。默认情况下,默认的 DefaultPahoMessageConverter 会生成一条消息,其负载为 String,并包含以下头信息
  • mqtt_topic:接收消息的主题

  • mqtt_duplicate:如果消息是重复消息,则为 true

  • mqtt_qos:服务质量您可以将 DefaultPahoMessageConverter 配置为在负载中返回原始 byte[],方法是将其声明为 <bean/> 并将 payloadAsBytes 属性设置为 true

6 客户端工厂。
7 send() 超时时间。仅当通道可能阻塞时(例如,当前已满的有界 QueueChannel)才适用。
8 错误通道。如果提供,则将下游异常发送到此通道,并以 ErrorMessage 的形式发送。负载是一个 MessagingException,其中包含失败的消息和原因。
9 恢复间隔。它控制适配器在发生故障后尝试重新连接的间隔。默认为 10000ms(十秒)。
10 确认模式;对于手动确认,设置为 true。
从 4.1 版本开始,您可以省略 URL。相反,您可以在 DefaultMqttPahoClientFactoryserverURIs 属性中提供服务器 URI。这样做可以例如连接到高可用 (HA) 集群。

从 4.2.2 版本开始,当适配器成功订阅主题时,会发布 MqttSubscribedEvent。当连接或订阅失败时,会发布 MqttConnectionFailedEvent 事件。实现 ApplicationListener 的 bean 可以接收这些事件。

此外,一个名为 recoveryInterval 的新属性控制适配器在发生故障后尝试重新连接的间隔。默认为 10000ms(十秒)。

在 4.2.3 版本之前,客户端在适配器停止时始终取消订阅。这是不正确的,因为如果客户端 QOS 大于 0,我们需要保持订阅处于活动状态,以便在适配器停止时到达的消息在下次启动时传递。这也需要将客户端工厂上的 cleanSession 属性设置为 false。默认为 true

从 4.2.3 版本开始,如果 cleanSession 属性为 false,则适配器不会(默认)取消订阅。

可以通过在工厂上设置 consumerCloseAction 属性来覆盖此行为。它可以具有以下值:UNSUBSCRIBE_ALWAYSUNSUBSCRIBE_NEVERUNSUBSCRIBE_CLEAN。后者(默认值)仅在 cleanSession 属性为 true 时取消订阅。

要恢复到 4.2.3 之前的行为,请使用 UNSUBSCRIBE_ALWAYS

从 5.0 版本开始,topicqosretained 属性映射到 .RECEIVED_…​ 头信息(MqttHeaders.RECEIVED_TOPICMqttHeaders.RECEIVED_QOSMqttHeaders.RECEIVED_RETAINED),以避免无意中传播到出站消息,该消息(默认情况下)使用 MqttHeaders.TOPICMqttHeaders.QOSMqttHeaders.RETAINED 头信息。

在运行时添加和删除主题

从 4.1 版本开始,您可以以编程方式更改适配器订阅的主题。Spring Integration 提供了 addTopic()removeTopic() 方法。添加主题时,您可以选择指定 QoS(默认值:1)。您还可以通过向 <control-bus/> 发送具有适当负载的消息来修改主题,例如:"myMqttAdapter.addTopic('foo', 1)"

停止和启动适配器对主题列表没有影响(它不会恢复到配置中的原始设置)。这些更改不会在应用程序上下文的生命周期之外保留。新的应用程序上下文会恢复到配置的设置。

在适配器停止(或与代理断开连接)时更改主题将在下次建立连接时生效。

手动确认

从 5.3 版本开始,您可以将 manualAcks 属性设置为 true。通常用于异步确认传递。当设置为 true 时,头信息 (IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK) 将添加到消息中,其值为 SimpleAcknowledgment。您必须调用 acknowledge() 方法来完成传递。有关更多信息,请参阅 IMqttClientsetManualAcks()messageArrivedComplete() 的 Javadoc。为了方便起见,提供了一个头信息访问器

StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();

5.2.11 版本开始,当消息转换器抛出异常或从 MqttMessage 转换中返回 null 时,MqttPahoMessageDrivenChannelAdapter 会将 ErrorMessage 发送到 errorChannel(如果提供)。否则,将此转换错误重新抛出到 MQTT 客户端回调中。

使用 Java 配置进行配置

以下 Spring Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
                .web(false)
                .run(args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://127.0.0.1:1883", "testClient",
                                                 "topic1", "topic2");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

使用 Java DSL 进行配置

以下 Spring Boot 应用程序提供了使用 Java DSL 配置入站适配器的示例

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow mqttInbound() {
        return IntegrationFlow.from(
                         new MqttPahoMessageDrivenChannelAdapter("tcp://127.0.0.1:1883",
                                        "testClient", "topic1", "topic2"))
                .handle(m -> System.out.println(m.getPayload()))
                .get();
    }

}

出站通道适配器

出站通道适配器由 MqttPahoMessageHandler 实现,该适配器包装在一个 ConsumerEndpoint 中。为了方便起见,您可以使用命名空间对其进行配置。

从 4.1 版本开始,适配器支持异步发送操作,避免阻塞直到确认传递。如果需要,您可以发出应用程序事件以使应用程序能够确认传递。

以下列表显示了出站通道适配器可用的属性

<int-mqtt:outbound-channel-adapter id="withConverter"
    client-id="foo"  (1)
    url="tcp://127.0.0.1:1883"  (2)
    converter="myConverter"  (3)
    client-factory="clientFactory"  (4)
    default-qos="1"  (5)
    qos-expression="" (6)
    default-retained="true"  (7)
    retained-expression="" (8)
    default-topic="bar"  (9)
    topic-expression="" (10)
    async="false"  (11)
    async-events="false"  (12)
    channel="target" />
1 客户端 ID。
2 代理 URL。
3 一个 MqttMessageConverter(可选)。默认的 DefaultPahoMessageConverter 识别以下头信息
  • mqtt_topic:将消息发送到的主题

  • mqtt_retained:如果消息要保留,则为 true

  • mqtt_qos:服务质量

4 客户端工厂。
5 默认的服务质量。如果未找到 mqtt_qos 头信息或 qos-expression 返回 null,则使用它。如果您提供自定义 converter,则不使用它。
6 用于评估以确定 qos 的表达式。默认为 headers[mqtt_qos]
7 保留标志的默认值。如果未找到 mqtt_retained 头信息,则使用它。如果提供自定义 converter,则不使用它。
8 用于评估以确定保留布尔值的表达式。默认为 headers[mqtt_retained]
9 将消息发送到的默认主题(如果未找到 mqtt_topic 头信息)。
10 用于评估以确定目标主题的表达式。默认为 headers['mqtt_topic']
11 true 时,调用者不会阻塞。相反,它在发送消息时等待传递确认。默认为 false(发送操作会阻塞直到确认传递)。
12 asyncasync-events 均为 true 时,会发出 MqttMessageSentEvent(请参阅 事件)。它包含消息、主题、客户端库生成的 messageIdclientIdclientInstance(每次客户端连接时都会递增)。当客户端库确认传递时,会发出 MqttMessageDeliveredEvent。它包含 messageIdclientIdclientInstance,使传递能够与 send() 相关联。任何 ApplicationListener 或事件入站通道适配器都可以接收这些事件。请注意,MqttMessageDeliveredEvent 可能在 MqttMessageSentEvent 之前接收。默认为 false
从 4.1 版本开始,可以省略 URL。相反,可以在 DefaultMqttPahoClientFactoryserverURIs 属性中提供服务器 URI。这可以例如连接到高可用 (HA) 集群。

使用 Java 配置进行配置

以下 Spring Boot 应用程序展示了如何使用 Java 配置来配置出站适配器的示例。

@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                new SpringApplicationBuilder(MqttJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo");
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
        options.setUserName("username");
        options.setPassword("password".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                       new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data);

    }

}

使用 Java DSL 配置

以下 Spring Boot 应用程序提供了一个使用 Java DSL 配置出站适配器的示例。

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

   	@Bean
   	public IntegrationFlow mqttOutboundFlow() {
   	    return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
    }

}

事件

适配器会发布某些应用程序事件。

  • MqttConnectionFailedEvent - 如果连接失败或随后连接丢失,则由两个适配器发布。对于 MQTT v5 Paho 客户端,当服务器执行正常断开连接时,也会发出此事件,在这种情况下,连接丢失的 causenull

  • MqttMessageSentEvent - 如果以异步模式运行,则在消息发送后由出站适配器发布。

  • MqttMessageDeliveredEvent - 如果以异步模式运行,则在客户端指示消息已传递后由出站适配器发布。

  • MqttSubscribedEvent - 在订阅主题后由入站适配器发布。

这些事件可以通过 ApplicationListener<MqttIntegrationEvent> 或使用 @EventListener 方法接收。

要确定事件的来源,请使用以下方法;您可以检查 bean 名称和/或连接选项(以访问服务器 URI 等)。

MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();

MQTT v5 支持

从 5.5.5 版本开始,spring-integration-mqtt 模块提供了用于 MQTT v5 协议的通道适配器实现。org.eclipse.paho:org.eclipse.paho.mqttv5.client 是一个 optional 依赖项,因此必须在目标项目中显式包含。

由于 MQTT v5 协议支持 MQTT 消息中的额外任意属性,因此引入了 MqttHeaderMapper 实现来映射发布和接收操作的标头。默认情况下(通过 * 模式),它映射所有接收到的 PUBLISH 帧属性(包括用户属性)。在出站端,它为 PUBLISH 帧映射此标头子集:contentTypemqtt_messageExpiryIntervalmqtt_responseTopicmqtt_correlationData

MQTT v5 协议的出站通道适配器以 Mqttv5PahoMessageHandler 的形式存在。它需要一个 clientId 和 MQTT 代理 URL 或 MqttConnectionOptions 引用。它支持 MqttClientPersistence 选项,可以是 async,并且在这种情况下可以发出 MqttIntegrationEvent 对象(请参阅 asyncEvents 选项)。如果请求消息有效负载是 org.eclipse.paho.mqttv5.common.MqttMessage,则会通过内部 IMqttAsyncClient 原样发布。如果有效负载是 byte[],则会原样用作目标 MqttMessage 有效负载以进行发布。如果有效负载是 String,则将其转换为 byte[] 以进行发布。其余用例委托给提供的 MessageConverter,该转换器是来自应用程序上下文的 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME ConfigurableCompositeMessageConverter bean。注意:当请求的消息有效负载已经是 MqttMessage 时,不会使用提供的 HeaderMapper<MqttProperties>。以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器。

@Bean
public IntegrationFlow mqttOutFlow() {
    Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
    MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
    mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
    messageHandler.setHeaderMapper(mqttHeaderMapper);
    messageHandler.setAsync(true);
    messageHandler.setAsyncEvents(true);
    messageHandler.setConverter(mqttStringToBytesConverter());

    return f -> f.handle(messageHandler);
}
org.springframework.integration.mqtt.support.MqttMessageConverter 不能与 Mqttv5PahoMessageHandler 一起使用,因为它的契约仅针对 MQTT v3 协议。

如果连接在启动时或运行时失败,Mqttv5PahoMessageHandler 会尝试在发送到此处理程序的下一条消息时重新连接。如果此手动重新连接失败,则连接异常将抛回给调用方。在这种情况下,将应用标准的 Spring Integration 错误处理程序,包括请求处理程序建议,例如重试或断路器。

有关更多信息,请参阅 Mqttv5PahoMessageHandler javadocs 及其超类。

MQTT v5 协议的入站通道适配器以 Mqttv5PahoMessageDrivenChannelAdapter 的形式存在。它需要一个 clientId 和 MQTT 代理 URL 或 MqttConnectionOptions 引用,以及要订阅和从中消费的主题。它支持 MqttClientPersistence 选项,默认情况下为内存中持久化。可以配置预期的 payloadType(默认为 byte[]),它会传播到提供的 SmartMessageConverter 以从接收到的 MqttMessagebyte[] 进行转换。如果设置了 manualAck 选项,则 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 标头将添加到要生成的作为 SimpleAcknowledgment 实例的消息中。HeaderMapper<MqttProperties> 用于将 PUBLISH 帧属性(包括用户属性)映射到目标消息标头。标准的 MqttMessage 属性,如 qosiddupretained 以及接收到的主题始终映射到标头。有关更多信息,请参阅 MqttHeaders

从 6.3 版本开始,Mqttv5PahoMessageDrivenChannelAdapter 提供了基于 MqttSubscription 的构造函数,用于进行细粒度配置,而不是简单的主题名称。当提供这些订阅时,不能使用通道适配器的 qos 选项,因为此类 qos 模式是 MqttSubscription API 的一部分。

以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器。

@Bean
public IntegrationFlow mqttInFlow() {
    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
    messageProducer.setPayloadType(String.class);
    messageProducer.setMessageConverter(mqttStringToBytesConverter());
    messageProducer.setManualAcks(true);

    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}
org.springframework.integration.mqtt.support.MqttMessageConverter 不能与 Mqttv5PahoMessageDrivenChannelAdapter 一起使用,因为它的契约仅针对 MQTT v3 协议。

有关更多信息,请参阅 Mqttv5PahoMessageDrivenChannelAdapter javadocs 及其超类。

建议将 MqttConnectionOptions#setAutomaticReconnect(boolean) 设置为 true,以允许内部 IMqttAsyncClient 实例处理重新连接。否则,只有 Mqttv5PahoMessageDrivenChannelAdapter 的手动重启才能处理重新连接,例如通过断开连接时的 MqttConnectionFailedEvent 处理。

共享 MQTT 客户端支持

如果多个集成需要单个 MQTT ClientID,则不能使用多个 MQTT 客户端实例,因为 MQTT 代理可能对每个 ClientID 的连接数量有限制(通常,只允许单个连接)。为了让单个客户端重复用于不同的通道适配器,可以使用 org.springframework.integration.mqtt.core.ClientManager 组件并将其传递给所需的任何通道适配器。它将管理 MQTT 连接生命周期,并在需要时进行自动重新连接。此外,可以向客户端管理器提供自定义连接选项和 MqttClientPersistence,就像目前可以为通道适配器组件执行的那样。

请注意,MQTT v5 和 v3 通道适配器都受支持。

以下 Java DSL 配置示例演示了如何在集成流中使用此客户端管理器。

@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
    MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
    connectionOptions.setServerURIs(new String[]{ "tcp://127.0.0.1:1883" });
    connectionOptions.setConnectionTimeout(30000);
    connectionOptions.setMaxReconnectDelay(1000);
    connectionOptions.setAutomaticReconnect(true);
    Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5");
    clientManager.setPersistence(new MqttDefaultFilePersistence());
    return clientManager;
}

@Bean
public IntegrationFlow mqttInFlowTopic1(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

@Bean
public IntegrationFlow mqttInFlowTopic2(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

@Bean
public IntegrationFlow mqttOutFlow(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}