TCP 连接工厂

概述

对于 TCP,底层连接的配置通过使用连接工厂提供。提供了两种类型的连接工厂:客户端连接工厂和服务器连接工厂。客户端连接工厂建立出站连接。服务器连接工厂监听入站连接。

出站通道适配器使用客户端连接工厂,但你也可以向入站通道适配器提供对客户端连接工厂的引用。该适配器接收由出站适配器创建的连接上接收到的任何入站消息。

入站通道适配器或网关使用服务器连接工厂。(实际上,没有它,连接工厂无法工作)。你还可以向出站适配器提供对服务器连接工厂的引用。然后,你可以使用该适配器通过同一连接发送对入站消息的回复。

只有当回复包含由连接工厂插入到原始消息中的 ip_connectionId 标头时,回复消息才会被路由到连接。
这就是在入站和出站适配器之间共享连接工厂时执行的消息关联的程度。这种共享允许通过 TCP 进行异步双向通信。默认情况下,只使用 TCP 传输有效负载信息。因此,任何消息关联必须由下游组件(例如聚合器或其他端点)执行。版本 3.0 中引入了对传输选定标头的支持。有关更多信息,请参阅TCP 消息关联

你可以为每种类型最多一个适配器提供对连接工厂的引用。

Spring Integration 提供了使用 java.net.Socketjava.nio.channel.SocketChannel 的连接工厂。

以下示例显示了一个使用 java.net.Socket 连接的简单服务器连接工厂

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"/>

以下示例显示了一个使用 java.nio.channel.SocketChannel 连接的简单服务器连接工厂

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    using-nio="true"/>
从 Spring Integration 4.2 版开始,如果服务器配置为在随机端口(通过将端口设置为 0)上侦听,则可以使用 getPort() 获取操作系统选择的实际端口。此外,getServerSocketAddress() 允许你获取完整的 SocketAddress。有关更多信息,请参阅TcpServerConnectionFactory 接口的 Javadoc
<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"/>

以下示例显示了一个使用 java.net.Socket 连接并为每个消息创建新连接的客户端连接工厂

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"
    using-nio=true/>

从 5.2 版本开始,客户端连接工厂支持 connectTimeout 属性,以秒为单位指定,默认为 60。

消息划分(序列化器和反序列化器)

TCP 是一种流协议。这意味着必须为通过 TCP 传输的数据提供一些结构,以便接收方可以将数据划分为离散的消息。连接工厂被配置为使用序列化器和反序列化器在消息有效负载和通过 TCP 发送的位之间进行转换。这通过分别为入站和出站消息提供反序列化器和序列化器来实现。Spring Integration 提供了许多标准的序列化器和反序列化器。

ByteArrayCrlfSerializer* 将字节数组转换为字节流,后跟回车符和换行符 (\r\n)。这是默认的序列化器(和反序列化器),可以与 telnet 作为客户端一起使用(例如)。

ByteArraySingleTerminatorSerializer* 将字节数组转换为字节流,后跟单个终止字符(默认为 0x00)。

ByteArrayLfSerializer* 将字节数组转换为字节流,后跟单个换行符 (0x0a)。

ByteArrayStxEtxSerializer* 将字节数组转换为字节流,前缀为 STX (0x02),后跟 ETX (0x03)。

ByteArrayLengthHeaderSerializer 将字节数组转换为字节流,前缀为网络字节顺序(大端)的二进制长度。这是一种高效的反序列化器,因为它不必解析每个字节来查找终止字符序列。它还可以用于包含二进制数据的有效负载。前面的序列化器仅支持有效负载中的文本。长度标头的默认大小为四个字节(一个整数),允许消息最大为 (2^31 - 1) 字节。但是,对于最大为 255 字节的消息,length 标头可以是单个字节(无符号),或者对于最大为 (2^16 - 1) 字节的消息,可以是无符号短整型(2 字节)。如果需要其他任何标头格式,你可以子类化 ByteArrayLengthHeaderSerializer 并为 readHeaderwriteHeader 方法提供实现。绝对最大数据大小为 (2^31 - 1) 字节。从 5.2 版本开始,标头值可以包含标头本身的长度以及有效负载的长度。设置 inclusive 属性以启用该机制(它必须在生产者和消费者之间设置为相同)。

ByteArrayRawSerializer* 将字节数组转换为字节流,并且不添加任何额外的消息划分数据。使用此序列化器(和反序列化器),消息的结束由客户端以有序方式关闭套接字来指示。使用此序列化器时,消息接收会一直挂起,直到客户端关闭套接字或发生超时。超时不会导致消息。当使用此序列化器并且客户端是 Spring Integration 应用程序时,客户端必须使用配置了 single-use="true" 的连接工厂。这样做会导致适配器在发送消息后关闭套接字。序列化器本身不会关闭连接。你应仅将此序列化器与通道适配器(而非网关)使用的连接工厂一起使用,并且连接工厂应由入站或出站适配器之一使用,而不是两者都使用。另请参阅本节后面的 ByteArrayElasticRawDeserializer。但是,从 5.2 版本开始,出站网关有一个新的属性 closeStreamAfterSend;这允许使用原始序列化器/反序列化器,因为 EOF 会发送给服务器,同时保持连接打开以接收回复。

