SFTP 流式入站通道适配器

4.3 版本引入了流式入站通道适配器。此适配器生成负载类型为 InputStream 的消息,允许您在不写入本地文件系统的情况下抓取文件。由于 session 保持打开状态,因此使用方应用程序负责在文件消费后关闭 session。session 会在 closeableResource 头部 (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE) 中提供。标准框架组件,如 FileSplitterStreamTransformer,会自动关闭 session。有关这些组件的更多信息,请参见文件分割器流转换器。以下示例展示了如何配置 SFTP 流式入站通道适配器。

<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
            channel="ftpChannel"
            session-factory="sessionFactory"
            filename-pattern="*.txt"
            filename-regex=".*\.txt"
            filter="filter"
            filter-expression="@myFilterBean.check(#root)"
            remote-file-separator="/"
            comparator="comparator"
            max-fetch-size="1"
            remote-directory-expression="'foo/bar'">
        <int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>

您只能使用 filename-patternfilename-regexfilterfilter-expression 中的一个。

从 5.0 版本开始,SftpStreamingMessageSource 适配器默认使用基于内存 SimpleMetadataStoreSftpPersistentAcceptOnceFileListFilter 来防止远程文件重复。默认情况下,此过滤器也与文件名模式(或正则表达式)一起应用。如果需要允许重复,可以使用 AcceptAllFileListFilter。您可以使用 CompositeFileListFilter(或 ChainFileListFilter)来处理任何其他用例。稍后展示的 Java 配置展示了一种在处理后删除远程文件以避免重复的技术。

有关 SftpPersistentAcceptOnceFileListFilter 的更多信息及其使用方法,请参见远程持久化文件列表过滤器

您可以使用 max-fetch-size 属性来限制每次轮询时需要抓取的文件数量。在集群环境中运行时,将其设置为 1 并使用持久化过滤器。有关更多信息,请参见入站通道适配器:控制远程文件抓取

适配器会将远程目录和文件名分别放入头部(FileHeaders.REMOTE_DIRECTORYFileHeaders.REMOTE_FILE)。从 5.0 版本开始,FileHeaders.REMOTE_FILE_INFO 头部提供了额外的远程文件信息(JSON 格式)。如果您将 SftpStreamingMessageSource 上的 fileInfoJson 属性设置为 false,则头部包含一个 SftpFileInfo 对象。您可以通过使用 SftpFileInfo.getFileInfo() 方法访问底层 SftpClient 提供的 SftpClient.DirEntry 对象。在使用 XML 配置时,fileInfoJson 属性不可用,但您可以通过将 SftpStreamingMessageSource 注入到您的配置类中来设置它。另请参见远程文件信息

使用 Java 配置进行配置

以下 Spring Boot 应用程序展示了如何使用 Java 配置入站适配器的示例。

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    @InboundChannelAdapter(channel = "stream")
    public MessageSource<InputStream> ftpMessageSource() {
        SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
        messageSource.setRemoteDirectory("sftpSource/");
        messageSource.setFilter(new AcceptAllFileListFilter<>());
        messageSource.setMaxFetchSize(1);
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "stream", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer("UTF-8");
    }

    @Bean
    public SftpRemoteFileTemplate template() {
        return new SftpRemoteFileTemplate(sftpSessionFactory());
    }

    @ServiceActivator(inputChannel = "data", adviceChain = "after")
    @Bean
    public MessageHandler handle() {
        return System.out::println;
    }

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpression(
                "@template.remove(headers['file_remoteDirectory'] + '/' +  headers['file_remoteFile'])");
        advice.setPropagateEvaluationFailures(true);
        return advice;
    }

}

注意,在此示例中,转换器下游的消息处理器有一个 `advice`,用于在处理后删除远程文件。