拆分器
拆分器是一个组件,其作用是将一个消息分割成多个部分,并将分割后的消息发送出去进行独立处理。通常,它们是包含聚合器(aggregator)的管道中的上游生产者。
编程模型
执行拆分的 API 由一个基类组成:AbstractMessageSplitter
。它是一个 MessageHandler
实现,封装了拆分器的通用功能,例如在生成的消息上填充相应的消息头(CORRELATION_ID
, SEQUENCE_SIZE
, 和 SEQUENCE_NUMBER
)。这些信息头能够跟踪消息及其处理结果(在典型场景中,这些信息头会被复制到由各种转换端点生成的消息中)。然后,这些值可以被使用,例如,由一个复合消息处理器使用。
以下示例展示了 AbstractMessageSplitter
的一个片段
public abstract class AbstractMessageSplitter
extends AbstractReplyProducingMessageConsumer {
...
protected abstract Object splitMessage(Message<?> message);
}
要在应用程序中实现特定的拆分器,你可以扩展 AbstractMessageSplitter
并实现 splitMessage
方法,该方法包含拆分消息的逻辑。返回值可以是以下之一
-
一个消息的
Collection
或数组,或者一个迭代消息的Iterable
(或Iterator
)。在这种情况下,消息会作为消息发送(在填充了CORRELATION_ID
,SEQUENCE_SIZE
和SEQUENCE_NUMBER
之后)。使用这种方法可以让你有更多的控制权,例如,可以在拆分过程中填充自定义消息头。 -
一个非消息对象的
Collection
或数组,或者一个迭代非消息对象的Iterable
(或Iterator
)。它的工作方式与前一种情况类似,不同之处在于每个集合元素都被用作消息的 payload。使用这种方法可以让你专注于领域对象,而无需考虑消息系统,并且可以生成更易于测试的代码。 -
一个
Message
或非消息对象(但不是集合或数组)。它的工作方式与之前的情况类似,不同之处在于只发送一个消息。
在 Spring Integration 中,任何 POJO 都可以实现拆分算法,前提是它定义了一个接受单个参数并有返回值的方法。在这种情况下,方法的返回值会按照前面的描述进行解释。输入参数可以是 Message
或简单的 POJO。在后一种情况下,拆分器接收入站消息的 payload。我们推荐这种方法,因为它将代码与 Spring Integration API 解耦,并且通常更容易测试。
迭代器
从版本 4.1 开始,AbstractMessageSplitter
支持 Iterator
类型用于拆分的值。请注意,在使用 Iterator
(或 Iterable
)时,我们无法获取底层元素的数量,并且 SEQUENCE_SIZE
头会被设置为 0
。这意味着 <aggregator>
的默认 SequenceSizeReleaseStrategy
将无法工作,并且拆分器生成的 CORRELATION_ID
对应的组将不会被释放;它会保持在 incomplete
状态。在这种情况下,你应该使用适当的自定义 ReleaseStrategy
,或者依赖 send-partial-result-on-expiry
以及 group-timeout
或 MessageGroupStoreReaper
。
从版本 5.0 开始,AbstractMessageSplitter
提供了受保护的 obtainSizeIfPossible()
方法,如果可能的话,允许确定 Iterable
和 Iterator
对象的大小。例如 XPathMessageSplitter
可以确定底层 NodeList
对象的大小。从版本 5.0.9 开始,此方法也能正确返回 com.fasterxml.jackson.core.TreeNode
的大小。
Iterator
对象非常有用,可以避免在拆分之前必须在内存中构建整个集合。例如,当使用迭代或流从外部系统(例如数据库或 FTP MGET
)填充底层元素时。
Stream 和 Flux
从版本 5.0 开始,AbstractMessageSplitter
支持 Java Stream
和 Reactive Streams Publisher
类型用于拆分的值。在这种情况下,目标 Iterator
是基于它们的迭代功能构建的。
此外,如果拆分器的输出通道是 ReactiveStreamsSubscribableChannel
的实例,AbstractMessageSplitter
会生成一个 Flux
结果而不是 Iterator
,并且输出通道会订阅此 Flux
,以便根据下游流的需求进行基于背压的拆分。
从版本 5.2 开始,拆分器支持 discardChannel
选项,用于发送那些拆分函数返回空容器(集合、数组、流、Flux
等)的请求消息。在这种情况下,没有要发送到 outputChannel
的元素可供迭代。null
的拆分结果仍然是流结束的指示符。
使用 Java、Groovy 和 Kotlin DSL 配置拆分器
基于 Message 及其可迭代 payload 的简单拆分器示例,使用 DSL 配置
-
Java DSL
-
Kotlin DSL
-
Groovy DSL
@Bean
public IntegrationFlow someFlow() {
return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
integrationFlow {
split<Message<*>> { it.payload }
}
@Bean
someFlow() {
integrationFlow {
splitWith {
expectedType Message<?>
function { it.payload }
}
}
}
请参阅相关章节中关于 DSL 的更多信息
使用 XML 配置拆分器
拆分器可以通过 XML 进行如下配置
<int:channel id="inputChannel"/>
<int:splitter id="splitter" (1)
ref="splitterBean" (2)
method="split" (3)
input-channel="inputChannel" (4)
output-channel="outputChannel" (5)
discard-channel="discardChannel" /> (6)
<int:channel id="outputChannel"/>
<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 | 拆分器的 ID 是可选的。 |
2 | 引用应用上下文(application context)中定义的一个 bean。该 bean 必须实现拆分逻辑,如前面章节所述。可选。如果未提供对 bean 的引用,则假定到达 input-channel 的消息的 payload 是 java.util.Collection 的实现,并将默认的拆分逻辑应用于该集合,将每个单独的元素合并到一个消息中并发送到 output-channel 。 |
3 | 实现拆分逻辑的方法(在 bean 上定义)。可选。 |
4 | 拆分器的入站通道。必需。 |
5 | 拆分器将入站消息拆分结果发送到的通道。可选(因为入站消息本身可以指定回复通道)。 |
6 | 在拆分结果为空时发送请求消息的通道。可选(在这种情况下,它们会像 null 结果一样停止)。 |
如果自定义拆分器实现可以在其他 <splitter>
定义中引用,我们建议使用 ref
属性。但是,如果自定义拆分器处理器实现应仅限于单个 <splitter>
定义的范围,你可以配置一个内部 bean 定义,如下例所示
<int:splitter id="testSplitter" input-channel="inChannel" method="split"
output-channel="outChannel">
<beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
在同一个 <int:splitter> 配置中同时使用 ref 属性和内部处理器定义是不允许的,因为它会创建模糊条件并导致抛出异常。 |
如果 ref 属性引用了一个扩展自 AbstractMessageProducingHandler 的 bean(例如框架本身提供的拆分器),则通过直接将输出通道注入到处理器中来优化配置。在这种情况下,每个 ref 必须是单独的 bean 实例(或 prototype 作用域的 bean),或者使用内部 <bean/> 配置类型。但是,此优化仅在你未在拆分器 XML 定义中提供任何拆分器特定属性时适用。如果你不小心从多个 bean 引用了同一个消息处理器,你会得到一个配置异常。 |
使用注解配置拆分器
@Splitter
注解适用于期望 Message
类型或消息 payload 类型作为参数,并且方法返回值应为任何类型的 Collection
的方法。如果返回的值不是实际的 Message
对象,则每个元素将被包装在一个 Message
中作为消息的 payload。每个结果消息都会发送到定义了 @Splitter
的端点指定的输出通道。
以下示例展示了如何使用 @Splitter
注解配置拆分器
@Splitter
List<LineItem> extractItems(Order order) {
return order.getItems()
}
另请参阅使用注解为端点提供 Advice 和文件拆分器。