单步批处理作业启动器

本节详细介绍如何使用 Spring Cloud Task 中包含的启动器开发一个包含单个 Step 的 Spring Batch Job。此启动器允许您使用配置来定义一个 ItemReader、一个 ItemWriter 或一个完整的单步 Spring Batch Job。有关 Spring Batch 及其功能的更多信息,请参阅 Spring Batch 文档

要获取 Maven 启动器,请将以下内容添加到您的构建中

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-single-step-batch-job</artifactId>
    <version>2.3.0</version>
</dependency>

要获取 Gradle 启动器,请将以下内容添加到您的构建中

compile "org.springframework.cloud:spring-cloud-starter-single-step-batch-job:2.3.0"

定义 Job

您可以使用该启动器定义最小的 ItemReaderItemWriter,或者定义完整的 Job。在本节中,我们将定义配置 Job 所需的属性。

属性

首先,此启动器提供了一组属性,允许您配置包含一个 Step 的 Job 的基本信息

表 1. Job 属性
属性 类型 默认值 描述

spring.batch.job.jobName

String

null

Job 的名称。

spring.batch.job.stepName

String

null

Step 的名称。

spring.batch.job.chunkSize

Integer

null

每个事务要处理的项数。

配置上述属性后,您将获得一个包含单个、基于块的 Step 的 Job。这个基于块的 Step 会读取、处理和写入 Map<String, Object> 实例作为项。但是,该 Step 尚未执行任何操作。您需要配置一个 ItemReader、一个可选的 ItemProcessor 和一个 ItemWriter 来使其执行某些操作。要配置其中之一,您可以使用属性并配置提供了自动配置的选项之一,或者您可以使用标准 Spring 配置机制配置自己的实现。

如果您自行配置,输入和输出类型必须与 Step 中的其他部分匹配。此启动器中的 ItemReader 实现和 ItemWriter 实现都使用 Map<String, Object> 作为输入和输出项。

ItemReader 实现的自动配置

此启动器为四种不同的 ItemReader 实现提供了自动配置:AmqpItemReaderFlatFileItemReaderJdbcCursorItemReaderKafkaItemReader。本节概述了如何使用提供的自动配置来配置其中每种实现。

AmqpItemReader

您可以使用 AmqpItemReader 从 AMQP 队列或主题读取数据。此 ItemReader 实现的自动配置取决于两组配置。第一组是 AmqpTemplate 的配置。您可以自行配置,也可以使用 Spring Boot 提供的自动配置。请参阅 Spring Boot AMQP 文档。配置 AmqpTemplate 后,您可以通过设置以下属性来启用批处理功能以支持它

表 2. AmqpItemReader 属性
属性 类型 默认值 描述

spring.batch.job.amqpitemreader.enabled

boolean

false

如果为 true,将执行自动配置。

spring.batch.job.amqpitemreader.jsonConverterEnabled

boolean

true

指示是否应注册 Jackson2JsonMessageConverter 来解析消息。

有关更多信息,请参阅 AmqpItemReader 文档

FlatFileItemReader

FlatFileItemReader 允许您从平面文件(例如 CSV 和其他文件格式)读取数据。要从文件读取,您可以通过正常的 Spring 配置自行提供一些组件(LineTokenizerRecordSeparatorPolicyFieldSetMapperLineMapperSkippedLinesCallback)。您还可以使用以下属性来配置读取器

表 3. FlatFileItemReader 属性
属性 类型 默认值 描述

spring.batch.job.flatfileitemreader.saveState

boolean

true

确定是否应为重启保存状态。

spring.batch.job.flatfileitemreader.name

String

null

用于在 ExecutionContext 中提供唯一键的名称。

spring.batch.job.flatfileitemreader.maxItemcount

int

Integer.MAX_VALUE

从文件读取的最大项数。

spring.batch.job.flatfileitemreader.currentItemCount

int

0

已读取的项数。用于重启。

spring.batch.job.flatfileitemreader.comments

List<String>

空列表

表示文件中注释行(将被忽略的行)的 String 列表。

spring.batch.job.flatfileitemreader.resource

Resource

null

要读取的资源。

spring.batch.job.flatfileitemreader.strict

boolean

true

如果设置为 true,则在找不到资源时读取器会抛出异常。

spring.batch.job.flatfileitemreader.encoding

String

FlatFileItemReader.DEFAULT_CHARSET

读取文件时使用的编码。

spring.batch.job.flatfileitemreader.linesToSkip

int

0

指示在文件开头要跳过的行数。

