拆分器
拆分器是一个组件,其作用是将一条消息分成多个部分,并将生成的这些消息独立地发送出去进行处理。通常,它们是管道中的上游生产者,该管道包括一个聚合器。
编程模型
执行拆分的 API 包含一个基类 AbstractMessageSplitter
。它是一个 MessageHandler
实现,封装了拆分器共有的功能,例如在生成的的消息上填充适当的消息头(CORRELATION_ID
、SEQUENCE_SIZE
和 SEQUENCE_NUMBER
)。这种填充可以跟踪消息及其处理结果(在典型情况下,这些头会被复制到由各种转换端点生成的的消息中)。然后,这些值可以被例如 组合消息处理器 使用。
以下示例展示了 AbstractMessageSplitter
的摘录
public abstract class AbstractMessageSplitter
extends AbstractReplyProducingMessageConsumer {
...
protected abstract Object splitMessage(Message<?> message);
}
要在应用程序中实现特定的拆分器,可以扩展 AbstractMessageSplitter
并实现 splitMessage
方法,该方法包含拆分消息的逻辑。返回值可以是以下之一
-
一个
Collection
或消息数组,或者一个Iterable
(或Iterator
),它迭代消息。在这种情况下,消息将作为消息发送(在填充CORRELATION_ID
、SEQUENCE_SIZE
和SEQUENCE_NUMBER
之后)。使用这种方法可以让你获得更多控制权,例如,在拆分过程中填充自定义消息头。 -
一个
Collection
或非消息对象数组,或者一个Iterable
(或Iterator
),它迭代非消息对象。它的工作原理与前一种情况类似,只是每个集合元素都被用作消息有效负载。使用这种方法可以让你专注于域对象,而无需考虑消息系统,并生成更容易测试的代码。 -
一个
Message
或非消息对象(但不是集合或数组)。它的工作原理与前几种情况类似,只是发送了一条消息。
在 Spring Integration 中,任何 POJO 都可以实现拆分算法,前提是它定义了一个接受单个参数并具有返回值的方法。在这种情况下,方法的返回值将按照前面描述的方式解释。输入参数可以是 Message
或简单的 POJO。在后一种情况下,拆分器接收传入消息的有效负载。我们推荐这种方法,因为它将代码与 Spring Integration API 解耦,并且通常更容易测试。
迭代器
从 4.1 版本开始,AbstractMessageSplitter
支持将 Iterator
类型用作要拆分的 value
。请注意,在 Iterator
(或 Iterable
)的情况下,我们无法访问底层项目的数量,并且 SEQUENCE_SIZE
标头被设置为 0
。这意味着 <aggregator>
的默认 SequenceSizeReleaseStrategy
将不起作用,并且来自 splitter
的 CORRELATION_ID
的组将不会被释放;它将保持为 incomplete
。在这种情况下,您应该使用适当的自定义 ReleaseStrategy
或依赖于 send-partial-result-on-expiry
以及 group-timeout
或 MessageGroupStoreReaper
。
从 5.0 版本开始,AbstractMessageSplitter
提供了 protected obtainSizeIfPossible()
方法,允许在可能的情况下确定 Iterable
和 Iterator
对象的大小。例如,XPathMessageSplitter
可以确定底层 NodeList
对象的大小。从 5.0.9 版本开始,此方法还正确地返回 com.fasterxml.jackson.core.TreeNode
的大小。
Iterator
对象对于避免在拆分之前在内存中构建整个集合很有用。例如,当底层项目使用迭代或流从某些外部系统(例如数据库或 FTP MGET
)填充时。
流和 Flux
从 5.0 版本开始,AbstractMessageSplitter
支持将 Java Stream
和 Reactive Streams Publisher
类型用作要拆分的 value
。在这种情况下,目标 Iterator
是基于它们的迭代功能构建的。
此外,如果拆分器的输出通道是 ReactiveStreamsSubscribableChannel
的实例,则 AbstractMessageSplitter
会生成 Flux
结果而不是 Iterator
,并且输出通道会订阅此 Flux
,以便在对下游流需求进行基于背压的拆分。
从 5.2 版本开始,拆分器支持 discardChannel
选项,用于发送那些拆分函数返回空容器(集合、数组、流、Flux
等)的请求消息。在这种情况下,没有要迭代的项目可以发送到 outputChannel
。null
拆分结果仍然是流结束的指示器。
使用 Java、Groovy 和 Kotlin DSL 配置拆分器
一个基于Message
及其可迭代有效负载的简单拆分器的示例,使用 DSL 配置
-
Java DSL
-
Kotlin DSL
-
Groovy DSL
@Bean
public IntegrationFlow someFlow() {
return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
integrationFlow {
split<Message<*>> { it.payload }
}
@Bean
someFlow() {
integrationFlow {
splitWith {
expectedType Message<?>
function { it.payload }
}
}
}
有关 DSL 的更多信息,请参阅相应章节
使用 XML 配置拆分器
拆分器可以通过 XML 配置,如下所示
<int:channel id="inputChannel"/>
<int:splitter id="splitter" (1)
ref="splitterBean" (2)
method="split" (3)
input-channel="inputChannel" (4)
output-channel="outputChannel" (5)
discard-channel="discardChannel" /> (6)
<int:channel id="outputChannel"/>
<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 | 拆分器的 ID 是可选的。 |
2 | 对应用程序上下文中定义的 Bean 的引用。该 Bean 必须实现拆分逻辑,如前面部分所述。可选。如果未提供对 Bean 的引用,则假定到达input-channel 的消息的有效负载是java.util.Collection 的实现,并且默认拆分逻辑应用于集合,将每个单独的元素合并到消息中并将其发送到output-channel 。 |
3 | 实现拆分逻辑的方法(在 Bean 上定义)。可选。 |
4 | 拆分器的输入通道。必需。 |
5 | 拆分器将拆分传入消息的结果发送到的通道。可选(因为传入消息本身可以指定回复通道)。 |
6 | 在拆分结果为空的情况下将请求消息发送到的通道。可选(它们将像null 结果一样停止)。 |
我们建议使用ref
属性,如果自定义拆分器实现可以在其他<splitter>
定义中引用。但是,如果自定义拆分器处理程序实现应该限定为<splitter>
的单个定义,则可以配置内部 Bean 定义,如下例所示
<int:splitter id="testSplitter" input-channel="inChannel" method="split"
output-channel="outChannel">
<beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
在同一个<int:splitter> 配置中同时使用ref 属性和内部处理程序定义是不允许的,因为它会创建一个模棱两可的条件,并导致抛出异常。
|
如果ref 属性引用扩展AbstractMessageProducingHandler 的 Bean(例如框架本身提供的拆分器),则通过将输出通道直接注入处理程序来优化配置。在这种情况下,每个ref 必须是单独的 Bean 实例(或prototype 范围的 Bean)或使用内部<bean/> 配置类型。但是,这种优化仅适用于您未在拆分器 XML 定义中提供任何特定于拆分器的属性。如果您无意中从多个 Bean 引用同一个消息处理程序,您将获得配置异常。
|
使用注解配置拆分器
@Splitter
注解适用于期望接收 Message
类型或消息有效负载类型的函数,函数的返回值应为任何类型的 Collection
。如果返回值不是实际的 Message
对象,则每个项目都会被包装在一个 Message
中作为 Message
的有效负载。每个生成的 Message
都将发送到定义 @Splitter
的端点的指定输出通道。
以下示例展示了如何使用 @Splitter
注解配置拆分器。
@Splitter
List<LineItem> extractItems(Order order) {
return order.getItems()
}
另请参阅 使用注解为端点提供建议 和 文件拆分器。