© 2009-2020 VMware, Inc. 保留所有权利。

本文档可供您自行使用和分发给他人,但前提是您不得对这些副本收取任何费用,并且无论是以印刷版还是电子版形式分发,每个副本都必须包含本版权声明。

1. 前言

本节简要概述了 Spring Cloud Task 参考文档。可以将其视为本文档其余部分的地图。您可以按顺序阅读本参考指南,如果某些部分不感兴趣,也可以跳过。

1.1. 关于文档

Spring Cloud Task 参考指南提供 htmlpdfepub 版本。最新副本可从 docs.spring.io/spring-cloud-task/docs/current-SNAPSHOT/reference/html/ 获取。

本文档可供您自行使用和分发给他人,但前提是您不得对这些副本收取任何费用,并且无论是以印刷版还是电子版形式分发,每个副本都必须包含本版权声明。

1.2. 获取帮助

使用 Spring Cloud Task 遇到问题?我们乐意提供帮助!

Spring Cloud Task 的所有内容都是开源的,包括文档。如果您发现文档有问题或只想改进它们,请 参与进来

1.3. 第一步

如果您刚开始使用 Spring Cloud Task 或广义上的 'Spring',我们建议阅读入门一章。

要从头开始,请阅读以下章节:

要 mengikuti 教程,请阅读 开发您的第一个 Spring Cloud Task 应用
要运行您的示例,请阅读 运行示例

2. 入门

如果您刚开始使用 Spring Cloud Task,应阅读本节。在此,我们将解答基本的“是什么?”、“如何做?”和“为什么?”等问题。我们首先温和地介绍 Spring Cloud Task。然后,我们将构建一个 Spring Cloud Task 应用,并在过程中讨论一些核心原则。

2.1. Spring Cloud Task 介绍

Spring Cloud Task 使得创建短生命周期的微服务变得容易。它提供了能够在生产环境中按需执行短生命周期的 JVM 进程的功能。

2.2. 系统要求

您需要安装 Java(Java 17 或更高版本)。要进行构建,还需要安装 Maven。

2.2.1. 数据库要求

Spring Cloud Task 使用关系型数据库来存储已执行任务的结果。虽然您可以在没有数据库的情况下开始开发任务(任务状态会作为任务仓库更新的一部分被记录到日志中),但对于生产环境,您需要使用一个受支持的数据库。Spring Cloud Task 当前支持以下数据库:

  • DB2

  • H2

  • HSQLDB

  • MySql

  • Oracle

  • Postgres

  • SqlServer

2.3. 开发您的第一个 Spring Cloud Task 应用

一个好的起点是简单的“Hello, World!”应用,因此我们创建相应的 Spring Cloud Task 应用来突出框架的特性。大多数 IDE 对 Apache Maven 都有良好的支持,所以我们将其用作本项目的构建工具。

spring.io 网站包含许多使用 Spring Boot 的 入门”指南。如果您需要解决特定问题,请先在那里查找。您可以通过访问 Spring Initializr 创建新项目来简化以下步骤。这样做会自动生成新的项目结构,以便您可以立即开始编码。我们建议您尝试使用 Spring Initializr 以熟悉它。

2.3.1. 使用 Spring Initializr 创建 Spring Task 项目

现在我们可以创建一个并测试一个向控制台打印 Hello, World! 的应用。

为此:

  1. 访问 Spring Initialzr 网站。

    1. 创建一个新的 Maven 项目,其中 Group 名称为 io.spring.demoArtifact 名称为 helloworld

    2. 在 Dependencies(依赖项)文本框中,输入 task,然后选择 Cloud Task 依赖项。

    3. 在 Dependencies(依赖项)文本框中,输入 jdbc,然后选择 JDBC 依赖项。

    4. 在 Dependencies(依赖项)文本框中,输入 h2,然后选择 H2。(或您喜欢的数据库)

    5. 点击 Generate Project(生成项目)按钮

  2. 解压 helloworld.zip 文件,并将项目导入您喜欢的 IDE 中。

2.3.2. 编写代码

为了完成我们的应用,我们需要使用以下内容更新生成的 HelloworldApplication,以便它能够启动一个任务。

package io.spring.Helloworld;

import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.task.configuration.EnableTask;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
@EnableTask
public class HelloworldApplication {

    @Bean
    public ApplicationRunner applicationRunner() {
        return new HelloWorldApplicationRunner();
    }

    public static void main(String[] args) {
        SpringApplication.run(HelloworldApplication.class, args);
    }

    public static class HelloWorldApplicationRunner implements ApplicationRunner {

        @Override
        public void run(ApplicationArguments args) throws Exception {
            System.out.println("Hello, World!");

        }
    }
}

虽然代码量可能看起来不大,但其背后发生了很多事情。有关 Spring Boot 的更多细节,请参阅 Spring Boot 参考文档

现在我们可以打开位于 src/main/resources 目录下的 application.properties 文件。我们需要在 application.properties 中配置两个属性:

  • application.name:设置应用名称(它会被转换为任务名称)

  • logging.level:将 Spring Cloud Task 的日志级别设置为 DEBUG,以便查看其运行情况。

以下示例展示了如何同时进行这两项配置:

logging.level.org.springframework.cloud.task=DEBUG
spring.application.name=helloWorld
任务自动配置

包含 Spring Cloud Task Starter 依赖项时,Task 会自动配置所有 bean 以引导其功能。此配置的一部分会注册 TaskRepository 及其使用所需的基础设施。

在我们的演示中,TaskRepository 使用嵌入式 H2 数据库来记录任务的结果。这种嵌入式 H2 数据库对于生产环境来说并不是一个实用的解决方案,因为任务结束后 H2 数据库就会消失。然而,为了快速入门体验,我们可以在示例中使用它,同时将仓库中更新的内容回显到日志中。在本文档后面介绍的配置部分,我们将介绍如何自定义 Spring Cloud Task 提供的各个组件的配置。

当我们的示例应用运行时,Spring Boot 会启动我们的 HelloWorldCommandLineRunner 并将我们的“Hello, World!”消息输出到标准输出。TaskLifecycleListener 会在仓库中记录任务的开始和结束。

main 方法

main 方法作为任何 Java 应用的入口点。我们的 main 方法委托给 Spring Boot 的 SpringApplication 类。

ApplicationRunner

Spring 包含多种引导应用逻辑的方式。Spring Boot 通过其 *Runner 接口(CommandLineRunnerApplicationRunner)提供了一种便捷且组织化的方式。一个设计良好的任务可以通过使用这两种 runner 之一来引导任何逻辑。

任务的生命周期被认为是开始于 *Runner#run 方法执行之前,结束于所有方法执行完毕之后。Spring Boot 允许应用使用多个 *Runner 实现,Spring Cloud Task 也是如此。

任何通过 CommandLineRunnerApplicationRunner 以外的机制(例如使用 InitializingBean#afterPropertiesSet)引导的处理都不会被 Spring Cloud Task 记录。

2.3.3. 运行示例

至此,我们的应用应该可以工作了。由于这个应用基于 Spring Boot,我们可以从应用根目录使用命令行 $ mvn spring-boot:run 来运行它,如下例所示(包括其输出):

$ mvn clean spring-boot:run
....... . . .
....... . . . (Maven log output here)
....... . . .

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.0.3.RELEASE)

