TCP连接工厂
概述
对于TCP,底层连接的配置是通过连接工厂提供的。提供了两种类型的连接工厂:客户端连接工厂和服务器连接工厂。客户端连接工厂建立出站连接。服务器连接工厂监听入站连接。
出站通道适配器使用客户端连接工厂,但是您也可以向入站通道适配器提供对客户端连接工厂的引用。该适配器接收通过出站适配器创建的连接接收到的任何入站消息。
入站通道适配器或网关使用服务器连接工厂。(事实上,如果没有服务器连接工厂,连接工厂就无法工作)。您也可以向出站适配器提供对服务器连接工厂的引用。然后,您可以使用该适配器向同一连接上的入站消息发送回复。
只有当回复包含连接工厂插入到原始消息中的ip_connectionId 标头时,回复消息才会路由到连接。 |
这是在入站和出站适配器之间共享连接工厂时执行的消息关联的全部内容。这种共享允许通过TCP进行异步双向通信。默认情况下,只有有效负载信息使用TCP传输。因此,任何消息关联都必须由下游组件(例如聚合器或其他端点)执行。对传输选定标头的支持是在3.0版中引入的。有关更多信息,请参见TCP消息关联。 |
您可以最多向每种类型的一个适配器提供对连接工厂的引用。
Spring Integration提供使用java.net.Socket
和java.nio.channel.SocketChannel
的连接工厂。
以下示例显示了一个使用java.net.Socket
连接的简单服务器连接工厂
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"/>
以下示例显示了一个使用java.nio.channel.SocketChannel
连接的简单服务器连接工厂
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
using-nio="true"/>
从Spring Integration 4.2版开始,如果服务器配置为监听随机端口(通过将端口设置为0 ),则可以使用getPort() 获取操作系统选择的实际端口。此外,getServerSocketAddress() 允许您获取完整的SocketAddress 。有关更多信息,请参见TcpServerConnectionFactory 接口的Javadoc。 |
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"/>
以下示例显示了一个使用java.net.Socket
连接并为每条消息创建一个新连接的客户端连接工厂
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"
using-nio=true/>
从5.2版开始,客户端连接工厂支持以秒为单位指定的属性connectTimeout
,默认为60。
消息分隔符(序列化器和反序列化器)
TCP是一种流协议。这意味着必须为通过TCP传输的数据提供一些结构,以便接收器可以将数据划分为离散的消息。连接工厂配置为使用序列化器和反序列化器在消息有效负载和通过TCP发送的位之间进行转换。这是通过分别为入站和出站消息提供反序列化器和序列化器来实现的。Spring Integration提供许多标准的序列化器和反序列化器。
ByteArrayCrlfSerializer
* 将字节数组转换为字节流,后跟回车符和换行符 (\r\n
)。这是默认的序列化器(和反序列化器),可以(例如)与telnet作为客户端一起使用。
ByteArraySingleTerminatorSerializer
* 将字节数组转换为字节流,后跟单个终止字符(默认为0x00
)。
ByteArrayLfSerializer
* 将字节数组转换为字节流,后跟单个换行符 (0x0a
)。
ByteArrayStxEtxSerializer
* 将字节数组转换为字节流,前面是STX (0x02
),后面是ETX (0x03
)。
ByteArrayLengthHeaderSerializer
将字节数组转换为字节流,前面是网络字节序(大端序)中的二进制长度。这是一个高效的反序列化器,因为它不必解析每个字节来查找终止字符序列。它也可以用于包含二进制数据的有效负载。前面的序列化器只支持有效负载中的文本。长度标头的默认大小为四个字节(一个整数),允许消息最多为 (2^31 - 1) 字节。但是,对于最多255字节的消息,length
标头可以是一个字节(无符号),或者对于最多 (2^16 - 1) 字节的消息,可以是一个无符号短整型(2字节)。如果您需要标头的任何其他格式,您可以对ByteArrayLengthHeaderSerializer
进行子类化,并为readHeader
和writeHeader
方法提供实现。绝对最大数据大小为 (2^31 - 1) 字节。从5.2版开始,标头值除了有效负载外还可以包含标头的长度。设置inclusive
属性以启用该机制(它必须对生产者和消费者设置为相同)。
ByteArrayRawSerializer
* 将字节数组转换为字节流,并且不添加任何其他消息分隔符数据。使用此序列化器(和反序列化器),消息的结尾由客户端以有序的方式关闭套接字指示。使用此序列化器时,消息接收将挂起,直到客户端关闭套接字或发生超时。超时不会导致消息。当使用此序列化器并且客户端是Spring Integration应用程序时,客户端必须使用配置了single-use="true"
的连接工厂。这样做会导致适配器在发送消息后关闭套接字。序列化器本身不会关闭连接。您应该只将此序列化器与通道适配器(而不是网关)使用的连接工厂一起使用,并且连接工厂应该由入站或出站适配器使用,而不是两者都使用。另请参见本节后面的ByteArrayElasticRawDeserializer
。但是,从5.2版开始,出站网关有一个新的属性closeStreamAfterSend
;这允许使用原始序列化器/反序列化器,因为EOF被发送到服务器,同时保持连接打开以接收回复。
在4.2.2版之前,当使用非阻塞I/O(NIO)时,此序列化器将超时(在读取期间)视为文件结尾,并且到目前为止读取的数据作为消息发出。这是不可靠的,不应将其用于分隔消息。它现在将此类条件视为异常。如果您以这种方式使用它(不太可能),则可以通过将treatTimeoutAsEndOfMessage 构造函数参数设置为true 来恢复之前的行为。 |
这些都是AbstractByteArraySerializer
的子类,它同时实现了org.springframework.core.serializer.Serializer
和org.springframework.core.serializer.Deserializer
。为了向后兼容,使用AbstractByteArraySerializer
的任何子类进行序列化的连接也接受首先转换为字节数组的String
。这些序列化器和反序列化器中的每一个都将包含相应格式的输入流转换为字节数组有效负载。
为了避免由于行为不良的客户端(一个不遵守已配置序列化器协议的客户端)导致内存耗尽,这些序列化器会施加最大消息大小。如果传入消息超过此大小,则会引发异常。默认最大消息大小为2048字节。您可以通过设置maxMessageSize
属性来增加它。如果您使用默认序列化器或反序列化器并希望增加最大消息大小,则必须使用已设置maxMessageSize
属性的显式bean声明最大消息大小,并将连接工厂配置为使用该bean。
前面本节中标有*的类使用中间缓冲区并将解码后的数据复制到正确大小的最终缓冲区。从4.3版开始,您可以通过设置poolSize
属性来配置这些缓冲区,以允许重用这些原始缓冲区,而不是为每条消息分配和丢弃它们,这是默认行为。将属性设置为负值会创建一个没有边界的池。如果池是有界的,您还可以设置poolWaitTimeout
属性(以毫秒为单位),在此之后,如果没有缓冲区可用,则会引发异常。它默认为无限大。此类异常会导致套接字关闭。
如果您希望在自定义反序列化器中使用相同的机制,您可以扩展AbstractPooledBufferByteArraySerializer
(而不是其超类AbstractByteArraySerializer
),并实现doDeserialize()
而不是deserialize()
。缓冲区会自动返回到池中。AbstractPooledBufferByteArraySerializer
还提供了一个方便的实用程序方法:copyToSizedArray()
。
5.0版添加了ByteArrayElasticRawDeserializer
。这类似于上面的ByteArrayRawSerializer
的反序列化器端,除了不必设置maxMessageSize
。在内部,它使用一个ByteArrayOutputStream
,允许缓冲区根据需要增长。客户端必须以有序的方式关闭套接字以发出消息结束信号。
只有当对等方可信时才应使用此反序列化器;它容易受到由于内存不足条件导致的DoS攻击。 |
MapJsonSerializer
使用Jackson ObjectMapper
在Map
和JSON之间进行转换。您可以将此序列化器与MessageConvertingTcpMessageMapper
和MapMessageConverter
结合使用,以JSON格式传输选定的标头和有效负载。
Jackson ObjectMapper 无法在流中分隔消息。因此,MapJsonSerializer 需要委托给另一个序列化器或反序列化器来处理消息分隔符。默认情况下,使用ByteArrayLfSerializer ,导致线路上的消息格式为<json><LF> ,但您可以将其配置为使用其他序列化器。(下一个示例显示如何执行此操作。) |
最终的标准序列化器是org.springframework.core.serializer.DefaultSerializer
,您可以使用它来使用Java序列化转换可序列化对象。org.springframework.core.serializer.DefaultDeserializer
用于对包含可序列化对象的流进行入站反序列化。
如果您不希望使用默认的序列化器和反序列化器 (ByteArrayCrLfSerializer
),则必须在连接工厂上设置serializer
和deserializer
属性。以下示例显示了如何执行此操作
<bean id="javaSerializer"
class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
class="org.springframework.core.serializer.DefaultDeserializer" />
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
deserializer="javaDeserializer"
serializer="javaSerializer"/>
一个使用java.net.Socket
连接并在线路上传输使用Java序列化的服务器连接工厂。
有关连接工厂上可用属性的完整详细信息,请参见本节结尾处的参考。
默认情况下,不会对入站数据包执行反向DNS查找:在未配置DNS的环境(例如Docker容器)中,这可能会导致连接延迟。为了将IP地址转换为主机名以在消息标头中使用,可以通过将lookup-host
属性设置为true
来覆盖默认行为。
您还可以修改套接字和套接字工厂的属性。有关更多信息,请参见SSL/TLS支持。如其中所述,如果使用或不使用SSL,都可以进行此类修改。 |
自定义序列化器和反序列化器
如果您的数据格式不受任何标准反序列化器支持,您可以实现自己的反序列化器;您还可以实现自定义序列化器。
要实现自定义序列化器和反序列化器对,请实现org.springframework.core.serializer.Deserializer
和org.springframework.core.serializer.Serializer
接口。
当反序列化器检测到消息之间的输入流已关闭时,它必须抛出SoftEndOfStreamException
;这是向框架发出的信号,指示关闭是“正常的”。如果在解码消息时流已关闭,则应改为抛出其他异常。
从5.2版开始,SoftEndOfStreamException
现在是RuntimeException
,而不是扩展IOException
。
TCP故障转移客户端连接工厂
您可以配置一个支持故障转移到一个或多个其他服务器的TCP连接工厂。发送消息时,工厂会遍历其所有已配置的工厂,直到消息可以发送或找不到连接为止。最初,使用配置列表中的第一个工厂。如果连接随后失败,则下一个工厂将成为当前工厂。以下示例显示如何配置故障转移客户端连接工厂。
<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
<constructor-arg>
<list>
<ref bean="clientFactory1"/>
<ref bean="clientFactory2"/>
</list>
</constructor-arg>
</bean>
使用故障转移连接工厂时,singleUse 属性必须在工厂本身及其配置使用的工厂列表之间保持一致。 |
当与共享连接一起使用(singleUse=false
)时,连接工厂有两个与回退相关的属性:
-
refreshSharedInterval
-
closeOnRefresh
考虑以下基于上述配置的场景:假设clientFactory1
无法建立连接,但clientFactory2
可以。当refreshSharedInterval
时间过去后调用failCF
的getConnection()
方法时,我们将再次尝试使用clientFactory1
进行连接;如果成功,则关闭与clientFactory2
的连接。如果closeOnRefresh
为false
,“旧”连接将保持打开状态,如果第一个工厂再次失败,则将来可能会重用。
设置refreshSharedInterval
,仅在该时间过去后才尝试重新连接第一个工厂;如果只想在当前连接失败时回退到第一个工厂,则将其设置为Long.MAX_VALUE
(默认值)。
设置closeOnRefresh
,在刷新实际创建新连接后关闭“旧”连接。
如果任何委托工厂是CachingClientConnectionFactory ,则这些属性不适用,因为连接缓存在那里处理;在这种情况下,将始终查询连接工厂列表以获取连接。 |
从 5.3 版本开始,这些默认值为 Long.MAX_VALUE
和 true
,因此工厂仅在当前连接失败时才尝试回退。要恢复到先前版本的默认行为,请将其设置为 0
和 false
。
另请参见测试连接。
TCP线程关联连接工厂
Spring Integration 5.0 版本引入了此连接工厂。它将连接绑定到调用线程,并且每次该线程发送消息时都会重用相同的连接。这种情况会持续到连接关闭(由服务器或网络关闭)或线程调用releaseConnection()
方法为止。连接本身由另一个客户端工厂实现提供,该实现必须配置为提供非共享(单次使用)连接,以便每个线程都获得一个连接。
以下示例显示如何配置 TCP 线程关联连接工厂。
@Bean
public TcpNetClientConnectionFactory cf() {
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
Integer.parseInt(System.getProperty(PORT)));
cf.setSingleUse(true);
return cf;
}
@Bean
public ThreadAffinityClientConnectionFactory tacf() {
return new ThreadAffinityClientConnectionFactory(cf());
}
@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
TcpOutboundGateway outGate = new TcpOutboundGateway();
outGate.setConnectionFactory(tacf());
outGate.setReplyChannelName("toString");
return outGate;
}