消息存储
《企业集成模式》(EIP) 一书指出了几种具有缓冲消息能力的模式。例如,聚合器会缓冲消息直到它们可以被释放,而一个 QueueChannel
会缓冲消息直到消费者明确地从该通道接收这些消息。由于消息流中任何点都可能发生故障,缓冲消息的 EIP 组件也引入了消息可能丢失的点。
为了降低消息丢失的风险,EIP 定义了消息存储模式,该模式允许 EIP 组件存储消息,通常存储在某种类型的持久化存储中(例如 RDBMS)。
Spring Integration 通过以下方式提供对消息存储模式的支持:
-
定义一个
org.springframework.integration.store.MessageStore
策略接口 -
提供该接口的多种实现
-
在所有具有缓冲消息能力的组件上暴露一个
message-store
属性,以便你可以注入实现MessageStore
接口的任何实例。
如何在手册中配置特定的消息存储实现以及如何将 MessageStore
实现注入到特定的缓冲组件的详细信息都在手册的各个部分有描述(参见特定组件,例如QueueChannel、Aggregator、Delayer 等)。下面这对示例展示了如何为 QueueChannel
和聚合器添加消息存储的引用。
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>
<int:aggregator message-store="refToMessageStore"/>
默认情况下,消息通过使用 o.s.i.store.SimpleMessageStore
(一个 MessageStore
实现)存储在内存中。这对于开发或简单的低流量环境来说可能没问题,在这些环境中,非持久化消息潜在的丢失不是问题。然而,典型的生产应用程序需要一个更健壮的选项,不仅能降低消息丢失的风险,还能避免潜在的内存溢出错误。因此,我们也为各种数据存储提供了 MessageStore
实现。以下是支持的实现的完整列表:
-
Hazelcast Message Store: 使用 Hazelcast 分布式缓存存储消息
-
JDBC Message Store: 使用 RDBMS 存储消息
-
Redis Message Store: 使用 Redis 键/值数据存储存储消息
-
MongoDB Message Store: 使用 MongoDB 文档存储存储消息
然而,在使用 消息数据(payload 和 headers)的序列化和反序列化使用不同的序列化策略,具体取决于 特别注意那些表示某些类型数据的 headers。例如,如果某个 header 包含某个 Spring bean 的实例,反序列化后,你可能会得到该 bean 的不同实例,这会直接影响框架创建的一些隐式 headers(例如 从 Spring Integration 3.0 版本开始,你可以使用配置为在向 另外,考虑当你按如下方式配置消息流时会发生什么:gateway → queue-channel(由持久化 Message Store 支持)→ service-activator。该 gateway 创建了一个临时回复通道,当 service-activator 的 poller 从队列读取时,这个通道就丢失了。同样,你可以使用 header enricher 将 headers 替换为 更多信息,请参阅Header Enricher。 |
Spring Integration 4.0 引入了两个新接口:
-
ChannelMessageStore
: 实现QueueChannel
实例特有的操作 -
PriorityCapableChannelMessageStore
: 标记用于PriorityChannel
实例的MessageStore
实现,并为持久化消息提供优先级排序。
实际行为取决于具体实现。框架提供了以下实现,可用作 QueueChannel
和 PriorityChannel
的持久化 MessageStore
:
关于
SimpleMessageStore 的注意事项从 4.1 版本开始, 现在,在聚合器等组件之外访问组存储的用户会获得聚合器正在使用的组的直接引用,而不是副本。在聚合器外部操作组可能会导致不可预测的结果。 因此,你应避免此类操作,或将 |
使用 MessageGroupFactory
从 4.3 版本开始,一些 MessageGroupStore
实现可以注入自定义的 MessageGroupFactory
策略,用于创建和自定义 MessageGroupStore
所使用的 MessageGroup
实例。默认情况下,这是 SimpleMessageGroupFactory
,它基于内部集合 GroupType.HASH_SET
(LinkedHashSet
) 生成 SimpleMessageGroup
实例。其他可能的选项包括 SYNCHRONISED_SET
和 BLOCKING_QUEUE
,其中最后一个可用于恢复之前的 SimpleMessageGroup
行为。此外,还提供了 PERSISTENT
选项。更多信息请参阅下一节。从 5.0.1 版本开始,当组中消息的顺序和唯一性不重要时,还可以使用 LIST
选项。
持久化 MessageGroupStore
和延迟加载
从 4.3 版本开始,所有持久化 MessageGroupStore
实例都以延迟加载方式从存储中检索 MessageGroup
实例及其 messages
。在大多数情况下,这对于关联 MessageHandler
实例(参见Aggregator和Resequencer)非常有用,因为每次关联操作都从存储中加载整个 MessageGroup
会增加开销。
你可以使用 AbstractMessageGroupStore.setLazyLoadMessageGroups(false)
选项从配置中关闭延迟加载行为。
我们对 MongoDB MessageStore
(MongoDB Message Store)和 <aggregator>
(Aggregator)上的延迟加载进行的性能测试使用了类似于以下的自定义 release-strategy
。
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>
它为 1000 条简单消息产生了类似于以下的结果:
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...
然而,从 5.5 版本开始,所有持久化 MessageGroupStore
实现都提供了一个基于目标数据库流式 API 的 streamMessagesForGroup(Object groupId)
契约。这在存储中的组非常大时提高了资源利用率。在框架内部,这个新 API 在启动时重新调度持久化消息时被 Delayer(例如)使用。返回的 Stream<Message<?>>
必须在处理结束时关闭,例如通过 try-with-resources
的自动关闭。无论何时使用 PersistentMessageGroup
,其 streamMessages()
都委托给 MessageGroupStore.streamMessagesForGroup()
。
消息组条件
从 5.5 版本开始,MessageGroup
抽象提供了一个 condition
字符串选项。此选项的值可以是任何可在后续解析以出于任何原因对组做出决定的内容。例如,来自关联消息处理器的 ReleaseStrategy
可以直接查阅组的此属性,而不是遍历组中的所有消息。MessageGroupStore
暴露了一个 setGroupCondition(Object groupId, String condition)
API。为此,已向 AbstractCorrelatingMessageHandler
添加了一个 setGroupConditionSupplier(BiFunction<Message<?>, String, String>)
选项。此函数在消息被添加到组后以及针对组的现有条件进行评估。实现可以决定返回一个新值、现有值或将目标条件重置为 null
。condition
的值可以是 JSON、SpEL 表达式、数字或任何可以序列化为字符串并在之后解析的内容。例如,来自文件聚合器组件的 FileMarkerReleaseStrategy
,从 FileSplitter.FileMarker.Mark.END
消息的 FileHeaders.LINE_COUNT
header 中填充条件到组中,并在其 canRelease()
方法中查阅此条件,将组大小与此条件中的值进行比较。这样,它就不需要遍历组中的所有消息来查找带有 FileHeaders.LINE_COUNT
header 的 FileSplitter.FileMarker.Mark.END
消息。它还允许结束标记在所有其他记录之前到达聚合器;例如在多线程环境中处理文件时。
此外,为了配置方便,引入了 GroupConditionProvider
契约。AbstractCorrelatingMessageHandler
会检查提供的 ReleaseStrategy
是否实现了此接口,并提取 conditionSupplier
以用于组条件评估逻辑。