2018-07-23 17:44:34.426  INFO 1978 --- [           main] i.s.d.helloworld.HelloworldApplication   : Starting HelloworldApplication on Glenns-MBP-2.attlocal.net with PID 1978 (/Users/glennrenfro/project/helloworld/target/classes started by glennrenfro in /Users/glennrenfro/project/helloworld)
2018-07-23 17:44:34.430  INFO 1978 --- [           main] i.s.d.helloworld.HelloworldApplication   : No active profile set, falling back to default profiles: default
2018-07-23 17:44:34.472  INFO 1978 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1d24f32d: startup date [Mon Jul 23 17:44:34 EDT 2018]; root of context hierarchy
2018-07-23 17:44:35.280  INFO 1978 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2018-07-23 17:44:35.410  INFO 1978 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2018-07-23 17:44:35.419 DEBUG 1978 --- [           main] o.s.c.t.c.SimpleTaskConfiguration        : Using org.springframework.cloud.task.configuration.DefaultTaskConfigurer TaskConfigurer
2018-07-23 17:44:35.420 DEBUG 1978 --- [           main] o.s.c.t.c.DefaultTaskConfigurer          : No EntityManager was found, using DataSourceTransactionManager
2018-07-23 17:44:35.522 DEBUG 1978 --- [           main] o.s.c.t.r.s.TaskRepositoryInitializer    : Initializing task schema for h2 database
2018-07-23 17:44:35.525  INFO 1978 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executing SQL script from class path resource [org/springframework/cloud/task/schema-h2.sql]
2018-07-23 17:44:35.558  INFO 1978 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executed SQL script from class path resource [org/springframework/cloud/task/schema-h2.sql] in 33 ms.
2018-07-23 17:44:35.728  INFO 1978 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2018-07-23 17:44:35.730  INFO 1978 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Bean with name 'dataSource' has been autodetected for JMX exposure
2018-07-23 17:44:35.733  INFO 1978 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Located MBean 'dataSource': registering with JMX server as MBean [com.zaxxer.hikari:name=dataSource,type=HikariDataSource]
2018-07-23 17:44:35.738  INFO 1978 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
2018-07-23 17:44:35.762 DEBUG 1978 --- [           main] o.s.c.t.r.support.SimpleTaskRepository   : Creating: TaskExecution{executionId=0, parentExecutionId=null, exitCode=null, taskName='application', startTime=Mon Jul 23 17:44:35 EDT 2018, endTime=null, exitMessage='null', externalExecutionId='null', errorMessage='null', arguments=[]}
2018-07-23 17:44:35.772  INFO 1978 --- [           main] i.s.d.helloworld.HelloworldApplication   : Started HelloworldApplication in 1.625 seconds (JVM running for 4.764)
Hello, World!
2018-07-23 17:44:35.782 DEBUG 1978 --- [           main] o.s.c.t.r.support.SimpleTaskRepository   : Updating: TaskExecution with executionId=1 with the following {exitCode=0, endTime=Mon Jul 23 17:44:35 EDT 2018, exitMessage='null', errorMessage='null'}

上述输出有三行是我们需要关注的:

  • SimpleTaskRepository 记录了在 TaskRepository 中创建条目。

  • 我们的 CommandLineRunner 的执行,通过“Hello, World!”输出展示。

  • SimpleTaskRepositoryTaskRepository 中记录了任务的完成。

Spring Cloud Task 项目的 samples 模块中可以找到一个简单的任务应用,点此查看

3. 特性

本节将更详细地介绍 Spring Cloud Task,包括如何使用、如何配置以及适当的扩展点。

3.1. Spring Cloud Task 的生命周期

在大多数情况下,现代云环境的设计围绕着不期望结束的进程的执行。如果它们确实结束了,通常会被重新启动。虽然大多数平台都有某种方式可以运行结束时不会重新启动的进程,但其运行结果通常不会以一种可消费的方式维护。Spring Cloud Task 提供了在环境中执行短生命周期进程并记录结果的能力。这样做可以通过消息集成任务,从而构建围绕短生命周期进程以及长期运行服务的微服务架构。

虽然此功能在云环境中很有用,但同样的问题也可能出现在传统的部署模型中。当使用 cron 等调度器运行 Spring Boot 应用时,能够在应用完成后监控其结果可能会很有用。

Spring Cloud Task 采取的方法是,Spring Boot 应用可以有开始和结束,并且仍然是成功的。批量应用就是期望结束(并且通常是短生命周期)的进程如何有用的一个例子。

Spring Cloud Task 记录给定任务的生命周期事件。大多数长期运行的进程,以大多数 Web 应用为典型代表,不保存其生命周期事件。而 Spring Cloud Task 核心的任务会保存。

生命周期由一个任务执行组成。这是配置为任务的 Spring Boot 应用的一次物理执行(即,它具有 Spring Cloud Task 依赖项)。

在任务开始时,在任何 CommandLineRunnerApplicationRunner 实现运行之前,会在 TaskRepository 中创建一个条目来记录开始事件。此事件通过 Spring Framework 触发 SmartLifecycle#start 来触发。这向系统表明所有 bean 都已准备就绪,并在运行 Spring Boot 提供的任何 CommandLineRunnerApplicationRunner 实现之前发生。

任务的记录仅在 ApplicationContext 成功引导时发生。如果上下文完全无法引导,则任务的运行不会被记录。

在 Spring Boot 的所有 *Runner#run 调用完成后,或者 ApplicationContext 失败(由 ApplicationFailedEvent 指示)时,任务执行结果会在仓库中更新。

