消息存储

企业集成模式》(EIP)一书中识别了几种能够缓冲消息的模式。例如,聚合器会缓冲消息,直到可以释放它们,而QueueChannel会缓冲消息,直到消费者显式地从该通道接收这些消息。由于在消息流中的任何点都可能发生故障,因此缓冲消息的EIP组件也引入了消息可能丢失的点。

为了降低消息丢失的风险,EIP 定义了消息存储模式,该模式允许EIP组件存储消息,通常存储在某种类型的持久存储(例如RDBMS)中。

Spring Integration 通过以下方式支持消息存储模式:

  • 定义一个org.springframework.integration.store.MessageStore策略接口

  • 提供此接口的多个实现

  • 在所有具有缓冲消息能力的组件上公开一个message-store属性,以便您可以注入任何实现MessageStore接口的实例。

关于如何配置特定消息存储实现以及如何将MessageStore实现注入到特定缓冲组件中的详细信息在整个手册中都有描述(请参阅特定组件,例如QueueChannelAggregatorDelayer等)。以下两个示例展示了如何为QueueChannel和聚合器添加对消息存储的引用

QueueChannel
<int:channel id="myQueueChannel">
    <int:queue message-store="refToMessageStore"/>
<int:channel>
聚合器
<int:aggregator message-store="refToMessageStore"/>

默认情况下,消息使用o.s.i.store.SimpleMessageStoreMessageStore的一个实现)存储在内存中。对于开发或简单的低容量环境(其中非持久消息的潜在丢失不是问题)来说,这可能就足够了。但是,典型的生产应用程序需要更强大的选项,不仅是为了降低消息丢失的风险,而且是为了避免潜在的内存不足错误。因此,我们还为各种数据存储提供了MessageStore实现。以下是受支持实现的完整列表

但是,请注意在使用MessageStore的持久实现时存在一些限制。

消息数据(有效负载和标头)使用不同的序列化策略进行序列化和反序列化,具体取决于MessageStore的实现。例如,当使用JdbcMessageStore时,默认情况下仅持久化Serializable数据。在这种情况下,在序列化发生之前会删除不可序列化的标头。此外,请注意传输适配器(如FTP、HTTP、JMS等)注入的协议特定标头。例如,<http:inbound-channel-adapter/>将HTTP标头映射到消息标头,其中之一是不可序列化的org.springframework.http.MediaType实例的ArrayList。但是,您可以将您自己的SerializerDeserializer策略接口实现注入到某些MessageStore实现(如JdbcMessageStore)中,以更改序列化和反序列化的行为。

请特别注意表示某些类型数据的标头。例如,如果其中一个标头包含某个Spring Bean的实例,则在反序列化后,您最终可能会得到该Bean的不同实例,这会直接影响框架创建的一些隐式标头(如REPLY_CHANNELERROR_CHANNEL)。目前,它们不可序列化,但是,即使它们可以序列化,反序列化的通道也不会表示预期的实例。

从Spring Integration 3.0版本开始,您可以通过配置标头丰富器来解决此问题,该丰富器在使用HeaderChannelRegistry注册通道后将这些标头替换为名称。

此外,请考虑当您按如下方式配置消息流时会发生什么情况:网关→队列通道(由持久消息存储支持)→服务激活器。该网关创建了一个临时回复通道,该通道在服务激活器的轮询器从队列读取时会丢失。同样,您可以使用标头丰富器将标头替换为String表示形式。

有关更多信息,请参阅标头丰富器

Spring Integration 4.0 引入了两个新接口

  • ChannelMessageStore:实现特定于QueueChannel实例的操作

  • PriorityCapableChannelMessageStore:标记要用于PriorityChannel实例的MessageStore实现,并为持久消息提供优先级顺序。

实际行为取决于实现。框架提供以下实现,可用作QueueChannelPriorityChannel的持久MessageStore

关于SimpleMessageStore的注意事项

从4.1版本开始,SimpleMessageStore在调用getMessageGroup()时不再复制消息组。对于大型消息组,这是一个重大的性能问题。4.0.1引入了布尔型copyOnGet属性,允许您控制此行为。当聚合器在内部使用时,此属性设置为false以提高性能。现在默认值为false

