TCP 连接工厂

概述

对于 TCP,底层连接的配置是通过使用连接工厂提供的。提供了两种类型的连接工厂:客户端连接工厂和服务器连接工厂。客户端连接工厂建立出站连接。服务器连接工厂监听传入连接。

出站通道适配器使用客户端连接工厂,但您也可以将对客户端连接工厂的引用提供给入站通道适配器。该适配器接收通过出站适配器创建的连接接收的任何传入消息。

入站通道适配器或网关使用服务器连接工厂。(实际上,连接工厂如果没有服务器连接工厂就无法正常工作)。您也可以将对服务器连接工厂的引用提供给出站适配器。然后,您可以使用该适配器将回复发送到同一连接上的传入消息。

只有当回复包含由连接工厂插入到原始消息中的ip_connectionId标头时,回复消息才会路由到连接。
当在入站和出站适配器之间共享连接工厂时,这是执行消息关联的范围。这种共享允许通过 TCP 进行异步双向通信。默认情况下,仅使用 TCP 传输有效负载信息。因此,任何消息关联都必须由下游组件(如聚合器或其他端点)执行。版本 3.0 中引入了对传输选定头的支持。有关更多信息,请参见 TCP 消息关联

您可以将连接工厂引用最多提供给每种类型的一个适配器。

Spring Integration 提供使用 java.net.Socketjava.nio.channel.SocketChannel 的连接工厂。

以下示例显示了一个使用 java.net.Socket 连接的简单服务器连接工厂

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"/>

以下示例显示了一个使用 java.nio.channel.SocketChannel 连接的简单服务器连接工厂

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    using-nio="true"/>
从 Spring Integration 版本 4.2 开始,如果服务器配置为监听随机端口(通过将端口设置为 0),则可以使用 getPort() 获取操作系统选择的实际端口。此外,getServerSocketAddress() 允许您获取完整的 SocketAddress。有关更多信息,请参见 TcpServerConnectionFactory 接口的 Javadoc
<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"/>

以下示例显示了一个使用 java.net.Socket 连接并为每条消息创建新连接的客户端连接工厂

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"
    using-nio=true/>

从版本 5.2 开始,客户端连接工厂支持属性 connectTimeout,以秒为单位指定,默认值为 60。

消息分隔(序列化器和反序列化器)

TCP 是一种流协议。这意味着必须为通过 TCP 传输的数据提供一些结构,以便接收器可以将数据分隔成离散的消息。连接工厂配置为使用序列化器和反序列化器在消息有效负载和通过 TCP 发送的位之间进行转换。这是通过分别为入站和出站消息提供反序列化器和序列化器来实现的。Spring Integration 提供了许多标准序列化器和反序列化器。

ByteArrayCrlfSerializer* 将字节数组转换为字节流,后跟回车符和换行符(\r\n)。这是默认的序列化器(和反序列化器),可以与 telnet 作为客户端一起使用(例如)。

ByteArraySingleTerminatorSerializer* 将字节数组转换为字节流,后跟单个终止字符(默认值为 0x00)。

ByteArrayLfSerializer* 将字节数组转换为字节流,后跟单个换行符(0x0a)。

ByteArrayStxEtxSerializer* 将字节数组转换为以 STX (0x02) 开头,以 ETX (0x03) 结尾的字节流。

ByteArrayLengthHeaderSerializer 将字节数组转换为以网络字节序 (大端序) 的二进制长度开头的字节流。这是一种高效的反序列化器,因为它不需要解析每个字节来查找终止字符序列。它也可以用于包含二进制数据的有效载荷。前面的序列化器只支持有效载荷中的文本。长度头的默认大小为四个字节(一个整数),允许消息大小高达 (2^31 - 1) 字节。但是,length 头可以是一个字节(无符号)用于大小高达 255 字节的消息,或者一个无符号短整型(2 字节)用于大小高达 (2^16 - 1) 字节的消息。如果您需要其他格式的头部,您可以子类化 ByteArrayLengthHeaderSerializer 并为 readHeaderwriteHeader 方法提供实现。绝对最大数据大小为 (2^31 - 1) 字节。从 5.2 版本开始,头部值可以包含头部长度以及有效载荷。设置 inclusive 属性以启用该机制(它必须在生产者和消费者中设置为相同)。