如果应用需要在任务完成后(所有 *Runner#run 方法已调用且任务仓库已更新)关闭 ApplicationContext,请将属性 spring.cloud.task.closecontextEnabled 设置为 true。

3.1.1. TaskExecution

存储在 TaskRepository 中的信息建模在 TaskExecution 类中,包含以下信息:

字段 描述

executionid

任务运行的唯一 ID。

exitCode

ExitCodeExceptionMapper 实现生成的退出码。如果没有生成退出码但抛出了 ApplicationFailedEvent,则设置为 1。否则,假定为 0。

taskName

任务的名称,由配置的 TaskNameResolver 确定。

startTime

任务开始的时间,由 SmartLifecycle#start 调用指示。

endTime

任务完成的时间,由 ApplicationReadyEvent 指示。

exitMessage

退出时可用的任何信息。这可以通过 TaskExecutionListener 以编程方式设置。

errorMessage

如果异常是任务结束的原因(如 ApplicationFailedEvent 所示),该异常的堆栈跟踪将存储在此处。

arguments

传递给可执行 boot 应用的字符串命令行参数的 List

3.1.2. 映射退出码

任务完成后,它会尝试向操作系统返回一个退出码。如果我们查看 原始示例,可以看到我们并未控制应用的这方面。因此,如果抛出异常,JVM 返回的代码可能对您的调试有用,也可能没用。

因此,Spring Boot 提供了一个接口 ExitCodeExceptionMapper,允许您将未捕获的异常映射到退出码。这样做可以让您在退出码层面指示出错原因。此外,通过这种方式映射退出码,Spring Cloud Task 会记录返回的退出码。

如果任务以 SIG-INT 或 SIG-TERM 终止,除非在代码中另有指定,否则退出码为零。

任务运行时,退出码在仓库中存储为 null。任务完成后,会根据本节前面所述的指南存储适当的退出码。

3.2. 配置

Spring Cloud Task 提供了一套开箱即用的配置,这些配置定义在 DefaultTaskConfigurerSimpleTaskConfiguration 类中。本节将介绍默认配置以及如何根据您的需求自定义 Spring Cloud Task。

3.2.1. DataSource

Spring Cloud Task 使用一个数据源来存储任务执行的结果。默认情况下,我们提供了一个内存中的 H2 实例,以提供一种简单的开发引导方法。然而,在生产环境中,您可能希望配置自己的 DataSource

如果您的应用仅使用一个 DataSource,并且该数据源同时用于您的业务 schema 和任务仓库,那么您只需提供任何 DataSource 即可(最简单的方法是通过 Spring Boot 的配置约定来提供)。Spring Cloud Task 会自动使用此 DataSource 作为任务仓库。

如果您的应用使用多个 DataSource,您需要使用适当的 DataSource 来配置任务仓库。这种自定义可以通过实现 TaskConfigurer 来完成。

3.2.2. 表前缀

TaskRepository 的一个可修改属性是任务表的表前缀。默认情况下,所有任务表都以 TASK_ 作为前缀。TASK_EXECUTIONTASK_EXECUTION_PARAMS 是两个例子。然而,可能有一些原因需要修改此前缀。如果需要将 schema 名称作为表名的前缀,或者在同一 schema 中需要多组任务表,则必须更改表前缀。您可以通过将 spring.cloud.task.tablePrefix 设置为您需要的前缀来实现,如下所示:

spring.cloud.task.tablePrefix=yourPrefix

通过使用 spring.cloud.task.tablePrefix,用户承担创建满足任务表 schema 要求且根据用户业务需求进行修改的任务表的责任。您可以使用 Spring Cloud Task Schema DDL 作为创建您自己的任务 DDL 的参考,点此查看

3.2.3. 启用/禁用表初始化

在您自行创建任务表,并且不希望 Spring Cloud Task 在任务启动时创建它们的情况下,将属性 spring.cloud.task.initialize-enabled 设置为 false,如下所示:

spring.cloud.task.initialize-enabled=false

默认值为 true

属性 spring.cloud.task.initialize.enable 已被弃用。

3.2.4. 外部生成的任务 ID

在某些情况下,您可能希望考虑到任务被请求与基础设施实际启动它之间的时间差。Spring Cloud Task 允许您在任务被请求时创建一个 TaskExecution。然后将生成的 TaskExecution 的执行 ID 传递给任务,以便任务可以在其生命周期中更新 TaskExecution

通过在引用存储 TaskExecution 对象的 datastore 的 TaskRepository 实现上调用 createTaskExecution 方法,可以创建一个 TaskExecution

为了配置您的任务使用生成的 TaskExecutionId,添加以下属性:

spring.cloud.task.executionid=yourtaskId

3.2.5. 外部任务 ID

Spring Cloud Task 允许您为每个 TaskExecution 存储一个外部任务 ID。为了配置您的任务使用生成的 TaskExecutionId,添加以下属性:

spring.cloud.task.external-execution-id=<externalTaskId>

3.2.6. 父任务 ID

Spring Cloud Task 允许您为每个 TaskExecution 存储一个父任务 ID。一个例子是,一个任务执行另一个或多个任务,并且您想记录是哪个任务启动了每个子任务。为了配置您的子任务设置父 TaskExecutionId,请在子任务中添加以下属性:

spring.cloud.task.parent-execution-id=<parentExecutionTaskId>

3.2.7. TaskConfigurer

TaskConfigurer 是一个策略接口,允许您自定义 Spring Cloud Task 各组件的配置方式。默认情况下,我们提供 DefaultTaskConfigurer,它提供逻辑默认值:基于 Map 的内存组件(在未提供 DataSource 时对开发有用)和基于 JDBC 的组件(在有 DataSource 可用时有用)。

TaskConfigurer 允许您配置三个主要组件:

组件 描述 默认值(由 DefaultTaskConfigurer 提供)

TaskRepository

将使用的 TaskRepository 实现。

SimpleTaskRepository

TaskExplorer

将使用的 TaskExplorer 实现(用于只读访问任务仓库的组件)。

SimpleTaskExplorer

PlatformTransactionManager

用于任务更新时的事务管理器。

如果使用了 DataSource,则为 JdbcTransactionManager。如果未使用,则为 ResourcelessTransactionManager

您可以通过创建 TaskConfigurer 接口的自定义实现来定制前面表格中描述的任何组件。通常,扩展 DefaultTaskConfigurer(如果在找不到 TaskConfigurer 时提供)并重写所需的 getter 就足够了。但是,可能需要从头开始实现您自己的版本。

用户不应直接使用 TaskConfigurer 中的 getter 方法,除非他们是使用它来提供要暴露为 Spring Bean 的实现。

3.2.8. 任务执行监听器

TaskExecutionListener 允许您注册任务生命周期中发生的特定事件的监听器。为此,创建一个实现 TaskExecutionListener 接口的类。实现 TaskExecutionListener 接口的类会收到以下事件的通知:

  • onTaskStartup:在将 TaskExecution 存储到 TaskRepository 之前。

  • onTaskEnd:在更新 TaskExecutionTaskRepository 中的条目并标记任务最终状态之前。

  • onTaskFailed:当任务抛出未处理的异常时,在调用 onTaskEnd 方法之前。

Spring Cloud Task 还允许您使用以下方法注解将 TaskExecution 监听器添加到 bean 中的方法上:

  • @BeforeTask:在将 TaskExecution 存储到 TaskRepository 之前。

  • @AfterTask:在更新 TaskExecutionTaskRepository 中的条目并标记任务最终状态之前。

  • @FailedTask:当任务抛出未处理的异常时,在调用 @AfterTask 方法之前。

以下示例展示了这三个注解的使用:

 public class MyBean {

    @BeforeTask
    public void methodA(TaskExecution taskExecution) {
    }

    @AfterTask
    public void methodB(TaskExecution taskExecution) {
    }

    @FailedTask
    public void methodC(TaskExecution taskExecution, Throwable throwable) {
    }
}
在链中插入一个早于 TaskLifecycleListenerApplicationListener 可能会导致意外的效果。
任务执行监听器抛出的异常

如果 TaskExecutionListener 事件处理器抛出异常,该事件处理器的所有监听器处理都会停止。例如,如果三个 onTaskStartup 监听器已启动,并且第一个 onTaskStartup 事件处理器抛出异常,则不会调用另外两个 onTaskStartup 方法。但是,TaskExecutionListeners 的其他事件处理器(onTaskEndonTaskFailed)会被调用。

TaskExecutionListener 事件处理器抛出异常时返回的退出码是 ExitCodeEvent 报告的退出码。如果没有发出 ExitCodeEvent,则会评估抛出的异常,看其是否属于 ExitCodeGenerator 类型。如果是,则返回 ExitCodeGenerator 的退出码。否则,返回 1

如果在 onTaskStartup 方法中抛出异常,应用的退出码将为 1。如果在 onTaskEndonTaskFailed 方法中抛出异常,应用的退出码将是根据上述规则确定的退出码。

如果在 onTaskStartuponTaskEndonTaskFailed 中抛出异常,您不能使用 ExitCodeExceptionMapper 来覆盖应用的退出码。
退出消息

您可以使用 TaskExecutionListener 以编程方式设置任务的退出消息。这是通过设置 TaskExecutionexitMessage 来完成的,然后该消息会传递给 TaskExecutionListener。以下示例显示了一个使用 @AfterTask ExecutionListener 注解的方法:

@AfterTask
public void afterMe(TaskExecution taskExecution) {
    taskExecution.setExitMessage("AFTER EXIT MESSAGE");
}

可以在任何监听器事件(onTaskStartuponTaskFailedonTaskEnd)中设置 ExitMessage。三个监听器的优先顺序如下:

  1. onTaskEnd

  2. onTaskFailed

  3. onTaskStartup

例如,如果您为 onTaskStartuponTaskFailed 监听器设置了 exitMessage,并且任务在没有失败的情况下结束,则存储在仓库中的是来自 onTaskStartupexitMessage。否则,如果发生失败,则存储来自 onTaskFailedexitMessage。此外,如果您使用 onTaskEnd 监听器设置了 exitMessage,则来自 onTaskEndexitMessage 会覆盖来自 onTaskStartuponTaskFailed 的退出消息。

3.2.9. 限制 Spring Cloud Task 实例

Spring Cloud Task 允许您指定具有给定任务名称的任务一次只能运行一个实例。为此,您需要指定任务名称,并为每个任务执行设置 spring.cloud.task.single-instance-enabled=true。当第一个任务执行正在运行时,如果您尝试使用相同的任务名称spring.cloud.task.single-instance-enabled=true 再次运行任务,任务将失败并显示以下错误消息:Task with name "application" is already running. spring.cloud.task.single-instance-enabled 的默认值为 false。以下示例展示了如何将 spring.cloud.task.single-instance-enabled 设置为 true

spring.cloud.task.single-instance-enabled=true or false

要使用此功能,您必须向应用添加以下 Spring Integration 依赖项:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jdbc</artifactId>
</dependency>
如果任务因启用此功能且有另一个同名任务正在运行而失败,则应用的退出码将为 1。
Spring AOT 和原生编译中的单实例用法

在创建原生编译应用时使用 Spring Cloud Task 的单实例功能,您需要在构建时启用该功能。为此,添加 process-aot 执行,并将 spring.cloud.task.single-step-instance-enabled=true 设置为 JVM 参数,如下所示:

<plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <executions>
        <execution>
            <id>process-aot</id>
            <goals>
                <goal>process-aot</goal>
            </goals>
            <configuration>
                <jvmArguments>
                    -Dspring.cloud.task.single-instance-enabled=true
                </jvmArguments>
            </configuration>
        </execution>
    </executions>
</plugin>

3.2.10. 为 ApplicationRunner 和 CommandLineRunner 启用观测

要为 ApplicationRunnerCommandLineRunner 启用任务观测,请将 spring.cloud.task.observation.enabled 设置为 true。

一个启用了观测并使用 SimpleMeterRegistry 的任务应用示例可在 处找到。

3.2.11. 禁用 Spring Cloud Task 自动配置

在某些情况下,不应为某个实现自动配置 Spring Cloud Task,您可以禁用 Task 的自动配置。这可以通过向您的任务应用添加以下注解来实现:

@EnableAutoConfiguration(exclude={SimpleTaskAutoConfiguration.class})

您也可以通过将属性 spring.cloud.task.autoconfiguration.enabled 设置为 false 来禁用 Task 的自动配置。

3.2.12. 关闭上下文

如果应用需要在任务完成后(所有 *Runner#run 方法已调用且任务仓库已更新)关闭 ApplicationContext,请将属性 spring.cloud.task.closecontextEnabled 设置为 true

另一种需要关闭上下文的情况是当任务执行完成但应用没有终止时。在这些情况下,上下文保持打开状态是因为分配了线程(例如:如果您正在使用 TaskExecutor)。在这种情况下,当启动任务时,将 spring.cloud.task.closecontextEnabled 属性设置为 true。这会在任务完成后关闭应用的上下文。从而允许应用终止。

3.2.13. 启用任务指标

Spring Cloud Task 集成了 Micrometer 并为其执行的任务创建观测。要启用任务可观测性集成,您必须向您的任务应用添加 spring-boot-starter-actuator、您首选的注册表实现(如果您想发布指标)以及 micrometer-tracing(如果您想发布跟踪数据)。使用 Influx 启用任务可观测性和指标的 Maven 依赖示例是:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-registry-influx</artifactId>
    <scope>runtime</scope>
</dependency>

3.2.14. Spring Task 和 Spring Cloud Task 属性

术语 task 是业内一个常用词汇。一个例子是 Spring Boot 提供了 spring.task,而 Spring Cloud Task 提供了 spring.cloud.task 属性。这在过去导致了一些混淆,认为这两组属性直接相关。然而,它们代表了 Spring 生态系统中提供的两组不同的功能。

  • spring.task 指的是配置 ThreadPoolTaskScheduler 的属性。

  • spring.cloud.task 指的是配置 Spring Cloud Task 功能的属性。

4. 批处理

本节详细介绍了 Spring Cloud Task 与 Spring Batch 的集成。本节涵盖了如何跟踪作业执行与其所在任务之间的关联,以及通过 Spring Cloud Deployer 进行远程分区。

4.1. 将作业执行与其所在任务关联

Spring Boot 提供了在 über-jar 中执行批处理作业的功能。Spring Boot 对此功能的支持允许开发人员在该执行中执行多个批处理作业。Spring Cloud Task 提供了将作业执行(即作业执行)与任务执行关联的能力,以便可以相互追溯。

Spring Cloud Task 通过使用 TaskBatchExecutionListener 实现此功能。默认情况下,在同时配置了 Spring Batch 作业(即在上下文中定义了类型为 Job 的 bean)并且类路径中包含 spring-cloud-task-batch jar 的任何上下文中,此监听器都会自动配置。满足这些条件的所有作业都会注入该监听器。

4.1.1. 覆盖 TaskBatchExecutionListener

要阻止监听器被注入到当前上下文中的任何批处理作业中,可以使用标准的 Spring Boot 机制禁用自动配置。

要仅将监听器注入到上下文中的特定作业中,请覆盖 batchTaskExecutionListenerBeanPostProcessor 并提供作业 bean ID 列表,如下例所示

public static TaskBatchExecutionListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() {
    TaskBatchExecutionListenerBeanPostProcessor postProcessor =
        new TaskBatchExecutionListenerBeanPostProcessor();

    postProcessor.setJobNames(Arrays.asList(new String[] {"job1", "job2"}));

    return postProcessor;
}
您可以在 Spring Cloud Task 项目的 samples 模块中找到批处理应用程序示例,点此查看

4.2. 远程分区

Spring Cloud Deployer 提供了在大多数云基础设施上启动基于 Spring Boot 的应用程序的功能。DeployerPartitionHandlerDeployerStepExecutionHandler 将工作步骤执行的启动委托给 Spring Cloud Deployer。

要配置 DeployerStepExecutionHandler,必须提供表示要执行的 Spring Boot über-jar 的 ResourceTaskLauncherHandlerJobExplorer。您可以配置任何环境属性,以及同时执行的最大工作线程数、轮询结果的间隔(默认为 10 秒)和超时(默认为 -1 或无超时)。以下示例展示了如何配置此 PartitionHandler

@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
        JobExplorer jobExplorer) throws Exception {

    MavenProperties mavenProperties = new MavenProperties();
    mavenProperties.setRemoteRepositories(new HashMap<>(Collections.singletonMap("springRepo",
        new MavenProperties.RemoteRepository(repository))));

    Resource resource =
        MavenResource.parse(String.format("%s:%s:%s",
                "io.spring.cloud",
                "partitioned-batch-job",
                "1.1.0.RELEASE"), mavenProperties);

    DeployerPartitionHandler partitionHandler =
        new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");

    List<String> commandLineArgs = new ArrayList<>(3);
    commandLineArgs.add("--spring.profiles.active=worker");
    commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
    commandLineArgs.add("--spring.batch.initializer.enabled=false");

    partitionHandler.setCommandLineArgsProvider(
        new PassThroughCommandLineArgsProvider(commandLineArgs));
    partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
    partitionHandler.setMaxWorkers(2);
    partitionHandler.setApplicationName("PartitionedBatchJobTask");

    return partitionHandler;
}
将环境变量传递给分区时,每个分区可能位于具有不同环境设置的不同机器上。因此,您只应传递那些必需的环境变量。

请注意上例中,我们将最大工作线程数设置为 2。设置最大工作线程数确定了应同时运行的最大分区数。

要执行的 Resource 预期是配置了 DeployerStepExecutionHandler 作为当前上下文中的 CommandLineRunner 的 Spring Boot über-jar。前面示例中列举的仓库应是 über-jar 所在的远程仓库。管理者和工作线程都应能访问用作作业仓库和任务仓库的同一数据存储。一旦底层基础设施引导了 Spring Boot jar 并且 Spring Boot 启动了 DeployerStepExecutionHandler,该步骤处理器就会执行请求的 Step。以下示例展示了如何配置 DeployerStepExecutionHandler

@Bean
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
    DeployerStepExecutionHandler handler =
        new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);

    return handler;
}
您可以在 Spring Cloud Task 项目的 samples 模块中找到远程分区应用程序示例,点此查看

