ZeroMQ 支持

Spring Integration 提供组件来支持应用程序中的 ZeroMQ 通信。该实现基于 JeroMQ 库良好支持的 Java API。所有组件都封装了 ZeroMQ socket 的生命周期并在内部管理它们的线程,从而使这些组件的交互是无锁和线程安全的。

您需要将此依赖项包含到您的项目中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-zeromq</artifactId>
    <version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-zeromq:6.4.4"

ZeroMQ Proxy

ZeroMqProxy 是内置的 ZMQ.proxy() 函数 的 Spring 友好型包装器。它封装了 socket 生命周期和线程管理。此 proxy 的客户端仍然可以使用标准的 ZeroMQ socket 连接和交互 API。除了标准的 ZContext 外,它还需要一种众所周知的 ZeroMQ proxy 模式:SUB/PUB、PULL/PUSH 或 ROUTER/DEALER。这样,针对 proxy 的前端和后端使用了适当的 ZeroMQ socket 类型对。详情请参阅 ZeroMqProxy.Type

ZeroMqProxy 实现了 SmartLifecycle,用于创建、绑定和配置 socket,并在(如果存在)Executor 的专用线程中启动 ZMQ.proxy()。前端和后端 socket 的绑定是通过 tcp:// 协议完成的,绑定到所有可用的网络接口上的指定端口。否则,它们会绑定到随机端口,随后可以通过相应的 getFrontendPort()getBackendPort() API 方法获取这些端口。

控制 socket 以 SocketType.PAIR 类型通过进程间传输暴露在 "inproc://" + beanName + ".control" 地址上;可以通过 getControlAddress() 获取。它应该与同一应用程序中使用另一个 SocketType.PAIR socket 结合使用,以发送 ZMQ.PROXY_TERMINATEZMQ.PROXY_PAUSE 和/或 ZMQ.PROXY_RESUME 命令。当调用其生命周期的 stop() 方法时,ZeroMqProxy 会执行 ZMQ.PROXY_TERMINATE 命令,以优雅地终止 ZMQ.proxy() 循环并关闭所有绑定的 socket。

setExposeCaptureSocket(boolean) 选项会使此组件绑定一个额外的、带有 SocketType.PUB 的进程间 socket,以捕获和发布前端和后端 socket 之间的所有通信,正如 ZMQ.proxy() 实现所述。此 socket 绑定到 "inproc://" + beanName + ".capture" 地址,并且不期望任何特定的订阅来进行过滤。

前端和后端 socket 可以通过附加属性进行定制,例如读写超时或安全性。这种定制可通过 setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)setBackendSocketConfigurer(Consumer<ZMQ.Socket>) 回调函数分别实现。

ZeroMqProxy 可以作为简单的 bean 提供,如下所示

@Bean
ZeroMqProxy zeroMqProxy() {
    ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
    proxy.setExposeCaptureSocket(true);
    proxy.setFrontendPort(6001);
    proxy.setBackendPort(6002);
    return proxy;
}

所有客户端节点应通过 tcp:// 连接到此 proxy 的主机,并使用其感兴趣的相应端口。

ZeroMQ 消息通道

ZeroMqChannel 是一个 SubscribableChannel,它使用一对 ZeroMQ socket 来连接发布者和订阅者进行消息交互。它可以在 PUB/SUB 模式下工作(默认为 PUSH/PULL);它也可以用作本地进程间通道(使用 PAIR socket)——在这种情况下不提供 connectUrl。在分布式模式下,它必须连接到外部管理的 ZeroMQ proxy,在那里它可以与连接到同一 proxy 的其他类似通道交换消息。连接 url 选项是一个标准的 ZeroMQ 连接字符串,包含协议、主机以及 ZeroMQ proxy 前端和后端 socket 的一对用冒号分隔的端口。为了方便起见,如果 ZeroMqProxy 实例与通道配置在同一个应用程序中,则可以向通道提供 ZeroMqProxy 实例而不是连接字符串。

发送和接收 socket 都在各自的专用线程中进行管理,使得此通道具有并发友好性。这样,我们可以在不同的线程中发布消息到 ZeroMqChannel 并从中消费消息,而无需同步。

默认情况下,ZeroMqChannel 使用 EmbeddedJsonHeadersMessageMapper 来使用 Jackson JSON 处理器将 Message (包括头信息) 序列化/反序列化为 byte[]。此逻辑可以通过 setMessageMapper(BytesMessageMapper) 进行配置。

发送和接收 socket 可以通过各自的 setSendSocketConfigurer(Consumer<ZMQ.Socket>)setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>) 回调函数定制任何选项(读写超时、安全性等)。

ZeroMqChannel 的内部逻辑基于通过 Project Reactor 的 FluxMono 操作符实现的响应式流。这提供了更容易的线程控制,并允许从通道进行无锁并发发布和消费。本地 PUB/SUB 逻辑实现为 Flux.publish() 操作符,允许连接到此通道的所有本地订阅者接收相同的已发布消息,就像连接到 PUB socket 的分布式订阅者一样。

以下是一个 ZeroMqChannel 配置的简单示例