在 4.2.2 版本之前,当使用非阻塞 I/O (NIO) 时,此序列化器将(读取期间的)超时视为文件结束,并且到目前为止读取的数据作为消息发出。这是不可靠的,不应用于分隔消息。它现在将此类情况视为异常。在极少数情况下,如果你以这种方式使用它,你可以通过将 treatTimeoutAsEndOfMessage 构造函数参数设置为 true 来恢复以前的行为。

这些都是 AbstractByteArraySerializer 的子类,它实现了 org.springframework.core.serializer.Serializerorg.springframework.core.serializer.Deserializer。为了向后兼容,使用 AbstractByteArraySerializer 的任何子类进行序列化的连接也接受首先转换为字节数组的 String。这些序列化器和反序列化器都将包含相应格式的输入流转换为字节数组有效负载。

为了避免由于行为不当的客户端(不遵守配置的序列化器协议的客户端)导致内存耗尽,这些序列化器施加了最大消息大小。如果传入消息超过此大小,则会抛出异常。默认最大消息大小为 2048 字节。你可以通过设置 maxMessageSize 属性来增加它。如果你使用默认的序列化器或反序列化器并希望增加最大消息大小,则必须将最大消息大小声明为具有设置 maxMessageSize 属性的显式 bean,并配置连接工厂以使用该 bean。

本节前面标记为*的类使用中间缓冲区并将解码数据复制到正确大小的最终缓冲区。从 4.3 版本开始,你可以通过设置 poolSize 属性来配置这些缓冲区,以使这些原始缓冲区可以重用,而不是为每个消息分配和丢弃,这是默认行为。将属性设置为负值会创建一个没有边界的池。如果池有边界,你还可以设置 poolWaitTimeout 属性(以毫秒为单位),在此之后,如果没有可用的缓冲区,则会抛出异常。它默认为无限。此类异常会导致套接字关闭。

如果你希望在自定义反序列化器中使用相同的机制,你可以扩展 AbstractPooledBufferByteArraySerializer(而不是其超类 AbstractByteArraySerializer)并实现 doDeserialize() 而不是 deserialize()。缓冲区会自动返回到池中。AbstractPooledBufferByteArraySerializer 还提供了一个方便的实用方法:copyToSizedArray()

5.0 版本添加了 ByteArrayElasticRawDeserializer。这类似于上述 ByteArrayRawSerializer 的反序列化器端,只是它不需要设置 maxMessageSize。在内部,它使用 ByteArrayOutputStream,允许缓冲区根据需要增长。客户端必须以有序方式关闭套接字以表示消息结束。

此反序列化器仅应在信任对等方时使用;它容易受到由于内存不足条件导致的 DoS 攻击。

MapJsonSerializer 使用 Jackson ObjectMapperMap 和 JSON 之间进行转换。你可以将此序列化器与 MessageConvertingTcpMessageMapperMapMessageConverter 结合使用,以 JSON 格式传输选定的标头和有效负载。

Jackson ObjectMapper 无法在流中划分消息。因此,MapJsonSerializer 需要委托给另一个序列化器或反序列化器来处理消息划分。默认情况下,使用 ByteArrayLfSerializer,导致消息以 <json><LF> 格式在线传输,但你可以将其配置为使用其他序列化器。(下一个示例显示了如何做到这一点。)

最后一个标准序列化器是 org.springframework.core.serializer.DefaultSerializer,你可以使用它通过 Java 序列化转换可序列化对象。org.springframework.core.serializer.DefaultDeserializer 用于入站流的反序列化,这些流包含可序列化对象。

如果你不希望使用默认的序列化器和反序列化器 (ByteArrayCrLfSerializer),则必须在连接工厂上设置 serializerdeserializer 属性。以下示例显示了如何做到这一点

<bean id="javaSerializer"
      class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
      class="org.springframework.core.serializer.DefaultDeserializer" />

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    deserializer="javaDeserializer"
    serializer="javaSerializer"/>

一个服务器连接工厂,它使用 java.net.Socket 连接并在网络上使用 Java 序列化。

有关连接工厂上可用属性的完整详细信息,请参阅本节末尾的参考

默认情况下,对入站数据包不执行反向 DNS 查找:在未配置 DNS 的环境中(例如 Docker 容器),这可能导致连接延迟。为了将 IP 地址转换为用于消息头的 hostname,可以通过将 lookup-host 属性设置为 true 来覆盖默认行为。

你还可以修改套接字和套接字工厂的属性。有关更多信息,请参阅SSL/TLS 支持。如那里所述,无论是否使用 SSL,都可以进行此类修改。