spring.batch.job.flatfileitemreader.delimited

boolean

false

指示文件是否为分隔符文件(CSV 和其他格式)。此属性或 spring.batch.job.flatfileitemreader.fixedLength 只能有一个同时为 true

spring.batch.job.flatfileitemreader.delimiter

String

DelimitedLineTokenizer.DELIMITER_COMMA

如果读取分隔符文件,指示用于解析的分隔符。

spring.batch.job.flatfileitemreader.quoteCharacter

char

DelimitedLineTokenizer.DEFAULT_QUOTE_CHARACTER

用于确定用于引用值的字符。

spring.batch.job.flatfileitemreader.includedFields

List<Integer>

空列表

一个索引列表,用于确定记录中要包含在项中的字段。

spring.batch.job.flatfileitemreader.fixedLength

boolean

false

指示文件的记录是否按列号解析。此属性或 spring.batch.job.flatfileitemreader.delimited 只能有一个同时为 true

spring.batch.job.flatfileitemreader.ranges

List<Range>

空列表

用于解析固定宽度记录的列范围列表。请参阅 Range 文档

spring.batch.job.flatfileitemreader.names

String []

null

从记录解析出的每个字段的名称列表。这些名称是此 ItemReader 返回的项中 Map<String, Object> 中的键。

spring.batch.job.flatfileitemreader.parsingStrict

boolean

true

如果设置为 true,则如果字段无法映射,映射将失败。

JdbcCursorItemReader

JdbcCursorItemReader 对关系数据库运行查询,并遍历结果游标(ResultSet)以提供结果项。此自动配置允许您提供一个 PreparedStatementSetter、一个 RowMapper 或两者。您还可以使用以下属性来配置 JdbcCursorItemReader

表 4. JdbcCursorItemReader 属性
属性 类型 默认值 描述

spring.batch.job.jdbccursoritemreader.saveState

boolean

true

确定是否应为重启保存状态。

spring.batch.job.jdbccursoritemreader.name

String

null

用于在 ExecutionContext 中提供唯一键的名称。

spring.batch.job.jdbccursoritemreader.maxItemcount

int

Integer.MAX_VALUE

从文件读取的最大项数。

spring.batch.job.jdbccursoritemreader.currentItemCount

int

0

已读取的项数。用于重启。

spring.batch.job.jdbccursoritemreader.fetchSize

int

给驱动程序的提示,指示每次调用数据库系统时要检索多少条记录。为了获得最佳性能,通常希望将其设置为与块大小匹配。

spring.batch.job.jdbccursoritemreader.maxRows

int

从数据库读取的最大项数。

spring.batch.job.jdbccursoritemreader.queryTimeout

int

查询超时的时间(毫秒)。

spring.batch.job.jdbccursoritemreader.ignoreWarnings

boolean

true

确定读取器在处理时是否应忽略 SQL 警告。

spring.batch.job.jdbccursoritemreader.verifyCursorPosition

boolean

true

指示每次读取后是否应验证游标位置,以确认 RowMapper 未移动游标。

spring.batch.job.jdbccursoritemreader.driverSupportsAbsolute

boolean

false

指示驱动程序是否支持游标的绝对定位。

spring.batch.job.jdbccursoritemreader.useSharedExtendedConnection

boolean

false

指示连接是否与其他处理共享(因此是事务的一部分)。

spring.batch.job.jdbccursoritemreader.sql

String

null

用于读取的 SQL 查询。

您还可以使用以下属性专门为读取器指定 JDBC DataSource:.`JdbcCursorItemReader` 属性

属性 类型 默认值 描述

spring.batch.job.jdbccursoritemreader.datasource.enable

boolean

false

确定是否应启用 JdbcCursorItemReaderDataSource

jdbccursoritemreader.datasource.url

String

null

数据库的 JDBC URL。

jdbccursoritemreader.datasource.username

String

null

数据库的登录用户名。

jdbccursoritemreader.datasource.password

String

null

数据库的登录密码。

jdbccursoritemreader.datasource.driver-class-name

String

null

JDBC 驱动程序的完全限定名。

如果未指定 jdbccursoritemreader_datasourceJDBCCursorItemReader 将使用默认的 DataSource

KafkaItemReader

从 Kafka 主题摄取分区数据非常有用,而这正是 KafkaItemReader 可以做到的。要配置 KafkaItemReader,需要两部分配置。首先,需要使用 Spring Boot 的 Kafka 自动配置来配置 Kafka(请参阅 Spring Boot Kafka 文档)。配置 Spring Boot 中的 Kafka 属性后,您可以通过设置以下属性来配置 KafkaItemReader 本身