除了聚合器等组件之外,访问组存储的用户现在会获得对聚合器正在使用的组的直接引用,而不是副本。在聚合器外部操作组可能会导致不可预测的结果。

因此,您应该要么不执行此类操作,要么将copyOnGet属性设置为true

使用MessageGroupFactory

从4.3版本开始,一些MessageGroupStore实现可以注入自定义的MessageGroupFactory策略来创建和自定义MessageGroupStore使用的MessageGroup实例。这默认为SimpleMessageGroupFactory,它基于GroupType.HASH_SETLinkedHashSet)内部集合生成SimpleMessageGroup实例。其他可能的选项是SYNCHRONISED_SETBLOCKING_QUEUE,其中最后一个选项可用于恢复以前的SimpleMessageGroup行为。此外,PERSISTENT选项可用。有关更多信息,请参阅下一节。从5.0.1版本开始,LIST选项也可用于消息在组中的顺序和唯一性无关紧要的情况。

持久MessageGroupStore和延迟加载

从4.3版本开始,所有持久MessageGroupStore实例都以延迟加载的方式从存储中检索MessageGroup实例及其messages。在大多数情况下,这对于相关MessageHandler实例(请参阅AggregatorResequencer)很有用,因为在每次相关操作中从存储中加载整个MessageGroup会增加开销。

您可以使用AbstractMessageGroupStore.setLazyLoadMessageGroups(false)选项从配置中关闭延迟加载行为。

我们针对MongoDB MessageStoreMongoDB消息存储)和<aggregator>Aggregator)的延迟加载性能测试使用类似于以下内容的自定义release-strategy

<int:aggregator input-channel="inputChannel"
                output-channel="outputChannel"
                message-store="mongoStore"
                release-strategy-expression="size() == 1000"/>

它会为1000条简单消息生成类似于以下的结果

...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms     %     Task name
-----------------------------------------
02652  007%  Lazy-Load
36266  093%  Eager
...

但是,从5.5版本开始,所有持久MessageGroupStore实现都提供了基于目标数据库流式API的streamMessagesForGroup(Object groupId)契约。当存储中的组非常大时,这可以提高资源利用率。在框架内部,此新API用于Delayer(例如)在启动时重新调度持久消息。返回的Stream<Message<?>>必须在处理结束时关闭,例如通过try-with-resources自动关闭。每当使用PersistentMessageGroup时,其streamMessages()都会委托给MessageGroupStore.streamMessagesForGroup()

消息组条件

从 5.5 版本开始,MessageGroup 抽象引入了 condition 字符串选项。此选项的值可以是任何稍后出于任何原因可解析的内容,以针对该组做出决策。例如,来自关联消息处理器ReleaseStrategy可能会查询组中的此属性,而不是迭代组中的所有消息。MessageGroupStore 公开了 setGroupCondition(Object groupId, String condition) API。为此,已向 AbstractCorrelatingMessageHandler 添加了一个 setGroupConditionSupplier(BiFunction<Message<?>, String, String>) 选项。此函数在将每条消息添加到组后以及针对组的现有条件进行评估。实现可以决定返回一个新值、现有值或将目标条件重置为nullcondition 的值可以是 JSON、SpEL 表达式、数字或任何可以序列化为字符串并在之后解析的内容。例如,来自文件聚合器组件的FileMarkerReleaseStrategyFileSplitter.FileMarker.Mark.END消息的FileHeaders.LINE_COUNT标头中将条件填充到组中,并在其canRelease()中与之进行比较,以比较组大小与此条件中的值。这样,它就不需要迭代组中的所有消息来查找具有FileHeaders.LINE_COUNT标头的FileSplitter.FileMarker.Mark.END消息。它还允许结束标记在所有其他记录之前到达聚合器;例如,在多线程环境中处理文件时。

此外,为了方便配置,引入了 GroupConditionProvider 接口。AbstractCorrelatingMessageHandler 检查提供的 ReleaseStrategy 是否实现了此接口,并提取用于组条件评估逻辑的 conditionSupplier