主机验证

从 5.1.0 版本开始,默认启用主机验证以增强安全性。此功能确保在 TCP 连接期间验证服务器的身份。

如果你遇到需要禁用主机验证的场景(不推荐),你可以在 tcp-connection-factory 中配置 socket-support 属性。

<int-ip:tcp-connection-factory id="client"
                                type="client"
                                host="localhost"
                                port="0"
                                socket-support="customSocketSupport"
                                single-use="true"
                                so-timeout="10000"/>

<bean id="customSocketSupport" class="org.springframework.integration.ip.tcp.connection.DefaultTcpSocketSupport">
	<constructor-arg value="false" />
</bean>

自定义序列化器和反序列化器

如果你的数据格式不受任何标准反序列化器支持,你可以实现自己的;你也可以实现自定义序列化器。

要实现自定义序列化器和反序列化器对,请实现 org.springframework.core.serializer.Deserializerorg.springframework.core.serializer.Serializer 接口。

当反序列化器在消息之间检测到已关闭的输入流时,它必须抛出 SoftEndOfStreamException;这是向框架发出的信号,表明关闭是“正常”的。如果在解码消息时流关闭,则应抛出其他异常。

从 5.2 版本开始,SoftEndOfStreamException 现在是一个 RuntimeException,而不是扩展 IOException

TCP 缓存客户端连接工厂

前所述,TCP 套接字可以是“一次性使用”(一个请求或响应)或共享的。共享套接字在高并发环境中与出站网关配合不良,因为套接字一次只能处理一个请求或响应。

为了提高性能,你可以使用协作通道适配器而不是网关,但这需要应用程序级别的消息关联。有关更多信息,请参阅TCP 消息关联

Spring Integration 2.2 引入了缓存客户端连接工厂,它使用共享套接字池,允许网关使用共享连接池处理多个并发请求。

TCP 故障转移客户端连接工厂

你可以配置一个支持故障转移到一个或多个其他服务器的 TCP 连接工厂。发送消息时,工厂会遍历其所有配置的工厂,直到可以发送消息或找不到连接为止。最初,使用配置列表中的第一个工厂。如果连接随后失败,则下一个工厂成为当前工厂。以下示例显示了如何配置故障转移客户端连接工厂

<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
    <constructor-arg>
        <list>
            <ref bean="clientFactory1"/>
            <ref bean="clientFactory2"/>
        </list>
    </constructor-arg>
</bean>
使用故障转移连接工厂时,singleUse 属性必须在工厂本身和它配置使用的工厂列表之间保持一致。

当与共享连接(singleUse=false)一起使用时,连接工厂有两个与回退相关的属性

  • refreshSharedInterval

  • closeOnRefresh

考虑以下基于上述配置的场景:假设 clientFactory1 无法建立连接,但 clientFactory2 可以。当 refreshSharedInterval 过去后调用 failCFgetConnection() 方法时,我们将再次尝试使用 clientFactory1 连接;如果成功,与 clientFactory2 的连接将被关闭。如果 closeOnRefreshfalse,则“旧”连接将保持打开状态,如果第一个工厂再次失败,将来可能会被重用。

refreshSharedInterval 设置为仅在该时间过期后尝试重新连接到第一个工厂;如果你只想在当前连接失败时回退到第一个工厂,则将其设置为 Long.MAX_VALUE(默认值)。

设置 closeOnRefresh 以在刷新实际创建新连接后关闭“旧”连接。

如果任何委托工厂是 CachingClientConnectionFactory,则这些属性不适用,因为连接缓存是在那里处理的;在这种情况下,将始终咨询连接工厂列表以获取连接。

从 5.3 版本开始,这些属性默认为 Long.MAX_VALUEtrue,因此工厂仅在当前连接失败时尝试回退。要恢复到以前版本的默认行为,请将它们设置为 0false

另请参阅测试连接

TCP 线程亲和性连接工厂

Spring Integration 5.0 版本引入了此连接工厂。它将连接绑定到调用线程,并且每次该线程发送消息时都会重用相同的连接。这会一直持续到连接关闭(由服务器或网络)或线程调用 releaseConnection() 方法为止。连接本身由另一个客户端工厂实现提供,该实现必须配置为提供非共享(一次性使用)连接,以便每个线程获得一个连接。

以下示例显示了如何配置 TCP 线程亲和性连接工厂

@Bean
public TcpNetClientConnectionFactory cf() {
    TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
            Integer.parseInt(System.getProperty(PORT)));
    cf.setSingleUse(true);
    return cf;
}

@Bean
public ThreadAffinityClientConnectionFactory tacf() {
    return new ThreadAffinityClientConnectionFactory(cf());
}

@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
    TcpOutboundGateway outGate = new TcpOutboundGateway();
    outGate.setConnectionFactory(tacf());
    outGate.setReplyChannelName("toString");
    return outGate;
}
© . This site is unofficial and not affiliated with VMware.