拆分器

拆分器是一个组件,其作用是将一条消息分成多个部分,并将生成的这些消息独立地发送出去进行处理。通常,它们是管道中的上游生产者,该管道包括一个聚合器。

编程模型

执行拆分的 API 包含一个基类 AbstractMessageSplitter。它是一个 MessageHandler 实现,封装了拆分器共有的功能,例如在生成的的消息上填充适当的消息头(CORRELATION_IDSEQUENCE_SIZESEQUENCE_NUMBER)。这种填充可以跟踪消息及其处理结果(在典型情况下,这些头会被复制到由各种转换端点生成的的消息中)。然后,这些值可以被例如 组合消息处理器 使用。

以下示例展示了 AbstractMessageSplitter 的摘录

public abstract class AbstractMessageSplitter
    extends AbstractReplyProducingMessageConsumer {
    ...
    protected abstract Object splitMessage(Message<?> message);

}

要在应用程序中实现特定的拆分器,可以扩展 AbstractMessageSplitter 并实现 splitMessage 方法,该方法包含拆分消息的逻辑。返回值可以是以下之一

  • 一个 Collection 或消息数组,或者一个 Iterable(或 Iterator),它迭代消息。在这种情况下,消息将作为消息发送(在填充 CORRELATION_IDSEQUENCE_SIZESEQUENCE_NUMBER 之后)。使用这种方法可以让你获得更多控制权,例如,在拆分过程中填充自定义消息头。

  • 一个 Collection 或非消息对象数组,或者一个 Iterable(或 Iterator),它迭代非消息对象。它的工作原理与前一种情况类似,只是每个集合元素都被用作消息有效负载。使用这种方法可以让你专注于域对象,而无需考虑消息系统,并生成更容易测试的代码。

  • 一个 Message 或非消息对象(但不是集合或数组)。它的工作原理与前几种情况类似,只是发送了一条消息。

在 Spring Integration 中,任何 POJO 都可以实现拆分算法,前提是它定义了一个接受单个参数并具有返回值的方法。在这种情况下,方法的返回值将按照前面描述的方式解释。输入参数可以是 Message 或简单的 POJO。在后一种情况下,拆分器接收传入消息的有效负载。我们推荐这种方法,因为它将代码与 Spring Integration API 解耦,并且通常更容易测试。

迭代器

从 4.1 版本开始,AbstractMessageSplitter 支持将 Iterator 类型用作要拆分的 value。请注意,在 Iterator(或 Iterable)的情况下,我们无法访问底层项目的数量,并且 SEQUENCE_SIZE 标头被设置为 0。这意味着 <aggregator> 的默认 SequenceSizeReleaseStrategy 将不起作用,并且来自 splitterCORRELATION_ID 的组将不会被释放;它将保持为 incomplete。在这种情况下,您应该使用适当的自定义 ReleaseStrategy 或依赖于 send-partial-result-on-expiry 以及 group-timeoutMessageGroupStoreReaper

从 5.0 版本开始,AbstractMessageSplitter 提供了 protected obtainSizeIfPossible() 方法,允许在可能的情况下确定 IterableIterator 对象的大小。例如,XPathMessageSplitter 可以确定底层 NodeList 对象的大小。从 5.0.9 版本开始,此方法还正确地返回 com.fasterxml.jackson.core.TreeNode 的大小。

Iterator 对象对于避免在拆分之前在内存中构建整个集合很有用。例如,当底层项目使用迭代或流从某些外部系统(例如数据库或 FTP MGET)填充时。

流和 Flux

从 5.0 版本开始,AbstractMessageSplitter 支持将 Java Stream 和 Reactive Streams Publisher 类型用作要拆分的 value。在这种情况下,目标 Iterator 是基于它们的迭代功能构建的。

此外,如果拆分器的输出通道是 ReactiveStreamsSubscribableChannel 的实例,则 AbstractMessageSplitter 会生成 Flux 结果而不是 Iterator,并且输出通道会订阅此 Flux,以便在对下游流需求进行基于背压的拆分。

从 5.2 版本开始,拆分器支持 discardChannel 选项,用于发送那些拆分函数返回空容器(集合、数组、流、Flux 等)的请求消息。在这种情况下,没有要迭代的项目可以发送到 outputChannelnull 拆分结果仍然是流结束的指示器。

使用 Java、Groovy 和 Kotlin DSL 配置拆分器

一个基于Message及其可迭代有效负载的简单拆分器的示例,使用 DSL 配置

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

@Bean
public IntegrationFlow someFlow() {
    return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
    integrationFlow {
        split<Message<*>> { it.payload }
    }
@Bean
someFlow() {
    integrationFlow {
        splitWith {
		    expectedType Message<?>
		    function { it.payload }
        }
    }
}

有关 DSL 的更多信息,请参阅相应章节

使用 XML 配置拆分器

拆分器可以通过 XML 配置,如下所示

<int:channel id="inputChannel"/>

<int:splitter id="splitter"           (1)
  ref="splitterBean"                  (2)
  method="split"                      (3)
  input-channel="inputChannel"        (4)
  output-channel="outputChannel"      (5)
  discard-channel="discardChannel" /> (6)

<int:channel id="outputChannel"/>

<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 拆分器的 ID 是可选的。
2 对应用程序上下文中定义的 Bean 的引用。该 Bean 必须实现拆分逻辑,如前面部分所述。可选。如果未提供对 Bean 的引用,则假定到达input-channel的消息的有效负载是java.util.Collection的实现,并且默认拆分逻辑应用于集合,将每个单独的元素合并到消息中并将其发送到output-channel
3 实现拆分逻辑的方法(在 Bean 上定义)。可选。
4 拆分器的输入通道。必需。
5 拆分器将拆分传入消息的结果发送到的通道。可选(因为传入消息本身可以指定回复通道)。
6 在拆分结果为空的情况下将请求消息发送到的通道。可选(它们将像null结果一样停止)。

我们建议使用ref属性,如果自定义拆分器实现可以在其他<splitter>定义中引用。但是,如果自定义拆分器处理程序实现应该限定为<splitter>的单个定义,则可以配置内部 Bean 定义,如下例所示

<int:splitter id="testSplitter" input-channel="inChannel" method="split"
                output-channel="outChannel">
  <beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
在同一个<int:splitter>配置中同时使用ref属性和内部处理程序定义是不允许的,因为它会创建一个模棱两可的条件,并导致抛出异常。
如果ref属性引用扩展AbstractMessageProducingHandler的 Bean(例如框架本身提供的拆分器),则通过将输出通道直接注入处理程序来优化配置。在这种情况下,每个ref必须是单独的 Bean 实例(或prototype范围的 Bean)或使用内部<bean/>配置类型。但是,这种优化仅适用于您未在拆分器 XML 定义中提供任何特定于拆分器的属性。如果您无意中从多个 Bean 引用同一个消息处理程序,您将获得配置异常。

使用注解配置拆分器

@Splitter 注解适用于期望接收 Message 类型或消息有效负载类型的函数,函数的返回值应为任何类型的 Collection。如果返回值不是实际的 Message 对象,则每个项目都会被包装在一个 Message 中作为 Message 的有效负载。每个生成的 Message 都将发送到定义 @Splitter 的端点的指定输出通道。

以下示例展示了如何使用 @Splitter 注解配置拆分器。

@Splitter
List<LineItem> extractItems(Order order) {
    return order.getItems()
}