读取文件
可以使用FileReadingMessageSource
从文件系统读取文件。这是一个MessageSource
的实现,它从文件系统目录创建消息。以下示例演示如何配置FileReadingMessageSource
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:directory="${input.directory}"/>
为了防止为某些文件创建消息,您可以提供一个FileListFilter
。默认情况下,我们使用以下过滤器:
-
IgnoreHiddenFileListFilter
-
AcceptOnceFileListFilter
IgnoreHiddenFileListFilter
确保不处理隐藏文件。请注意,隐藏文件的精确定义取决于系统。例如,在基于UNIX的系统上,以句点字符开头的文件被认为是隐藏文件。另一方面,Microsoft Windows有一个专用的文件属性来指示隐藏文件。
4.2版本引入了 |
AcceptOnceFileListFilter
确保仅从目录中拾取文件一次。
从4.0版本开始,此过滤器需要一个 从4.1.5版本开始,此过滤器新增了一个属性( |
持久性文件列表过滤器现在具有一个布尔属性forRecursion
。将此属性设置为true
,还会设置alwaysAcceptDirectories
,这意味着出站网关(ls
和mget
)上的递归操作现在每次都会遍历整个目录树。这是为了解决目录树深处更改未被检测到的问题。此外,forRecursion=true
导致使用文件的完整路径作为元数据存储键;这解决了如果在不同目录中多次出现同名文件,则过滤器无法正常工作的问题。重要:这意味着持久性元数据存储中存在的键将找不到顶级目录下的文件。因此,此属性默认为false
;这可能会在将来的版本中更改。
以下示例配置具有过滤器的FileReadingMessageSource
:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="customFilterBean"/>
读取文件的一个常见问题是,在文件准备好之前可能会检测到该文件(即,某些其他进程可能仍在写入该文件)。默认的AcceptOnceFileListFilter
不会阻止这种情况。在大多数情况下,如果文件写入进程在准备好读取时立即重命名每个文件,则可以防止这种情况。一个filename-pattern
或filename-regex
过滤器,它只接受已准备好的文件(可能基于已知的后缀),与默认的AcceptOnceFileListFilter
组合使用,允许这种情况。CompositeFileListFilter
启用组合,如下例所示:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="compositeFilter"/>
<bean id="compositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
<bean class="o.s.i.file.filters.RegexPatternFileListFilter">
<constructor-arg value="^test.*$"/>
</bean>
</list>
</constructor-arg>
</bean>
如果无法使用临时名称创建文件并重命名为最终名称,Spring Integration提供了另一种替代方案。4.2版本添加了LastModifiedFileListFilter
。此过滤器可以使用age
属性进行配置,以便只有比此值旧的文件才能通过过滤器。年龄默认为60秒,但您应该选择一个足够大的年龄,以避免过早拾取文件(例如,由于网络故障)。以下示例演示如何配置LastModifiedFileListFilter
:
<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
<property name="age" value="120" />
</bean>
从4.3.7版本开始,引入了ChainFileListFilter
(CompositeFileListFilter
的扩展),允许在后续过滤器只应查看先前过滤器的结果的情况下使用。(使用CompositeFileListFilter
,所有过滤器都会看到所有文件,但它只传递已通过所有过滤器的文件)。需要新行为的一个示例是LastModifiedFileListFilter
和AcceptOnceFileListFilter
的组合,当我们不希望在经过一段时间后才接受文件时。使用CompositeFileListFilter
,由于AcceptOnceFileListFilter
在第一次传递时会看到所有文件,因此当其他过滤器这样做时,它不会在稍后传递它。当模式过滤器与查找辅助文件以指示文件传输完成的自定义过滤器组合使用时,CompositeFileListFilter
方法很有用。模式过滤器可能只传递主文件(例如something.txt
),但“完成”过滤器需要查看(例如)something.done
是否存在。
假设我们有文件a.txt
、a.done
和b.txt
。
模式过滤器只传递a.txt
和b.txt
,“完成”过滤器查看所有三个文件,只传递a.txt
。组合过滤器的最终结果是只有a.txt
被释放。
使用ChainFileListFilter ,如果链中的任何过滤器返回空列表,则不会调用其余过滤器。 |
5.0版本引入了ExpressionFileListFilter
,用于针对文件作为上下文评估根对象执行SpEL表达式。为此,所有用于文件处理(本地和远程)的XML组件以及现有的filter
属性都已提供filter-expression
选项,如下例所示:
<int-file:inbound-channel-adapter
directory="${inputdir}"
filter-expression="name matches '.text'"
auto-startup="false"/>
5.0.5版本引入了DiscardAwareFileListFilter
实现,这些实现对被拒绝的文件感兴趣。为此,应通过addDiscardCallback(Consumer<File>)
为这样的过滤器实现提供回调。在框架中,此功能与LastModifiedFileListFilter
结合使用,来自FileReadingMessageSource.WatchServiceDirectoryScanner
。与常规的DirectoryScanner
不同,WatchService
根据目标文件系统上的事件提供要处理的文件。在轮询包含这些文件的内部队列时,LastModifiedFileListFilter
可能会丢弃它们,因为相对于其配置的age
而言,它们太年轻了。因此,我们丢失了将来可能考虑的文件。丢弃回调钩子允许我们将文件保留在内部队列中,以便在后续轮询中检查age
。CompositeFileListFilter
也实现了一个DiscardAwareFileListFilter
,并向其所有DiscardAwareFileListFilter
委托填充丢弃回调。
由于CompositeFileListFilter 根据所有委托匹配文件,因此discardCallback 可能会对同一文件调用多次。 |
从5.1版本开始,FileReadingMessageSource
不会检查目录是否存在,也不会在调用其start()
(通常通过包装SourcePollingChannelAdapter
)之前创建目录。以前,当引用目录时(例如来自测试,或者当稍后应用权限时),没有简单的方法可以防止操作系统权限错误。
消息头
从5.0版本开始,FileReadingMessageSource
(除了将轮询的File
作为payload
之外)还会将以下头填充到出站Message
中:
-
FileHeaders.FILENAME
:要发送文件的File.getName()
。可用于后续重命名或复制逻辑。 -
FileHeaders.ORIGINAL_FILE
:File
对象本身。通常,当我们丢失原始File
对象时,框架组件(例如拆分器或转换器)会自动填充此标头。但是,为了与任何其他自定义用例保持一致性和方便性,此标头对于访问原始文件很有用。 -
FileHeaders.RELATIVE_PATH
:引入的新标头,用于表示相对于扫描的根目录的文件路径的一部分。当需要在其他地方还原源目录层次结构时,此标头非常有用。为此,可以配置DefaultFileNameGenerator
(参见“`生成文件名”)以使用此标头。
目录扫描和轮询
FileReadingMessageSource
不会立即生成目录中文件的的消息。它使用内部队列来存储由scanner
返回的“合格文件”。scanEachPoll
选项用于确保在每次轮询时,内部队列都会使用最新的输入目录内容进行刷新。默认情况下(scanEachPoll = false
),FileReadingMessageSource
会在再次扫描目录之前清空其队列。此默认行为对于减少对目录中大量文件的扫描特别有用。但是,在需要自定义排序的情况下,务必考虑将此标志设置为true
的影响。文件的处理顺序可能与预期不符。默认情况下,队列中的文件按其自然(path
)顺序处理。即使队列中已经有文件,扫描添加的新文件也会插入到适当的位置以保持该自然顺序。要自定义顺序,FileReadingMessageSource
可以接受Comparator<File>
作为构造函数参数。内部(PriorityBlockingQueue
)使用它根据业务需求重新排序其内容。因此,要按特定顺序处理文件,应向FileReadingMessageSource
提供比较器,而不是对自定义DirectoryScanner
生成的列表进行排序。
5.0 版本引入了RecursiveDirectoryScanner
来执行文件树遍历。该实现基于Files.walk(Path start, int maxDepth, FileVisitOption… options)
功能。根目录(DirectoryScanner.listFiles(File)
)参数不包含在结果中。所有其他子目录的包含和排除都基于目标FileListFilter
实现。例如,SimplePatternFileListFilter
默认会过滤掉目录。有关更多信息,请参见AbstractDirectoryAwareFileListFilter
及其实现。
从 5.5 版本开始,Java DSL 的FileInboundChannelAdapterSpec 具有方便的recursive(boolean) 选项,可在目标FileReadingMessageSource 中使用RecursiveDirectoryScanner ,而不是默认的扫描器。 |
命名空间支持
可以使用特定于文件的命名空间简化文件读取的配置。为此,请使用以下模板
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>
在此命名空间内,您可以减少FileReadingMessageSource
并将其包装在入站通道适配器中,如下所示
<int-file:inbound-channel-adapter id="filesIn1"
directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>
<int-file:inbound-channel-adapter id="filesIn2"
directory="file:${input.directory}"
filter="customFilterBean" />
<int-file:inbound-channel-adapter id="filesIn3"
directory="file:${input.directory}"
filename-pattern="test*" />
<int-file:inbound-channel-adapter id="filesIn4"
directory="file:${input.directory}"
filename-regex="test[0-9]+\.txt" />
第一个通道适配器示例依赖于默认的FileListFilter
实现
-
IgnoreHiddenFileListFilter
(不处理隐藏文件) -
AcceptOnceFileListFilter
(防止重复)
因此,您也可以省略prevent-duplicates
和ignore-hidden
属性,因为它们默认为true
。
Spring Integration 4.2 引入了 |
第二个通道适配器示例使用自定义过滤器,第三个使用filename-pattern
属性添加基于AntPathMatcher
的过滤器,第四个使用filename-regex
属性向FileReadingMessageSource
添加基于正则表达式的模式过滤器。filename-pattern
和filename-regex
属性与常规filter
引用属性互斥。但是,您可以使用filter
属性引用CompositeFileListFilter
的实例,该实例可以组合任意数量的过滤器,包括一个或多个基于模式的过滤器,以满足您的特定需求。
当多个进程从同一目录读取时,您可能希望锁定文件以防止它们被同时获取。为此,您可以使用FileLocker
。有一个基于java.nio
的实现可用,但也可以实现您自己的锁定方案。可以按如下方式注入nio
锁:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:nio-locker/>
</int-file:inbound-channel-adapter>
您可以按如下方式配置自定义锁:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
当文件入站适配器配置了锁时,它负责在允许接收文件之前获取锁。它不承担解锁文件的责任。如果您已处理文件并让锁保持挂起状态,则会出现内存泄漏。如果这是一个问题,则应在适当的时候自行调用FileLocker.unlock(File file) 。 |
当过滤和锁定文件不足时,您可能需要完全控制文件的列出方式。要实现此类需求,可以使用DirectoryScanner
的实现。此扫描器允许您准确确定每次轮询中列出的文件。这也是 Spring Integration 用于将FileListFilter
实例和FileLocker
连接到FileReadingMessageSource
的内部接口。您可以将自定义DirectoryScanner
注入到<int-file:inbound-channel-adapter/>
的scanner
属性中,如下例所示
<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
scanner="customDirectoryScanner"/>
这样做可以让您完全自由地选择排序、列出和锁定策略。
还必须了解,过滤器(包括patterns
、regex
、prevent-duplicates
等)和locker
实例实际上是由scanner
使用的。适配器上设置的任何这些属性随后都会注入到内部scanner
中。对于外部scanner
,在FileReadingMessageSource
上禁止所有过滤器和锁属性。必须在自定义DirectoryScanner
上指定(如果需要)它们。换句话说,如果您将scanner
注入到FileReadingMessageSource
中,则应在该scanner
上提供filter
和locker
,而不是在FileReadingMessageSource
上。
默认情况下,DefaultDirectoryScanner 使用IgnoreHiddenFileListFilter 和AcceptOnceFileListFilter 。要阻止使用它们,您可以配置您自己的过滤器(例如AcceptAllFileListFilter )甚至将其设置为null 。 |
WatchServiceDirectoryScanner
FileReadingMessageSource.WatchServiceDirectoryScanner
依赖于将新文件添加到目录时的文件系统事件。在初始化期间,注册目录以生成事件。初始文件列表也在初始化期间构建。遍历目录树时,遇到的任何子目录也会注册以生成事件。在第一次轮询中,返回遍历目录生成的初始文件列表。在后续轮询中,返回来自新创建事件的文件。如果添加了新的子目录,则使用其创建事件遍历新的子树以查找现有文件并注册找到的任何新的子目录。
当程序无法像目录修改事件发生那样快速地清空其内部事件queue 时,WatchKey 存在问题。如果队列大小超过限制,则会发出StandardWatchEventKinds.OVERFLOW 以指示某些文件系统事件可能丢失。在这种情况下,将完全重新扫描根目录。为了避免重复,请考虑使用合适的FileListFilter (例如AcceptOnceFileListFilter )或在处理完成后删除文件。 |
可以通过FileReadingMessageSource.use-watch-service
选项启用WatchServiceDirectoryScanner
,该选项与scanner
选项互斥。为提供的directory
填充内部FileReadingMessageSource.WatchServiceDirectoryScanner
实例。
此外,现在WatchService
轮询逻辑可以跟踪StandardWatchEventKinds.ENTRY_MODIFY
和StandardWatchEventKinds.ENTRY_DELETE
。
如果您需要跟踪现有文件的修改以及新文件,则应在FileListFilter
中实现ENTRY_MODIFY
事件逻辑。否则,这些事件的文件将以相同的方式处理。
ResettableFileListFilter
实现拾取ENTRY_DELETE
事件。因此,它们的文件将用于remove()
操作。启用此事件时,AcceptOnceFileListFilter
等过滤器会删除该文件。结果,如果出现同名文件,它将通过过滤器并作为消息发送。
为此,引入了watch-events
属性(FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents)
)。(WatchEventType
是FileReadingMessageSource
中的公共内部枚举。)使用此选项,我们可以对新文件使用一个下游流逻辑,对修改的文件使用其他逻辑。以下示例显示了如何在同一目录中为创建和修改事件配置不同的逻辑
值得一提的是,ENTRY_DELETE
事件参与到被监视目录的子目录的重命名操作中。更具体地说,与先前目录名称相关的ENTRY_DELETE
事件先于ENTRY_CREATE
事件,后者通知新的(已重命名)目录。在某些操作系统(如 Windows)上,必须注册ENTRY_DELETE
事件才能处理这种情况。否则,在文件资源管理器中重命名受监视的子目录可能会导致该子目录中的新文件未被检测到。
<int-file:inbound-channel-adapter id="newFiles"
directory="${input.directory}"
use-watch-service="true"/>
<int-file:inbound-channel-adapter id="modifiedFiles"
directory="${input.directory}"
use-watch-service="true"
filter="acceptAllFilter"
watch-events="MODIFY"/> <!-- The default is CREATE. -->
从 6.1 版本开始,FileReadingMessageSource
公开了两个新的与WatchService
相关的选项
-
watchMaxDepth
-Files.walkFileTree(Path root, Set attributes, int maxDepth, FileVisitor visitor)
API 的参数; -
watchDirPredicate
- 一个Predicate<Path>
,用于测试是否应遍历扫描树中的目录并将其注册到WatchService
和配置的监视事件类型。
限制内存消耗
您可以使用HeadDirectoryScanner
来限制内存中保留的文件数量。这在扫描大型目录时非常有用。使用 XML 配置,可以通过在入站通道适配器上设置queue-size
属性来启用此功能。
在 4.2 版本之前,此设置与使用任何其他过滤器不兼容。任何其他过滤器(包括prevent-duplicates="true"
)都会覆盖用于限制大小的过滤器。
使用 通常,在这种情况下,不要使用 |
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站适配器的示例
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(INBOUND_PATH));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}
@Bean
@Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序显示了如何使用 Java DSL 配置出站适配器的示例
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.channel("processFileChannel")
.get();
}
}
跟踪文件
另一个常用的案例是从文件的末尾(或尾部)获取“行”,并在添加新行时捕获它们。提供了两种实现方式。第一种,OSDelegatingFileTailingMessageProducer
,使用原生tail
命令(在拥有该命令的操作系统上)。这通常是这些平台上效率最高的实现方式。对于没有tail
命令的操作系统,第二个实现ApacheCommonsFileTailingMessageProducer
使用Apache commons-io
的Tailer
类。
在这两种情况下,文件系统事件(例如文件不可用和其他事件)都将作为ApplicationEvent
实例发布,方法是使用正常的Spring事件发布机制。此类事件的示例包括:
[message=tail: cannot open '/tmp/somefile' for reading:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become inaccessible:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has appeared;
following end of new file, file=/tmp/somefile]
例如,当文件被轮换时,可能会发生前面示例中所示的事件序列。
从5.0版本开始,当在idleEventInterval
期间文件中没有数据时,会发出FileTailingIdleEvent
。以下示例显示了此类事件的外观:
[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
并非所有支持tail 命令的平台都提供这些状态消息。 |
从这些端点发出的消息具有以下头信息:
-
FileHeaders.ORIGINAL_FILE
:File
对象 -
FileHeaders.FILENAME
:文件名 (File.getName()
)
在5.0版本之前,FileHeaders.FILENAME 头包含文件的绝对路径的字符串表示形式。现在,您可以通过调用原始文件头的getAbsolutePath() 方法来获取该字符串表示形式。 |
以下示例使用默认选项('-F -n 0',表示从当前末尾跟踪文件名)创建一个原生适配器。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
task-executor="exec"
file="/tmp/foo"/>
以下示例使用'-F -n +0'选项(表示跟踪文件名,发出所有现有行)创建一个原生适配器。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
native-options="-F -n +0"
task-executor="exec"
file-delay=10000
file="/tmp/foo"/>
如果tail
命令失败(在某些平台上,即使指定了-F
,缺少文件也会导致tail
失败),则每10秒重试一次该命令。
默认情况下,原生适配器从标准输出捕获内容并将其作为消息发送。它们还从标准错误捕获内容以引发事件。从4.3.6版本开始,您可以通过将enable-status-reader
设置为false
来丢弃标准错误事件,如下例所示:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
enable-status-reader="false"
task-executor="exec"
file="/tmp/foo"/>
在以下示例中,IdleEventInterval
设置为5000
,这意味着如果五秒钟内没有写入任何行,则每五秒钟触发一次FileTailingIdleEvent
。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
idle-event-interval="5000"
task-executor="exec"
file="/tmp/somefile"/>
当您需要停止适配器时,这非常有用。
以下示例创建一个Apache commons-io
Tailer
适配器,该适配器每两秒检查文件是否有新行,并每十秒检查是否存在缺少的文件。
<int-file:tail-inbound-channel-adapter id="apache"
channel="input"
task-executor="exec"
file="/tmp/bar"
delay="2000"
end="false" (1)
reopen="true" (2)
file-delay="10000"/>
1 | 文件从开头(end="false" )而不是从结尾(这是默认值)进行跟踪。 |
2 | 每个块都会重新打开文件(默认情况下保持文件打开)。 |
指定delay 、end 或reopen 属性会强制使用Apache commons-io 适配器,并使native-options 属性不可用。 |