入站通道适配器:控制远程文件抓取
配置入站通道适配器时,应考虑两个属性。与所有轮询器一样,max-messages-per-poll 可用于限制每次轮询发出的消息数量(如果准备好的消息数量超过配置值)。max-fetch-size(从版本 5.0 开始)可以限制每次从远程服务器检索的文件数量。
以下场景假设初始状态是空的本地目录
-
max-messages-per-poll=2和max-fetch-size=1:适配器获取一个文件,发出它,然后获取下一个文件并发出它。然后它休眠直到下一次轮询。 -
max-messages-per-poll=2和max-fetch-size=2:适配器获取两个文件,然后分别发出它们。 -
max-messages-per-poll=2和max-fetch-size=4:适配器获取最多 4 个文件(如果可用),并发出前两个(如果至少有两个)。接下来的两个文件将在下一次轮询中发出。 -
max-messages-per-poll=2且未指定max-fetch-size:适配器获取所有远程文件,并发出前两个(如果至少有两个)。随后的文件将在随后的轮询中发出(每次两个)。当所有文件都被消费后,将再次尝试远程获取以获取任何新文件。
当您部署应用程序的多个实例时,我们建议设置一个较小的 max-fetch-size,以避免一个实例“霸占”所有文件而导致其他实例饥饿。 |
max-fetch-size 的另一个用途是当您希望停止获取远程文件但继续处理已获取的文件时。在 MessageSource 上设置 maxFetchSize 属性(通过编程、JMX 或通过控制总线)有效地阻止适配器获取更多文件,但允许轮询器继续发出先前已获取的文件的消息。如果属性更改时轮询器处于活动状态,则更改将在下一次轮询时生效。
从版本 5.1 开始,同步器可以提供一个 Comparator<?>。这在使用 maxFetchSize 限制获取文件数量时很有用。
从版本 6.4 开始,AbstractRemoteFileStreamingMessageSource 现在有一个方便的 clearFetchedCache() API,用于从缓存中删除未处理的远程文件的引用。这些引用保留在缓存中,因为轮询配置不允许在一个周期内处理所有文件,并且目标 SessionFactory 可能会在轮询周期之间更改,例如,通过 RotatingServerAdvice。
从版本 7.0 开始,AbstractInboundFileSynchronizer 在应用 maxFetchSize 切片后缓存一个过滤的 Session.list(remoteDirectory)。AbstractInboundFileSynchronizer.transferFilesFromRemoteToLocal() 方法的逻辑如下:
-
如果
maxFetchSize > 0,则会针对remoteDirectory获取锁,以避免在缓存工作时不同线程的竞态条件。性能下降最小,因为所有后续同步都只处理内存中缓存的剩余部分; -
如果
remoteDirectory没有缓存条目,则调用Session.list(remoteDirectory)并过滤所有返回的远程文件; -
然后将过滤结果切片到
maxFetchSize; -
然后将这些文件条目传输到本地目录;
-
其余的过滤远程文件被缓存以供后续同步;
-
如果
remoteDirectory有缓存条目,则将此类列表切片到maxFetchSize并迭代以传输到本地目录; -
如果其中一个传输失败,则
filter将从失败的远程文件重置。缓存也会被清除;因此,下一次同步将从一个干净的状态开始。
另请参阅一般 SFTP 入站通道适配器 章节,了解有关 FileListFilter 配置的信息。