SFTP 入站通道适配器

SFTP 入站通道适配器是一个特殊的监听器,它连接到服务器并监听远程目录事件(例如创建新文件),此时它会启动文件传输。以下示例展示了如何配置 SFTP 入站通道适配器

<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
              session-factory="sftpSessionFactory"
            channel="requestChannel"
            filename-pattern="*.txt"
            remote-directory="/foo/bar"
            preserve-timestamp="true"
            local-directory="file:target/foo"
            auto-create-local-directory="true"
            local-filename-generator-expression="#this.toUpperCase() + '.a'"
            scanner="myDirScanner"
            local-filter="myFilter"
            temporary-file-suffix=".writing"
            max-fetch-size="-1"
            delete-remote-files="false">
        <int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>

前面的配置示例展示了如何为各种属性提供值,包括以下内容

  • local-directory:文件将要传输到的位置

  • remote-directory:文件将要从中传输的远程源目录

  • session-factory:对我们之前配置的 bean 的引用

默认情况下,传输的文件与原始文件同名。如果要覆盖此行为,可以设置 local-filename-generator-expression 属性,它允许您提供一个 SpEL 表达式来生成本地文件的名称。与出站网关和适配器不同,在这些情况下 SpEL 评估上下文的根对象是 Message,而此入站适配器在评估时还没有消息,因为这是它最终用传输的文件作为其负载生成的内容。因此,SpEL 评估上下文的根对象是远程文件的原始名称(一个 String)。

入站通道适配器首先将文件检索到本地目录,然后根据轮询器配置发出每个文件。从 5.0 版本开始,您可以在需要检索新文件时限制从 SFTP 服务器获取的文件数量。当目标文件较大或在带有持久化文件列表过滤器的集群系统中运行时,这会很有益,本节后面会讨论此过滤器。为此目的使用 max-fetch-size。负值(默认值)表示没有限制,并且会检索所有匹配的文件。有关更多信息,请参阅 入站通道适配器:控制远程文件获取。自 5.0 版本以来,您还可以通过设置 scanner 属性向 inbound-channel-adapter 提供自定义的 DirectoryScanner 实现。

从 Spring Integration 3.0 开始,您可以指定 preserve-timestamp 属性(默认为 false)。当设置为 true 时,本地文件的修改时间戳会设置为从服务器检索到的值。否则,它会设置为当前时间。

从 4.2 版本开始,您可以指定 remote-directory-expression 代替 remote-directory,它允许您在每次轮询时动态确定目录 — 例如,remote-directory-expression="@myBean.determineRemoteDir()"

有时,基于通过 filename-pattern 属性指定的简单模式进行的文件过滤可能不够用。如果出现这种情况,您可以使用 filename-regex 属性来指定正则表达式(例如,filename-regex=".*\.test$")。如果您需要完全控制,可以使用 filter 属性提供对 org.springframework.integration.file.filters.FileListFilter 自定义实现的引用,这是一个用于过滤文件列表的策略接口。此过滤器确定哪些远程文件会被检索。您还可以通过使用 CompositeFileListFilter 将基于模式的过滤器与其他过滤器(例如 AcceptOnceFileListFilter,以避免同步之前已获取的文件)结合使用。

AcceptOnceFileListFilter 将其状态存储在内存中。如果您希望状态在系统重启后仍然存在,请考虑使用 SftpPersistentAcceptOnceFileListFilter。此过滤器会将接受的文件名存储在 MetadataStore 策略的实例中(请参阅 元数据存储)。此过滤器根据文件名和远程修改时间进行匹配。

自 4.0 版本以来,此过滤器需要一个 ConcurrentMetadataStore。当与共享数据存储(例如使用 RedisMetadataStoreRedis)一起使用时,这允许在多个应用程序或服务器实例之间共享过滤键。

