流支持

在许多情况下,应用程序数据是从流中获取的。不建议将流的引用作为消息载荷发送给消费者。相反,消息是从输入流读取的数据创建的,消息载荷则逐个写入输出流。

您需要在项目中包含此依赖项

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
    <version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-stream:6.4.4"

从流中读取

Spring Integration 提供了两种流适配器。`ByteStreamReadingMessageSource` 和 `CharacterStreamReadingMessageSource` 都实现了 `MessageSource` 接口。通过在 channel-adapter 元素中配置其中之一,可以配置轮询周期,并且消息总线可以自动检测并调度它们。字节流版本需要一个 `InputStream` 作为唯一的构造函数参数,字符流版本需要一个 `Reader` 作为唯一的构造函数参数。`ByteStreamReadingMessageSource` 还接受 'bytesPerMessage' 属性来确定它尝试读取到每个 `Message` 中的字节数。默认值为 `1024`。以下示例创建了一个输入流,该输入流创建的消息每个包含 2048 字节。

<bean class="org.springframework.integration.stream.ByteStreamReadingMessageSource">
  <constructor-arg ref="someInputStream"/>
  <property name="bytesPerMessage" value="2048"/>
</bean>

<bean class="org.springframework.integration.stream.CharacterStreamReadingMessageSource">
  <constructor-arg ref="someReader"/>
</bean>

CharacterStreamReadingMessageSource 会将读取器包装在 BufferedReader 中(如果它本身不是的话)。您可以在第二个构造函数参数中设置缓冲读取器使用的缓冲区大小。从 5.0 版本开始,第三个构造函数参数(`blockToDetectEOF`)控制 CharacterStreamReadingMessageSource 的行为。当设置为 false(默认值)时,`receive()` 方法会检查读取器是否 ready(),如果不是则返回 null。在这种情况下不会检测到 EOF(文件末尾)。当设置为 true 时,`receive()` 方法会阻塞,直到数据可用或在底层流上检测到 EOF。当检测到 EOF 时,会发布一个 `StreamClosedEvent`(应用程序事件)。您可以使用实现 ApplicationListener<StreamClosedEvent> 的 bean 来消费此事件。

为了便于 EOF 检测,轮询线程会阻塞在 `receive()` 方法中,直到数据到达或检测到 EOF。
一旦检测到 EOF,轮询器会在每次轮询时继续发布事件。应用程序监听器可以停止适配器以阻止此行为。事件在轮询线程上发布。停止适配器会导致线程中断。如果您打算在停止适配器后执行一些可中断的任务,您必须要么在不同的线程上执行 `stop()`,要么为下游活动使用不同的线程。请注意,发送到 QueueChannel 是可中断的,因此,如果您希望从监听器发送消息,请在停止适配器之前进行。

这有助于“管道化”或将数据重定向到 `stdin`,如下面两个示例所示

cat myfile.txt | java -jar my.jar
java -jar my.jar < foo.txt

这种方法使得当管道关闭时应用程序能够停止。

提供了四种方便的工厂方法

public static final CharacterStreamReadingMessageSource stdin() { ... }

public static final CharacterStreamReadingMessageSource stdin(String charsetName) { ... }

public static final CharacterStreamReadingMessageSource stdinPipe() { ... }

public static final CharacterStreamReadingMessageSource stdinPipe(String charsetName) { ... }

写入流

对于目标流,您可以使用 `ByteStreamWritingMessageHandler` 或 `CharacterStreamWritingMessageHandler` 这两种实现之一。每种实现都需要一个构造函数参数(字节流为 `OutputStream`,字符流为 `Writer`),并且每种实现都提供了第二个构造函数,用于添加可选的 'bufferSize'。由于它们最终都实现了 `MessageHandler` 接口,您可以在 `channel-adapter` 配置中引用它们,如通道适配器中所述。

<bean class="org.springframework.integration.stream.ByteStreamWritingMessageHandler">
  <constructor-arg ref="someOutputStream"/>
  <constructor-arg value="1024"/>
</bean>

<bean class="org.springframework.integration.stream.CharacterStreamWritingMessageHandler">
  <constructor-arg ref="someWriter"/>
</bean>

流命名空间支持

Spring Integration 定义了一个命名空间,以减少流相关通道适配器所需的配置。使用它需要以下 schema 位置

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:beans="http://www.springframework.org/schema/beans"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
      https://www.springframework.org/schema/beans/spring-beans.xsd
      http://www.springframework.org/schema/integration/stream
      https://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">

以下代码片段显示了支持配置入站通道适配器的不同配置选项

<int-stream:stdin-channel-adapter id="adapterWithDefaultCharset"/>

<int-stream:stdin-channel-adapter id="adapterWithProvidedCharset" charset="UTF-8"/>

从 5.0 版本开始,您可以设置 `detect-eof` 属性,该属性设置 `blockToDetectEOF` 属性。更多信息请参见从流中读取

要配置出站通道适配器,您也可以使用命名空间支持。以下示例展示了出站通道适配器的不同配置

<int-stream:stdout-channel-adapter id="stdoutAdapterWithDefaultCharset"
    channel="testChannel"/>

<int-stream:stdout-channel-adapter id="stdoutAdapterWithProvidedCharset" charset="UTF-8"
    channel="testChannel"/>

<int-stream:stderr-channel-adapter id="stderrAdapter" channel="testChannel"/>

<int-stream:stdout-channel-adapter id="newlineAdapter" append-newline="true"
    channel="testChannel"/>