FTP 流式入站通道适配器
版本 4.3 引入了流式入站通道适配器。此适配器生成的Payload类型为 InputStream
的消息,允许在不写入本地文件系统的情况下获取文件。由于会话保持打开状态,消费应用程序负责在文件被消费后关闭会话。会话在 closeableResource
消息头 (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
) 中提供。标准框架组件,例如 FileSplitter
和 StreamTransformer
,会自动关闭会话。有关这些组件的更多信息,请参阅 文件分割器 和 流转换器。以下示例展示了如何配置 inbound-streaming-channel-adapter
<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>
只能指定 filename-pattern
, filename-regex
, filter
或 filter-expression
中的一个。
从版本 5.0 开始,默认情况下,FtpStreamingMessageSource 适配器使用基于内存 SimpleMetadataStore 的 FtpPersistentAcceptOnceFileListFilter 来防止远程文件重复。默认情况下,此过滤器也适用于文件名模式(或正则表达式)。如果需要允许重复,可以使用 AcceptAllFileListFilter 。任何其他用例可以通过 CompositeFileListFilter (或 ChainFileListFilter )处理。Java 配置(文档后面部分)展示了一种在处理后删除远程文件以避免重复的技术。 |
有关 FtpPersistentAcceptOnceFileListFilter
及其使用方式的更多信息,请参阅 远程持久文件列表过滤器。
使用 max-fetch-size
属性来限制每次轮询时(需要获取文件时)获取的文件数量。在集群环境中运行时,将其设置为 1
并使用持久过滤器。有关更多信息,请参阅 入站通道适配器:控制远程文件获取。
适配器分别将远程目录和文件名放在 FileHeaders.REMOTE_DIRECTORY
和 FileHeaders.REMOTE_FILE
消息头中。从版本 5.0 开始,FileHeaders.REMOTE_FILE_INFO
消息头提供了额外的远程文件信息(默认以 JSON 格式表示)。如果将 FtpStreamingMessageSource
上的 fileInfoJson
属性设置为 false
,则消息头包含一个 FtpFileInfo
对象。可以通过底层 Apache Net 库提供的 FTPFile
对象,使用 FtpFileInfo.getFileInfo()
方法进行访问。在使用 XML 配置时,fileInfoJson
属性不可用,但可以通过将 FtpStreamingMessageSource
注入到配置类中来设置它。另请参阅 远程文件信息。
从版本 5.1 开始,comparator
的泛型类型是 FTPFile
。之前是 AbstractFileInfo<FTPFile>
。这是因为排序现在在处理过程中的更早阶段执行,即在过滤和应用 maxFetch
之前。
使用 Java 配置进行配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置来配置入站适配器的示例
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
messageSource.setRemoteDirectory("ftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public FtpRemoteFileTemplate template() {
return new FtpRemoteFileTemplate(ftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
注意,在此示例中,转换器下游的消息处理器包含一个 advice
,用于在处理后删除远程文件。