从 5.0 版本开始,默认情况下,对于 SftpInboundFileSynchronizer 会应用带有内存中的 SimpleMetadataStoreSftpPersistentAcceptOnceFileListFilter。此过滤器也会与 XML 配置中的 regexpattern 选项以及 Java DSL 中的 SftpInboundChannelAdapterSpec 一起应用。您可以通过使用 CompositeFileListFilter(或 ChainFileListFilter)处理任何其他用例。

上面的讨论是指在检索文件之前对文件进行过滤。文件检索后,会对文件系统上的文件应用额外的过滤器。默认情况下,这是一个 AcceptOnceFileListFilter,如本节所述,它将状态保留在内存中,并且不考虑文件的修改时间。除非您的应用程序在处理后删除文件,否则适配器在应用程序重启后默认会重新处理磁盘上的文件。

此外,如果您将 filter 配置为使用 SftpPersistentAcceptOnceFileListFilter 并且远程文件时间戳发生变化(导致其被重新获取),则默认的本地过滤器不允许处理此新文件。

有关此过滤器的更多信息以及如何使用它,请参阅 远程持久化文件列表过滤器

您可以使用 local-filter 属性来配置本地文件系统过滤器的行为。从 4.3.8 版本开始,默认配置了一个 FileSystemPersistentAcceptOnceFileListFilter。此过滤器将接受的文件名和修改时间戳存储在 MetadataStore 策略的实例中(请参阅 元数据存储),并检测本地文件修改时间的变化。默认的 MetadataStore 是一个 SimpleMetadataStore,它将状态存储在内存中。

自 4.1.5 版本以来,这些过滤器新增了一个名为 flushOnUpdate 的属性,它会导致它们在每次更新时刷新元数据存储(如果存储实现了 Flushable)。

此外,如果您使用分布式 MetadataStore(例如 Redis 元数据存储),您可以拥有同一适配器或应用程序的多个实例,并确保只有一个实例处理文件。

实际的本地过滤器是一个 CompositeFileListFilter,它包含提供的过滤器和一个模式过滤器,用于阻止处理正在下载中的文件(基于 temporary-file-suffix)。文件会以该后缀(默认为 .writing)下载,并在传输完成后重命名为最终名称,使其对过滤器“可见”。

有关这些属性的更多详细信息,请参阅 schema

SFTP 入站通道适配器是一个轮询消费者。因此,您必须配置一个轮询器(可以是全局默认的或本地元素)。文件传输到本地目录后,会生成一个以 java.io.File 作为负载类型的消息,并发送到由 channel 属性标识的通道。

从 6.2 版本开始,您可以使用 SftpLastModifiedFileListFilter 基于最后修改策略来过滤 SFTP 文件。此过滤器可以使用 age 属性进行配置,以便只有早于此值的文件才通过过滤器。默认年龄为 60 秒,但您应选择足够大的年龄以避免过早获取文件(例如由于网络故障)。有关更多信息,请查看其 Javadoc。

更多关于文件过滤和大型文件的信息

有时,刚出现在被监控(远程)目录中的文件可能不完整。通常,此类文件会写入带有临时扩展名(例如文件名为 something.txt.writing 的文件上的 .writing),然后在写入过程完成后重命名。在大多数情况下,开发人员只对完整的文件感兴趣,并希望仅过滤这些文件。为了处理这些情况,您可以使用 filename-patternfilename-regexfilter 属性提供的过滤支持。如果您需要自定义过滤器实现,可以通过设置 filter 属性在您的适配器中包含一个引用。以下示例展示了如何实现

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
            channel="receiveChannel"
            session-factory="sftpSessionFactory"
            filter="customFilter"
            local-directory="file:/local-test-dir"
            remote-directory="/remote-test-dir">
        <int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>

<bean id="customFilter" class="org.foo.CustomFilter"/>

从故障中恢复

您应该了解适配器的架构。文件同步器获取文件,而 FileReadingMessageSource 为每个同步文件发出一条消息。如前所述,涉及两个过滤器。filter 属性(和模式)指代远程 (SFTP) 文件列表,以避免获取已经获取的文件。FileReadingMessageSource 使用 local-filter 来确定哪些文件将作为消息发送。

