创建自定义 ItemReader 和 ItemWriter

到目前为止,本章讨论了 Spring Batch 中读写操作的基本契约以及一些常见的实现方式。然而,这些实现都比较通用,许多潜在的场景可能无法通过开箱即用的实现来满足。本节将通过一个简单的示例,展示如何创建自定义的 ItemReaderItemWriter 实现,并正确地实现它们的契约。ItemReader 还实现了 ItemStream 接口,以说明如何使 reader 或 writer 具有重启能力 (restartable)。

自定义 ItemReader 示例

出于示例目的,我们创建了一个简单的 ItemReader 实现,它从提供的列表中读取数据。我们首先实现 ItemReader 最基本的契约,即 read 方法,如下面的代码所示:

public class CustomItemReader<T> implements ItemReader<T> {

    List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
       NonTransientResourceException, ParseException {

        if (!items.isEmpty()) {
            return items.remove(0);
        }
        return null;
    }
}

前面的类接收一个 Item 列表,并逐个返回它们,同时将每个 Item 从列表中移除。当列表为空时,它返回 null,从而满足了 ItemReader 最基本的要求,如下面的测试代码所示:

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");

ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());

使 ItemReader 具有重启能力 (Restartable)

最后的挑战是使 ItemReader 具有重启能力 (restartable)。目前,如果处理中断并再次开始,ItemReader 必须从头开始。这在许多场景下是有效的,但有时更希望批处理作业 (job) 能从中断的地方继续。主要的区别通常在于 reader 是有状态 (stateful) 还是无状态 (stateless) 的。无状态 reader 不需要担心重启能力,而有状态 reader 则必须在重启时尝试恢复其最后已知状态。因此,我们建议您尽可能保持自定义 reader 为无状态的,这样您就不需要担心重启能力了。

如果您确实需要存储状态,则应使用 ItemStream 接口

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

    List<T> items;
    int currentIndex = 0;
    private static final String CURRENT_INDEX = "current.index";

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
        ParseException, NonTransientResourceException {

        if (currentIndex < items.size()) {
            return items.get(currentIndex++);
        }

        return null;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (executionContext.containsKey(CURRENT_INDEX)) {
            currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
        }
        else {
            currentIndex = 0;
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
    }

    public void close() throws ItemStreamException {}
}

每次调用 ItemStreamupdate 方法时,ItemReader 的当前索引都会存储在提供的 ExecutionContext 中,键为 'current.index'。当调用 ItemStreamopen 方法时,会检查 ExecutionContext 是否包含该键的条目。如果找到该键,则将当前索引移动到该位置。这是一个相当简单的示例,但它仍然满足了通用契约。

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);

((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());

大多数 ItemReader 都拥有更为复杂的重启逻辑。例如,JdbcCursorItemReader 会存储游标中最后处理行的行 ID。

另外值得注意的是,在 ExecutionContext 中使用的键不应该是微不足道的。这是因为同一个 ExecutionContext 用于 Step 中的所有 ItemStream。在大多数情况下,只需在键前面加上类名就足以保证唯一性。然而,在极少数情况下,如果同一个 Step 中使用了两个相同类型的 ItemStream(例如,需要输出到两个文件时),则需要一个更唯一的名称。因此,许多 Spring Batch 的 ItemReaderItemWriter 实现都提供了 setName() 属性,允许覆盖此键名。

自定义 ItemWriter 示例

实现自定义 ItemWriter 在很多方面类似于上面的 ItemReader 示例,但在足以需要单独示例的方面有所不同。然而,添加重启能力基本上是相同的,因此本示例不再重复介绍。与 ItemReader 示例一样,这里使用了 List 以使示例尽可能简单。

public class CustomItemWriter<T> implements ItemWriter<T> {

    List<T> output = TransactionAwareProxyFactory.createTransactionalList();

    public void write(Chunk<? extends T> items) throws Exception {
        output.addAll(items);
    }

    public List<T> getOutput() {
        return output;
    }
}

使 ItemWriter 具有重启能力 (Restartable)

要使 ItemWriter 具有重启能力 (restartable),我们将遵循与 ItemReader 相同的过程,添加并实现 ItemStream 接口来同步执行上下文。在示例中,我们可能需要计算已处理的 Item 数量,并将其作为页脚记录添加。如果需要这样做,我们可以在 ItemWriter 中实现 ItemStream,以便在重新打开流时,计数器可以从执行上下文中恢复。

在许多实际场景中,自定义 ItemWriter 也委托给另一个本身具有重启能力的 writer(例如,写入文件时),或者写入事务性资源,因此不需要具备重启能力,因为它是无状态的。当您有一个有状态的 writer 时,您很可能需要确保同时实现 ItemStreamItemWriter。请记住,writer 的客户端需要知道 ItemStream 的存在,因此您可能需要在配置中将其注册为一个 stream。