读取文件
可以使用 FileReadingMessageSource
从文件系统消费文件。这是 MessageSource
的一个实现,它从文件系统目录创建消息。以下示例展示了如何配置 FileReadingMessageSource
:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:directory="${input.directory}"/>
为了防止为某些文件创建消息,你可以提供一个 FileListFilter
。默认情况下,我们使用以下过滤器:
-
IgnoreHiddenFileListFilter
-
AcceptOnceFileListFilter
IgnoreHiddenFileListFilter
确保隐藏文件不会被处理。请注意,隐藏的确切定义取决于系统。例如,在基于 UNIX 的系统上,以点字符开头的文件被认为是隐藏的。另一方面,Microsoft Windows 有一个专门的文件属性来指示隐藏文件。
4.2 版本引入了 |
AcceptOnceFileListFilter
确保文件只从目录中被选取一次。
自 4.0 版本以来,此过滤器需要 自 4.1.5 版本以来,此过滤器具有一个新属性( |
持久文件列表过滤器现在有一个布尔属性 forRecursion
。将此属性设置为 true
,也会设置 alwaysAcceptDirectories
,这意味着出站网关(ls
和 mget
)上的递归操作现在每次都会遍历完整的目录树。这是为了解决目录树深处更改未被检测到的问题。此外,forRecursion=true
会导致文件的完整路径用作元数据存储的键;这解决了如果具有相同名称的文件出现在不同目录中多次时过滤器无法正常工作的问题。重要提示:这意味着在持久元数据存储中,顶级目录下的文件将找不到现有键。因此,该属性默认为 false
;这可能会在未来的版本中更改。
以下示例配置了带有过滤器的 FileReadingMessageSource
:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="customFilterBean"/>
读取文件的一个常见问题是文件可能在准备好之前就被检测到(即,某些其他进程可能仍在写入文件)。默认的 AcceptOnceFileListFilter
不会阻止这种情况。在大多数情况下,如果文件写入进程在文件准备好读取后立即重命名文件,则可以防止这种情况。一个基于 filename-pattern
或 filename-regex
的过滤器(可能基于已知后缀)接受仅准备好的文件,与默认的 AcceptOnceFileListFilter
组合使用,可以应对这种情况。CompositeFileListFilter
可以实现这种组合,如下例所示:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="compositeFilter"/>
<bean id="compositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
<bean class="o.s.i.file.filters.RegexPatternFileListFilter">
<constructor-arg value="^test.*$"/>
</bean>
</list>
</constructor-arg>
</bean>
如果无法使用临时名称创建文件并重命名为最终名称,Spring Integration 提供了另一种选择。4.2 版本添加了 LastModifiedFileListFilter
。可以为该过滤器配置一个 age
属性,以便只有早于此值的文件才能通过过滤器。默认年龄为 60 秒,但你应该选择一个足够大的年龄,以避免过早地选取文件(例如,由于网络故障)。以下示例展示了如何配置 LastModifiedFileListFilter
:
<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
<property name="age" value="120" />
</bean>
从 4.3.7 版本开始,引入了 ChainFileListFilter
(CompositeFileListFilter
的扩展),以便在后续过滤器仅看到前一个过滤器的结果时允许使用场景。(使用 CompositeFileListFilter
时,所有过滤器都会看到所有文件,但它只传递已通过所有过滤器的文件)。需要新行为的一个示例是 LastModifiedFileListFilter
和 AcceptOnceFileListFilter
的组合,在这种情况下,我们不希望在一段时间过去之前接受文件。使用 CompositeFileListFilter
时,由于 AcceptOnceFileListFilter
在第一次扫描时看到所有文件,当另一个过滤器通过时,它以后不会再传递它。CompositeFileListFilter
方法在模式过滤器与自定义过滤器(用于查找辅助文件以指示文件传输完成)结合使用时很有用。模式过滤器可能只传递主文件(例如 something.txt
),但“完成”过滤器需要查看(例如)something.done
是否存在。
假设我们有文件 a.txt
、a.done
和 b.txt
。
模式过滤器仅传递 a.txt
和 b.txt
,而“完成”过滤器看到所有三个文件并仅传递 a.txt
。组合过滤器的最终结果是仅释放 a.txt
。
使用 ChainFileListFilter 时,如果链中的任何过滤器返回空列表,则不会调用剩余的过滤器。 |
5.0 版本引入了 ExpressionFileListFilter
,用于对文件执行 SpEL 表达式作为上下文评估根对象。为此,所有用于文件处理(本地和远程)的 XML 组件,以及现有的 filter
属性,都提供了 filter-expression
选项,如下例所示:
<int-file:inbound-channel-adapter
directory="${inputdir}"
filter-expression="name matches '.text'"
auto-startup="false"/>
5.0.5 版本引入了对拒绝文件感兴趣的 DiscardAwareFileListFilter
实现。为此,应通过 addDiscardCallback(Consumer<File>)
为此类过滤器实现提供回调。在框架中,此功能与 LastModifiedFileListFilter
结合使用,来自 FileReadingMessageSource.WatchServiceDirectoryScanner
。与常规的 DirectoryScanner
不同,WatchService
根据目标文件系统上的事件提供文件进行处理。在轮询带有这些文件的内部队列时,LastModifiedFileListFilter
可能会因为它们相对于其配置的 age
太年轻而丢弃它们。因此,我们丢失了未来可能考虑的文件。丢弃回调钩子允许我们将文件保留在内部队列中,以便在后续轮询中针对 age
进行检查。CompositeFileListFilter
也实现了 DiscardAwareFileListFilter
,并向其所有 DiscardAwareFileListFilter
委托填充丢弃回调。
由于 CompositeFileListFilter 将文件与所有委托进行匹配,因此对于同一文件,discardCallback 可能会被调用多次。 |
从 5.1 版本开始,FileReadingMessageSource
不再检查目录是否存在,也不会创建它,直到其 start()
被调用(通常通过包装 SourcePollingChannelAdapter
)。以前,例如从测试中引用目录时,或者权限稍后应用时,没有简单的方法可以防止操作系统权限错误。
消息头
从 5.0 版本开始,FileReadingMessageSource
(除了作为轮询的 File
的 payload
)还会为出站 Message
填充以下消息头:
-
FileHeaders.FILENAME
:要发送文件的File.getName()
。可用于后续的重命名或复制逻辑。 -
FileHeaders.ORIGINAL_FILE
:File
对象本身。通常,当丢失原始的File
对象时,此消息头由框架组件(例如 分割器 或 转换器)自动填充。然而,为了与任何其他自定义用例保持一致和方便,此消息头对于访问原始文件可能很有用。 -
FileHeaders.RELATIVE_PATH
:引入了一个新的消息头,用于表示文件路径相对于扫描根目录的部分。当需要在其他位置恢复源目录层次结构时,此消息头可能很有用。为此,可以配置DefaultFileNameGenerator
(参见 生成文件名)来使用此消息头。
目录扫描和轮询
FileReadingMessageSource
不会立即为目录中的文件生成消息。它使用内部队列存储由 scanner
返回的“合格文件”。scanEachPoll
选项用于确保内部队列在每次轮询时使用最新的输入目录内容进行刷新。默认情况下(scanEachPoll = false
),FileReadingMessageSource
会在再次扫描目录之前清空其队列。这种默认行为对于减少对目录中大量文件的扫描特别有用。然而,在需要自定义排序的情况下,考虑将此标志设置为 true
的影响非常重要。文件处理的顺序可能与预期不符。默认情况下,队列中的文件按照其自然顺序(path
)处理。扫描添加的新文件,即使队列中已有文件,也会被插入到适当的位置以保持该自然顺序。要自定义顺序,FileReadingMessageSource
可以接受一个 Comparator<File>
作为构造函数参数。它由内部的 PriorityBlockingQueue
使用,以根据业务需求重新排序其内容。因此,要按照特定顺序处理文件,你应该为 FileReadingMessageSource
提供一个比较器,而不是对自定义 DirectoryScanner
生成的列表进行排序。
5.0 版本引入了 RecursiveDirectoryScanner
来执行文件树遍历。该实现基于 Files.walk(Path start, int maxDepth, FileVisitOption… options)
功能。根目录(DirectoryScanner.listFiles(File)
)参数从结果中排除。所有其他子目录的包含和排除都基于目标 FileListFilter
实现。例如,SimplePatternFileListFilter
默认过滤掉目录。更多信息请参见 AbstractDirectoryAwareFileListFilter
及其实现。
从 5.5 版本开始,Java DSL 的 FileInboundChannelAdapterSpec 有一个方便的 recursive(boolean) 选项,可以在目标 FileReadingMessageSource 中使用 RecursiveDirectoryScanner 代替默认的。 |
命名空间支持
使用文件特定的命名空间可以简化文件读取的配置。为此,请使用以下模板:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>
在该命名空间内,你可以简化 FileReadingMessageSource
并将其包装在入站通道适配器中,如下所示:
<int-file:inbound-channel-adapter id="filesIn1"
directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>
<int-file:inbound-channel-adapter id="filesIn2"
directory="file:${input.directory}"
filter="customFilterBean" />
<int-file:inbound-channel-adapter id="filesIn3"
directory="file:${input.directory}"
filename-pattern="test*" />
<int-file:inbound-channel-adapter id="filesIn4"
directory="file:${input.directory}"
filename-regex="test[0-9]+\.txt" />
第一个通道适配器示例依赖于默认的 FileListFilter
实现:
-
IgnoreHiddenFileListFilter
(不处理隐藏文件) -
AcceptOnceFileListFilter
(防止重复)
因此,你也可以省略 prevent-duplicates
和 ignore-hidden
属性,因为它们默认都为 true
。
Spring Integration 4.2 引入了 |
第二个通道适配器示例使用自定义过滤器,第三个使用 filename-pattern
属性添加一个基于 AntPathMatcher
的过滤器,第四个使用 filename-regex
属性为 FileReadingMessageSource
添加一个基于正则表达式模式的过滤器。filename-pattern
和 filename-regex
属性彼此与常规的 filter
引用属性互斥。但是,你可以使用 filter
属性引用一个 CompositeFileListFilter
实例,该实例可以组合任意数量的过滤器,包括一个或多个基于模式的过滤器,以满足你的特定需求。
当多个进程从同一目录读取时,你可能希望锁定文件以防止它们被并发拾取。为此,可以使用 FileLocker
。有一个基于 java.nio
的实现可用,但也可以实现自己的锁定方案。可以按如下方式注入 nio
locker:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:nio-locker/>
</int-file:inbound-channel-adapter>
你可以按如下方式配置自定义 locker:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
当文件入站适配器配置了 locker 时,它负责在允许接收文件之前获取锁。它不承担解锁文件的责任。如果你已经处理了文件并保留了悬挂的锁,就会出现内存泄漏。如果这是一个问题,你应该在适当的时候自己调用 FileLocker.unlock(File file) 。 |
当过滤和锁定文件不足时,你可能需要完全控制文件的列出方式。为了实现这种类型的需求,可以使用 DirectoryScanner
的实现。此扫描器允许你精确确定每次轮询时列出哪些文件。这也是 Spring Integration 在内部用于将 FileListFilter
实例和 FileLocker
连接到 FileReadingMessageSource
的接口。你可以在 <int-file:inbound-channel-adapter/>
的 scanner
属性上注入自定义的 DirectoryScanner
,如下例所示:
<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
scanner="customDirectoryScanner"/>
这样做可以让你完全自由地选择排序、列出和锁定策略。
同样重要的是要理解过滤器(包括 patterns
、regex
、prevent-duplicates
等)和 locker
实例实际上是由 scanner
使用的。在适配器上设置的任何这些属性随后都会注入到内部 scanner
中。对于外部 scanner
的情况,在 FileReadingMessageSource
上禁止所有过滤器和 locker 属性。它们必须(如果需要)在该自定义 DirectoryScanner
上指定。换句话说,如果你将 scanner
注入到 FileReadingMessageSource
中,你应该在该 scanner
上提供 filter
和 locker
,而不是在 FileReadingMessageSource
上。
默认情况下,DefaultDirectoryScanner 使用一个 IgnoreHiddenFileListFilter 和一个 AcceptOnceFileListFilter 。为了防止它们的使用,你可以配置自己的过滤器(例如 AcceptAllFileListFilter ),甚至将其设置为 null 。 |
WatchServiceDirectoryScanner
FileReadingMessageSource.WatchServiceDirectoryScanner
依赖于当新文件被添加到目录时产生的文件系统事件。在初始化期间,目录会被注册以生成事件。初始文件列表也在初始化期间构建。遍历目录树时,遇到的任何子目录也会被注册以生成事件。在第一次轮询时,返回遍历目录得到的初始文件列表。在随后的轮询中,返回来自新创建事件的文件。如果添加了新的子目录,其创建事件将被用来遍历新的子树以查找现有文件并注册找到的任何新子目录。
当程序的处理速度跟不上目录修改事件的发生速度时,WatchKey 的内部事件队列 queue 可能会出现问题。如果队列大小超出限制,会发出一个 StandardWatchEventKinds.OVERFLOW 事件,表明可能丢失了一些文件系统事件。在这种情况下,根目录会被完全重新扫描。为了避免重复,请考虑使用合适的 FileListFilter (例如 AcceptOnceFileListFilter )或在处理完成后移除文件。 |
可以通过 FileReadingMessageSource.use-watch-service
选项启用 WatchServiceDirectoryScanner
,该选项与 scanner
选项互斥。对于提供的 directory
,会填充一个内部的 FileReadingMessageSource.WatchServiceDirectoryScanner
实例。
此外,现在 WatchService
的轮询逻辑可以跟踪 StandardWatchEventKinds.ENTRY_MODIFY
和 StandardWatchEventKinds.ENTRY_DELETE
事件。
如果你需要跟踪现有文件的修改以及新文件的添加,你应该在 FileListFilter
中实现 ENTRY_MODIFY
事件的逻辑。否则,来自这些事件的文件将被以相同的方式处理。
ResettableFileListFilter
的实现会接收到 ENTRY_DELETE
事件。因此,这些文件会被提供给 remove()
操作。当启用此事件时,像 AcceptOnceFileListFilter
这样的过滤器会将文件移除。结果是,如果出现同名文件,它会通过过滤器并作为消息发送。
为此,引入了 watch-events
属性(FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents)
)。(WatchEventType
是 FileReadingMessageSource
中的一个公共内部枚举。) 通过此选项,我们可以为新文件使用一种下游流程逻辑,为修改的文件使用另一种逻辑。以下示例展示了如何在同一目录中为创建和修改事件配置不同的逻辑
值得一提的是,ENTRY_DELETE
事件与被监控目录的子目录的重命名操作有关。更具体地说,与旧目录名相关的 ENTRY_DELETE
事件发生在通知新(重命名)目录的 ENTRY_CREATE
事件之前。在某些操作系统(如 Windows)上,必须注册 ENTRY_DELETE
事件来处理这种情况。否则,在文件资源管理器中重命名被监控的子目录可能导致在该子目录中无法检测到新文件。
<int-file:inbound-channel-adapter id="newFiles"
directory="${input.directory}"
use-watch-service="true"/>
<int-file:inbound-channel-adapter id="modifiedFiles"
directory="${input.directory}"
use-watch-service="true"
filter="acceptAllFilter"
watch-events="MODIFY"/> <!-- The default is CREATE. -->
从 6.1 版本开始,FileReadingMessageSource
暴露了两个新的与 WatchService
相关的选项
-
watchMaxDepth
- 作为Files.walkFileTree(Path root, Set attributes, int maxDepth, FileVisitor visitor)
API 的参数; -
watchDirPredicate
- 一个Predicate<Path>
,用于测试扫描树中的目录是否应该被遍历并注册到WatchService
以及配置的监视事件种类。
限制内存消耗
您可以使用 HeadDirectoryScanner
来限制内存中保留的文件数量。这在扫描大型目录时非常有用。通过 XML 配置,可以通过在入站通道适配器上设置 queue-size
属性来启用此功能。
在 4.2 版本之前,此设置与使用任何其他过滤器不兼容。任何其他过滤器(包括 prevent-duplicates="true"
)都会覆盖用于限制大小的过滤器。
使用 通常,在这种情况下,你应该移除已处理的文件,而不是使用 |
使用 Java Configuration 进行配置
以下 Spring Boot 应用程序展示了如何使用 Java Configuration 配置出站适配器的示例
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(INBOUND_PATH));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}
@Bean
@Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序展示了如何使用 Java DSL 配置出站适配器的示例
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.channel("processFileChannel")
.get();
}
}
文件“跟踪”(tail)
另一个常见的用例是从文件的末尾(或尾部)获取“行”,并在添加新行时捕获它们。提供了两种实现。第一种是 OSDelegatingFileTailingMessageProducer
,它使用原生 tail
命令(在支持此命令的操作系统上)。这通常是这些平台上最有效的实现。对于没有 tail
命令的操作系统,第二种实现 ApacheCommonsFileTailingMessageProducer
使用 Apache commons-io
库的 Tailer
类。
在这两种情况下,文件系统事件(例如文件不可用和其他事件)都通过正常的 Spring 事件发布机制作为 ApplicationEvent
实例发布。此类事件的示例包括以下内容
[message=tail: cannot open '/tmp/somefile' for reading:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become inaccessible:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has appeared;
following end of new file, file=/tmp/somefile]
上述示例中显示的事件序列可能发生,例如,当文件被轮转时。
从 5.0 版本开始,当在 idleEventInterval
期间文件没有数据时,会发出 FileTailingIdleEvent
。以下示例显示了此类事件的样子
[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
并非所有支持 tail 命令的平台都提供这些状态消息。 |
从这些端点发出的消息包含以下头部信息
-
FileHeaders.ORIGINAL_FILE
:File
对象 -
FileHeaders.FILENAME
: 文件名 (File.getName()
)
在 5.0 版本之前,FileHeaders.FILENAME 头部包含文件的绝对路径的字符串表示。现在可以通过对原始文件头部调用 getAbsolutePath() 来获取该字符串表示。 |
以下示例创建一个具有默认选项('-F -n 0',表示从当前文件末尾跟踪文件)的原生适配器。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
task-executor="exec"
file="/tmp/foo"/>
以下示例创建一个具有 '-F -n +0' 选项(表示跟踪文件,并发出所有现有行)的原生适配器。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
native-options="-F -n +0"
task-executor="exec"
file-delay=10000
file="/tmp/foo"/>
如果 tail
命令失败(在某些平台上,文件丢失会导致 tail
失败,即使指定了 `-F`),该命令会每 10 秒重试一次。
默认情况下,原生适配器从标准输出捕获内容并将其作为消息发送。它们也从标准错误捕获信息以触发事件。从 4.3.6 版本开始,您可以通过将 enable-status-reader
设置为 false
来丢弃标准错误事件,如下例所示
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
enable-status-reader="false"
task-executor="exec"
file="/tmp/foo"/>
在以下示例中,IdleEventInterval
设置为 5000
,这意味着如果在五秒内没有写入任何行,则每五秒触发一次 FileTailingIdleEvent
。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
idle-event-interval="5000"
task-executor="exec"
file="/tmp/somefile"/>
这在需要停止适配器时非常有用。
以下示例创建了一个 Apache commons-io
Tailer
适配器,它每两秒检查一次文件是否有新行,并每十秒检查一次丢失文件是否存在。
<int-file:tail-inbound-channel-adapter id="apache"
channel="input"
task-executor="exec"
file="/tmp/bar"
delay="2000"
end="false" (1)
reopen="true" (2)
file-delay="10000"/>
1 | 文件从开头 (end="false" ) 而非末尾(默认值)开始跟踪。 |
2 | 文件对每个块进行重新打开(默认是保持文件打开)。 |
指定 delay 、end 或 reopen 属性会强制使用 Apache commons-io 适配器,并使 native-options 属性不可用。 |