SFTP 流式入站通道适配器
4.3 版本引入了流式入站通道适配器。此适配器生成负载类型为 InputStream
的消息,允许您在不写入本地文件系统的情况下抓取文件。由于 session 保持打开状态,因此使用方应用程序负责在文件消费后关闭 session。session 会在 closeableResource
头部 (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
) 中提供。标准框架组件,如 FileSplitter
和 StreamTransformer
,会自动关闭 session。有关这些组件的更多信息,请参见文件分割器和流转换器。以下示例展示了如何配置 SFTP 流式入站通道适配器。
<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>
您只能使用 filename-pattern
、filename-regex
、filter
或 filter-expression
中的一个。
从 5.0 版本开始,SftpStreamingMessageSource 适配器默认使用基于内存 SimpleMetadataStore 的 SftpPersistentAcceptOnceFileListFilter 来防止远程文件重复。默认情况下,此过滤器也与文件名模式(或正则表达式)一起应用。如果需要允许重复,可以使用 AcceptAllFileListFilter 。您可以使用 CompositeFileListFilter (或 ChainFileListFilter )来处理任何其他用例。稍后展示的 Java 配置展示了一种在处理后删除远程文件以避免重复的技术。 |
有关 SftpPersistentAcceptOnceFileListFilter
的更多信息及其使用方法,请参见远程持久化文件列表过滤器。
您可以使用 max-fetch-size
属性来限制每次轮询时需要抓取的文件数量。在集群环境中运行时,将其设置为 1
并使用持久化过滤器。有关更多信息,请参见入站通道适配器:控制远程文件抓取。
适配器会将远程目录和文件名分别放入头部(FileHeaders.REMOTE_DIRECTORY
和 FileHeaders.REMOTE_FILE
)。从 5.0 版本开始,FileHeaders.REMOTE_FILE_INFO
头部提供了额外的远程文件信息(JSON 格式)。如果您将 SftpStreamingMessageSource
上的 fileInfoJson
属性设置为 false
,则头部包含一个 SftpFileInfo
对象。您可以通过使用 SftpFileInfo.getFileInfo()
方法访问底层 SftpClient
提供的 SftpClient.DirEntry
对象。在使用 XML 配置时,fileInfoJson
属性不可用,但您可以通过将 SftpStreamingMessageSource
注入到您的配置类中来设置它。另请参见远程文件信息。
使用 Java 配置进行配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置入站适配器的示例。
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
messageSource.setRemoteDirectory("sftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public SftpRemoteFileTemplate template() {
return new SftpRemoteFileTemplate(sftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + '/' + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
注意,在此示例中,转换器下游的消息处理器有一个 `advice`,用于在处理后删除远程文件。