表 5. KafkaItemReader 属性
属性 类型 默认值 描述

spring.batch.job.kafkaitemreader.name

String

null

用于在 ExecutionContext 中提供唯一键的名称。

spring.batch.job.kafkaitemreader.topic

String

null

要从中读取的主题名称。

spring.batch.job.kafkaitemreader.partitions

List<Integer>

空列表

要从中读取的分区索引列表。

spring.batch.job.kafkaitemreader.pollTimeOutInSeconds

long

30

poll() 操作的超时时间。

spring.batch.job.kafkaitemreader.saveState

boolean

true

确定是否应为重启保存状态。

请参阅 KafkaItemReader 文档

本机编译

单步批处理的优势在于,当您使用 JVM 时,它允许您在运行时动态选择要使用的读取器和写入器 Bean。但是,当您使用本机编译时,必须在构建时而不是运行时确定读取器和写入器。以下示例演示了如何做到这一点

<plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <executions>
        <execution>
            <id>process-aot</id>
            <goals>
                <goal>process-aot</goal>
            </goals>
            <configuration>
                <jvmArguments>
                    -Dspring.batch.job.flatfileitemreader.name=fooReader
                    -Dspring.batch.job.flatfileitemwriter.name=fooWriter
                </jvmArguments>
            </configuration>
        </execution>
    </executions>
</plugin>

ItemProcessor 配置

单步批处理作业自动配置会接受一个 ItemProcessor(如果在 ApplicationContext 中可用)。如果找到类型正确的 ItemProcessorItemProcessor<Map<String, Object>, Map<String, Object>>),它将被自动注入到 Step 中。

ItemWriter 实现的自动配置

此启动器为与受支持的 ItemReader 实现匹配的 ItemWriter 实现提供了自动配置:AmqpItemWriterFlatFileItemWriterJdbcItemWriterKafkaItemWriter。本节介绍了如何使用自动配置来配置受支持的 ItemWriter

AmqpItemWriter

要写入 RabbitMQ 队列,您需要两组配置。首先,您需要一个 AmqpTemplate。最简单的方法是使用 Spring Boot 的 RabbitMQ 自动配置。请参阅 Spring Boot AMQP 文档

配置 AmqpTemplate 后,您可以通过设置以下属性来配置 AmqpItemWriter

表 6. AmqpItemWriter 属性
属性 类型 默认值 描述

spring.batch.job.amqpitemwriter.enabled

boolean

false

如果为 true,将运行自动配置。

spring.batch.job.amqpitemwriter.jsonConverterEnabled

boolean

true

指示是否应注册 Jackson2JsonMessageConverter 来转换消息。

FlatFileItemWriter

要将文件作为 Step 的输出写入,您可以配置 FlatFileItemWriter。自动配置接受已显式配置的组件(例如 LineAggregatorFieldExtractorFlatFileHeaderCallbackFlatFileFooterCallback)以及通过设置以下指定属性配置的组件

表 7. FlatFileItemWriter 属性
属性 类型 默认值 描述

spring.batch.job.flatfileitemwriter.resource

Resource

null

要读取的资源。

spring.batch.job.flatfileitemwriter.delimited

boolean

false

指示输出文件是否为分隔符文件。如果为 true,则 spring.batch.job.flatfileitemwriter.formatted 必须为 false

spring.batch.job.flatfileitemwriter.formatted

boolean

false

指示输出文件是否为格式化文件。如果为 true,则 spring.batch.job.flatfileitemwriter.delimited 必须为 false

spring.batch.job.flatfileitemwriter.format

String

null

用于生成格式化文件输出的格式。格式化通过使用 String.format 完成。

spring.batch.job.flatfileitemwriter.locale

Locale

Locale.getDefault()

生成文件时使用的 Locale

spring.batch.job.flatfileitemwriter.maximumLength

int

0

记录的最大长度。如果为 0,则大小不受限制。

spring.batch.job.flatfileitemwriter.minimumLength

int

0

最小记录长度。

spring.batch.job.flatfileitemwriter.delimiter

String

,

用于分隔分隔符文件中字段的 String

spring.batch.job.flatfileitemwriter.encoding

String

FlatFileItemReader.DEFAULT_CHARSET

写入文件时使用的编码。

spring.batch.job.flatfileitemwriter.forceSync

boolean

false

指示在刷新时文件是否应强制同步到磁盘。

