FTP 流式入站通道适配器

版本 4.3 引入了流式入站通道适配器。此适配器生成有效负载类型为InputStream的消息,允许在不写入本地文件系统的情况下获取文件。由于会话保持打开状态,因此在使用完文件后,使用者应用程序负责关闭会话。会话在closeableResource标头(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE)中提供。标准框架组件(如FileSplitterStreamTransformer)会自动关闭会话。有关这些组件的更多信息,请参阅文件分割器流转换器。以下示例显示如何配置inbound-streaming-channel-adapter

<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
            channel="ftpChannel"
            session-factory="sessionFactory"
            filename-pattern="*.txt"
            filename-regex=".*\.txt"
            filter="filter"
            filter-expression="@myFilterBean.check(#root)"
            remote-file-separator="/"
            comparator="comparator"
            max-fetch-size="1"
            remote-directory-expression="'foo/bar'">
        <int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>

仅允许使用filename-patternfilename-regexfilterfilter-expression中的一个。

从版本 5.0 开始,默认情况下,FtpStreamingMessageSource适配器基于内存中的SimpleMetadataStore使用FtpPersistentAcceptOnceFileListFilter防止远程文件重复。默认情况下,此过滤器也与文件名模式(或正则表达式)一起应用。如果需要允许重复,可以使用AcceptAllFileListFilter。任何其他用例都可以由CompositeFileListFilter(或ChainFileListFilter)处理。Java 配置(文档后面部分)显示了一种在处理后删除远程文件以避免重复的技术。

有关FtpPersistentAcceptOnceFileListFilter及其使用方法的更多信息,请参阅远程持久文件列表过滤器

使用max-fetch-size属性限制每次轮询时获取的文件数量(如果需要获取)。在群集环境中运行时,将其设置为1并使用持久过滤器。有关更多信息,请参阅入站通道适配器:控制远程文件获取

适配器分别将远程目录和文件名放入FileHeaders.REMOTE_DIRECTORYFileHeaders.REMOTE_FILE标头中。从版本 5.0 开始,FileHeaders.REMOTE_FILE_INFO标头提供了其他远程文件信息(默认情况下以 JSON 表示)。如果将FtpStreamingMessageSource上的fileInfoJson属性设置为false,则标头包含一个FtpFileInfo对象。可以通过使用FtpFileInfo.getFileInfo()方法访问底层 Apache Net 库提供的FTPFile对象。当使用 XML 配置时,fileInfoJson属性不可用,但可以通过将FtpStreamingMessageSource注入到您的一个配置类中来设置它。另请参阅远程文件信息

从版本 5.1 开始,comparator的泛型类型为FTPFile。以前,它是AbstractFileInfo<FTPFile>。这是因为排序现在在处理的早期阶段执行,在过滤和应用maxFetch之前。

使用 Java 配置进行配置

以下 Spring Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    @InboundChannelAdapter(channel = "stream")
    public MessageSource<InputStream> ftpMessageSource() {
        FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
        messageSource.setRemoteDirectory("ftpSource/");
        messageSource.setFilter(new AcceptAllFileListFilter<>());
        messageSource.setMaxFetchSize(1);
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "stream", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer("UTF-8");
    }

    @Bean
    public FtpRemoteFileTemplate template() {
        return new FtpRemoteFileTemplate(ftpSessionFactory());
    }

    @ServiceActivator(inputChannel = "data", adviceChain = "after")
    @Bean
    public MessageHandler handle() {
        return System.out::println;
    }

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpression(
                "@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
        advice.setPropagateEvaluationFailures(true);
        return advice;
    }

}

请注意,在此示例中,转换器下游的消息处理程序具有一个advice,该advice在处理后删除远程文件。