4.2.1. 异步启动远程批处理分区

默认情况下,批处理分区是顺序启动的。然而,在某些情况下,这可能会影响性能,因为每次启动都会阻塞,直到资源(例如:在 Kubernetes 中预置 Pod)被预置完成。在这种情况下,您可以向 DeployerPartitionHandler 提供一个 ThreadPoolTaskExecutor。这将根据 ThreadPoolTaskExecutor 的配置启动远程批处理分区。例如

    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setThreadNamePrefix("default_task_executor_thread");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }

    @Bean
    public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
        TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception {
        Resource resource = this.resourceLoader
            .getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");

        DeployerPartitionHandler partitionHandler =
            new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
                "workerStep", taskRepository, executor);
    ...
    }
我们需要关闭上下文,因为使用 ThreadPoolTaskExecutor 会留下一个活动的线程,从而导致应用程序不会终止。要适当地关闭应用程序,我们需要将 spring.cloud.task.closecontextEnabled 属性设置为 true

4.2.2. 关于为 Kubernetes 平台开发批处理分区应用程序的注意事项

  • 在 Kubernetes 平台上部署分区应用程序时,必须使用 Spring Cloud Kubernetes Deployer 的以下依赖

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-deployer-kubernetes</artifactId>
    </dependency>
  • 任务应用程序及其分区的应用程序名称需要遵循以下正则表达式模式:[a-z0-9]([-a-z0-9]*[a-z0-9])。否则,将抛出异常。

4.3. 批处理信息消息

Spring Cloud Task 提供了批处理作业发出信息消息的能力。“Spring Batch 事件” 部分详细介绍了此功能。

4.4. 批处理作业退出码