@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
    ZeroMqChannel channel = new ZeroMqChannel(context, true);
    channel.setConnectUrl("tcp://localhost:6001:6002");
    channel.setConsumeDelay(Duration.ofMillis(100));
    return channel;
}

ZeroMQ 入站通道适配器

ZeroMqMessageProducer 是一个具有响应式语义的 MessageProducerSupport 实现。它以非阻塞方式持续从 ZeroMQ socket 读取数据,并将消息发布到一个无限的 Flux 中,该 FluxFluxMessageChannel 订阅,如果输出通道不是响应式的,则在 start() 方法中显式订阅。当 socket 上没有接收到数据时,在下一次读取尝试之前会应用一个 consumeDelay(默认为 1 秒)。

ZeroMqMessageProducer 仅支持 SocketType.PAIRSocketType.PULLSocketType.SUB。此组件可以连接到远程 socket,或者通过提供的或随机端口绑定到 TCP 协议。在组件启动且 ZeroMQ socket 绑定后,实际端口可以通过 getBoundPort() 获取。socket 选项(例如,安全性或写入超时)可以通过 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) 回调进行配置。

如果 receiveRaw 选项设置为 true,则从 socket 消费的 ZMsg 会原样作为生成 Message 的 payload 发送:由下游流负责解析和转换 ZMsg。否则,使用 InboundMessageMapper 将消费的数据转换为 Message。如果收到的 ZMsg 是多帧的,则第一帧被视为此 ZeroMQ 消息发布到的 ZeroMqHeaders.TOPIC 头。

如果 unwrapTopic 选项设置为 false,则认为入站消息由两帧组成:主题和 ZeroMQ 消息。否则,默认情况下,认为 ZMsg 由三帧组成:第一帧包含主题,最后一帧包含消息,中间有一个空帧。

对于 SocketType.SUBZeroMqMessageProducer 使用提供的 topics 选项进行订阅;默认为订阅所有主题。可以使用 subscribeToTopics()unsubscribeFromTopics() @ManagedOperation 在运行时调整订阅。

以下是 ZeroMqMessageProducer 配置的示例

@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
    ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
    messageProducer.setOutputChannel(outputChannel);
    messageProducer.setTopics("some");
    messageProducer.setReceiveRaw(true);
    messageProducer.setBindPort(7070);
    messageProducer.setConsumeDelay(Duration.ofMillis(100));
    return messageProducer;
}

ZeroMQ 出站通道适配器

ZeroMqMessageHandler 是一个 ReactiveMessageHandler 实现,用于将消息发布到 ZeroMQ socket。仅支持 SocketType.PAIRSocketType.PUSHSocketType.PUB。此组件可以连接到远程 socket,或者通过提供的或随机端口绑定到 TCP 协议。在组件启动且 ZeroMQ socket 绑定后,实际端口可以通过 getBoundPort() 获取。

当使用 SocketType.PUB 时,会针对请求消息评估 topicExpression,以便在 ZeroMQ 消息中注入主题帧(如果不是 null)。订阅者端(SocketType.SUB)必须先接收主题帧,然后才能解析实际数据。

如果 wrapTopic 选项设置为 false,则 ZeroMQ 消息帧会在注入的主题(如果存在)之后发送。默认情况下,在主题和消息之间会发送一个额外的空帧。

当请求消息的 payload 是 ZMsg 时,不执行转换或主题提取:ZMsg 原样发送到 socket,并且不会被销毁以便后续重用。否则,使用 OutboundMessageMapper<byte[]> 将请求消息(或其 payload)转换为 ZeroMQ 帧进行发布。默认情况下,使用带有 ConfigurableCompositeMessageConverterConvertingBytesMessageMapper。socket 选项(例如,安全性或写入超时)可以通过 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) 回调进行配置。

以下是连接到 socket 的 ZeroMqMessageHandler 配置示例

@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
    ZeroMqMessageHandler messageHandler =
                  new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB);
    messageHandler.setTopicExpression(
                  new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
    messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}

以下是绑定到指定端口的 ZeroMqMessageHandler 配置示例

@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
    ZeroMqMessageHandler messageHandler =
                  new ZeroMqMessageHandler(context, 7070, SocketType.PUB);
    messageHandler.setTopicExpression(
                  new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
    messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}

ZeroMQ Java DSL 支持

spring-integration-zeromq 通过 ZeroMq 工厂和 IntegrationComponentSpec 实现为上述组件提供了方便的 Java DSL 流畅 API。

这是 ZeroMqChannel 的 Java DSL 示例

.channel(ZeroMq.zeroMqChannel(this.context)
            .connectUrl("tcp://localhost:6001:6002")
            .consumeDelay(Duration.ofMillis(100)))
}

用于 ZeroMQ Java DSL 的入站通道适配器是

IntegrationFlow.from(
            ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
                        .connectUrl("tcp://localhost:9000")
                        .topics("someTopic")
                        .receiveRaw(true)
                        .consumeDelay(Duration.ofMillis(100)))
}

用于 ZeroMQ Java DSL 的出站通道适配器是

.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB)
                  .topicFunction(message -> message.getHeaders().get("myTopic")))
}