TCP 消息关联

IP 端点的一个目标是与非 Spring Integration 应用程序的系统进行通信。因此,默认情况下只发送和接收消息载荷。从 3.0 版本开始,您可以使用 JSON、Java 序列化或自定义序列化器和反序列化器来传输消息头。更多信息请参见传输消息头。框架本身(除了使用网关时)或服务器端的协作通道适配器不提供消息关联。在本文档后面部分,我们将讨论应用程序可用的各种关联技术。在大多数情况下,即使消息载荷包含一些自然的关联数据(例如订单号),这也需要特定的应用程序级消息关联。

网关

网关会自动关联消息。但是,对于流量相对较低的应用,您应该使用出站网关。当您将连接工厂配置为对所有消息对使用单个共享连接('single-use="false"')时,一次只能处理一个消息。新消息必须等待接收到前一个消息的回复后才能处理。当连接工厂配置为每个新消息使用一个新的连接('single-use="true"')时,此限制不适用。尽管此设置可以提供比共享连接环境更高的吞吐量,但它会带来为每对消息打开和关闭新连接的开销。

因此,对于高流量消息,考虑使用一对协作的通道适配器。然而,这样做您需要提供协作逻辑。

另一种解决方案,在 Spring Integration 2.2 中引入,是使用 CachingClientConnectionFactory,它允许使用共享连接池。

协作的出站和入站通道适配器

为了实现高吞吐量(避免使用网关的陷阱,如前面提到的),您可以配置一对协作的出站和入站通道适配器。您还可以使用协作适配器(服务器端或客户端)进行完全异步通信(而不是请求-回复语义)。在服务器端,消息关联由适配器自动处理,因为入站适配器添加了一个消息头,允许出站适配器确定发送回复消息时使用哪个连接。

在服务器端,您必须填充 ip_connectionId 消息头,因为它用于将消息与连接关联起来。源自入站适配器的消息会自动设置此消息头。如果您希望构建其他消息来发送,则需要设置此消息头。您可以从入站消息中获取消息头值。

在客户端,如果需要,应用程序必须提供自己的关联逻辑。

您可以通过多种方式实现。如果消息载荷包含一些自然的关联数据(例如事务 ID 或订单号),并且您无需保留原始出站消息中的任何信息(例如回复通道消息头),则关联很简单,并且无论如何都将在应用程序级别完成。

如果消息载荷包含一些自然的关联数据(例如事务 ID 或订单号),但您需要保留原始出站消息中的一些信息(例如回复通道消息头),您可以保留原始出站消息的副本(例如使用发布-订阅通道),并使用聚合器重新组合所需的数据。

对于前面两种情况中的任何一种,如果载荷没有自然的关联数据,您可以在出站通道适配器的上游提供一个转换器来增强载荷,添加此类数据。这样的转换器可以将原始载荷转换为一个新对象,该对象包含原始载荷和部分消息头。当然,消息头中的“实时”对象(例如回复通道)不能包含在转换后的载荷中。

如果您选择这种策略,需要确保连接工厂具有适当的序列化器-反序列化器对来处理此类载荷(例如使用 Java 序列化的 DefaultSerializerDefaultDeserializer,或自定义序列化器和反序列化器)。TCP 连接工厂中提到的 ByteArray*Serializer 选项,包括默认的 ByteArrayCrLfSerializer,除非转换后的载荷是 Stringbyte[],否则不支持此类载荷。

在 2.2 版本之前,当协作通道适配器使用客户端连接工厂时,so-timeout 属性默认为默认回复超时(10 秒)。这意味着,如果入站适配器在此期间没有收到数据,则套接字将被关闭。

此默认行为在真正的异步环境中不适用,因此现在默认为无限超时。您可以通过在客户端连接工厂上将 so-timeout 属性设置为 10000 毫秒来恢复之前的默认行为。

从 5.4 版本开始,多个出站通道适配器和一个 TcpInboundChannelAdapter 可以共享同一个连接工厂。这允许应用程序同时支持请求/回复和任意的服务器 → 客户端消息传递。更多信息请参见TCP 网关

传输消息头

TCP 是一种流协议。SerializersDeserializers 在流中分隔消息。在 3.0 之前,只能通过 TCP 传输消息载荷(Stringbyte[])。从 3.0 版本开始,您也可以传输选定的消息头以及载荷。然而,“实时”对象,例如 replyChannel 消息头,无法序列化。

通过 TCP 发送消息头信息需要一些额外的配置。

第一步是为 ConnectionFactory 提供一个使用 mapper 属性的 MessageConvertingTcpMessageMapper。此 mapper 委托给任何 MessageConverter 实现,将消息转换为可通过配置的 serializerdeserializer 进行序列化和反序列化的对象,反之亦然。

Spring Integration 提供了一个 MapMessageConverter,它允许指定一个消息头列表,这些消息头连同载荷一起添加到 Map 对象中。生成的 Map 有两个条目:payloadheadersheaders 条目本身是一个 Map,包含选定的消息头。

第二步是提供一个序列化器和反序列化器,可以在 Map 和某种线路格式之间进行转换。这可以是一个自定义的 SerializerDeserializer,如果对端系统不是 Spring Integration 应用程序,通常需要这样做。

Spring Integration 提供了一个 MapJsonSerializer,用于将 Map 转换为 JSON,反之亦然。它使用一个 Spring Integration JsonObjectMapper。如果需要,您可以提供一个自定义的 JsonObjectMapper。默认情况下,序列化器在对象之间插入换行符(0x0a)。更多信息请参见Javadoc

JsonObjectMapper 使用类路径中可用的任何版本的 Jackson

您也可以使用标准的 Java 序列化 Map,通过使用 DefaultSerializerDefaultDeserializer

以下示例展示了一个连接工厂的配置,该配置使用 JSON 传输 correlationIdsequenceNumbersequenceSize 消息头

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="12345"
    mapper="mapper"
    serializer="jsonSerializer"
    deserializer="jsonSerializer"/>

<bean id="mapper"
      class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
    <constructor-arg name="messageConverter">
        <bean class="o.sf.integration.support.converter.MapMessageConverter">
            <property name="headerNames">
                <list>
                    <value>correlationId</value>
                    <value>sequenceNumber</value>
                    <value>sequenceSize</value>
                </list>
            </property>
        </bean>
    </constructor-arg>
</bean>

<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />

使用前面配置发送的消息,载荷为 'something',在网络线路上将如下显示

{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}