前所述,Spring Cloud Task 应用程序支持记录任务执行的退出码。但是,在任务中运行 Spring Batch 作业的情况下,无论批处理作业执行如何完成,使用默认的 Batch/Boot 行为时,任务的结果始终为零。请记住,任务是引导应用程序,从任务返回的退出码与引导应用程序相同。要覆盖此行为并在批处理作业返回 BatchStatusFAILED 时允许任务返回非零退出码,请将 spring.cloud.task.batch.fail-on-job-failure 设置为 true。然后退出码可以是 1(默认值)或基于指定的 ExitCodeGenerator)

此功能使用一个新的 ApplicationRunner,它替换了 Spring Boot 提供的那一个。默认情况下,它的配置顺序与 Spring Boot 提供的相同。但是,如果您想自定义 ApplicationRunner 的运行顺序,可以通过设置 spring.cloud.task.batch.applicationRunnerOrder 属性来设置其顺序。要让您的任务根据批处理作业执行结果返回退出码,您需要编写自己的 CommandLineRunner

5. 单步批处理作业启动器

本节介绍了如何使用 Spring Cloud Task 中包含的启动器开发具有单个 Step 的 Spring Batch Job。该启动器允许您使用配置来定义 ItemReaderItemWriter 或完整的单步 Spring Batch Job。有关 Spring Batch 及其功能的更多信息,请参阅Spring Batch 文档

要获取 Maven 启动器,请将以下内容添加到您的构建中

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-single-step-batch-job</artifactId>
    <version>2.3.0</version>
</dependency>

要获取 Gradle 启动器,请将以下内容添加到您的构建中

compile "org.springframework.cloud:spring-cloud-starter-single-step-batch-job:2.3.0"

5.1. 定义作业

您可以使用此启动器定义一个简单的 ItemReaderItemWriter,也可以定义一个完整的 Job。在本节中,我们将定义配置 Job 所需的属性。

5.1.1. 属性

首先,此启动器提供了一组属性,允许您配置具有一个 Step 的 Job 的基本信息

表 1. 作业属性
属性 类型 默认值 描述

spring.batch.job.jobName

String

null

作业的名称。

spring.batch.job.stepName

String

null

步骤的名称。

spring.batch.job.chunkSize

Integer

null

每个事务要处理的条目数。

配置了上述属性后,您就拥有了一个具有单个基于 chunk 的步骤的作业。这个基于 chunk 的步骤以 Map<String, Object> 实例作为条目进行读取、处理和写入。但是,这个步骤还没有执行任何操作。您需要配置一个 ItemReader、一个可选的 ItemProcessor 和一个 ItemWriter 来让它做一些事情。要配置其中之一,您可以使用属性并配置提供了自动配置的选项之一,或者使用标准的 Spring 配置机制配置您自己的。

如果您配置自己的实现,输入和输出类型必须与步骤中的其他实现匹配。此启动器中的 ItemReader 实现和 ItemWriter 实现都使用 Map<String, Object> 作为输入和输出条目。

5.2. ItemReader 实现的自动配置

此启动器为四种不同的 ItemReader 实现提供了自动配置:AmqpItemReaderFlatFileItemReaderJdbcCursorItemReaderKafkaItemReader。在本节中,我们将概述如何使用提供的自动配置来配置它们中的每一个。

5.2.1. AmqpItemReader

您可以使用 AmqpItemReader 通过 AMQP 从队列或主题读取数据。此 ItemReader 实现的自动配置取决于两组配置。第一组是 AmqpTemplate 的配置。您可以自己配置,也可以使用 Spring Boot 提供的自动配置。请参阅Spring Boot AMQP 文档。配置好 AmqpTemplate 后,您可以通过设置以下属性启用批处理能力来支持它

表 2. AmqpItemReader 属性
属性 类型 默认值 描述

spring.batch.job.amqpitemreader.enabled

boolean

false

如果为 true,则执行自动配置。

spring.batch.job.amqpitemreader.jsonConverterEnabled

boolean

true

指示是否应该注册 Jackson2JsonMessageConverter 来解析消息。

更多信息,请参阅AmqpItemReader 文档

5.2.2. FlatFileItemReader

FlatFileItemReader 允许您从平面文件(如 CSV 和其他文件格式)读取数据。要从文件读取,您可以通过正常的 Spring 配置自行提供一些组件(LineTokenizer, RecordSeparatorPolicy, FieldSetMapper, LineMapper, 或 SkippedLinesCallback)。您也可以使用以下属性来配置读取器

表 3. FlatFileItemReader 属性
属性 类型 默认值 描述

spring.batch.job.flatfileitemreader.saveState

boolean

true

确定是否应为重新启动保存状态。

spring.batch.job.flatfileitemreader.name

String

null

用于在 ExecutionContext 中提供唯一键的名称。

spring.batch.job.flatfileitemreader.maxItemcount

int

Integer.MAX_VALUE

从文件读取的最大条目数。

spring.batch.job.flatfileitemreader.currentItemCount

int

0

已读取的条目数。用于重新启动。

spring.batch.job.flatfileitemreader.comments

List<String>

空列表

一个字符串列表,指示文件中的注释行(要忽略的行)。

spring.batch.job.flatfileitemreader.resource

Resource

null

要读取的资源。

spring.batch.job.flatfileitemreader.strict

boolean

true

如果设置为 true,则如果找不到资源,读取器会抛出异常。

spring.batch.job.flatfileitemreader.encoding

String

FlatFileItemReader.DEFAULT_CHARSET

读取文件时使用的编码。

spring.batch.job.flatfileitemreader.linesToSkip

int

0

指示文件开头要跳过的行数。

spring.batch.job.flatfileitemreader.delimited

boolean

false

指示文件是否为分隔文件(CSV 及其他格式)。此属性或 spring.batch.job.flatfileitemreader.fixedLength 属性中只有一个可以同时为 true

spring.batch.job.flatfileitemreader.delimiter

String

DelimitedLineTokenizer.DELIMITER_COMMA

如果读取分隔文件,指示用于解析的分隔符。

spring.batch.job.flatfileitemreader.quoteCharacter

char

DelimitedLineTokenizer.DEFAULT_QUOTE_CHARACTER

用于确定用于引用值的字符。

spring.batch.job.flatfileitemreader.includedFields

List<Integer>

空列表

一个索引列表,用于确定记录中哪些字段包含在条目中。

spring.batch.job.flatfileitemreader.fixedLength

boolean

false

指示文件的记录是否按列号解析。此属性或 spring.batch.job.flatfileitemreader.delimited 属性中只有一个可以同时为 true

spring.batch.job.flatfileitemreader.ranges

List<Range>

空列表

用于解析定宽记录的列范围列表。请参阅Range 文档

spring.batch.job.flatfileitemreader.names

String []

null

从记录解析出的每个字段的名称列表。这些名称是此 ItemReader 返回的条目中 Map<String, Object> 的键。

spring.batch.job.flatfileitemreader.parsingStrict

boolean

true

如果设置为 true,则如果字段无法映射,映射会失败。

5.2.3. JdbcCursorItemReader

JdbcCursorItemReader 对关系数据库运行查询,并遍历结果游标(ResultSet)以提供结果条目。此自动配置允许您提供一个 PreparedStatementSetter、一个 RowMapper 或两者。您还可以使用以下属性来配置 JdbcCursorItemReader

表 4. JdbcCursorItemReader 属性
属性 类型 默认值 描述

spring.batch.job.jdbccursoritemreader.saveState

boolean

true

确定是否应为重新启动保存状态。

spring.batch.job.jdbccursoritemreader.name

String

null

用于在 ExecutionContext 中提供唯一键的名称。

spring.batch.job.jdbccursoritemreader.maxItemcount

int

Integer.MAX_VALUE

从文件读取的最大条目数。

spring.batch.job.jdbccursoritemreader.currentItemCount

int

0

已读取的条目数。用于重新启动。

spring.batch.job.jdbccursoritemreader.fetchSize

int

给驱动程序的一个提示,指示每次调用数据库系统时检索多少条记录。为了获得最佳性能,通常希望将其设置为与 chunk 大小匹配。

spring.batch.job.jdbccursoritemreader.maxRows

int

从数据库读取的最大条目数。

spring.batch.job.jdbccursoritemreader.queryTimeout

int

查询超时的毫秒数。

spring.batch.job.jdbccursoritemreader.ignoreWarnings

boolean

true

确定读取器在处理时是否应忽略 SQL 警告。

