分流器
分拣器是一个组件,其作用是将消息分成几个部分,并将生成的消息发送出去进行独立处理。通常,它们是包含聚合器的管道中的上游生产者。
编程模型
执行分拣的 API 由一个基类 AbstractMessageSplitter 组成。它是一个 MessageHandler 实现,封装了分拣器共有的特性,例如在生成的消息中填充适当的消息头(CORRELATION_ID、SEQUENCE_SIZE 和 SEQUENCE_NUMBER)。此填充功能可以跟踪消息及其处理结果(在典型场景中,这些头会被复制到由各种转换端点生成的消息中)。这些值可以被例如 组合消息处理器 使用。
以下示例显示了 AbstractMessageSplitter 的一个摘录
public abstract class AbstractMessageSplitter
extends AbstractReplyProducingMessageConsumer {
...
protected abstract Object splitMessage(Message<?> message);
}
要在应用程序中实现特定的分拣器,您可以扩展 AbstractMessageSplitter 并实现 splitMessage 方法,该方法包含分拣消息的逻辑。返回值可以是以下之一:
-
Collection或消息数组,或迭代消息的Iterable(或Iterator)。在这种情况下,消息以消息的形式发送(在填充CORRELATION_ID、SEQUENCE_SIZE和SEQUENCE_NUMBER之后)。使用此方法可以为您提供更多控制权——例如,在分拣过程中填充自定义消息头。 -
非消息对象的
Collection或数组,或迭代非消息对象的Iterable(或Iterator)。它与前一种情况类似,只是每个集合元素都用作消息负载。使用此方法可以让您专注于领域对象,而无需考虑消息系统,并生成更易于测试的代码。 -
一个
Message或非消息对象(但不是集合或数组)。它与前几种情况类似,只是发送一条消息。
在 Spring Integration 中,任何 POJO 都可以实现分拣算法,前提是它定义了一个接受单个参数并具有返回值的 方法。在这种情况下,方法的返回值将如前所述进行解释。输入参数可以是 Message 或简单的 POJO。在后一种情况下,分拣器接收传入消息的有效负载。我们推荐这种方法,因为它将代码与 Spring Integration API 解耦,并且通常更容易测试。
迭代器
从 4.1 版本开始,AbstractMessageSplitter 支持 Iterator 类型来分拣 value。请注意,在 Iterator(或 Iterable)的情况下,我们无法访问底层项的数量,并且 SEQUENCE_SIZE 消息头被设置为 0。这意味着 <aggregator> 的默认 SequenceSizeReleaseStrategy 将不起作用,并且来自 splitter 的 CORRELATION_ID 对应的组将不会被释放;它将保持为 incomplete。在这种情况下,您应该使用适当的自定义 ReleaseStrategy 或依赖 send-partial-result-on-expiry 以及 group-timeout 或 MessageGroupStoreReaper。
从 5.0 版本开始,AbstractMessageSplitter 提供了 protected obtainSizeIfPossible() 方法,如果可能的话,可以确定 Iterable 和 Iterator 对象的大小。例如,XPathMessageSplitter 可以确定底层 NodeList 对象的大小。从 5.0.9 版本开始,此方法也正确返回 com.fasterxml.jackson.core.TreeNode 的大小。
Iterator 对象有助于避免在分拣之前需要在内存中构建整个集合。例如,当底层项通过迭代或流从某些外部系统(例如数据库或 FTP MGET)填充时。
Stream 和 Flux
从 5.0 版本开始,AbstractMessageSplitter 支持 Java Stream 和 Reactive Streams Publisher 类型来分拣 value。在这种情况下,目标 Iterator 是基于它们的迭代功能构建的。
此外,如果分拣器的输出通道是 ReactiveStreamsSubscribableChannel 的实例,则 AbstractMessageSplitter 会生成 Flux 结果而不是 Iterator,并且输出通道将订阅此 Flux,以根据下游流需求进行基于背压的分拣。
从 5.2 版本开始,分拣器支持 discardChannel 选项,用于发送那些拆分函数返回空容器(集合、数组、流、Flux 等)的请求消息。在这种情况下,没有要迭代发送到 outputChannel 的项。null 拆分结果仍然是流结束的指示。
使用 Java、Groovy 和 Kotlin DSL 配置分拣器
基于 Message 及其可迭代有效负载的简单分拣器示例,使用 DSL 配置
-
Java DSL
-
Kotlin DSL
-
Groovy DSL
@Bean
public IntegrationFlow someFlow() {
return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
integrationFlow {
split<Message<*>> { it.payload }
}
@Bean
someFlow() {
integrationFlow {
splitWith {
expectedType Message<?>
function { it.payload }
}
}
}
有关 DSL 的更多信息,请参阅相关章节
使用 XML 配置分拣器
分拣器可以通过 XML 如下配置
<int:channel id="inputChannel"/>
<int:splitter id="splitter" (1)
ref="splitterBean" (2)
method="split" (3)
input-channel="inputChannel" (4)
output-channel="outputChannel" (5)
discard-channel="discardChannel" /> (6)
<int:channel id="outputChannel"/>
<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
| 1 | 分拣器的 ID 是可选的。 |
| 2 | 对应用程序上下文中定义的 bean 的引用。该 bean 必须实现分拣逻辑,如前一节所述。可选。如果未提供对 bean 的引用,则假定到达 input-channel 的消息的有效负载是 java.util.Collection 的实现,并将默认分拣逻辑应用于该集合,将每个单独的元素合并到一条消息中并将其发送到 output-channel。 |
| 3 | 实现分拣逻辑的方法(在 bean 上定义)。可选。 |
| 4 | 分拣器的输入通道。必需。 |
| 5 | 分拣器将传入消息的分拣结果发送到的通道。可选(因为传入消息本身可以指定回复通道)。 |
| 6 | 如果拆分结果为空,请求消息将被发送到的通道。可选(它们将像 null 结果一样停止)。 |
如果自定义分拣器实现可以在其他 <splitter> 定义中引用,我们建议使用 ref 属性。但是,如果自定义分拣器处理程序实现应仅限于 <splitter> 的单个定义,您可以配置内部 bean 定义,如下例所示
<int:splitter id="testSplitter" input-channel="inChannel" method="split"
output-channel="outChannel">
<beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
在同一个 <int:splitter> 配置中同时使用 ref 属性和内部处理程序定义是不允许的,因为它会创建模糊条件并导致抛出异常。 |
如果 ref 属性引用了一个扩展 AbstractMessageProducingHandler 的 bean(例如框架本身提供的分拣器),则通过将输出通道直接注入处理程序来优化配置。在这种情况下,每个 ref 必须是独立的 bean 实例(或 prototype 作用域的 bean),或者使用内部 <bean/> 配置类型。但是,此优化仅在您未在分拣器 XML 定义中提供任何分拣器特定属性时才适用。如果您不小心从多个 bean 引用了同一个消息处理程序,您将得到一个配置异常。 |