ByteArrayRawSerializer* 将字节数组转换为字节流,不添加任何额外的消息分隔数据。使用此序列化器(和反序列化器)时,消息的结束由客户端以有序方式关闭套接字来指示。使用此序列化器时,消息接收将挂起,直到客户端关闭套接字或发生超时。超时不会导致消息。当使用此序列化器并且客户端是 Spring Integration 应用程序时,客户端必须使用配置了 single-use="true" 的连接工厂。这样做会导致适配器在发送消息后关闭套接字。序列化器本身不会关闭连接。您应该仅将此序列化器与通道适配器(而不是网关)使用的连接工厂一起使用,并且连接工厂应该由入站或出站适配器使用,而不是两者都使用。另请参阅本节后面的 ByteArrayElasticRawDeserializer。但是,从 5.2 版本开始,出站网关有一个新的属性 closeStreamAfterSend;这允许使用原始序列化器/反序列化器,因为 EOF 会向服务器发出信号,同时保持连接打开以接收回复。

在 4.2.2 版本之前,当使用非阻塞 I/O (NIO) 时,此序列化器将超时(在读取期间)视为文件结束,并且到目前为止读取的数据将作为消息发出。这是不可靠的,不应用于分隔消息。现在它将此类条件视为异常。在您以这种方式使用它的极不可能的情况下,您可以通过将 treatTimeoutAsEndOfMessage 构造函数参数设置为 true 来恢复之前的行为。

这些都是 AbstractByteArraySerializer 的子类,它同时实现了 org.springframework.core.serializer.Serializerorg.springframework.core.serializer.Deserializer。为了向后兼容,使用 AbstractByteArraySerializer 的任何子类进行序列化的连接也接受一个首先转换为字节数组的 String。这些序列化器和反序列化器中的每一个都将包含相应格式的输入流转换为字节数组有效负载。

为了避免由于行为不端的客户端(不遵守配置的序列化器的协议的客户端)导致的内存耗尽,这些序列化器会强制执行最大消息大小。如果传入的消息超过此大小,则会抛出异常。默认最大消息大小为 2048 字节。您可以通过设置 maxMessageSize 属性来增加它。如果您使用默认的序列化器或反序列化器并希望增加最大消息大小,则必须将最大消息大小声明为具有 maxMessageSize 属性设置的显式 bean,并将连接工厂配置为使用该 bean。

在本节前面标记为 * 的类使用中间缓冲区并将解码后的数据复制到正确大小的最终缓冲区。从版本 4.3 开始,您可以通过设置 poolSize 属性来配置这些缓冲区,以让这些原始缓冲区被重用,而不是为每条消息分配和丢弃,这是默认行为。将属性设置为负值将创建一个没有边界的池。如果池是有界的,您还可以设置 poolWaitTimeout 属性(以毫秒为单位),在此之后,如果没有任何缓冲区可用,则会抛出异常。它默认为无穷大。此类异常会导致套接字关闭。

如果您希望在自定义反序列化器中使用相同的机制,您可以扩展 AbstractPooledBufferByteArraySerializer(而不是它的超类 AbstractByteArraySerializer)并实现 doDeserialize() 而不是 deserialize()。缓冲区会自动返回到池中。AbstractPooledBufferByteArraySerializer 还提供了一个方便的实用程序方法:copyToSizedArray()

版本 5.0 添加了 ByteArrayElasticRawDeserializer。这类似于上面 ByteArrayRawSerializer 的反序列化器端,只是不需要设置 maxMessageSize。在内部,它使用一个 ByteArrayOutputStream,它允许缓冲区根据需要增长。客户端必须以有序的方式关闭套接字以发出消息结束的信号。

此反序列化器应该只在对等方可信时使用;它容易受到由于内存不足条件导致的 DoS 攻击。

MapJsonSerializer 使用 Jackson ObjectMapperMap 和 JSON 之间进行转换。您可以将此序列化器与 MessageConvertingTcpMessageMapperMapMessageConverter 结合使用,以在 JSON 中传输选定的标头和有效负载。

Jackson 的 ObjectMapper 无法在流中划分消息。因此,MapJsonSerializer 需要委托给其他序列化器或反序列化器来处理消息划分。默认情况下,使用 ByteArrayLfSerializer,导致消息在网络上的格式为 <json><LF>,但您可以配置它使用其他序列化器。 (下一个示例展示了如何操作。)

最终的标准序列化器是 org.springframework.core.serializer.DefaultSerializer,您可以使用它将可序列化对象转换为 Java 序列化格式。org.springframework.core.serializer.DefaultDeserializer 用于对包含可序列化对象的流进行入站反序列化。

如果您不想使用默认的序列化器和反序列化器 (ByteArrayCrLfSerializer),则必须在连接工厂上设置 serializerdeserializer 属性。以下示例展示了如何操作。

<bean id="javaSerializer"
      class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
      class="org.springframework.core.serializer.DefaultDeserializer" />

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    deserializer="javaDeserializer"
    serializer="javaSerializer"/>

一个使用 java.net.Socket 连接并在网络上使用 Java 序列化的服务器连接工厂。

有关连接工厂上可用属性的完整详细信息,请参阅本节末尾的 参考