spring.batch.job.jdbccursoritemreader.verifyCursorPosition

boolean

true

指示每次读取后是否应验证游标位置,以验证 RowMapper 未移动游标。

spring.batch.job.jdbccursoritemreader.driverSupportsAbsolute

boolean

false

指示驱动程序是否支持游标的绝对定位。

spring.batch.job.jdbccursoritemreader.useSharedExtendedConnection

boolean

false

指示连接是否与其他处理共享(因此是事务的一部分)。

spring.batch.job.jdbccursoritemreader.sql

String

null

从中读取的 SQL 查询。

您还可以使用以下属性专门为读取器指定 JDBC DataSource:.JdbcCursorItemReader 属性

属性 类型 默认值 描述

spring.batch.job.jdbccursoritemreader.datasource.enable

boolean

false

确定是否应启用 JdbcCursorItemReaderDataSource

jdbccursoritemreader.datasource.url

String

null

数据库的 JDBC URL。

jdbccursoritemreader.datasource.username

String

null

数据库的登录用户名。

jdbccursoritemreader.datasource.password

String

null

数据库的登录密码。

jdbccursoritemreader.datasource.driver-class-name

String

null

JDBC 驱动程序的完全限定名。

如果未指定 jdbccursoritemreader_datasourceJDBCCursorItemReader 将使用默认的 DataSource

5.2.4. KafkaItemReader

从 Kafka 主题摄取分区数据非常有用,这正是 KafkaItemReader 可以做到的。要配置 KafkaItemReader,需要两部分配置。首先,需要使用 Spring Boot 的 Kafka 自动配置来配置 Kafka(请参阅Spring Boot Kafka 文档)。一旦您配置了 Spring Boot 的 Kafka 属性,就可以通过设置以下属性来配置 KafkaItemReader 本身

表 5. KafkaItemReader 属性
属性 类型 默认值 描述

spring.batch.job.kafkaitemreader.name

String

null

用于在 ExecutionContext 中提供唯一键的名称。

spring.batch.job.kafkaitemreader.topic

String

null

要从中读取的主题名称。

spring.batch.job.kafkaitemreader.partitions

List<Integer>

空列表

要从中读取的分区索引列表。

spring.batch.job.kafkaitemreader.pollTimeOutInSeconds

long

30

poll() 操作的超时时间。

spring.batch.job.kafkaitemreader.saveState

boolean

true

确定是否应为重新启动保存状态。

5.2.5. 本地编译

单步批处理的优点是,在使用 JVM 时,您可以在运行时动态选择要使用的读取器和写入器 bean。然而,在使用本地编译时,您必须在构建时而不是运行时确定读取器和写入器。以下示例展示了如何做到这一点

<plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <executions>
        <execution>
            <id>process-aot</id>
            <goals>
                <goal>process-aot</goal>
            </goals>
            <configuration>
                <jvmArguments>
                    -Dspring.batch.job.flatfileitemreader.name=fooReader
                    -Dspring.batch.job.flatfileitemwriter.name=fooWriter
                </jvmArguments>
            </configuration>
        </execution>
    </executions>
</plugin>

5.3. ItemProcessor 配置

如果 ApplicationContext 中有一个 ItemProcessor 可用,单步批处理作业自动配置将接受它。如果找到类型正确(ItemProcessor<Map<String, Object>, Map<String, Object>>)的处理器,它将自动注入到步骤中。

5.4. ItemWriter 实现的自动配置

此启动器为与支持的 ItemReader 实现匹配的 ItemWriter 实现提供自动配置:AmqpItemWriterFlatFileItemWriterJdbcItemWriterKafkaItemWriter。本节介绍了如何使用自动配置来配置支持的 ItemWriter

5.4.1. AmqpItemWriter

要写入 RabbitMQ 队列,您需要两组配置。首先,您需要一个 AmqpTemplate。最简单的方法是使用 Spring Boot 的 RabbitMQ 自动配置。请参阅Spring Boot AMQP 文档

配置好 AmqpTemplate 后,您可以通过设置以下属性来配置 AmqpItemWriter

表 6. AmqpItemWriter 属性
属性 类型 默认值 描述

spring.batch.job.amqpitemwriter.enabled

boolean

false

如果为 true,则运行自动配置。

spring.batch.job.amqpitemwriter.jsonConverterEnabled

boolean

true

指示是否应注册 Jackson2JsonMessageConverter 来转换消息。

5.4.2. FlatFileItemWriter

要将文件作为步骤的输出写入,您可以配置 FlatFileItemWriter。自动配置接受已明确配置的组件(如 LineAggregatorFieldExtractorFlatFileHeaderCallbackFlatFileFooterCallback)以及通过设置以下指定属性配置的组件

表 7. FlatFileItemWriter 属性
属性 类型 默认值 描述

spring.batch.job.flatfileitemwriter.resource

Resource

null

要读取的资源。

spring.batch.job.flatfileitemwriter.delimited

boolean

false

指示输出文件是否为分隔文件。如果为 true,则 spring.batch.job.flatfileitemwriter.formatted 必须为 false

spring.batch.job.flatfileitemwriter.formatted

boolean

false

指示输出文件是否为格式化文件。如果为 true,则 spring.batch.job.flatfileitemwriter.delimited 必须为 false

spring.batch.job.flatfileitemwriter.format

String

null

用于为格式化文件生成输出的格式。格式化通过使用 String.format 完成。

spring.batch.job.flatfileitemwriter.locale

Locale

Locale.getDefault()

生成文件时使用的 Locale

spring.batch.job.flatfileitemwriter.maximumLength

int

0

记录的最大长度。如果为 0,则大小无限制。

spring.batch.job.flatfileitemwriter.minimumLength

int

0

记录的最小长度。

spring.batch.job.flatfileitemwriter.delimiter

String

,

用于分隔分隔文件中字段的 String

spring.batch.job.flatfileitemwriter.encoding

String

FlatFileItemReader.DEFAULT_CHARSET

写入文件时使用的编码。

spring.batch.job.flatfileitemwriter.forceSync

boolean

false

指示文件在刷新时是否应强制同步到磁盘。

spring.batch.job.flatfileitemwriter.names

String []

null

从记录解析出的每个字段的名称列表。这些名称是此 ItemWriter 接收的条目中 Map<String, Object> 的键。

spring.batch.job.flatfileitemwriter.append

boolean

false

指示如果找到输出文件是否应追加到文件中。

spring.batch.job.flatfileitemwriter.lineSeparator

String

FlatFileItemWriter.DEFAULT_LINE_SEPARATOR

用于分隔输出文件中行的 String

spring.batch.job.flatfileitemwriter.name

String

null

用于在 ExecutionContext 中提供唯一键的名称。

spring.batch.job.flatfileitemwriter.saveState

boolean

true

确定是否应为重新启动保存状态。

spring.batch.job.flatfileitemwriter.shouldDeleteIfEmpty

boolean

false

如果设置为 true,则在作业完成时删除空文件(没有输出)。

spring.batch.job.flatfileitemwriter.shouldDeleteIfExists

boolean

true

如果设置为 true 并且在应该生成输出文件的位置找到文件,则在步骤开始前删除该文件。

spring.batch.job.flatfileitemwriter.transactional

boolean

FlatFileItemWriter.DEFAULT_TRANSACTIONAL

指示读取器是否为事务性队列(表示读取的条目在失败时返回队列)。

5.4.3. JdbcBatchItemWriter

要将步骤的输出写入关系数据库,此启动器提供了自动配置 JdbcBatchItemWriter 的能力。通过设置以下属性,自动配置允许您提供自己的 ItemPreparedStatementSetterItemSqlParameterSourceProvider 以及配置选项

表 8. JdbcBatchItemWriter 属性
属性 类型 默认值 描述

spring.batch.job.jdbcbatchitemwriter.name

String

null

用于在 ExecutionContext 中提供唯一键的名称。

spring.batch.job.jdbcbatchitemwriter.sql

String

null

用于插入每个条目的 SQL。

spring.batch.job.jdbcbatchitemwriter.assertUpdates

boolean

true

是否验证每次插入至少更新了一条记录。

您还可以使用以下属性专门为写入器指定 JDBC DataSource:.JdbcBatchItemWriter 属性

属性 类型 默认值 描述

spring.batch.job.jdbcbatchitemwriter.datasource.enable

