TCP 连接工厂
概览
对于 TCP,通过使用连接工厂来配置底层连接。提供了两种类型的连接工厂:客户端连接工厂和服务器连接工厂。客户端连接工厂建立出站连接,服务器连接工厂监听入站连接。
出站通道适配器使用客户端连接工厂,但您也可以将客户端连接工厂的引用提供给入站通道适配器。该适配器接收由出站适配器创建的连接上收到的任何入站消息。
入站通道适配器或网关使用服务器连接工厂。(事实上,连接工厂必须配合其中之一才能工作)。您也可以将服务器连接工厂的引用提供给出站适配器。然后,您可以使用该适配器在同一连接上向入站消息发送回复。
回复消息仅在回复包含连接工厂插入到原始消息中的 ip_connectionId 头部时才路由到该连接。 |
这是在入站和出站适配器之间共享连接工厂时执行的消息关联的程度。这种共享允许通过 TCP 进行异步双向通信。默认情况下,仅通过 TCP 传输载荷信息。因此,任何消息关联必须由下游组件(如聚合器或其他端点)执行。从 3.0 版本开始引入了传输选定头部的支持。更多信息请参见TCP 消息关联。 |
您最多可以将连接工厂的引用提供给每种类型的一个适配器。
Spring Integration 提供了使用 java.net.Socket
和 java.nio.channel.SocketChannel
的连接工厂。
以下示例展示了一个使用 java.net.Socket
连接的简单服务器连接工厂
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"/>
以下示例展示了一个使用 java.nio.channel.SocketChannel
连接的简单服务器连接工厂
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
using-nio="true"/>
从 Spring Integration 4.2 版本开始,如果服务器配置为监听随机端口(通过将端口设置为 0 ),您可以使用 getPort() 获取操作系统选择的实际端口。此外,getServerSocketAddress() 允许您获取完整的 SocketAddress 。更多信息请参见TcpServerConnectionFactory 接口的 Javadoc。 |
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"/>
以下示例展示了一个使用 java.net.Socket
连接并为每条消息创建新连接的客户端连接工厂
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"
using-nio=true/>
从 5.2 版本开始,客户端连接工厂支持以秒为单位指定的 connectTimeout
属性,该属性默认为 60。
消息边界划分(序列化器和反序列化器)
TCP 是一个流协议。这意味着需要为通过 TCP 传输的数据提供一些结构,以便接收方可以将数据划分为独立的消息。连接工厂配置为使用序列化器和反序列化器,分别用于在消息载荷与通过 TCP 发送的字节之间进行转换。这通过分别为入站和出站消息提供反序列化器和序列化器来完成。Spring Integration 提供了许多标准序列化器和反序列化器。
ByteArrayCrlfSerializer
* 将字节数组转换为字节流,后跟回车符和换行符 (\r\n
)。这是默认的序列化器(和反序列化器),例如可以与 telnet 作为客户端一起使用。
ByteArraySingleTerminatorSerializer
* 将字节数组转换为字节流,后跟单个终止字符(默认为 0x00
)。
ByteArrayLfSerializer
* 将字节数组转换为字节流,后跟单个换行符 (0x0a
)。
ByteArrayStxEtxSerializer
* 将字节数组转换为字节流,前面是 STX (0x02
),后面是 ETX (0x03
)。
ByteArrayLengthHeaderSerializer
将字节数组转换为字节流,前面是一个网络字节序(大端序)的二进制长度。这是一种高效的反序列化器,因为它无需解析每个字节来查找终止字符序列。它也可用于包含二进制数据的载荷。前面的序列化器仅支持载荷中的文本。长度头部默认大小为四个字节(一个 Integer),允许的消息最大可达 (2^31 - 1) 字节。但是,length
头部可以是单字节(无符号),用于最大 255 字节的消息,或无符号短整型(2 字节),用于最大 (2^16 - 1) 字节的消息。如果您需要其他格式的头部,可以子类化 ByteArrayLengthHeaderSerializer
并实现 readHeader
和 writeHeader
方法。绝对最大数据大小是 (2^31 - 1) 字节。从 5.2 版本开始,头部值可以包含头部自身的长度以及载荷长度。设置 inclusive
属性启用该机制(生产者和消费者必须设置相同)。
ByteArrayRawSerializer
* 将字节数组转换为字节流,并且不添加额外的消息边界划分数据。使用此序列化器(和反序列化器)时,消息的结束由客户端有序关闭套接字来指示。使用此序列化器时,消息接收将一直阻塞,直到客户端关闭套接字或发生超时。超时不会产生消息。当使用此序列化器且客户端是 Spring Integration 应用时,客户端必须使用配置了 single-use="true"
的连接工厂。这样做会导致适配器在发送消息后关闭套接字。序列化器本身不会关闭连接。您应该仅在通道适配器(而非网关)使用的连接工厂中使用此序列化器,并且连接工厂应该由入站或出站适配器之一使用,但不能两者都用。另请参见本节后面的 ByteArrayElasticRawDeserializer
。然而,从 5.2 版本开始,出站网关有一个新属性 closeStreamAfterSend
;这允许使用原始序列化器/反序列化器,因为 EOF 会被发送到服务器,同时保持连接打开以接收回复。
在 4.2.2 版本之前,使用非阻塞 I/O (NIO) 时,此序列化器会将(读取期间的)超时视为文件结束,并将已读取的数据作为消息发出。这是不可靠的,不应该用于划分消息边界。现在将此类情况视为异常。如果您在这种情况下使用它(尽管不推荐),可以通过将 treatTimeoutAsEndOfMessage 构造函数参数设置为 true 来恢复以前的行为。 |
它们都是 AbstractByteArraySerializer
的子类,实现了 org.springframework.core.serializer.Serializer
和 org.springframework.core.serializer.Deserializer
接口。为了向后兼容,使用 AbstractByteArraySerializer
的任何子类进行序列化的连接也接受 String
,该 String
会先转换为字节数组。每个序列化器和反序列化器都将包含相应格式的输入流转换为字节数组载荷。
为避免因行为不端的客户端(不遵守配置的序列化器协议的客户端)导致内存耗尽,这些序列化器强制设置了最大消息大小。如果入站消息超出此大小,将抛出异常。默认最大消息大小为 2048 字节。您可以通过设置 maxMessageSize
属性来增加它。如果您使用默认序列化器或反序列化器并希望增加最大消息大小,必须将最大消息大小声明为一个显式的 bean,设置好 maxMessageSize
属性,并配置连接工厂使用该 bean。
本节前面标记有 * 的类使用中间缓冲区,并将解码后的数据复制到正确大小的最终缓冲区。从 4.3 版本开始,可以通过设置 poolSize
属性来配置这些缓冲区,让这些原始缓冲区被重用,而不是为每条消息分配和丢弃,这是默认行为。将属性设置为负值会创建一个无界的池。如果池有界,您还可以设置 poolWaitTimeout
属性(以毫秒为单位),超过此时间后,如果没有可用缓冲区,则抛出异常。它默认为无限。此类异常会导致套接字关闭。
如果您希望在自定义反序列化器中使用相同的机制,可以扩展 AbstractPooledBufferByteArraySerializer
(而不是其超类 AbstractByteArraySerializer
)并实现 doDeserialize()
而不是 deserialize()
。缓冲区会自动返回到池中。AbstractPooledBufferByteArraySerializer
还提供了一个便捷的实用方法:copyToSizedArray()
。
5.0 版本添加了 ByteArrayElasticRawDeserializer
。这类似于上面的 ByteArrayRawSerializer
的反序列化器部分,不同之处在于无需设置 maxMessageSize
。在内部,它使用 ByteArrayOutputStream
,允许缓冲区按需增长。客户端必须有序关闭套接字以指示消息结束。
此反序列化器仅应在对等方受信任时使用;它容易受到因内存不足条件导致的 DoS 攻击。 |
MapJsonSerializer
使用 Jackson ObjectMapper
在 Map
和 JSON 之间进行转换。可以结合 MessageConvertingTcpMessageMapper
和 MapMessageConverter
使用此序列化器,以 JSON 格式传输选定的头部和载荷。
Jackson ObjectMapper 无法在流中划分消息边界。因此,MapJsonSerializer 需要委托给另一个序列化器或反序列化器来处理消息边界划分。默认情况下,使用 ByteArrayLfSerializer ,导致在线上传输的消息格式为 <json><LF> ,但您可以配置它使用其他序列化器。(下一个示例将展示如何操作。) |
最后一个标准序列化器是 org.springframework.core.serializer.DefaultSerializer
,可用于通过 Java 序列化转换可序列化对象。提供了 org.springframework.core.serializer.DefaultDeserializer
用于入站反序列化包含可序列化对象的流。
如果您不想使用默认序列化器和反序列化器 (ByteArrayCrLfSerializer
),必须在连接工厂上设置 serializer
和 deserializer
属性。以下示例展示了如何操作
<bean id="javaSerializer"
class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
class="org.springframework.core.serializer.DefaultDeserializer" />
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
deserializer="javaDeserializer"
serializer="javaSerializer"/>
一个使用 java.net.Socket
连接并在传输中使用 Java 序列化的服务器连接工厂。
有关连接工厂可用属性的完整详细信息,请参见本节末尾的参考。
默认情况下,不对入站数据包执行反向 DNS 查找:在未配置 DNS 的环境(例如 Docker 容器)中,这可能导致连接延迟。要将 IP 地址转换为用于消息头的主机名,可以通过将 lookup-host
属性设置为 true
来覆盖默认行为。
您还可以修改套接字和套接字工厂的属性。更多信息请参见SSL/TLS 支持。如该部分所述,无论是否使用 SSL,此类修改都是可能的。 |
主机验证
从 5.1.0 版本开始,默认启用主机验证以增强安全性。此功能确保在 TCP 连接期间验证服务器的身份。
如果您遇到需要禁用主机验证的场景(不推荐),可以在 tcp-connection-factory 中配置 socket-support 属性。
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="0"
socket-support="customSocketSupport"
single-use="true"
so-timeout="10000"/>
<bean id="customSocketSupport" class="org.springframework.integration.ip.tcp.connection.DefaultTcpSocketSupport">
<constructor-arg value="false" />
</bean>
自定义序列化器和反序列化器
如果您的数据格式不受标准反序列化器支持,您可以实现自定义反序列化器;您也可以实现自定义序列化器。
要实现自定义序列化器和反序列化器对,实现 org.springframework.core.serializer.Deserializer
和 org.springframework.core.serializer.Serializer
接口。
当反序列化器在消息之间检测到输入流已关闭时,必须抛出 SoftEndOfStreamException
;这是向框架发出的信号,表明关闭是“正常”的。如果在解码消息时流关闭,则应抛出其他异常。
从 5.2 版本开始,SoftEndOfStreamException
现在是 RuntimeException
的子类,而不是继承自 IOException
。
TCP 故障转移客户端连接工厂
可以配置支持故障转移到一个或多个其他服务器的 TCP 连接工厂。发送消息时,工厂会遍历其配置的所有工厂,直到消息可以发送或找不到连接。最初,使用配置列表中的第一个工厂。如果后续连接失败,下一个工厂将成为当前工厂。以下示例展示了如何配置故障转移客户端连接工厂
<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
<constructor-arg>
<list>
<ref bean="clientFactory1"/>
<ref bean="clientFactory2"/>
</list>
</constructor-arg>
</bean>
使用故障转移连接工厂时,工厂本身的 singleUse 属性必须与其配置使用的工厂列表保持一致。 |
与共享连接 (singleUse=false
) 一起使用时,连接工厂有两个与故障恢复相关的属性
-
refreshSharedInterval
-
closeOnRefresh
考虑基于上述配置的以下场景:假设 clientFactory1
无法建立连接,但 clientFactory2
可以。当在 refreshSharedInterval
超时后调用 failCF
的 getConnection()
方法时,我们将再次尝试使用 clientFactory1
连接;如果成功,与 clientFactory2
的连接将被关闭。如果 closeOnRefresh
为 false
,则“旧”连接将保持打开,如果第一个工厂再次失败,将来可能会被重用。
设置 refreshSharedInterval
,以便仅在该时间过期后才尝试重新连接到第一个工厂;将其设置为 Long.MAX_VALUE
(默认),如果您只希望在当前连接失败时才故障恢复到第一个工厂。
设置 closeOnRefresh
,以便在刷新实际创建新连接后关闭“旧”连接。
如果任何委托工厂是 CachingClientConnectionFactory ,则这些属性不适用,因为连接缓存是在那里处理的;在这种情况下,总是会查阅连接工厂列表来获取连接。 |
从 5.3 版本开始,这些属性默认为 Long.MAX_VALUE
和 true
,因此工厂仅在当前连接失败时才尝试故障恢复。要恢复到之前版本的默认行为,将它们设置为 0
和 false
。
另请参见测试连接。
TCP 线程亲和性连接工厂
Spring Integration 5.0 版本引入了此连接工厂。它将连接绑定到调用线程,该线程每次发送消息时都会重用相同的连接。这会一直持续,直到连接被关闭(由服务器或网络)或直到线程调用 releaseConnection()
方法。连接本身由另一个客户端工厂实现提供,该实现必须配置为提供非共享(单次使用)连接,以便每个线程获取一个连接。
以下示例展示了如何配置 TCP 线程亲和性连接工厂
@Bean
public TcpNetClientConnectionFactory cf() {
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
Integer.parseInt(System.getProperty(PORT)));
cf.setSingleUse(true);
return cf;
}
@Bean
public ThreadAffinityClientConnectionFactory tacf() {
return new ThreadAffinityClientConnectionFactory(cf());
}
@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
TcpOutboundGateway outGate = new TcpOutboundGateway();
outGate.setConnectionFactory(tacf());
outGate.setReplyChannelName("toString");
return outGate;
}