默认情况下,不会对入站数据包执行反向 DNS 查询:在未配置 DNS 的环境(例如 Docker 容器)中,这会导致连接延迟。要将 IP 地址转换为主机名以用于消息头,可以通过将 lookup-host 属性设置为 true 来覆盖默认行为。

您还可以修改套接字和套接字工厂的属性。有关更多信息,请参阅 SSL/TLS 支持。如那里所述,如果使用 SSL,或者不使用 SSL,都可以进行此类修改。

自定义序列化器和反序列化器

如果您的数据格式不受任何标准反序列化器支持,您可以实现自己的反序列化器;您也可以实现自定义序列化器。

要实现自定义序列化器和反序列化器对,请实现 org.springframework.core.serializer.Deserializerorg.springframework.core.serializer.Serializer 接口。

当反序列化器检测到消息之间的输入流关闭时,它必须抛出 SoftEndOfStreamException;这是向框架发出的信号,表明关闭是“正常的”。如果在解码消息时流关闭,则应抛出其他异常。

从 5.2 版本开始,SoftEndOfStreamException 现在是 RuntimeException,而不是扩展 IOException

TCP 缓存客户端连接工厂

前面所述,TCP 套接字可以是“单次使用”(一次请求或响应)或共享的。在高流量环境中,共享套接字在出站网关中表现不佳,因为套接字一次只能处理一个请求或响应。

为了提高性能,您可以使用协作通道适配器而不是网关,但这需要应用程序级别的消息关联。有关更多信息,请参见 TCP 消息关联

Spring Integration 2.2 引入了一个缓存客户端连接工厂,它使用一个共享套接字池,允许网关使用一个共享连接池处理多个并发请求。

TCP 故障转移客户端连接工厂

您可以配置一个支持故障转移到一个或多个其他服务器的 TCP 连接工厂。发送消息时,工厂会遍历所有配置的工厂,直到消息能够发送或找不到连接为止。最初,使用配置列表中的第一个工厂。如果连接随后失败,下一个工厂将成为当前工厂。以下示例展示了如何配置故障转移客户端连接工厂

<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
    <constructor-arg>
        <list>
            <ref bean="clientFactory1"/>
            <ref bean="clientFactory2"/>
        </list>
    </constructor-arg>
</bean>
使用故障转移连接工厂时,singleUse 属性必须在工厂本身及其配置的工厂列表之间保持一致。

连接工厂有两个与回退相关的属性,当与共享连接(singleUse=false)一起使用时

  • refreshSharedInterval

  • closeOnRefresh

考虑以下基于上述配置的场景:假设 clientFactory1 无法建立连接,但 clientFactory2 可以。当 refreshSharedInterval 过后调用 failCFgetConnection() 方法时,我们将再次尝试使用 clientFactory1 建立连接;如果成功,将关闭与 clientFactory2 的连接。如果 closeOnRefreshfalse,则“旧”连接将保持打开状态,如果第一个工厂再次失败,则可以在将来重用该连接。

refreshSharedInterval 设置为仅在该时间到期后才尝试重新连接到第一个工厂;如果只想在当前连接失败时才回退到第一个工厂,则将其设置为 Long.MAX_VALUE(默认值)。

closeOnRefresh 设置为在刷新实际创建新连接后关闭“旧”连接。

如果任何委托工厂是 CachingClientConnectionFactory,则这些属性不适用,因为连接缓存是在那里处理的;在这种情况下,将始终查询连接工厂列表以获取连接。

从 5.3 版开始,这些默认设置为 Long.MAX_VALUEtrue,因此工厂仅在当前连接失败时才尝试回退。要恢复到先前版本的默认行为,请将其设置为 0false

另请参见 测试连接

TCP 线程亲和力连接工厂

Spring Integration 5.0 版引入了此连接工厂。它将连接绑定到调用线程,并且每次该线程发送消息时都会重用相同的连接。这将持续到连接关闭(由服务器或网络关闭)或直到线程调用 releaseConnection() 方法为止。连接本身由另一个客户端工厂实现提供,该实现必须配置为提供非共享(单次使用)连接,以便每个线程都获得一个连接。

以下示例展示了如何配置 TCP 线程亲和力连接工厂

@Bean
public TcpNetClientConnectionFactory cf() {
    TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
            Integer.parseInt(System.getProperty(PORT)));
    cf.setSingleUse(true);
    return cf;
}

@Bean
public ThreadAffinityClientConnectionFactory tacf() {
    return new ThreadAffinityClientConnectionFactory(cf());
}

@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
    TcpOutboundGateway outGate = new TcpOutboundGateway();
    outGate.setConnectionFactory(tacf());
    outGate.setReplyChannelName("toString");
    return outGate;
}