boolean

false

确定是否应启用 JdbcCursorItemReaderDataSource

jdbcbatchitemwriter.datasource.url

String

null

数据库的 JDBC URL。

jdbcbatchitemwriter.datasource.username

String

null

数据库的登录用户名。

jdbcbatchitemwriter.datasource.password

String

null

数据库的登录密码。

jdbcbatchitemreader.datasource.driver-class-name

String

null

JDBC 驱动程序的完全限定名。

如果未指定 jdbcbatchitemwriter_datasourceJdbcBatchItemWriter 将使用默认的 DataSource

5.4.4. KafkaItemWriter

要将步骤输出写入 Kafka 主题,您需要 KafkaItemWriter。此启动器通过使用两方面的功能为 KafkaItemWriter 提供自动配置。首先是 Spring Boot 的 Kafka 自动配置。(请参阅Spring Boot Kafka 文档。)其次,此启动器允许您配置写入器上的两个属性。

表 9. KafkaItemWriter 属性
属性 类型 默认值 描述

spring.batch.job.kafkaitemwriter.topic

String

null

要写入的 Kafka 主题。

spring.batch.job.kafkaitemwriter.delete

boolean

false

传递给写入器的条目是否都作为删除事件发送到主题。

有关 KafkaItemWriter 配置选项的更多信息,请参阅KafkaItemWiter 文档

5.4.5. Spring AOT

使用 Spring AOT 和单步批处理启动器时,必须在编译时设置读取器和写入器的名称属性(除非为读取器和/或写入器创建 bean)。为此,您必须在 boot maven 插件或 gradle 插件中将您希望使用的读取器和写入器名称作为参数或环境变量包含进来。例如,如果您希望在 Maven 中启用 FlatFileItemReaderFlatFileItemWriter,它看起来像这样

    <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
        <executions>
            <execution>
            <id>process-aot</id>
            <goals>
                <goal>process-aot</goal>
            </goals>
            </execution>
        </executions>
        <configuration>
            <arguments>
                <argument>--spring.batch.job.flatfileitemreader.name=foobar</argument>
                <argument>--spring.batch.job.flatfileitemwriter.name=fooWriter</argument>
            </arguments>
        </configuration>
    </plugin>

6. Spring Cloud Stream 集成

任务本身可能有用,但将任务集成到更大的生态系统中可以使其在更复杂的处理和编排中发挥作用。本节涵盖了 Spring Cloud Task 与 Spring Cloud Stream 的集成选项。

6.1. 从 Spring Cloud Stream 启动任务

您可以从流中启动任务。为此,请创建一个侦听包含 TaskLaunchRequest 作为其有效载荷的消息的 sink。TaskLaunchRequest 包含

  • uri:指向要执行的任务 artifact。

  • applicationName:与任务关联的名称。如果未设置 applicationName,TaskLaunchRequest 将生成一个由以下内容组成的任务名称:Task-<UUID>

  • commandLineArguments:一个包含任务命令行参数的列表。

  • environmentProperties:一个包含任务要使用的环境变量的 Map。

  • deploymentProperties:一个包含部署器用于部署任务的属性的 Map。

如果有效载荷是不同类型,则 sink 将抛出异常。

例如,可以创建一个流,该流具有一个处理器,该处理器从 HTTP 源接收数据并创建包含 TaskLaunchRequestGenericMessage,然后将消息发送到其输出通道。任务 sink 将从其输入通道接收消息,然后启动任务。

要创建 taskSink,只需创建一个包含 EnableTaskLauncher 注解的 Spring Boot 应用程序,如下例所示

@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
    public static void main(String[] args) {
        SpringApplication.run(TaskSinkApplication.class, args);
    }
}

Spring Cloud Task 项目的 samples 模块包含一个 Sink 和 Processor 示例。要将这些示例安装到您的本地 Maven 仓库,请在 spring-cloud-task-samples 目录中运行 Maven 构建,并将 skipInstall 属性设置为 false,如下例所示

mvn clean install

必须将 maven.remoteRepositories.springRepo.url 属性设置为 über-jar 所在的远程仓库位置。如果未设置,则没有远程仓库,因此仅依赖本地仓库。

6.1.1. Spring Cloud Data Flow

要在 Spring Cloud Data Flow 中创建流,必须先注册我们创建的 Task Sink 应用程序。在下面的示例中,我们使用 Spring Cloud Data Flow shell 注册 Processor 和 Sink 示例应用程序。

app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>

以下示例展示了如何从 Spring Cloud Data Flow shell 创建流。

stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy

6.2. Spring Cloud Task 事件

Spring Cloud Task 提供了当任务通过 Spring Cloud Stream 通道运行时,可以通过 Spring Cloud Stream 通道发出事件的能力。任务监听器用于将 TaskExecution 发布到名为 task-events 的消息通道上。此功能会自动装配到任何 classpath 中包含 spring-cloud-streamspring-cloud-stream-<binder> 和已定义任务的应用程序中。

要禁用事件发送监听器,请将 spring.cloud.task.events.enabled 属性设置为 false

定义了适当的 classpath 后,以下任务会在 task-events 通道上将 TaskExecution 作为事件发出(在任务开始和结束时都会发出)。

@SpringBootApplication
public class TaskEventsApplication {

    public static void main(String[] args) {
        SpringApplication.run(TaskEventsApplication.class, args);
    }

    @Configuration
    public static class TaskConfiguration {

        @Bean
        public ApplicationRunner applicationRunner() {
            return new ApplicationRunner() {
                @Override
                public void run(ApplicationArguments args) {
                    System.out.println("The ApplicationRunner was executed");
                }
            };
        }
    }
}
classpath 中还需要包含绑定器实现。
Spring Cloud Task 项目的 samples 模块中提供了一个任务事件示例应用程序,请参见此处

6.2.1. 禁用特定任务事件

要禁用任务事件,可以将 spring.cloud.task.events.enabled 属性设置为 false

6.3. Spring Batch 事件

当通过任务执行 Spring Batch 作业时,Spring Cloud Task 可以配置为根据 Spring Batch 中可用的 Spring Batch 监听器发出信息性消息。具体来说,当通过 Spring Cloud Task 运行时,以下 Spring Batch 监听器会自动配置到每个批处理作业中,并在相关的 Spring Cloud Stream 通道上发出消息。

  • JobExecutionListener 监听 job-execution-events

  • StepExecutionListener 监听 step-execution-events

  • ChunkListener 监听 chunk-events

  • ItemReadListener 监听 item-read-events

  • ItemProcessListener 监听 item-process-events

  • ItemWriteListener 监听 item-write-events

  • SkipListener 监听 skip-events

当上下文中存在适当的 Bean(JobTaskLifecycleListener)时,这些监听器会自动配置到任何 AbstractJob 中。监听这些事件的配置方式与绑定任何其他 Spring Cloud Stream 通道相同。我们的任务(运行批处理作业的任务)充当 Source,而监听应用程序充当 ProcessorSink

一个例子是有一个应用程序监听 job-execution-events 通道,以获取作业的开始和停止事件。要配置监听应用程序,您可以如下配置输入为 job-execution-events

spring.cloud.stream.bindings.input.destination=job-execution-events

classpath 中还需要包含绑定器实现。
Spring Cloud Task 项目的 samples 模块中提供了一个批处理事件示例应用程序,请参见此处

6.3.1. 将批处理事件发送到不同的通道

Spring Cloud Task 为批处理事件提供的一个选项是更改特定监听器发出消息的通道的能力。为此,请使用以下配置:spring.cloud.stream.bindings.<the channel>.destination=<new destination>。例如,如果 StepExecutionListener 需要将消息发送到名为 my-step-execution-events 的另一个通道,而不是默认的 step-execution-events,您可以添加以下配置。

spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events

6.3.2. 禁用批处理事件

要禁用所有批处理事件的监听器功能,请使用以下配置。

spring.cloud.task.batch.events.enabled=false

要禁用特定的批处理事件,请使用以下配置。

spring.cloud.task.batch.events.<batch event listener>.enabled=false:

以下列表显示了您可以禁用的单个监听器。

spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false

6.3.3. 批处理事件的发送顺序

默认情况下,批处理事件具有 Ordered.LOWEST_PRECEDENCE。要更改此值(例如,改为 5),请使用以下配置。

spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5

7. 附录

7.1. 任务仓库 Schema

本附录提供了任务仓库中使用的数据库 schema 的 ERD(实体关系图)。

task schema

7.1.1. 表信息

TASK_EXECUTION

存储任务执行信息。

列名 必需 类型 字段长度 备注

TASK_EXECUTION_ID

TRUE

BIGINT

X

Spring Cloud Task Framework 在应用程序启动时从 TASK_SEQ 获取下一个可用 ID 并建立。或者如果在任务之外创建记录,则必须在记录创建时填充此值。

START_TIME

FALSE

DATETIME(6)

X

Spring Cloud Task Framework 在应用程序启动时建立该值。

END_TIME

FALSE

DATETIME(6)

X

Spring Cloud Task Framework 在应用程序退出时建立该值。

TASK_NAME

FALSE

VARCHAR

100

Spring Cloud Task Framework 在应用程序启动时会将其设置为 "Application",除非用户使用 spring.application.name 指定了名称。

EXIT_CODE

FALSE

INTEGER

X

遵循 Spring Boot 默认值,除非用户如此处所述进行覆盖。

EXIT_MESSAGE

FALSE

VARCHAR

2500

用户定义,如此处所述。

ERROR_MESSAGE

FALSE

VARCHAR

2500

Spring Cloud Task Framework 在应用程序退出时建立该值。

LAST_UPDATED

TRUE

TIMESTAMP

X

Spring Cloud Task Framework 在应用程序启动时建立该值。或者如果在任务之外创建记录,则必须在记录创建时填充此值。

EXTERNAL_EXECUTION_ID

FALSE

VARCHAR

250

如果设置了 spring.cloud.task.external-execution-id 属性,则 Spring Cloud Task Framework 在应用程序启动时会将其设置为指定的值。更多信息请参见此处

PARENT_TASK_EXECUTION_ID

FALSE

BIGINT

X

如果设置了 spring.cloud.task.parent-execution-id 属性,则 Spring Cloud Task Framework 在应用程序启动时会将其设置为指定的值。更多信息请参见此处

TASK_EXECUTION_PARAMS

存储任务执行使用的参数。

列名 必需 类型 字段长度

TASK_EXECUTION_ID

TRUE

BIGINT

X

TASK_PARAM

FALSE

VARCHAR

2500

TASK_TASK_BATCH

用于将任务执行链接到批处理执行。

列名 必需 类型 字段长度

TASK_EXECUTION_ID

TRUE

BIGINT

X

JOB_EXECUTION_ID

TRUE

BIGINT

X

TASK_LOCK

用于此处讨论的 single-instance-enabled 特性。

列名 必需 类型 字段长度 备注

LOCK_KEY

TRUE

CHAR

36

此锁的 UUID

REGION

TRUE

VARCHAR

100

用户可以使用此字段建立一组锁。

CLIENT_ID

TRUE

CHAR

36

包含要锁定应用程序名称的任务执行 ID。

CREATED_DATE

TRUE

DATETIME

X

创建条目的日期

每种数据库类型的表设置 DDL 可以在此处找到。

7.1.2. SQL Server

默认情况下,Spring Cloud Task 使用序列表来确定 TASK_EXECUTION 表的 TASK_EXECUTION_ID。但是,当在使用 SQL Server 时同时启动多个任务,这可能会导致 TASK_SEQ 表发生死锁。解决方法是删除 TASK_EXECUTION_SEQ 表并创建一个同名的 sequence。例如:

DROP TABLE TASK_SEQ;

CREATE SEQUENCE [DBO].[TASK_SEQ] AS BIGINT
 START WITH 1
 INCREMENT BY 1;
START WITH 设置为您当前执行 ID 的更高值。

7.2. 构建本文档

本项目使用 Maven 生成本文档。要自己生成它,请运行以下命令:$ mvn clean install -DskipTests -P docs

7.3. 可观测性元数据

7.3.1. 可观测性 - 指标

下面您可以找到此项目声明的所有指标列表。

活跃任务

围绕任务执行创建的指标。

指标名称 spring.cloud.task (由约定类 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention 定义)。类型 timer

指标名称 spring.cloud.task.active (由约定类 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention 定义)。类型 long task timer

在开始观测后添加的 KeyValue 可能不会出现在 *.active 指标中。
Micrometer 在内部使用 nanoseconds 作为基本单位。然而,每个后端决定实际的基本单位。(例如 Prometheus 使用秒)

包含类 org.springframework.cloud.task.listener.TaskExecutionObservation 的完全限定名称。

所有标签必须以 spring.cloud.task 前缀开头!
表 10. 低基数键

名称

描述

spring.cloud.task.cf.app.id (必需)

CF 云的应用程序 ID。

spring.cloud.task.cf.app.name (必需)

CF 云的应用程序名称。

spring.cloud.task.cf.app.version (必需)

CF 云的应用程序版本。

spring.cloud.task.cf.instance.index (必需)

CF 云的实例索引。

spring.cloud.task.cf.org.name (必需)

CF 云的组织名称。

spring.cloud.task.cf.space.id (必需)

CF 云的空间 ID。

spring.cloud.task.cf.space.name (必需)

CF 云的空间名称。

spring.cloud.task.execution.id (必需)

任务执行 ID。

spring.cloud.task.exit.code (必需)

任务退出码。

spring.cloud.task.external.execution.id (必需)

任务的外部执行 ID。

spring.cloud.task.name (必需)

任务名称度量。

spring.cloud.task.parent.execution.id (必需)

任务父执行 ID。

spring.cloud.task.status (必需)

任务状态。可以是 success 或 failure。

任务运行器观测

任务运行器执行时创建的观测。

指标名称 spring.cloud.task.runner (由约定类 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention 定义)。类型 timer

指标名称 spring.cloud.task.runner.active (由约定类 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention 定义)。类型 long task timer

在开始观测后添加的 KeyValue 可能不会出现在 *.active 指标中。
Micrometer 在内部使用 nanoseconds 作为基本单位。然而,每个后端决定实际的基本单位。(例如 Prometheus 使用秒)

包含类 org.springframework.cloud.task.configuration.observation.TaskDocumentedObservation 的完全限定名称。

所有标签必须以 spring.cloud.task 前缀开头!
表 11. 低基数键

名称

描述

spring.cloud.task.runner.bean-name (必需)

由 Spring Cloud Task 执行的 Bean 的名称。

7.3.2. 可观测性 - Span

下面您可以找到此项目声明的所有 span 列表。

活跃任务 Span

围绕任务执行创建的指标。

Span 名称 spring.cloud.task (由约定类 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention 定义)。

包含类 org.springframework.cloud.task.listener.TaskExecutionObservation 的完全限定名称。

所有标签必须以 spring.cloud.task 前缀开头!
表 12. 标签键

名称

描述

spring.cloud.task.cf.app.id (必需)

CF 云的应用程序 ID。

spring.cloud.task.cf.app.name (必需)

CF 云的应用程序名称。

spring.cloud.task.cf.app.version (必需)

CF 云的应用程序版本。

spring.cloud.task.cf.instance.index (必需)

CF 云的实例索引。

spring.cloud.task.cf.org.name (必需)

CF 云的组织名称。

spring.cloud.task.cf.space.id (必需)

CF 云的空间 ID。

spring.cloud.task.cf.space.name (必需)

CF 云的空间名称。

spring.cloud.task.execution.id (必需)

任务执行 ID。

spring.cloud.task.exit.code (必需)

任务退出码。

spring.cloud.task.external.execution.id (必需)

任务的外部执行 ID。

spring.cloud.task.name (必需)

任务名称度量。

spring.cloud.task.parent.execution.id (必需)

任务父执行 ID。

spring.cloud.task.status (必需)

任务状态。可以是 success 或 failure。

任务运行器观测 Span

任务运行器执行时创建的观测。

Span 名称 spring.cloud.task.runner (由约定类 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention 定义)。

包含类 org.springframework.cloud.task.configuration.observation.TaskDocumentedObservation 的完全限定名称。

所有标签必须以 spring.cloud.task 前缀开头!
表 13. 标签键

名称

描述

spring.cloud.task.runner.bean-name (必需)

由 Spring Cloud Task 执行的 Bean 的名称。