文件分割器
FileSplitter
在 4.1.2 版本中添加,其命名空间支持在 4.2 版本中添加。FileSplitter
基于 BufferedReader.readLine()
将文本文件拆分为单个行。默认情况下,拆分器使用 Iterator
以一次读取一行的方式从文件中读取并发出行。将 iterator
属性设置为 false
将导致它在将所有行作为消息发出之前将其全部读入内存。这样做的一个用例可能是,如果希望在发送包含行的任何消息之前检测文件的 I/O 错误。但是,这仅适用于相对较短的文件。
入站有效负载可以是 File
、String
(文件路径)、InputStream
或 Reader
。其他有效负载类型将保持不变。
以下清单显示了配置 FileSplitter
的可能方法
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@SpringBootApplication
public class FileSplitterApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileSplitterApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileSplitterFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(tmpDir.getRoot())
.filter(new ChainFileListFilter<File>()
.addFilter(new AcceptOnceFileListFilter<>())
.addFilter(new ExpressionFileListFilter<>(
new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
.split(Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.channel(c -> c.queue("fileSplittingResultChannel"))
.get();
}
}
@Bean
fun fileSplitterFlow() =
integrationFlow(
Files.inboundAdapter(tmpDir.getRoot())
.filter(
ChainFileListFilter<File?>()
.addFilter(AcceptOnceFileListFilter())
.addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
)
) {
split(
Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true)
)
channel { queue("fileSplittingResultChannel") }
}
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
FileSplitter splitter = new FileSplitter(true, true);
splitter.setApplySequence(true);
splitter.setOutputChannel(outputChannel);
return splitter;
}
<int-file:splitter id="splitter" (1)
iterator="" (2)
markers="" (3)
markers-json="" (4)
apply-sequence="" (5)
requires-reply="" (6)
charset="" (7)
first-line-as-header="" (8)
input-channel="" (9)
output-channel="" (10)
send-timeout="" (11)
auto-startup="" (12)
order="" (13)
phase="" /> (14)
1 | 拆分器的 bean 名称。 |
2 | 设置为 true (默认值)以使用迭代器,或设置为 false 以在发送行之前将文件加载到内存中。 |
3 | 设置为 true 以在文件数据之前和之后发出文件开头和文件结尾标记消息。标记是具有 FileSplitter.FileMarker 有效负载的消息(在 mark 属性中具有 START 和 END 值)。在按顺序处理下游流中的文件(其中某些行被过滤)时,您可能会使用标记。它们使下游处理能够知道何时已完全处理文件。此外,还会将包含 START 或 END 的 file_marker 标头添加到这些消息中。END 标记包含行数。如果文件为空,则仅发出 START 和 END 标记,并将 0 作为 lineCount 。默认为 false 。当为 true 时,apply-sequence 默认情况下为 false 。另请参阅 markers-json (下一个属性)。 |
4 | 当 markers 为 true 时,将此设置为 true 以使 FileMarker 对象转换为 JSON 字符串。(在底层使用 SimpleJsonSerializer )。 |
5 | 设置为 false 以禁用在消息中包含 sequenceSize 和 sequenceNumber 标头。默认为 true ,除非 markers 为 true 。当 true 且 markers 为 true 时,标记包含在排序中。当 true 且 iterator 为 true 时,sequenceSize 标头设置为 0 ,因为大小未知。 |
6 | 设置为 true 以导致如果文件中没有行则抛出 RequiresReplyException 。默认为 false 。 |
7 | 设置读取文本数据到 String 有效负载时要使用的字符集名称。默认为平台字符集。 |
8 | 第一行标头名称,作为为剩余行发出的消息中的标头携带。自 5.0 版起。 |
9 | 设置用于将消息发送到拆分器的输入通道。 |
10 | 设置发送消息的目标输出通道。 |
11 | 设置发送超时。仅当 output-channel 可以阻塞时适用,例如满的 QueueChannel 。 |
12 | 设置为 false 以禁用在上下文刷新时自动启动拆分器。默认为 true 。 |
13 | 如果 input-channel 是 <publish-subscribe-channel/> ,则设置此端点的顺序。 |
14 | 设置拆分器的启动阶段(在 auto-startup 为 true 时使用)。 |
FileSplitter
还会将任何基于文本的 InputStream
拆分为行。从 4.3 版开始,当与 FTP 或 SFTP 流式入站通道适配器或使用 stream
选项检索文件的 FTP 或 SFTP 出站网关结合使用时,拆分器会在文件完全使用后自动关闭支持该流的会话。有关这些功能的更多信息,请参阅FTP 流式入站通道适配器和SFTP 流式入站通道适配器,以及FTP 出站网关和SFTP 出站网关。
在使用 Java 配置时,可以使用其他构造函数,如下例所示
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)
当 markersJson
为 true 时,标记表示为 JSON 字符串(使用 SimpleJsonSerializer
)。
5.0 版引入了 firstLineAsHeader
选项,用于指定内容的第一行是标头(例如 CSV 文件中的列名)。传递给此属性的参数是标头名称,在该名称下,第一行作为为剩余行发出的消息中的标头携带。此行不包含在序列标头中(如果 applySequence
为 true),也不包含在与 FileMarker.END
关联的 lineCount
中。注意:从 5.5 版开始,lineCount
也作为 FileHeaders.LINE_COUNT
包含在 FileMarker.END
消息的标头中,因为 FileMarker
可以序列化为 JSON。如果文件仅包含标题行,则该文件将被视为为空,因此,在拆分期间仅发出 FileMarker
实例(如果启用了标记,否则不发出任何消息)。默认情况下(如果未设置标头名称),第一行被视为数据并成为发出的第一个消息的有效负载。
如果您需要有关从文件内容中提取标头的更复杂的逻辑(不是第一行,不是行的全部内容,不是一个特定的标头等),请考虑在 FileSplitter
之前使用标头增强器。请注意,已移至标头的行可能会在下游从正常的正文处理中过滤掉。
幂等下游处理拆分文件
当 apply-sequence
为 true 时,拆分器会在 SEQUENCE_NUMBER
标头中添加行号(当 markers
为 true 时,标记计为行)。行号可与幂等接收器一起使用,以避免在重新启动后重新处理行。
例如
@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}