创建自定义 ItemReaders 和 ItemWriters

到目前为止,本章讨论了 Spring Batch 中读取和写入的基本契约,以及一些执行此操作的常见实现。但是,这些都是相当通用的,并且有许多潜在的场景可能无法通过开箱即用的实现来涵盖。本节通过一个简单的示例展示如何创建自定义 ItemReaderItemWriter 实现并正确实现其契约。 ItemReader 还实现了 ItemStream,以便说明如何使读取器或写入器可重新启动。

自定义 ItemReader 示例

为了本示例的目的,我们创建一个简单的 ItemReader 实现,从提供的列表中读取。我们首先实现 ItemReader 最基本的契约,即 read 方法,如下面的代码所示

public class CustomItemReader<T> implements ItemReader<T> {

    List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
       NonTransientResourceException, ParseException {

        if (!items.isEmpty()) {
            return items.remove(0);
        }
        return null;
    }
}

前面的类采用一个项目列表,一次返回一个项目,从列表中删除每个项目。当列表为空时,它返回 null,从而满足 ItemReader 的最基本要求,如下面的测试代码所示

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");

ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());

使 ItemReader 可重启

最后的挑战是使 ItemReader 可重启。当前,如果处理中断并重新开始,则 ItemReader 必须从头开始。这在许多场景中实际上是有效的,但有时最好从上次中断处重新启动批处理作业。关键区别通常在于读取器是有状态还是无状态。无状态读取器无需担心可重启性,但有状态读取器必须尝试在重新启动时重建其最后已知状态。因此,我们建议您尽可能保持自定义读取器无状态,这样您不必担心可重启性。

如果您确实需要存储状态,则应使用 ItemStream 接口

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

    List<T> items;
    int currentIndex = 0;
    private static final String CURRENT_INDEX = "current.index";

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
        ParseException, NonTransientResourceException {

        if (currentIndex < items.size()) {
            return items.get(currentIndex++);
        }

        return null;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (executionContext.containsKey(CURRENT_INDEX)) {
            currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
        }
        else {
            currentIndex = 0;
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
    }

    public void close() throws ItemStreamException {}
}

每次调用 ItemStream update 方法时,ItemReader 的当前索引都会存储在提供的 ExecutionContext 中,键为“current.index”。当调用 ItemStream open 方法时,将检查 ExecutionContext 以查看它是否包含具有该键的条目。如果找到该键,则当前索引将移动到该位置。这是一个相当简单的示例,但它仍然满足一般合同

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);

((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());

大多数 ItemReaders 具有更复杂的重新启动逻辑。例如,JdbcCursorItemReader 会存储游标中最后处理的行行 ID。

还值得注意的是,在 ExecutionContext 中使用的键不应简单。这是因为同一个 ExecutionContext 用于 Step 中的所有 ItemStreams。在大多数情况下,只需用类名作为前缀即可保证唯一性。但是,在同一步骤中使用两种相同类型的 ItemStream 的罕见情况下(如果输出需要两个文件,则可能会发生这种情况),则需要更唯一的名称。因此,许多 Spring Batch ItemReaderItemWriter 实现都具有 setName() 属性,可覆盖此键名。

自定义 ItemWriter 示例

实现自定义 ItemWriter 在许多方面与上述 ItemReader 示例类似,但在足够多的方面有所不同,以至于需要自己的示例。但是,添加可重启性本质上是相同的,因此本示例中未涉及。与 ItemReader 示例一样,使用 List 是为了使示例尽可能简单

public class CustomItemWriter<T> implements ItemWriter<T> {

    List<T> output = TransactionAwareProxyFactory.createTransactionalList();

    public void write(Chunk<? extends T> items) throws Exception {
        output.addAll(items);
    }

    public List<T> getOutput() {
        return output;
    }
}

使 ItemWriter 可重启

若要使 ItemWriter 可重启,我们将遵循与 ItemReader 相同的过程,添加并实现 ItemStream 接口以同步执行上下文。在此示例中,我们可能必须计算已处理的项目数,并将其添加为页脚记录。如果需要这样做,我们可以在 ItemWriter 中实现 ItemStream,以便在重新打开流时从执行上下文中重建计数器。

在许多实际情况下,自定义 ItemWriters 还会委托给另一个本身可重启的编写器(例如,写入文件时),或者它写入事务资源,因此不需要可重启,因为它无状态。当您有状态编写器时,您可能应该确保实现 ItemStreamItemWriter。还要记住,编写器的客户端需要了解 ItemStream,因此您可能需要在配置中将其注册为流。