spring.batch.job.flatfileitemwriter.names

String []

null

从记录解析出的每个字段的名称列表。这些名称是此 ItemWriter 接收的项中 Map<String, Object> 的键。

spring.batch.job.flatfileitemwriter.append

boolean

false

指示如果找到输出文件,是否应追加写入。

spring.batch.job.flatfileitemwriter.lineSeparator

String

FlatFileItemWriter.DEFAULT_LINE_SEPARATOR

用于分隔输出文件中行的 String

spring.batch.job.flatfileitemwriter.name

String

null

用于在 ExecutionContext 中提供唯一键的名称。

spring.batch.job.flatfileitemwriter.saveState

boolean

true

确定是否应为重启保存状态。

spring.batch.job.flatfileitemwriter.shouldDeleteIfEmpty

boolean

false

如果设置为 true,则在 Job 完成时删除空文件(没有输出)。

spring.batch.job.flatfileitemwriter.shouldDeleteIfExists

boolean

true

如果设置为 true 并且在应有的位置找到输出文件,则在 Step 开始之前将其删除。

spring.batch.job.flatfileitemwriter.transactional

boolean

FlatFileItemWriter.DEFAULT_TRANSACTIONAL

指示读取器是否是事务性队列(表示读取的项在失败时会返回到队列)。

JdbcBatchItemWriter

要将 Step 的输出写入关系数据库,此启动器提供了自动配置 JdbcBatchItemWriter 的能力。自动配置允许您提供自己的 ItemPreparedStatementSetterItemSqlParameterSourceProvider,并通过设置以下属性来配置选项

表 8. JdbcBatchItemWriter 属性
属性 类型 默认值 描述

spring.batch.job.jdbcbatchitemwriter.name

String

null

用于在 ExecutionContext 中提供唯一键的名称。

spring.batch.job.jdbcbatchitemwriter.sql

String

null

用于插入每个项的 SQL。

spring.batch.job.jdbcbatchitemwriter.assertUpdates

boolean

true

是否验证每次插入都会导致至少一条记录被更新。

您还可以使用以下属性专门为写入器指定 JDBC DataSource:.`JdbcBatchItemWriter` 属性

属性 类型 默认值 描述

spring.batch.job.jdbcbatchitemwriter.datasource.enable

boolean

false

确定是否应启用 JdbcCursorItemReaderDataSource

jdbcbatchitemwriter.datasource.url

String

null

数据库的 JDBC URL。

jdbcbatchitemwriter.datasource.username

String

null

数据库的登录用户名。

jdbcbatchitemwriter.datasource.password

String

null

数据库的登录密码。

jdbcbatchitemreader.datasource.driver-class-name

String

null

JDBC 驱动程序的完全限定名。

如果未指定 jdbcbatchitemwriter_datasourceJdbcBatchItemWriter 将使用默认的 DataSource

KafkaItemWriter

要将 Step 输出写入 Kafka 主题,您需要 KafkaItemWriter。此启动器通过利用两个来源的能力为 KafkaItemWriter 提供了自动配置。首先是 Spring Boot 的 Kafka 自动配置。(请参阅 Spring Boot Kafka 文档。)其次,此启动器允许您配置写入器上的两个属性。

表 9. KafkaItemWriter 属性
属性 类型 默认值 描述

spring.batch.job.kafkaitemwriter.topic

String

null

要写入的 Kafka 主题。

spring.batch.job.kafkaitemwriter.delete

boolean

false

传递给写入器的项是否都应作为删除事件发送到主题。

有关 KafkaItemWriter 配置选项的更多信息,请参阅 KafkaItemWiter 文档

Spring AOT

当将 Spring AOT 与单步批处理启动器一起使用时,您必须在编译时设置读取器和写入器的名称属性(除非您为读取器或写入器创建 Bean)。为此,您必须在 boot maven 插件或 gradle 插件中将您希望使用的读取器和写入器名称作为参数或环境变量包含进去。例如,如果您希望在 Maven 中启用 FlatFileItemReaderFlatFileItemWriter,它将如下所示

    <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
        <executions>
            <execution>
            <id>process-aot</id>
            <goals>
                <goal>process-aot</goal>
            </goals>
            </execution>
        </executions>
        <configuration>
            <arguments>
                <argument>--spring.batch.job.flatfileitemreader.name=foobar</argument>
                <argument>--spring.batch.job.flatfileitemwriter.name=fooWriter</argument>
            </arguments>
        </configuration>
    </plugin>