ZeroMQ 支持
Spring 集成提供组件以支持应用程序中的 ZeroMQ 通信。该实现基于 JeroMQ 库的良好支持的 Java API。所有组件都封装了 ZeroMQ 套接字生命周期,并在内部为它们管理线程,从而使与这些组件的交互无锁且线程安全。
您需要将此依赖项包含到您的项目中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-zeromq</artifactId>
<version>6.3.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-zeromq:6.3.0"
ZeroMQ 代理
ZeroMqProxy
是一个 Spring 友好的包装器,用于内置的 ZMQ.proxy()
函数。它封装了套接字生命周期和线程管理。此代理的客户端仍然可以使用标准 ZeroMQ 套接字连接和交互 API。除了标准的 ZContext
之外,它还需要一个众所周知的 ZeroMQ 代理模式:SUB/PUB、PULL/PUSH 或 ROUTER/DEALER。这样,一对合适的 ZeroMQ 套接字类型将用于代理的前端和后端。有关详细信息,请参阅 ZeroMqProxy.Type
。
ZeroMqProxy
实现 SmartLifecycle
来创建、绑定和配置套接字,并在专用线程中从 Executor
(如果有)启动 ZMQ.proxy()
。前端和后端套接字的绑定通过 tcp://
协议完成,绑定到所有可用的网络接口,并使用提供的端口。否则,它们将绑定到随机端口,这些端口可以通过相应的 getFrontendPort()
和 getBackendPort()
API 方法在稍后获取。
控制套接字作为 SocketType.PAIR
公开,在 "inproc://" + beanName + ".control"
地址上具有线程间传输;可以通过 getControlAddress()
获取。它应该与来自另一个 SocketType.PAIR
套接字的同一应用程序一起使用,以发送 ZMQ.PROXY_TERMINATE
、ZMQ.PROXY_PAUSE
和/或 ZMQ.PROXY_RESUME
命令。ZeroMqProxy
在调用其生命周期的 stop()
以终止 ZMQ.proxy()
循环并优雅地关闭所有绑定套接字时,执行 ZMQ.PROXY_TERMINATE
命令。
setExposeCaptureSocket(boolean)
选项会导致此组件绑定一个额外的线程间套接字,该套接字使用 SocketType.PUB
来捕获和发布前端和后端套接字之间所有通信,如 ZMQ.proxy()
实现所述。此套接字绑定到 "inproc://" + beanName + ".capture"
地址,并且不期望任何特定订阅进行过滤。
前端和后端套接字可以使用其他属性进行自定义,例如读/写超时或安全性。此自定义可以通过 setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)
和 setBackendSocketConfigurer(Consumer<ZMQ.Socket>)
回调分别提供。
ZeroMqProxy
可以作为简单的 bean 提供,如下所示
@Bean
ZeroMqProxy zeroMqProxy() {
ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
proxy.setExposeCaptureSocket(true);
proxy.setFrontendPort(6001);
proxy.setBackendPort(6002);
return proxy;
}
所有客户端节点都应通过 tcp://
连接到此代理的主机,并使用其感兴趣的相应端口。
ZeroMQ 消息通道
ZeroMqChannel
是一个 SubscribableChannel
,它使用一对 ZeroMQ 套接字来连接发布者和订阅者以进行消息交互。它可以在 PUB/SUB 模式下工作(默认为 PUSH/PULL);它也可以用作本地线程间通道(使用 PAIR
套接字) - 在这种情况下不提供 connectUrl
。在分布式模式下,它必须连接到外部管理的 ZeroMQ 代理,在那里它可以与连接到同一代理的其他类似通道交换消息。连接 URL 选项是标准 ZeroMQ 连接字符串,其中包含协议和主机以及用于 ZeroMQ 代理的前端和后端套接字的一对端口(以冒号分隔)。为了方便起见,如果通道在与代理相同的应用程序中配置,则可以为通道提供 ZeroMqProxy
实例,而不是连接字符串。
发送和接收套接字都在各自的专用线程中管理,这使得该通道对并发友好。这样,我们就可以从不同的线程发布和消费到/从 ZeroMqChannel
中,而无需同步。
默认情况下,ZeroMqChannel
使用 EmbeddedJsonHeadersMessageMapper
将 Message
(包括头信息)从/到 byte[]
进行(反)序列化,使用 Jackson JSON 处理器。此逻辑可以通过 setMessageMapper(BytesMessageMapper)
进行配置。
发送和接收套接字可以通过各自的 setSendSocketConfigurer(Consumer<ZMQ.Socket>)
和 setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>)
回调针对任何选项(读/写超时、安全等)进行自定义。
ZeroMqChannel
的内部逻辑基于 Project Reactor 的 Flux
和 Mono
运算符的响应式流。这提供了更轻松的线程控制,并允许对通道进行无锁并发发布和消费。本地 PUB/SUB 逻辑实现为 Flux.publish()
运算符,以允许所有本地订阅者接收与分布式订阅者到 PUB
套接字相同的已发布消息。
以下是 ZeroMqChannel
配置的简单示例
@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setConnectUrl("tcp://127.0.0.1:6001:6002");
channel.setConsumeDelay(Duration.ofMillis(100));
return channel;
}
ZeroMQ 入站通道适配器
ZeroMqMessageProducer
是一个具有响应式语义的 MessageProducerSupport
实现。它以非阻塞方式持续从 ZeroMQ 套接字读取数据,并将消息发布到一个无限的 Flux
,该 Flux
由 FluxMessageChannel
订阅,或者在 start()
方法中显式订阅,如果输出通道不是响应式的。当套接字上没有收到数据时,将在下一次读取尝试之前应用 consumeDelay
(默认为 1 秒)。
ZeroMqMessageProducer
仅支持 SocketType.PAIR
、SocketType.PULL
和 SocketType.SUB
。此组件可以连接到远程套接字或绑定到 TCP 协议,使用提供的或随机端口。实际端口可以在此组件启动并绑定 ZeroMQ 套接字后通过 getBoundPort()
获取。套接字选项(例如安全或写超时)可以通过 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
回调进行配置。
如果 receiveRaw
选项设置为 true
,则从套接字中消费的 ZMsg
将按原样发送到已生成 Message
的有效负载中:由下游流解析和转换 ZMsg
。否则,将使用 InboundMessageMapper
将消费的数据转换为 Message
。如果接收到的 ZMsg
是多帧的,则第一帧将被视为发布到此 ZeroMQ 消息的 ZeroMqHeaders.TOPIC
头信息。
使用SocketType.SUB
时,ZeroMqMessageProducer
使用提供的topics
选项进行订阅;默认订阅所有主题。可以使用subscribeToTopics()
和unsubscribeFromTopics()
@ManagedOperation
在运行时调整订阅。
以下是一个ZeroMqMessageProducer
配置示例
@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
messageProducer.setOutputChannel(outputChannel);
messageProducer.setTopics("some");
messageProducer.setReceiveRaw(true);
messageProducer.setBindPort(7070);
messageProducer.setConsumeDelay(Duration.ofMillis(100));
return messageProducer;
}
ZeroMQ 出站通道适配器
ZeroMqMessageHandler
是ReactiveMessageHandler
的实现,用于将发布消息发送到 ZeroMQ 套接字。仅支持SocketType.PAIR
、SocketType.PUSH
和SocketType.PUB
。ZeroMqMessageHandler
仅支持连接 ZeroMQ 套接字;不支持绑定。当使用SocketType.PUB
时,topicExpression
会针对请求消息进行评估,以将主题帧注入 ZeroMQ 消息(如果它不为空)。订阅方(SocketType.SUB
)必须先接收主题帧,然后才能解析实际数据。当请求消息的有效负载是ZMsg
时,不会执行转换或主题提取:ZMsg
按原样发送到套接字,并且不会被销毁以供进一步重用。否则,将使用OutboundMessageMapper<byte[]>
将请求消息(或仅其有效负载)转换为 ZeroMQ 帧以进行发布。默认情况下,将使用ConvertingBytesMessageMapper
,并提供ConfigurableCompositeMessageConverter
。可以通过setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
回调配置套接字选项(例如安全或写入超时)。
以下是一个ZeroMqMessageHandler
配置示例
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, "tcp://127.0.0.1:6060", SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}
ZeroMQ Java DSL 支持
spring-integration-zeromq
通过ZeroMq
工厂和IntegrationComponentSpec
实现,为上述组件提供了一个方便的 Java DSL 流式 API。
这是一个ZeroMqChannel
的 Java DSL 示例
.channel(ZeroMq.zeroMqChannel(this.context)
.connectUrl("tcp://127.0.0.1:6001:6002")
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的入站通道适配器是
IntegrationFlow.from(
ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
.connectUrl("tcp://127.0.0.1:9000")
.topics("someTopic")
.receiveRaw(true)
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的出站通道适配器是
.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://127.0.0.1:9001", SocketType.PUB)
.topicFunction(message -> message.getHeaders().get("myTopic")))
}