创建自定义 ItemReader 和 ItemWriter
到目前为止,本章讨论了 Spring Batch 中读取和写入的基本契约以及一些常见的实现方式。但是,这些都相当通用,并且可能存在许多现成的实现无法涵盖的潜在场景。本节将通过一个简单的示例说明如何创建自定义 ItemReader
和 ItemWriter
实现,以及如何正确实现其契约。ItemReader
还实现了 ItemStream
,以便说明如何使读取器或写入器可重启。
自定义 ItemReader
示例
出于本示例的目的,我们创建一个简单的 ItemReader
实现,该实现从提供的列表中读取。我们首先实现 ItemReader
最基本契约的 read
方法,如下面的代码所示
public class CustomItemReader<T> implements ItemReader<T> {
List<T> items;
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
NonTransientResourceException, ParseException {
if (!items.isEmpty()) {
return items.remove(0);
}
return null;
}
}
前面的类接收一个项目列表,并一次返回一个项目,同时从列表中移除每个项目。当列表为空时,它返回 null
,从而满足了 ItemReader
最基本的要求,如下面的测试代码所示
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
使 ItemReader
可重启
最后的挑战是使 ItemReader
可重启。目前,如果处理被打断并重新开始,则 ItemReader
必须从头开始。这实际上在许多场景中是有效的,但在某些情况下,最好让批处理作业从中断的地方重新开始。关键的区别通常在于读取器是有状态还是无状态。无状态读取器不需要担心可重启性,但有状态读取器必须在重启时尝试重建其最后已知的状态。因此,我们建议您尽可能地使自定义读取器保持无状态,这样您就不需要担心可重启性。
如果您确实需要存储状态,则应使用 ItemStream
接口
public class CustomItemReader<T> implements ItemReader<T>, ItemStream {
List<T> items;
int currentIndex = 0;
private static final String CURRENT_INDEX = "current.index";
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (currentIndex < items.size()) {
return items.get(currentIndex++);
}
return null;
}
public void open(ExecutionContext executionContext) throws ItemStreamException {
if (executionContext.containsKey(CURRENT_INDEX)) {
currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
}
else {
currentIndex = 0;
}
}
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
}
public void close() throws ItemStreamException {}
}
每次调用 ItemStream
的 update
方法时,ItemReader
的当前索引都会使用“current.index”键存储在提供的 ExecutionContext
中。当调用 ItemStream
的 open
方法时,会检查 ExecutionContext
是否包含具有该键的条目。如果找到该键,则当前索引将移动到该位置。这是一个相当简单的示例,但它仍然满足了通用契约
ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);
((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());
大多数 ItemReaders
具有更复杂的重启逻辑。例如,JdbcCursorItemReader
将最后处理行的行 ID 存储在游标中。
还值得注意的是,ExecutionContext
中使用的键不应过于简单。这是因为同一个 ExecutionContext
用于 Step
中的所有 ItemStreams
。在大多数情况下,只需在键前加上类名就足以保证唯一性。但是,在极少数情况下,如果在同一个步骤中使用了两种相同类型的 ItemStream
(如果输出需要两个文件,则可能发生这种情况),则需要更唯一的名称。因此,许多 Spring Batch ItemReader
和 ItemWriter
实现都具有 setName()
属性,允许覆盖此键名。
自定义 ItemWriter
示例
实现自定义 ItemWriter
在很多方面都类似于上面的 ItemReader
示例,但也有足够的差异,因此需要单独举例说明。但是,添加可重启性与之基本相同,因此在本示例中不作介绍。与 ItemReader
示例一样,使用 List
以使示例尽可能简单
public class CustomItemWriter<T> implements ItemWriter<T> {
List<T> output = TransactionAwareProxyFactory.createTransactionalList();
public void write(Chunk<? extends T> items) throws Exception {
output.addAll(items);
}
public List<T> getOutput() {
return output;
}
}
使 ItemWriter
可重启
要使 ItemWriter
可重启,我们将遵循与 ItemReader
相同的过程,添加并实现 ItemStream
接口以同步执行上下文。在本示例中,我们可能需要计算处理的项目数量,并将其作为页脚记录添加。如果我们需要这样做,我们可以在 ItemWriter
中实现 ItemStream
,以便在重新打开流时从执行上下文中重建计数器。
在许多实际情况下,自定义 ItemWriters
还会委托给另一个本身可重启的写入器(例如,当写入文件时),或者写入事务资源,因此不需要可重启,因为它无状态。当您拥有有状态写入器时,您可能应该确保同时实现 ItemStream
和 ItemWriter
。还请记住,写入器的客户端需要知道 ItemStream
,因此您可能需要在配置中将其注册为流。