同步器列出远程文件并咨询其过滤器。然后传输文件。如果在文件传输过程中发生 IO 错误,任何已添加到过滤器的文件都会被移除,以便它们在下一次轮询时有资格被重新获取。这仅适用于过滤器实现了 ReversibleFileListFilter(例如 AcceptOnceFileListFilter)的情况。

如果在同步文件后,下游流处理文件时发生错误,则不会自动回滚过滤器,因此默认情况下失败的文件不会被重新处理。

如果您希望在故障后重新处理此类文件,可以使用类似于以下的配置来方便地从过滤器中移除失败的文件

<int-sftp:inbound-channel-adapter id="sftpAdapter"
        session-factory="sftpSessionFactory"
        channel="requestChannel"
        remote-directory-expression="'/sftpSource'"
        local-directory="file:myLocalDir"
        auto-create-local-directory="true"
        filename-pattern="*.txt">
    <int:poller fixed-rate="1000">
        <int:transactional synchronization-factory="syncFactory" />
    </int:poller>
</int-sftp:inbound-channel-adapter>

<bean id="acceptOnceFilter"
    class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>

<bean id="transactionManager"
    class="org.springframework.integration.transaction.PseudoTransactionManager" />

前面的配置适用于任何 ResettableFileListFilter

从 5.0 版本开始,入站通道适配器可以根据生成的本地文件名在本地构建子目录。这也可以是远程子路径。为了能够根据层次结构支持递归读取本地目录进行修改,您现在可以使用基于 Files.walk() 算法的新 RecursiveDirectoryScanner 为内部的 FileReadingMessageSource 提供支持。有关更多信息,请参阅 AbstractInboundFileSynchronizingMessageSource.setScanner()。此外,您现在可以通过使用 setUseWatchService() 选项将 AbstractInboundFileSynchronizingMessageSource 切换到基于 WatchServiceDirectoryScanner。它也配置为所有 WatchEventType 实例对本地目录中的任何修改做出反应。前面展示的重新处理示例基于 FileReadingMessageSource.WatchServiceDirectoryScanner 的内置功能,该功能在文件从本地目录中删除时(StandardWatchEventKinds.ENTRY_DELETE)使用 ResettableFileListFilter.remove()。有关更多信息,请参阅 WatchServiceDirectoryScanner

使用 Java 配置

以下 Spring Boot 应用程序展示了如何使用 Java 配置入站适配器的示例

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(port);
        factory.setUser("foo");
        factory.setPassword("foo");
        factory.setAllowUnknownKeys(true);
        factory.setTestSession(true);
        return new CachingSessionFactory<>(factory);
    }

    @Bean
    public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
        SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(false);
        fileSynchronizer.setRemoteDirectory("foo");
        fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
    public MessageSource<File> sftpMessageSource() {
        SftpInboundFileSynchronizingMessageSource source =
                new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("sftp-inbound"));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        source.setMaxFetchSize(1);
        return source;
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

使用 Java DSL 配置

以下 Spring Boot 应用程序展示了如何使用 Java DSL 配置入站适配器的示例

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow sftpInboundFlow() {
        return IntegrationFlow
            .from(Sftp.inboundAdapter(this.sftpSessionFactory)
                    .preserveTimestamp(true)
                    .remoteDirectory("foo")
                    .regexFilter(".*\\.txt$")
                    .localFilenameExpression("#this.toUpperCase() + '.a'")
                    .localDirectory(new File("sftp-inbound")),
                 e -> e.id("sftpInboundAdapter")
                    .autoStartup(true)
                    .poller(Pollers.fixedDelay(5000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
    }
}

处理不完整数据

请参阅 处理不完整数据

提供了 SftpSystemMarkerFilePresentFileListFilter 以过滤远程系统上没有相应标记文件的远程文件。有关配置信息,请参阅其 Javadoc