TCP 消息关联
网关
网关自动关联消息。但是,您应该将出站网关用于相对低容量的应用程序。当您将连接工厂配置为对所有消息对使用单个共享连接('single-use="false"')时,一次只能处理一条消息。新消息必须等到收到上一条消息的回复后才能处理。当连接工厂配置为每个新消息使用新连接('single-use="true"')时,此限制将不适用。虽然此设置可以提供比共享连接环境更高的吞吐量,但它会带来为每个消息对打开和关闭新连接的开销。
因此,对于大量消息,请考虑使用一对协作通道适配器。但是,要这样做,您需要提供协作逻辑。
Spring Integration 2.2 中引入的另一种解决方案是使用CachingClientConnectionFactory
,它允许使用共享连接池。
协作出站和入站通道适配器
为了实现高吞吐量(避免使用网关的缺陷,如前面所述),您可以配置一对协作出站和入站通道适配器。您还可以使用协作适配器(服务器端或客户端)进行完全异步通信(而不是使用请求-回复语义)。在服务器端,消息关联由适配器自动处理,因为入站适配器添加了一个标头,允许出站适配器确定在发送回复消息时使用哪个连接。
在服务器端,您必须填充ip_connectionId 标头,因为它用于将消息与连接关联。源自入站适配器的消息会自动设置该标头。如果您希望构建其他要发送的消息,则需要设置该标头。您可以从传入消息中获取标头值。 |
在客户端,如果需要,应用程序必须提供自己的关联逻辑。您可以通过多种方式做到这一点。
如果消息有效负载具有一些自然关联数据(例如事务 ID 或订单号),并且您无需保留来自原始出站消息的任何信息(例如回复通道标头),则关联很简单,并且无论如何都将在应用程序级别完成。
如果消息有效负载具有一些自然关联数据(例如事务 ID 或订单号),但您需要保留来自原始出站消息的一些信息(例如回复通道标头),则可以保留原始出站消息的副本(可能通过使用发布-订阅通道),并使用聚合器重新组合必要的数据。
对于前两种情况中的任何一种,如果有效负载没有自然关联数据,则可以在出站通道适配器上游提供一个转换器来使用此类数据增强有效负载。此类转换器可能会将原始有效负载转换为包含原始有效负载和消息标头子集的新对象。当然,来自标头的活动对象(例如回复通道)不能包含在转换后的有效负载中。
如果您选择这种策略,则需要确保连接工厂具有合适的序列化器-反序列化器对来处理此类有效负载(例如DefaultSerializer
和DefaultDeserializer
,它们使用 Java 序列化,或自定义序列化器和反序列化器)。TCP 连接工厂中提到的ByteArray*Serializer
选项(包括默认的ByteArrayCrLfSerializer
)不支持此类有效负载,除非转换后的有效负载是String
或byte[]
。
在 2.2 版本之前,当协作通道适配器使用客户端连接工厂时, 这种默认行为在真正异步的环境中并不合适,因此它现在默认为无限超时。您可以通过将客户端连接工厂上的 |
从 5.4 版本开始,多个出站通道适配器和一个TcpInboundChannelAdapter
可以共享同一个连接工厂。这允许应用程序同时支持请求/回复和任意服务器→客户端消息传递。有关更多信息,请参阅TCP 网关。
传输消息头
TCP 是一种流协议。序列化器
和反序列化器
在流中分隔消息。在 3.0 之前,只能通过 TCP 传输消息有效负载(String
或byte[]
)。从 3.0 开始,您也可以传输选定的标头以及有效负载。但是,“活动”对象(例如replyChannel
标头)无法序列化。
通过 TCP 发送标头信息需要一些额外的配置。
第一步是为ConnectionFactory
提供一个MessageConvertingTcpMessageMapper
,它使用mapper
属性。此映射器委托给任何MessageConverter
实现,以将消息转换为可以由配置的serializer
和deserializer
序列化和反序列化的某个对象。
Spring Integration 提供了一个MapMessageConverter
,它允许指定要添加到Map
对象(以及有效负载)的标头列表。生成的 Map 包含两个条目:payload
和headers
。headers
条目本身是一个Map
,包含选定的标头。
第二步是提供一个序列化器和一个反序列化器,它们可以在Map
和某些线格式之间进行转换。这可以是自定义Serializer
或Deserializer
,如果您需要的对等系统不是 Spring Integration 应用程序,则通常需要此类序列化器或反序列化器。
Spring Integration 提供了一个MapJsonSerializer
来将Map
转换为 JSON 并从 JSON 转换回来。它使用 Spring Integration JsonObjectMapper
。如果需要,您可以提供自定义JsonObjectMapper
。默认情况下,序列化器在对象之间插入换行符(0x0a
)字符。有关更多信息,请参阅Javadoc。
JsonObjectMapper 使用类路径上的任何版本的Jackson 。 |
您还可以使用Map
的标准 Java 序列化,方法是使用DefaultSerializer
和DefaultDeserializer
。
以下示例显示了连接工厂的配置,该工厂使用 JSON 传输correlationId
、sequenceNumber
和sequenceSize
标头
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="12345"
mapper="mapper"
serializer="jsonSerializer"
deserializer="jsonSerializer"/>
<bean id="mapper"
class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
<constructor-arg name="messageConverter">
<bean class="o.sf.integration.support.converter.MapMessageConverter">
<property name="headerNames">
<list>
<value>correlationId</value>
<value>sequenceNumber</value>
<value>sequenceSize</value>
</list>
</property>
</bean>
</constructor-arg>
</bean>
<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />
使用上述配置发送的消息,其有效负载为“something”,将在网络上显示如下
{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}