入站通道适配器:轮询多个服务器和目录

从 5.0.7 版本开始,RotatingServerAdvice 可用;当配置为轮询器 advice 时,入站适配器可以轮询多个服务器和目录。配置 advice 并将其正常添加到轮询器的 advice 链中。DelegatingSessionFactory 用于选择服务器,详情请参见Delegating Session Factory。该 advice 的配置包含一个 RotationPolicy.KeyDirectory 对象列表。

示例
@Bean
public RotatingServerAdvice advice() {
    List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
    keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
    return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}

此 advice 将轮询服务器 one 上的目录 foo,直到没有新文件为止,然后转移到服务器 two 上的目录 bar,接着是目录 baz,依此类推。

可以通过 fair 构造函数参数修改此默认行为

fair
@Bean
public RotatingServerAdvice advice() {
    ...
    return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}

在这种情况下,无论上次轮询是否返回文件,该 advice 都将移至下一个服务器/目录。

或者,您可以提供自己的 RotationPolicy,根据需要重新配置消息源

policy
public interface RotationPolicy {

    void beforeReceive(MessageSource<?> source);

    void afterReceive(boolean messageReceived, MessageSource<?> source);

}

custom
@Bean
public RotatingServerAdvice advice() {
    return new RotatingServerAdvice(myRotationPolicy());
}

local-filename-generator-expression 属性(在 synchronizer 上是 localFilenameGeneratorExpression)现在可以包含 #remoteDirectory 变量。这允许将从不同目录检索的文件下载到本地相似的目录中

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from(Ftp.inboundAdapter(sf())
                    .filter(new FtpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
                    .localDirectory(new File(tmpDir))
                    .localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
                    .remoteDirectory("."),
                e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
            .channel(MessageChannels.queue("files"))
            .get();
}
使用此 advice 时,请勿在 poller 上配置 TaskExecutor;更多信息请参见消息源的条件轮询器