SFTP 入站通道适配器
SFTP 入站通道适配器是一种特殊的监听器,它连接到服务器并监听远程目录事件(例如创建新文件),此时它会启动文件传输。以下示例展示了如何配置 SFTP 入站通道适配器。
<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
session-factory="sftpSessionFactory"
channel="requestChannel"
filename-pattern="*.txt"
remote-directory="/foo/bar"
preserve-timestamp="true"
local-directory="file:target/foo"
auto-create-local-directory="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
delete-remote-files="false">
<int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>
上述配置示例展示了如何为各种属性提供值,包括以下内容:
-
local-directory:文件将被传输到的位置。 -
remote-directory:文件将被传输的远程源目录。 -
session-factory:对我们之前配置的 bean 的引用。
默认情况下,传输的文件与原始文件具有相同的名称。如果您想覆盖此行为,可以设置 local-filename-generator-expression 属性,该属性允许您提供一个 SpEL 表达式来生成本地文件的名称。与出站网关和适配器不同,在出站网关和适配器中,SpEL 评估上下文的根对象是 Message,此入站适配器在评估时还没有消息,因为它最终会生成以传输文件作为其 payload 的消息。因此,SpEL 评估上下文的根对象是远程文件的原始名称(一个 String)。
入站通道适配器首先将文件检索到本地目录,然后根据轮询器配置发出每个文件。从版本 5.0 开始,当需要新的文件检索时,您可以限制从 SFTP 服务器获取的文件数量。当目标文件较大或在具有持久文件列表过滤器的集群系统中运行时,这可能是有益的,本节稍后将讨论。为此,请使用 max-fetch-size。负值(默认值)表示没有限制,并且检索所有匹配的文件。有关更多信息,请参阅入站通道适配器:控制远程文件获取。从版本 5.0 开始,您还可以通过设置 scanner 属性为 inbound-channel-adapter 提供自定义的 DirectoryScanner 实现。
从 Spring Integration 3.0 开始,您可以指定 preserve-timestamp 属性(默认值为 false)。当为 true 时,本地文件的修改时间戳被设置为从服务器检索到的值。否则,它被设置为当前时间。
从版本 4.2 开始,您可以指定 remote-directory-expression 而不是 remote-directory,这允许您在每次轮询时动态确定目录 — 例如,remote-directory-expression="@myBean.determineRemoteDir()"。
有时,基于 filename-pattern 属性指定的简单模式进行文件过滤可能不足。在这种情况下,您可以使用 filename-regex 属性来指定正则表达式,例如 filename-regex=".*\.test$"。如果您需要完全控制,可以使用 filter 属性来提供对 org.springframework.integration.file.filters.FileListFilter 自定义实现的引用,这是一个用于过滤文件列表的策略接口。此过滤器决定哪些远程文件被检索。您还可以通过使用 CompositeFileListFilter 将基于模式的过滤器与其他过滤器(例如 AcceptOnceFileListFilter,以避免同步以前已获取的文件)结合起来。
AcceptOnceFileListFilter 将其状态存储在内存中。如果您希望该状态在系统重新启动后仍然存在,请考虑使用 SftpPersistentAcceptOnceFileListFilter。此过滤器将接受的文件名存储在 MetadataStore 策略的实例中(请参阅元数据存储)。此过滤器根据文件名和远程修改时间进行匹配。
自版本 4.0 以来,此过滤器需要一个 ConcurrentMetadataStore。当与共享数据存储(例如使用 RedisMetadataStore 的 Redis)一起使用时,这允许在多个应用程序或服务器实例之间共享过滤器键。
从版本 5.0 开始,默认情况下,对于 SftpInboundFileSynchronizer,应用带有内存中 SimpleMetadataStore 的 SftpPersistentAcceptOnceFileListFilter。此过滤器也与 XML 配置中的 regex 或 pattern 选项以及 Java DSL 中的 SftpInboundChannelAdapterSpec 一起应用。您可以通过使用 CompositeFileListFilter(或 ChainFileListFilter)来处理任何其他用例。
以上讨论指的是在检索文件之前进行过滤。文件检索后,会有一个额外的过滤器应用于文件系统上的文件。默认情况下,这是一个 AcceptOnceFileListFilter,如本节所述,它将状态保留在内存中,并且不考虑文件的修改时间。除非您的应用程序在处理后删除文件,否则适配器默认会在应用程序重新启动后重新处理磁盘上的文件。
此外,如果您将 filter 配置为使用 SftpPersistentAcceptOnceFileListFilter 并且远程文件时间戳发生更改(导致其被重新获取),则默认的本地过滤器不允许处理此新文件。
有关此过滤器及其使用方式的更多信息,请参阅远程持久文件列表过滤器。
您可以使用 local-filter 属性来配置本地文件系统过滤器的行为。从版本 4.3.8 开始,默认配置了一个 FileSystemPersistentAcceptOnceFileListFilter。此过滤器将接受的文件名和修改时间戳存储在 MetadataStore 策略的实例中(请参阅元数据存储),并检测本地文件修改时间的变化。默认的 MetadataStore 是一个将状态存储在内存中的 SimpleMetadataStore。
自版本 4.1.5 以来,这些过滤器有一个名为 flushOnUpdate 的新属性,它导致它们在每次更新时刷新元数据存储(如果存储实现了 Flushable)。
此外,如果您使用分布式 MetadataStore(例如Redis 元数据存储),您可以拥有同一适配器或应用程序的多个实例,并确保只有一个实例处理一个文件。 |
实际的本地过滤器是一个 ChainFileListFilter,它包含一个模式过滤器,用于防止处理正在下载的文件(基于 temporary-file-suffix)和提供的过滤器。文件以此后缀(默认为 .writing)下载,并在传输完成后将文件重命名为最终名称,使其对过滤器“可见”。
有关这些属性的更多详细信息,请参阅模式。
SFTP 入站通道适配器是一个轮询消费者。因此,您必须配置一个轮询器(可以是全局默认的,也可以是本地元素)。一旦文件被传输到本地目录,就会生成一个以 java.io.File 为其 payload 类型并发送到由 channel 属性标识的通道的消息。
从版本 6.2 开始,您可以使用 SftpLastModifiedFileListFilter 根据最后修改策略过滤 SFTP 文件。此过滤器可以配置一个 age 属性,以便只有早于此值的文件才能通过过滤器。年龄默认为 60 秒,但您应该选择一个足够大的年龄,以避免过早地获取文件(例如,由于网络故障)。有关更多信息,请参阅其 Javadoc。
相反,从版本 6.5 开始,引入了 SftpRecentFileListFilter 以仅接受不早于提供 age 的文件。
更多关于文件过滤和大型文件
有时,刚出现在受监视(远程)目录中的文件不完整。通常,此类文件以某种临时扩展名(例如名为 something.txt.writing 的文件上的 .writing)写入,然后在写入过程完成后重命名。在大多数情况下,开发人员只对完整的文件感兴趣,并希望只过滤这些文件。为了处理这些情况,您可以使用 filename-pattern、filename-regex 和 filter 属性提供的过滤支持。如果您需要自定义过滤器实现,您可以通过设置 filter 属性在适配器中包含一个引用。以下示例展示了如何实现:
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="receiveChannel"
session-factory="sftpSessionFactory"
filter="customFilter"
local-directory="file:/local-test-dir"
remote-directory="/remote-test-dir">
<int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>
<bean id="customFilter" class="org.foo.CustomFilter"/>
从故障中恢复
您应该了解适配器的架构。文件同步器获取文件,FileReadingMessageSource 为每个同步的文件发出消息。如前面讨论的,涉及两个过滤器。filter 属性(和模式)指远程(SFTP)文件列表,以避免获取已经获取的文件。FileReadingMessageSource 使用 local-filter 来确定哪些文件将作为消息发送。
同步器列出远程文件并咨询其过滤器。然后传输文件。如果在文件传输过程中发生 IO 错误,则已添加到过滤器的任何文件都将被删除,以便它们可以在下一次轮询时重新获取。这仅适用于过滤器实现了 ReversibleFileListFilter(例如 AcceptOnceFileListFilter)的情况。
如果同步文件后,在下游流程处理文件时发生错误,则不会自动回滚过滤器,因此默认情况下不会重新处理失败的文件。
如果您希望在故障后重新处理此类文件,您可以使用类似于以下内容的配置来方便地从过滤器中删除失败的文件:
<int-sftp:inbound-channel-adapter id="sftpAdapter"
session-factory="sftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/sftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-sftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
上述配置适用于任何 ResettableFileListFilter。
从版本 5.0 开始,入站通道适配器可以根据生成的本地文件名在本地构建子目录。这也可以是一个远程子路径。为了能够根据层次结构支持递归读取本地目录以进行修改,您现在可以使用基于 Files.walk() 算法的新 RecursiveDirectoryScanner 向内部 FileReadingMessageSource 提供服务。有关更多信息,请参阅AbstractInboundFileSynchronizingMessageSource.setScanner()。此外,您现在可以通过使用 setUseWatchService() 选项将 AbstractInboundFileSynchronizingMessageSource 切换到基于 WatchService 的 DirectoryScanner。它还配置了所有 WatchEventType 实例,以对本地目录中的任何修改作出反应。前面显示的回处理示例基于 FileReadingMessageSource.WatchServiceDirectoryScanner 的内置功能,该功能在文件从本地目录删除 (StandardWatchEventKinds.ENTRY_DELETE) 时使用 ResettableFileListFilter.remove()。有关更多信息,请参阅WatchServiceDirectoryScanner。
使用 Java 配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置入站适配器的示例
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(port);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
factory.setTestSession(true);
return new CachingSessionFactory<>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File("sftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序展示了如何使用 Java DSL 配置入站适配器的示例:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlow
.from(Sftp.inboundAdapter(this.sftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilenameExpression("#this.toUpperCase() + '.a'")
.localDirectory(new File("sftp-inbound")),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}