版本 3.0.4

© 2009-2022 VMware, Inc. 保留所有权利。

本文档可供您自己使用和分发给他人,前提是您不收取任何费用,并且无论以印刷品还是电子形式分发,每个副本都必须包含此版权声明。

序言

本节简要概述了 Spring Cloud Task 参考文档。可以将其视为文档其余部分的地图。您可以按顺序阅读本参考指南,或者如果某些内容不感兴趣,可以跳过某些章节。

1. 关于文档

Spring Cloud Task 参考指南提供 htmlpdfepub 格式。最新版本位于 docs.spring.io/spring-cloud-task/docs/current-SNAPSHOT/reference/html/

本文档可供您自己使用和分发给他人,前提是您不收取任何费用,并且无论以印刷品还是电子形式分发,每个副本都必须包含此版权声明。

2. 获取帮助

使用 Spring Cloud Task 遇到问题了吗?我们乐于提供帮助!

Spring Cloud Task 的所有内容,包括文档,都是开源的。如果您发现文档有问题或只是想改进它们,请参与进来

3. 入门

如果您刚开始接触 Spring Cloud Task 或一般的 'Spring',我们建议阅读入门章节。

从零开始入门,请阅读以下章节

要按照教程进行操作,请阅读开发您的第一个 Spring Cloud Task 应用
要运行您的示例,请阅读运行示例

入门

如果您刚刚开始使用 Spring Cloud Task,应该阅读本节。在这里,我们回答基本的“是什么?”、“如何做?”和“为什么?”等问题。我们从 Spring Cloud Task 的温和介绍开始。然后构建一个 Spring Cloud Task 应用,在此过程中讨论一些核心原则。

4. Spring Cloud Task 简介

Spring Cloud Task 使得创建短生命周期微服务变得容易。它提供了按需在生产环境中执行短生命周期 JVM 进程的功能。

5. 系统要求

您需要安装 Java(Java 17 或更高版本)。要构建,您还需要安装 Maven。

5.1. 数据库要求

Spring Cloud Task 使用关系型数据库存储已执行任务的结果。虽然您可以在没有数据库的情况下开始开发任务(任务状态会作为任务仓库更新的一部分被记录到日志中),但在生产环境中,您会希望使用受支持的数据库。Spring Cloud Task 目前支持以下数据库

  • DB2

  • H2

  • HSQLDB

  • MySql

  • Oracle

  • Postgres

  • SqlServer

6. 开发您的第一个 Spring Cloud Task 应用

一个好的开始是使用一个简单的“Hello, World!”应用,因此我们创建一个等效的 Spring Cloud Task 应用来突出框架的特性。大多数 IDE 对 Apache Maven 有良好的支持,因此我们将其用作本项目构建工具。

spring.io 网站包含许多使用 Spring Boot 的入门”指南。如果您需要解决特定问题,请先在那里查找。您可以通过访问Spring Initializr并创建新项目来简化以下步骤。这样做会自动生成一个新的项目结构,以便您可以立即开始编码。我们建议尝试使用 Spring Initializr 来熟悉它。

6.1. 使用 Spring Initializr 创建 Spring Task 项目

现在我们可以创建一个并测试一个将 Hello, World! 打印到控制台的应用。

为此

  1. 访问Spring Initialzr 网站。

    1. 创建一个新的 Maven 项目,Group 名称为 io.spring.demoArtifact 名称为 helloworld

    2. 在 Dependencies 文本框中,输入 task,然后选择 Cloud Task 依赖项。

    3. 在 Dependencies 文本框中,输入 jdbc,然后选择 JDBC 依赖项。

    4. 在 Dependencies 文本框中,输入 h2,然后选择 H2。(或您喜欢的数据库)

    5. 点击 Generate Project 按钮

  2. 解压 helloworld.zip 文件,并将项目导入您喜欢的 IDE。

6.2. 编写代码

为了完成我们的应用,我们需要用以下内容更新生成的 HelloworldApplication,以便它能启动一个任务。

package io.spring.Helloworld;

import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.task.configuration.EnableTask;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
@EnableTask
public class HelloworldApplication {

    @Bean
    public ApplicationRunner applicationRunner() {
        return new HelloWorldApplicationRunner();
    }

    public static void main(String[] args) {
        SpringApplication.run(HelloworldApplication.class, args);
    }

    public static class HelloWorldApplicationRunner implements ApplicationRunner {

        @Override
        public void run(ApplicationArguments args) throws Exception {
            System.out.println("Hello, World!");

        }
    }
}

虽然看起来很小,但其中涉及不少内容。有关 Spring Boot 特定内容的更多信息,请参阅Spring Boot 参考文档

现在我们可以打开 src/main/resources 目录下的 application.properties 文件。我们需要在 application.properties 中配置两个属性

  • application.name: 设置应用名称(它会被转换成任务名称)

  • logging.level: 将 Spring Cloud Task 的日志级别设置为 DEBUG,以便查看正在发生的情况。

以下示例展示了如何同时进行这两项配置

logging.level.org.springframework.cloud.task=DEBUG
spring.application.name=helloWorld

6.2.1. 任务自动配置

当包含 Spring Cloud Task Starter 依赖项时,Task 会自动配置所有 bean 来启动其功能。此配置的一部分是注册 TaskRepository 及其使用所需的基础设施。

在我们的示例中,TaskRepository 使用嵌入式 H2 数据库记录任务结果。对于生产环境来说,嵌入式 H2 数据库不是一个实际的解决方案,因为任务结束后 H2 DB 就会消失。然而,对于快速入门体验,我们可以在示例中使用它,并将仓库中的更新信息回显到日志中。在本文档后面的配置章节中,我们将介绍如何定制 Spring Cloud Task 提供的各个部分的配置。

当我们的示例应用运行时,Spring Boot 会启动我们的 HelloWorldCommandLineRunner 并将“Hello, World!”消息输出到标准输出。TaskLifecycleListener 在仓库中记录任务的开始和结束。

6.2.2. main 方法

main 方法是任何 Java 应用的入口点。我们的 main 方法委托给 Spring Boot 的SpringApplication 类。

6.2.3. ApplicationRunner

Spring 包含多种启动应用逻辑的方式。Spring Boot 通过其 *Runner 接口(CommandLineRunnerApplicationRunner)提供了一种方便且有条理的方式来做到这一点。一个表现良好的任务可以使用这两个 runner 中的一个来启动任何逻辑。

任务的生命周期被认为是从 *Runner#run 方法执行之前开始,直到它们全部完成后结束。Spring Boot 允许一个应用使用多个 *Runner 实现,Spring Cloud Task 也是如此。

任何不是通过 CommandLineRunnerApplicationRunner 启动的机制(例如使用 InitializingBean#afterPropertiesSet)启动的处理都不会被 Spring Cloud Task 记录。

6.3. 运行示例

此时,我们的应用应该可以工作了。由于此应用基于 Spring Boot,我们可以从应用根目录使用命令行命令 $ mvn spring-boot:run 来运行它,如下例所示(及其输出)

$ mvn clean spring-boot:run
....... . . .
....... . . . (Maven log output here)
....... . . .

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.0.3.RELEASE)

2018-07-23 17:44:34.426  INFO 1978 --- [           main] i.s.d.helloworld.HelloworldApplication   : Starting HelloworldApplication on Glenns-MBP-2.attlocal.net with PID 1978 (/Users/glennrenfro/project/helloworld/target/classes started by glennrenfro in /Users/glennrenfro/project/helloworld)
2018-07-23 17:44:34.430  INFO 1978 --- [           main] i.s.d.helloworld.HelloworldApplication   : No active profile set, falling back to default profiles: default
2018-07-23 17:44:34.472  INFO 1978 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1d24f32d: startup date [Mon Jul 23 17:44:34 EDT 2018]; root of context hierarchy
2018-07-23 17:44:35.280  INFO 1978 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2018-07-23 17:44:35.410  INFO 1978 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2018-07-23 17:44:35.419 DEBUG 1978 --- [           main] o.s.c.t.c.SimpleTaskConfiguration        : Using org.springframework.cloud.task.configuration.DefaultTaskConfigurer TaskConfigurer
2018-07-23 17:44:35.420 DEBUG 1978 --- [           main] o.s.c.t.c.DefaultTaskConfigurer          : No EntityManager was found, using DataSourceTransactionManager
2018-07-23 17:44:35.522 DEBUG 1978 --- [           main] o.s.c.t.r.s.TaskRepositoryInitializer    : Initializing task schema for h2 database
2018-07-23 17:44:35.525  INFO 1978 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executing SQL script from class path resource [org/springframework/cloud/task/schema-h2.sql]
2018-07-23 17:44:35.558  INFO 1978 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executed SQL script from class path resource [org/springframework/cloud/task/schema-h2.sql] in 33 ms.
2018-07-23 17:44:35.728  INFO 1978 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2018-07-23 17:44:35.730  INFO 1978 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Bean with name 'dataSource' has been autodetected for JMX exposure
2018-07-23 17:44:35.733  INFO 1978 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Located MBean 'dataSource': registering with JMX server as MBean [com.zaxxer.hikari:name=dataSource,type=HikariDataSource]
2018-07-23 17:44:35.738  INFO 1978 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
2018-07-23 17:44:35.762 DEBUG 1978 --- [           main] o.s.c.t.r.support.SimpleTaskRepository   : Creating: TaskExecution{executionId=0, parentExecutionId=null, exitCode=null, taskName='application', startTime=Mon Jul 23 17:44:35 EDT 2018, endTime=null, exitMessage='null', externalExecutionId='null', errorMessage='null', arguments=[]}
2018-07-23 17:44:35.772  INFO 1978 --- [           main] i.s.d.helloworld.HelloworldApplication   : Started HelloworldApplication in 1.625 seconds (JVM running for 4.764)
Hello, World!
2018-07-23 17:44:35.782 DEBUG 1978 --- [           main] o.s.c.t.r.support.SimpleTaskRepository   : Updating: TaskExecution with executionId=1 with the following {exitCode=0, endTime=Mon Jul 23 17:44:35 EDT 2018, exitMessage='null', errorMessage='null'}

前面的输出中有三行我们感兴趣的内容

  • SimpleTaskRepository 记录了在 TaskRepository 中条目的创建。

  • 我们的 CommandLineRunner 的执行,由“Hello, World!”输出展示。

  • SimpleTaskRepository 记录了任务在 TaskRepository 中的完成。

Spring Cloud Task 项目的 samples 模块中可以找到一个简单的任务应用,这里

特性

本节更详细地介绍了 Spring Cloud Task,包括如何使用它、如何配置它以及适当的扩展点。

7. Spring Cloud Task 的生命周期

在大多数情况下,现代云环境围绕着预期不会结束的进程执行进行设计。如果它们结束了,通常会被重新启动。虽然大多数平台确实有某种方式来运行结束后不会重新启动的进程,但该运行的结果通常不会以可消费的方式维护。Spring Cloud Task 提供了在一个环境中执行短生命周期进程并记录结果的能力。这样做允许构建围绕短生命周期进程以及通过消息集成任务来实现的长生命周期服务的微服务架构。

虽然此功能在云环境中很有用,但在传统部署模型中也可能出现同样的问题。当使用 cron 等调度器运行 Spring Boot 应用时,能够在应用完成后监控其结果会很有用。

Spring Cloud Task 采取了一种方法,即 Spring Boot 应用可以有开始和结束,并且仍然是成功的。批处理应用就是预期会结束(且通常是短生命周期)的进程如何发挥作用的一个例子。

Spring Cloud Task 记录给定任务的生命周期事件。大多数长生命周期进程,典型的如大多数 Web 应用,不保存其生命周期事件。Spring Cloud Task 核心的任务则会保存。

生命周期由一次任务执行组成。这是一次配置为任务(即,包含 Spring Cloud Task 依赖项)的 Spring Boot 应用的物理执行。

在任务开始时,在任何 CommandLineRunnerApplicationRunner 实现运行之前,会在 TaskRepository 中创建一个记录开始事件的条目。此事件是通过 Spring Framework 触发的 SmartLifecycle#start 来触发的。这表示系统中的所有 bean 已准备就绪,并且发生在运行 Spring Boot 提供的任何 CommandLineRunnerApplicationRunner 实现之前。

任务的记录仅在 ApplicationContext 成功启动后发生。如果上下文根本无法启动,任务的运行将不会被记录。

在 Spring Boot 的所有 *Runner#run 调用完成后,或者 ApplicationContext 失败时(由 ApplicationFailedEvent 指示),任务执行结果会在仓库中更新。

如果应用要求在任务完成后(所有 *Runner#run 方法都被调用并且任务仓库已更新)关闭 ApplicationContext,请将属性 spring.cloud.task.closecontextEnabled 设置为 true。

7.1. TaskExecution

存储在 TaskRepository 中的信息在 TaskExecution 类中建模,包含以下信息

字段 描述

executionid

任务运行的唯一 ID。

exitCode

ExitCodeExceptionMapper 实现生成的退出码。如果没有生成退出码,但抛出了 ApplicationFailedEvent,则设置为 1。否则,假定为 0。

taskName

任务名称,由配置的 TaskNameResolver 决定。

startTime

任务启动的时间,由 SmartLifecycle#start 调用指示。

endTime

任务完成的时间,由 ApplicationReadyEvent 指示。

exitMessage

退出时可用的任何信息。这可以通过 TaskExecutionListener 以编程方式设置。

errorMessage

如果异常是任务结束的原因(由 ApplicationFailedEvent 指示),则该异常的堆栈跟踪存储在此处。

arguments

字符串命令行参数的 List,它们被传递到可执行的 boot 应用中。

7.2. 映射退出码

任务完成时,它会尝试向操作系统返回一个退出码。如果我们查看我们的原始示例,我们可以看到我们没有控制应用的这一方面。因此,如果抛出异常,JVM 返回的代码可能对您的调试有用,也可能没用。

因此,Spring Boot 提供了一个接口 ExitCodeExceptionMapper,允许您将未捕获的异常映射到退出码。这样做可以让您在退出码级别指示出了什么问题。此外,通过这种方式映射退出码,Spring Cloud Task 会记录返回的退出码。

如果任务因 SIG-INT 或 SIG-TERM 终止,除非代码中另有指定,否则退出码为零。

任务运行时,退出码在仓库中存储为 null。任务完成后,根据本节前面描述的指南存储适当的退出码。

8. 配置

Spring Cloud Task 提供了开箱即用的配置,在 DefaultTaskConfigurerSimpleTaskConfiguration 类中定义。本节将介绍默认配置以及如何根据您的需求定制 Spring Cloud Task。

8.1. DataSource

Spring Cloud Task 使用一个数据源来存储任务执行的结果。默认情况下,我们提供一个内存中的 H2 实例,以提供一种简单的开发引导方法。然而,在生产环境中,您可能希望配置自己的 DataSource

如果您的应用只使用一个 DataSource,并且它同时作为您的业务 schema 和任务仓库,您只需要提供任何 DataSource(最简单的方法是通过 Spring Boot 的配置约定)。Spring Cloud Task 会自动将此 DataSource 用于仓库。

如果您的应用使用多个 DataSource,您需要使用适当的 DataSource 配置任务仓库。此定制可以通过实现 TaskConfigurer 来完成。

8.2. 表前缀

TaskRepository 的一个可修改属性是任务表的表前缀。默认情况下,它们都以 TASK_ 为前缀。TASK_EXECUTIONTASK_EXECUTION_PARAMS 是两个示例。然而,修改此前缀可能有一些原因。如果需要在表名前添加 schema 名称,或者在同一 schema 内需要多组任务表,您必须更改表前缀。您可以通过将 spring.cloud.task.tablePrefix 设置为您需要的前缀来做到这一点,如下所示

spring.cloud.task.tablePrefix=yourPrefix

通过使用 spring.cloud.task.tablePrefix,用户承担创建满足任务表 schema 标准但根据用户业务需求进行修改的任务表的责任。在创建自己的任务 DDL 时,可以将 Spring Cloud Task Schema DDL 作为参考,如这里所示。

8.3. 启用/禁用表初始化

如果您自己创建了任务表,并且不希望 Spring Cloud Task 在任务启动时创建它们,请将 spring.cloud.task.initialize-enabled 属性设置为 false,如下所示

spring.cloud.task.initialize-enabled=false

它默认为 true

属性 spring.cloud.task.initialize.enable 已弃用。

8.4. 外部生成的任务 ID

在某些情况下,您可能希望考虑任务被请求与基础设施实际启动它之间的时间差。Spring Cloud Task 允许您在任务被请求时创建一个 TaskExecution。然后将生成的 TaskExecution 的执行 ID 传递给任务,以便它可以在任务的生命周期中更新 TaskExecution

可以通过在引用存储 TaskExecution 对象的 datastore 的 TaskRepository 实现上调用 createTaskExecution 方法来创建一个 TaskExecution

为了配置您的任务以使用生成的 TaskExecutionId,请添加以下属性

spring.cloud.task.executionid=yourtaskId

8.5. 外部任务 Id

Spring Cloud Task 允许您为每个 TaskExecution 存储一个外部任务 ID。为了配置您的任务以使用生成的 TaskExecutionId,请添加以下属性

spring.cloud.task.external-execution-id=<externalTaskId>

8.6. 父任务 Id

Spring Cloud Task 允许您为每个 TaskExecution 存储一个父任务 ID。例如,一个任务执行另一个或多个任务,并且您想记录哪个任务启动了每个子任务。为了配置您的任务以设置父 TaskExecutionId,请在子任务上添加以下属性

spring.cloud.task.parent-execution-id=<parentExecutionTaskId>

8.7. TaskConfigurer

TaskConfigurer 是一个策略接口,允许您定制 Spring Cloud Task 组件的配置方式。默认情况下,我们提供 DefaultTaskConfigurer,它提供逻辑默认值:基于 Map 的内存组件(如果未提供 DataSource,则对开发有用)和基于 JDBC 的组件(如果提供了 DataSource,则有用)。

TaskConfigurer 允许您配置三个主要组件

组件 描述 默认值(由 DefaultTaskConfigurer 提供)

TaskRepository

要使用的 TaskRepository 实现。

SimpleTaskRepository

TaskExplorer

要使用的 TaskExplorer 实现(用于对任务仓库进行只读访问的组件)。

SimpleTaskExplorer

PlatformTransactionManager

用于运行任务更新时的事务管理器。

如果使用了 DataSource,则为 JdbcTransactionManager。如果没有使用 DataSource,则为 ResourcelessTransactionManager

您可以通过创建 TaskConfigurer 接口的自定义实现来定制前面表格中描述的任何组件。通常,继承 DefaultTaskConfigurer(如果在找不到 TaskConfigurer 时提供)并覆盖所需的 getter 就足够了。然而,可能需要从头开始实现自己的。

用户不应直接使用 TaskConfigurer 的 getter 方法,除非他们使用它来提供作为 Spring Bean 暴露的实现。

8.8. 任务执行监听器

TaskExecutionListener 允许您为任务生命周期期间发生的特定事件注册监听器。为此,创建一个实现 TaskExecutionListener 接口的类。实现 TaskExecutionListener 接口的类会收到以下事件的通知

  • onTaskStartup: 在将 TaskExecution 存储到 TaskRepository 之前。

  • onTaskEnd: 在更新 TaskRepository 中的 TaskExecution 条目并标记任务最终状态之前。

  • onTaskFailed: 在任务抛出未处理异常时调用 onTaskEnd 方法之前。

Spring Cloud Task 还允许您使用以下方法注解将 TaskExecution 监听器添加到 bean 中的方法

  • @BeforeTask: 在将 TaskExecution 存储到 TaskRepository 之前

  • @AfterTask: 在更新 TaskRepository 中的 TaskExecution 条目并标记任务最终状态之前。

  • @FailedTask: 在任务抛出未处理异常时调用 @AfterTask 方法之前。

以下示例展示了这三个注解的使用

 public class MyBean {

    @BeforeTask
    public void methodA(TaskExecution taskExecution) {
    }

    @AfterTask
    public void methodB(TaskExecution taskExecution) {
    }

    @FailedTask
    public void methodC(TaskExecution taskExecution, Throwable throwable) {
    }
}
TaskLifecycleListener 之前插入 ApplicationListener 可能会导致意想不到的效果。

8.8.1. 任务执行监听器抛出的异常

如果 TaskExecutionListener 事件处理器抛出异常,则该事件处理器的所有监听处理都会停止。例如,如果三个 onTaskStartup 监听器已启动,并且第一个 onTaskStartup 事件处理器抛出异常,则不会调用其他两个 onTaskStartup 方法。然而,TaskExecutionListeners 的其他事件处理器(onTaskEndonTaskFailed)仍会被调用。

TaskExecutionListener 事件处理器抛出异常时返回的退出码是 ExitCodeEvent 报告的退出码。如果没有发出 ExitCodeEvent,则评估抛出的异常,查看其是否属于 ExitCodeGenerator 类型。如果是,则返回 ExitCodeGenerator 的退出码。否则,返回 1

如果在 onTaskStartup 方法中抛出异常,应用的退出码将是 1。如果在 onTaskEndonTaskFailed 方法中抛出异常,应用的退出码将是使用上述规则确定的那个。

如果在 onTaskStartuponTaskEndonTaskFailed 中抛出异常,您不能使用 ExitCodeExceptionMapper 覆盖应用的退出码。

8.8.2. 退出消息

您可以使用 TaskExecutionListener 以编程方式设置任务的退出消息。这通过设置 TaskExecutionexitMessage 来完成,该 exitMessage 随后传递给 TaskExecutionListener。以下示例显示了一个使用 @AfterTask ExecutionListener 注解的方法

@AfterTask
public void afterMe(TaskExecution taskExecution) {
    taskExecution.setExitMessage("AFTER EXIT MESSAGE");
}

可以在任何监听器事件(onTaskStartuponTaskFailedonTaskEnd)中设置 ExitMessage。三个监听器的优先顺序如下

  1. onTaskEnd

  2. onTaskFailed

  3. onTaskStartup

例如,如果您为 onTaskStartuponTaskFailed 监听器设置了 exitMessage,并且任务在没有失败的情况下结束,则 onTaskStartup 中的 exitMessage 将存储在仓库中。否则,如果发生失败,则存储 onTaskFailed 中的 exitMessage。此外,如果您使用 onTaskEnd 监听器设置了 exitMessage,则 onTaskEnd 中的 exitMessage 会覆盖 onTaskStartuponTaskFailed 中的 exit messages。

8.9. 限制 Spring Cloud Task 实例

Spring Cloud Task 允许您设定具有给定任务名称的任务一次只能运行一个实例。为此,您需要设定 任务名称 并为每个任务执行设置 spring.cloud.task.single-instance-enabled=true。当第一个任务执行正在运行时,任何其他尝试运行具有相同 任务名称spring.cloud.task.single-instance-enabled=true 的任务都会失败,并显示以下错误消息:Task with name "application" is already running. spring.cloud.task.single-instance-enabled 的默认值为 false。以下示例展示了如何将 spring.cloud.task.single-instance-enabled 设置为 true

spring.cloud.task.single-instance-enabled=true or false

要使用此功能,您必须将以下 Spring Integration 依赖项添加到您的应用程序中

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jdbc</artifactId>
</dependency>
如果任务由于此功能被启用且另一个同名任务正在运行而失败,应用程序的退出码将为 1。

8.9.1. Spring AOT 和原生编译的单实例用法

在创建原生编译应用程序时使用 Spring Cloud Task 的单实例功能,您需要在构建时启用此功能。为此,请添加 process-aot execution 并将 spring.cloud.task.single-step-instance-enabled=true 作为 JVM 参数设置,如下所示

<plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <executions>
        <execution>
            <id>process-aot</id>
            <goals>
                <goal>process-aot</goal>
            </goals>
            <configuration>
                <jvmArguments>
                    -Dspring.cloud.task.single-instance-enabled=true
                </jvmArguments>
            </configuration>
        </execution>
    </executions>
</plugin>

8.10. 为 ApplicationRunner 和 CommandLineRunner 启用观测

要为 ApplicationRunnerCommandLineRunner 启用任务观测,请将 spring.cloud.task.observation.enabled 设置为 true。

一个使用 SimpleMeterRegistry 启用观测功能的示例任务应用程序可以在这里找到。

8.11. 禁用 Spring Cloud Task 自动配置

在某些实现中,如果不需要 Spring Cloud Task 自动配置,您可以禁用任务的自动配置。可以通过将以下注解添加到您的任务应用程序来实现这一点

@EnableAutoConfiguration(exclude={SimpleTaskAutoConfiguration.class})

您也可以通过将 spring.cloud.task.autoconfiguration.enabled 属性设置为 false 来禁用任务自动配置。

8.12. 关闭上下文

如果应用程序需要在任务完成时关闭 ApplicationContext(所有 *Runner#run 方法已被调用且任务仓库已更新),请将属性 spring.cloud.task.closecontextEnabled 设置为 true

另一种需要关闭上下文的情况是,任务执行完成但应用程序并未终止。在这些情况下,上下文之所以保持打开是因为分配了一个线程(例如:如果您正在使用一个 TaskExecutor)。在这些情况下,当启动任务时,将 spring.cloud.task.closecontextEnabled 属性设置为 true。这将在任务完成后关闭应用程序的上下文。从而允许应用程序终止。

8.13. 启用任务指标

Spring Cloud Task 与 Micrometer 集成,并为其执行的任务创建观测。要启用任务可观测性集成,您必须将 spring-boot-starter-actuator、您首选的注册表实现(如果您想发布指标)以及 micrometer-tracing(如果您想发布跟踪数据)添加到您的任务应用程序中。一个使用 Influx 启用任务可观测性和指标的 Maven 依赖项示例是

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-registry-influx</artifactId>
    <scope>runtime</scope>
</dependency>

8.14. Spring Task 和 Spring Cloud Task 属性

术语 task 在行业中是一个常用词。例如,Spring Boot 提供了 spring.task,而 Spring Cloud Task 提供了 spring.cloud.task 属性。这在过去引起了一些混淆,认为这两组属性直接相关。然而,它们代表了 Spring 生态系统中提供的两组不同的功能。

  • spring.task 指的是配置 ThreadPoolTaskScheduler 的属性。

  • spring.cloud.task 指的是配置 Spring Cloud Task 功能的属性。

批处理

本节将更详细地介绍 Spring Cloud Task 与 Spring Batch 的集成。本节涵盖了如何跟踪作业执行与其执行的任务之间的关联,以及如何通过 Spring Cloud Deployer 进行远程分区。

9. 将作业执行关联到执行它的任务

Spring Boot 提供了在 über-jar 中执行批处理作业的功能。Spring Boot 对此功能的支持允许开发人员在该执行中执行多个批处理作业。Spring Cloud Task 提供了将作业的执行(即作业执行)与任务的执行关联起来的能力,以便可以相互追溯。

Spring Cloud Task 通过使用 TaskBatchExecutionListener 实现此功能。默认情况下,此监听器在任何同时配置了 Spring Batch Job(通过在上下文中定义类型为 Job 的 bean)并且类路径中有 spring-cloud-task-batch jar 的上下文中自动配置。该监听器会被注入到所有符合这些条件的作业中。

9.1. 覆盖 TaskBatchExecutionListener

为了防止该监听器被注入到当前上下文中的任何批处理作业中,您可以使用标准的 Spring Boot 机制禁用自动配置。

要只将该监听器注入到上下文中的特定作业中,请覆盖 batchTaskExecutionListenerBeanPostProcessor 并提供作业 bean ID 列表,如下例所示

public static TaskBatchExecutionListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() {
    TaskBatchExecutionListenerBeanPostProcessor postProcessor =
        new TaskBatchExecutionListenerBeanPostProcessor();

    postProcessor.setJobNames(Arrays.asList(new String[] {"job1", "job2"}));

    return postProcessor;
}
您可以在 Spring Cloud Task 项目的 samples 模块中找到一个示例批处理应用程序,链接在这里

10. 远程分区

Spring Cloud Deployer 提供了在大多数云基础设施上启动基于 Spring Boot 的应用程序的功能。DeployerPartitionHandlerDeployerStepExecutionHandler 将工作步骤执行的启动委托给 Spring Cloud Deployer。

要配置 DeployerStepExecutionHandler,您必须提供一个代表要执行的 Spring Boot über-jar 的 Resource,一个 TaskLauncherHandler 和一个 JobExplorer。您可以配置任何环境变量属性,以及同时执行的最大工作进程数、轮询结果的间隔(默认为 10 秒)以及超时时间(默认为 -1 或无超时)。以下示例展示了如何配置此 PartitionHandler

@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
        JobExplorer jobExplorer) throws Exception {

    MavenProperties mavenProperties = new MavenProperties();
    mavenProperties.setRemoteRepositories(new HashMap<>(Collections.singletonMap("springRepo",
        new MavenProperties.RemoteRepository(repository))));

    Resource resource =
        MavenResource.parse(String.format("%s:%s:%s",
                "io.spring.cloud",
                "partitioned-batch-job",
                "1.1.0.RELEASE"), mavenProperties);

    DeployerPartitionHandler partitionHandler =
        new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");

    List<String> commandLineArgs = new ArrayList<>(3);
    commandLineArgs.add("--spring.profiles.active=worker");
    commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
    commandLineArgs.add("--spring.batch.initializer.enabled=false");

    partitionHandler.setCommandLineArgsProvider(
        new PassThroughCommandLineArgsProvider(commandLineArgs));
    partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
    partitionHandler.setMaxWorkers(2);
    partitionHandler.setApplicationName("PartitionedBatchJobTask");

    return partitionHandler;
}
在向分区传递环境变量时,每个分区可能位于不同的机器上,具有不同的环境设置。因此,您应该只传递必需的环境变量。

请注意,在上面的示例中,我们将最大工作进程数设置为 2。设置最大工作进程数确定了应同时运行的最大分区数。

要执行的 Resource 预期是一个 Spring Boot über-jar,其中在当前上下文中将 DeployerStepExecutionHandler 配置为 CommandLineRunner。前面示例中列出的仓库应该是 uber-jar 所在的远程仓库。管理器和工作进程都应该能够访问用作作业仓库和任务仓库的相同数据存储。一旦底层基础设施引导了 Spring Boot jar 并且 Spring Boot 启动了 DeployerStepExecutionHandler,步骤处理器就会执行请求的 Step。以下示例展示了如何配置 DeployerStepExecutionHandler

@Bean
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
    DeployerStepExecutionHandler handler =
        new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);

    return handler;
}
您可以在 Spring Cloud Task 项目的 samples 模块中找到一个示例远程分区应用程序,链接在这里

10.1. 异步启动远程批处理分区

默认情况下,批处理分区是按顺序启动的。然而,在某些情况下,这可能会影响性能,因为每次启动都会阻塞直到资源(例如:在 Kubernetes 中 provision 一个 pod)被 provision 完成。在这些情况下,您可以向 DeployerPartitionHandler 提供一个 ThreadPoolTaskExecutor。这将根据 ThreadPoolTaskExecutor 的配置启动远程批处理分区。例如

    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setThreadNamePrefix("default_task_executor_thread");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }

    @Bean
    public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
        TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception {
        Resource resource = this.resourceLoader
            .getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");

        DeployerPartitionHandler partitionHandler =
            new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
                "workerStep", taskRepository, executor);
    ...
    }
由于使用 ThreadPoolTaskExecutor 会留下一个活动线程,应用程序将不会终止,因此我们需要关闭上下文。为了正确关闭应用程序,我们需要将 spring.cloud.task.closecontextEnabled 属性设置为 true

10.2. 关于为 Kubernetes 平台开发批处理分区应用程序的注意事项

  • 在 Kubernetes 平台上部署分区应用程序时,您必须使用 Spring Cloud Kubernetes Deployer 的以下依赖项

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-deployer-kubernetes</artifactId>
    </dependency>
  • 任务应用程序及其分区的应用程序名称需要遵循以下正则表达式模式:[a-z0-9]([-a-z0-9]*[a-z0-9])。否则,会抛出异常。

11. 批处理信息消息

Spring Cloud Task 提供了批处理作业发出信息消息的能力。“Spring Batch 事件”部分详细介绍了此功能。

12. 批处理作业退出码

正如之前讨论的,Spring Cloud Task 应用程序支持记录任务执行的退出码。然而,在任务中运行 Spring Batch Job 的情况下,无论 Batch Job Execution 如何完成,使用默认的 Batch/Boot 行为时,任务的结果总是零。请记住,任务是一个 boot 应用程序,从任务返回的退出码与 boot 应用程序的退出码相同。要覆盖此行为并允许任务在批处理作业返回 FAILEDBatchStatus 时返回非零退出码,请将 spring.cloud.task.batch.fail-on-job-failure 设置为 true。此时,退出码可以是 1(默认值),也可以基于指定的 ExitCodeGenerator

此功能使用一个新的 ApplicationRunner,它替换了 Spring Boot 提供的那个。默认情况下,它配置了相同的顺序。但是,如果您想自定义 ApplicationRunner 运行的顺序,可以通过设置 spring.cloud.task.batch.applicationRunnerOrder 属性来设置其顺序。要让您的任务根据批处理作业执行的结果返回退出码,您需要编写自己的 CommandLineRunner

单步批处理作业 Starter

本节介绍如何使用 Spring Cloud Task 中包含的 starter 开发具有单个 Step 的 Spring Batch Job。此 starter 允许您使用配置来定义 ItemReaderItemWriter 或一个完整的单步 Spring Batch Job。有关 Spring Batch 及其功能的更多信息,请参阅Spring Batch 文档

要获取 Maven 的 starter,请将以下内容添加到您的构建中

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-single-step-batch-job</artifactId>
    <version>2.3.0</version>
</dependency>

要获取 Gradle 的 starter,请将以下内容添加到您的构建中

compile "org.springframework.cloud:spring-cloud-starter-single-step-batch-job:2.3.0"

13. 定义作业

您可以使用此 starter 定义最少的(例如 ItemReaderItemWriter)或最多的(例如完整的 Job)。在本节中,我们定义了配置 Job 所需的属性。

13.1. 属性

首先,此 starter 提供了一组属性,允许您配置包含一个 Step 的 Job 的基本信息

表 1. Job 属性
属性 类型 默认值 描述

spring.batch.job.jobName

String

null

作业的名称。

spring.batch.job.stepName

String

null

步骤的名称。

spring.batch.job.chunkSize

Integer

null

每事务处理的项数。

配置了上述属性后,您将拥有一个包含单个基于 chunk 的步骤的作业。此基于 chunk 的步骤将 Map<String, Object> 实例作为项读取、处理和写入。然而,此步骤尚无任何实际操作。您需要配置一个 ItemReader、一个可选的 ItemProcessor 和一个 ItemWriter 来使其执行某些操作。要配置其中之一,您可以使用属性并配置已提供自动配置的选项之一,或者使用标准的 Spring 配置机制配置您自己的。

如果您自行配置,输入和输出类型必须与步骤中的其他类型匹配。此 starter 中的 ItemReader 实现和 ItemWriter 实现都使用 Map<String, Object> 作为输入和输出项。

14. ItemReader 实现的自动配置

此 starter 为四种不同的 ItemReader 实现提供了自动配置:AmqpItemReaderFlatFileItemReaderJdbcCursorItemReaderKafkaItemReader。在本节中,我们将概述如何使用提供的自动配置来配置这些实现。

14.1. AmqpItemReader

您可以使用 AmqpItemReader 通过 AMQP 从队列或主题读取数据。此 ItemReader 实现的自动配置依赖于两组配置。第一组是 AmqpTemplate 的配置。您可以自行配置,也可以使用 Spring Boot 提供的自动配置。请参阅Spring Boot AMQP 文档。配置好 AmqpTemplate 后,您可以通过设置以下属性来启用支持其的批处理功能。

表 2. AmqpItemReader 属性
属性 类型 默认值 描述

spring.batch.job.amqpitemreader.enabled

boolean

false

如果为 true,则自动配置将执行。

spring.batch.job.amqpitemreader.jsonConverterEnabled

boolean

true

指示是否应注册 Jackson2JsonMessageConverter 来解析消息。

有关更多信息,请参阅AmqpItemReader 文档

14.2. FlatFileItemReader

FlatFileItemReader 允许您从平面文件(例如 CSV 和其他文件格式)读取数据。要从文件读取,您可以通过普通的 Spring 配置(LineTokenizerRecordSeparatorPolicyFieldSetMapperLineMapperSkippedLinesCallback)自行提供一些组件。您也可以使用以下属性来配置读取器

表 3. FlatFileItemReader 属性
属性 类型 默认值 描述

spring.batch.job.flatfileitemreader.saveState

boolean

true

确定是否应为重启保存状态。

spring.batch.job.flatfileitemreader.name

String

null

ExecutionContext 中用于提供唯一键的名称。

spring.batch.job.flatfileitemreader.maxItemcount

int

Integer.MAX_VALUE

从文件读取的最大项数。

spring.batch.job.flatfileitemreader.currentItemCount

int

0

已读取的项数。用于重启。

spring.batch.job.flatfileitemreader.comments

List<String>

empty List

文件中指示注释行(要忽略的行)的字符串列表。

spring.batch.job.flatfileitemreader.resource

Resource

null

要读取的资源。

spring.batch.job.flatfileitemreader.strict

boolean

true

如果设置为 true,如果找不到资源,读取器将抛出异常。

spring.batch.job.flatfileitemreader.encoding

String

FlatFileItemReader.DEFAULT_CHARSET

读取文件时使用的编码。

spring.batch.job.flatfileitemreader.linesToSkip

int

0

指示在文件开头要跳过的行数。

spring.batch.job.flatfileitemreader.delimited

boolean

false

指示文件是否为分隔文件(CSV 和其他格式)。此属性或 spring.batch.job.flatfileitemreader.fixedLength 只能同时有一个为 true

spring.batch.job.flatfileitemreader.delimiter

String

DelimitedLineTokenizer.DELIMITER_COMMA

如果读取分隔文件,指示用于解析的分隔符。

spring.batch.job.flatfileitemreader.quoteCharacter

char

DelimitedLineTokenizer.DEFAULT_QUOTE_CHARACTER

用于确定引用值的字符。

spring.batch.job.flatfileitemreader.includedFields

List<Integer>

empty list

用于确定记录中哪些字段应包含在项中的索引列表。

spring.batch.job.flatfileitemreader.fixedLength

boolean

false

指示文件的记录是否按列号解析。此属性或 spring.batch.job.flatfileitemreader.delimited 只能同时有一个为 true

spring.batch.job.flatfileitemreader.ranges

List<Range>

empty list

用于解析固定宽度记录的列范围列表。请参阅Range 文档

spring.batch.job.flatfileitemreader.names

String []

null

从记录解析出的每个字段的名称列表。这些名称是此 ItemReader 返回的项中 Map<String, Object> 的键。

spring.batch.job.flatfileitemreader.parsingStrict

boolean

true

如果设置为 true,如果无法映射字段,则映射将失败。

14.3. JdbcCursorItemReader

JdbcCursorItemReader 对关系数据库运行查询,并遍历结果游标(ResultSet)以提供结果项。此自动配置允许您提供 PreparedStatementSetterRowMapper 或两者。您也可以使用以下属性来配置 JdbcCursorItemReader

表 4. JdbcCursorItemReader 属性
属性 类型 默认值 描述

spring.batch.job.jdbccursoritemreader.saveState

boolean

true

确定是否应为重启保存状态。

spring.batch.job.jdbccursoritemreader.name

String

null

ExecutionContext 中用于提供唯一键的名称。

spring.batch.job.jdbccursoritemreader.maxItemcount

int

Integer.MAX_VALUE

从文件读取的最大项数。

spring.batch.job.jdbccursoritemreader.currentItemCount

int

0

已读取的项数。用于重启。

spring.batch.job.jdbccursoritemreader.fetchSize

int

对驱动程序的提示,指示每次调用数据库系统时检索多少条记录。为了获得最佳性能,通常希望将其设置为与 chunk size 匹配。

spring.batch.job.jdbccursoritemreader.maxRows

int

从数据库读取的最大项数。

spring.batch.job.jdbccursoritemreader.queryTimeout

int

查询超时前的毫秒数。

spring.batch.job.jdbccursoritemreader.ignoreWarnings

boolean

true

确定读取器在处理时是否应忽略 SQL 警告。

spring.batch.job.jdbccursoritemreader.verifyCursorPosition

boolean

true

指示在每次读取后是否应验证游标位置,以确认 RowMapper 没有移动游标。

spring.batch.job.jdbccursoritemreader.driverSupportsAbsolute

boolean

false

指示驱动程序是否支持游标的绝对定位。

spring.batch.job.jdbccursoritemreader.useSharedExtendedConnection

boolean

false

指示连接是否与其他处理共享(因此是事务的一部分)。

spring.batch.job.jdbccursoritemreader.sql

String

null

从中读取的 SQL 查询。

您还可以使用以下属性专门为读取器指定 JDBC DataSource:.JdbcCursorItemReader 属性

属性 类型 默认值 描述

spring.batch.job.jdbccursoritemreader.datasource.enable

boolean

false

确定是否应启用 JdbcCursorItemReaderDataSource

jdbccursoritemreader.datasource.url

String

null

数据库的 JDBC URL

jdbccursoritemreader.datasource.username

String

null

数据库的登录用户名。

jdbccursoritemreader.datasource.password

String

null

数据库的登录密码。

jdbccursoritemreader.datasource.driver-class-name

String

null

JDBC driver 的完全限定名。

如果未指定 jdbccursoritemreader_datasourceJDBCCursorItemReader 将使用默认的 DataSource

14.4. KafkaItemReader

从 Kafka topic 摄取数据分区非常有用,这正是 KafkaItemReader 可以做到的。要配置 KafkaItemReader,需要两个配置项。首先,需要使用 Spring Boot 的 Kafka 自动配置来配置 Kafka(请参阅Spring Boot Kafka 文档)。配置好 Spring Boot 中的 Kafka 属性后,您可以通过设置以下属性来配置 KafkaItemReader 本身。

表 5. KafkaItemReader 属性
属性 类型 默认值 描述

spring.batch.job.kafkaitemreader.name

String

null

ExecutionContext 中用于提供唯一键的名称。

spring.batch.job.kafkaitemreader.topic

String

null

要从中读取的 topic 的名称。

spring.batch.job.kafkaitemreader.partitions

List<Integer>

empty list

要从中读取的分区索引列表。

spring.batch.job.kafkaitemreader.pollTimeOutInSeconds

long

30

poll() 操作的超时时间。

spring.batch.job.kafkaitemreader.saveState

boolean

true

确定是否应为重启保存状态。

14.5. 原生编译

单步批处理的优点是,在使用 JVM 时,它允许您在运行时动态选择要使用的 reader 和 writer beans。但是,在使用原生编译时,您必须在构建时而不是运行时确定 reader 和 writer。以下示例展示了如何做到这一点

<plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <executions>
        <execution>
            <id>process-aot</id>
            <goals>
                <goal>process-aot</goal>
            </goals>
            <configuration>
                <jvmArguments>
                    -Dspring.batch.job.flatfileitemreader.name=fooReader
                    -Dspring.batch.job.flatfileitemwriter.name=fooWriter
                </jvmArguments>
            </configuration>
        </execution>
    </executions>
</plugin>

15. ItemProcessor 配置

如果 ApplicationContext 中存在 ItemProcessor,单步批处理作业自动配置会接受它。如果找到类型正确(ItemProcessor<Map<String, Object>, Map<String, Object>>)的 ItemProcessor,它将被自动注入到步骤中。

16. ItemWriter 实现的自动配置

此 starter 为与支持的 ItemReader 实现匹配的 ItemWriter 实现提供了自动配置:AmqpItemWriterFlatFileItemWriterJdbcItemWriterKafkaItemWriter。本节介绍如何使用自动配置来配置受支持的 ItemWriter

16.1. AmqpItemWriter

要写入 RabbitMQ 队列,您需要两组配置。首先,您需要一个 AmqpTemplate。获取此对象的最简单方法是使用 Spring Boot 的 RabbitMQ 自动配置。请参阅Spring Boot AMQP 文档

配置好 AmqpTemplate 后,您可以通过设置以下属性来配置 AmqpItemWriter

表 6. AmqpItemWriter 属性
属性 类型 默认值 描述

spring.batch.job.amqpitemwriter.enabled

boolean

false

如果为 true,则自动配置运行。

spring.batch.job.amqpitemwriter.jsonConverterEnabled

boolean

true

指示是否应注册 Jackson2JsonMessageConverter 来转换消息。

16.2. FlatFileItemWriter

要将文件作为步骤的输出写入,可以配置 FlatFileItemWriter。自动配置接受已明确配置的组件(例如 LineAggregatorFieldExtractorFlatFileHeaderCallbackFlatFileFooterCallback)以及通过设置以下指定属性配置的组件。

表 7. FlatFileItemWriter 属性
属性 类型 默认值 描述

spring.batch.job.flatfileitemwriter.resource

Resource

null

要读取的资源。

spring.batch.job.flatfileitemwriter.delimited

boolean

false

指示输出文件是否为分隔文件。如果为 true,则 spring.batch.job.flatfileitemwriter.formatted 必须为 false

spring.batch.job.flatfileitemwriter.formatted

boolean

false

指示输出文件是否为格式化文件。如果为 true,则 spring.batch.job.flatfileitemwriter.delimited 必须为 false

spring.batch.job.flatfileitemwriter.format

String

null

用于为格式化文件生成输出的格式。格式化是使用 String.format 执行的。

spring.batch.job.flatfileitemwriter.locale

Locale

Locale.getDefault()

生成文件时要使用的 Locale

spring.batch.job.flatfileitemwriter.maximumLength

int

0

记录的最大长度。如果为 0,则大小无限制。

spring.batch.job.flatfileitemwriter.minimumLength

int

0

记录的最小长度。

spring.batch.job.flatfileitemwriter.delimiter

String

,

用于分隔分隔文件中字段的 String

spring.batch.job.flatfileitemwriter.encoding

String

FlatFileItemReader.DEFAULT_CHARSET

写入文件时使用的编码。

spring.batch.job.flatfileitemwriter.forceSync

boolean

false

指示文件在刷新时是否应强制同步到磁盘。

spring.batch.job.flatfileitemwriter.names

String []

null

从记录解析出的每个字段的名称列表。这些名称是此 ItemWriter 接收的项中 Map<String, Object> 的键。

spring.batch.job.flatfileitemwriter.append

boolean

false

指示如果找到输出文件,是否应向其追加内容。

spring.batch.job.flatfileitemwriter.lineSeparator

String

FlatFileItemWriter.DEFAULT_LINE_SEPARATOR

用于分隔输出文件中行的 String

spring.batch.job.flatfileitemwriter.name

String

null

ExecutionContext 中用于提供唯一键的名称。

spring.batch.job.flatfileitemwriter.saveState

boolean

true

确定是否应为重启保存状态。

spring.batch.job.flatfileitemwriter.shouldDeleteIfEmpty

boolean

false

如果设置为 true,作业完成时会删除空文件(没有输出)。

spring.batch.job.flatfileitemwriter.shouldDeleteIfExists

boolean

true

如果设置为 true 并且在应该输出文件的位置找到了文件,则在步骤开始前会删除该文件。

spring.batch.job.flatfileitemwriter.transactional

boolean

FlatFileItemWriter.DEFAULT_TRANSACTIONAL

指示读取器是否是事务性队列(表示读取的项在失败时会返回到队列)。

16.3. JdbcBatchItemWriter

要将步骤的输出写入关系数据库,此 starter 提供了自动配置 JdbcBatchItemWriter 的能力。自动配置允许您通过设置以下属性来提供自己的 ItemPreparedStatementSetterItemSqlParameterSourceProvider 以及配置选项。

表 8. JdbcBatchItemWriter 属性
属性 类型 默认值 描述

spring.batch.job.jdbcbatchitemwriter.name

String

null

ExecutionContext 中用于提供唯一键的名称。

spring.batch.job.jdbcbatchitemwriter.sql

String

null

用于插入每个项的 SQL

spring.batch.job.jdbcbatchitemwriter.assertUpdates

boolean

true

是否验证每次插入至少更新一条记录。

您还可以使用以下属性专门为写入器指定 JDBC DataSource:.JdbcBatchItemWriter 属性

属性 类型 默认值 描述

spring.batch.job.jdbcbatchitemwriter.datasource.enable

boolean

false

确定是否应启用 JdbcCursorItemReaderDataSource

jdbcbatchitemwriter.datasource.url

String

null

数据库的 JDBC URL

jdbcbatchitemwriter.datasource.username

String

null

数据库的登录用户名。

jdbcbatchitemwriter.datasource.password

String

null

数据库的登录密码。

jdbcbatchitemreader.datasource.driver-class-name

String

null

JDBC driver 的完全限定名。

如果未指定 jdbcbatchitemwriter_datasourceJdbcBatchItemWriter 将使用默认的 DataSource

16.4. KafkaItemWriter

要将步骤输出写入 Kafka topic,您需要 KafkaItemWriter。此 starter 利用两个地方的功能为 KafkaItemWriter 提供自动配置。第一个是 Spring Boot 的 Kafka 自动配置。(请参阅Spring Boot Kafka 文档。)其次,此 starter 允许您配置写入器的两个属性。

表 9. KafkaItemWriter 属性
属性 类型 默认值 描述

spring.batch.job.kafkaitemwriter.topic

String

null

要写入的 Kafka topic。

spring.batch.job.kafkaitemwriter.delete

boolean

false

传递给写入器的项是否全部作为删除事件发送到 topic。

有关 KafkaItemWriter 配置选项的更多信息,请参阅KafkaItemWiter 文档

16.5. Spring AOT

将 Spring AOT 与 Single Step Batch Starter 结合使用时,您必须在编译时设置 reader 和 writer 名称属性(除非您为 reader 和/或 writer 创建 bean(s))。为此,您必须将您希望使用的 reader 和 writer 的名称作为参数或环境变量包含在 boot maven plugin 或 gradle plugin 中。例如,如果您希望在 Maven 中启用 FlatFileItemReaderFlatFileItemWriter,它会是这样

    <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
        <executions>
            <execution>
            <id>process-aot</id>
            <goals>
                <goal>process-aot</goal>
            </goals>
            </execution>
        </executions>
        <configuration>
            <arguments>
                <argument>--spring.batch.job.flatfileitemreader.name=foobar</argument>
                <argument>--spring.batch.job.flatfileitemwriter.name=fooWriter</argument>
            </arguments>
        </configuration>
    </plugin>

Spring Cloud Stream 集成

任务本身可能很有用,但将任务集成到更大的生态系统中,可以使其在更复杂的处理和编排中发挥作用。本节介绍了 Spring Cloud Task 与 Spring Cloud Stream 的集成选项。

17. 从 Spring Cloud Stream 启动任务

您可以从 stream 启动任务。为此,创建一个 sink,监听包含 TaskLaunchRequest 作为 payload 的消息。TaskLaunchRequest 包含

  • uri:要执行的任务 artifact 的 URI。

  • applicationName:与任务关联的名称。如果未设置 applicationName,TaskLaunchRequest 将生成一个包含以下内容的任务名称:Task-<UUID>

  • commandLineArguments:包含任务命令行参数的列表。

  • environmentProperties:包含任务要使用的环境变量的 map。

  • deploymentProperties:包含 deployer 用于部署任务的属性的 map。

如果 payload 类型不同,sink 会抛出异常。

例如,可以创建一个 stream,其中包含一个 processor,该 processor 接收来自 HTTP 源的数据并创建一个包含 TaskLaunchRequestGenericMessage,然后将消息发送到其输出通道。任务 sink 将从其输入通道接收该消息,然后启动任务。

要创建一个 taskSink,您只需要创建一个包含 EnableTaskLauncher 注解的 Spring Boot 应用程序,如下例所示

@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
    public static void main(String[] args) {
        SpringApplication.run(TaskSinkApplication.class, args);
    }
}

Spring Cloud Task 项目的 samples 模块 包含了一个示例 Sink 和 Processor。要将这些示例安装到你的本地 maven 仓库,请在 spring-cloud-task-samples 目录中运行 maven 构建,并将 skipInstall 属性设置为 false,如以下示例所示

mvn clean install

必须将 maven.remoteRepositories.springRepo.url 属性设置为 über-jar 所在的远程仓库位置。如果未设置,则没有远程仓库,因此它将仅依赖本地仓库。

17.1. Spring Cloud Data Flow

要在 Spring Cloud Data Flow 中创建流,必须首先注册我们创建的 Task Sink 应用程序。在以下示例中,我们使用 Spring Cloud Data Flow shell 注册 Processor 和 Sink 示例应用程序。

app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>

以下示例展示了如何使用 Spring Cloud Data Flow shell 创建流。

stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy

18. Spring Cloud Task 事件

Spring Cloud Task 提供了在通过 Spring Cloud Stream 通道运行任务时,通过 Spring Cloud Stream 通道发出事件的能力。任务监听器用于将 TaskExecution 发布到名为 task-events 的消息通道上。此功能会自动注入到任何类路径中包含 spring-cloud-streamspring-cloud-stream-<binder> 和已定义任务的任务中。

要禁用事件发出监听器,请将 spring.cloud.task.events.enabled 属性设置为 false

定义了适当的类路径后,以下任务将在 task-events 通道上将 TaskExecution 作为事件发出(在任务开始和结束时)。

@SpringBootApplication
public class TaskEventsApplication {

    public static void main(String[] args) {
        SpringApplication.run(TaskEventsApplication.class, args);
    }

    @Configuration
    public static class TaskConfiguration {

        @Bean
        public ApplicationRunner applicationRunner() {
            return new ApplicationRunner() {
                @Override
                public void run(ApplicationArguments args) {
                    System.out.println("The ApplicationRunner was executed");
                }
            };
        }
    }
}
类路径中还需要包含一个绑定器实现。
可以在 Spring Cloud Task 项目的 samples 模块中找到一个示例任务事件应用程序,点击这里

18.1. 禁用特定任务事件

要禁用任务事件,可以将 spring.cloud.task.events.enabled 属性设置为 false

19. Spring Batch 事件

通过任务执行 Spring Batch 作业时,Spring Cloud Task 可以配置为根据 Spring Batch 中可用的 Spring Batch 监听器发出信息性消息。具体来说,当通过 Spring Cloud Task 运行时,以下 Spring Batch 监听器会自动配置到每个批处理作业中,并在相关的 Spring Cloud Stream 通道上发出消息。

  • JobExecutionListener 监听 job-execution-events

  • StepExecutionListener 监听 step-execution-events

  • ChunkListener 监听 chunk-events

  • ItemReadListener 监听 item-read-events

  • ItemProcessListener 监听 item-process-events

  • ItemWriteListener 监听 item-write-events

  • SkipListener 监听 skip-events

当上下文中存在适当的 bean(一个 Job 和一个 TaskLifecycleListener)时,这些监听器会自动配置到任何 AbstractJob 中。监听这些事件的配置与绑定到任何其他 Spring Cloud Stream 通道的方式相同。我们的任务(运行批处理作业的任务)充当 Source,而监听应用程序充当 ProcessorSink

一个示例是有一个应用程序监听 job-execution-events 通道,以了解作业的启动和停止。要配置监听应用程序,您可以按如下方式将输入配置为 job-execution-events

spring.cloud.stream.bindings.input.destination=job-execution-events

类路径中还需要包含一个绑定器实现。
可以在 Spring Cloud Task 项目的 samples 模块中找到一个示例批处理事件应用程序,点击这里

19.1. 将批处理事件发送到不同的通道

Spring Cloud Task 为批处理事件提供的一个选项是更改特定监听器可以发出消息的通道的能力。为此,请使用以下配置:spring.cloud.stream.bindings.<the channel>.destination=<new destination>。例如,如果 StepExecutionListener 需要将其消息发出到名为 my-step-execution-events 的另一个通道,而不是默认的 step-execution-events,您可以添加以下配置。

spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events

19.2. 禁用批处理事件

要禁用所有批处理事件的监听器功能,请使用以下配置

spring.cloud.task.batch.events.enabled=false

要禁用特定的批处理事件,请使用以下配置

spring.cloud.task.batch.events.<batch event listener>.enabled=false:

以下列表显示了可以禁用的各个监听器

spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false

19.3. 批处理事件的发出顺序

默认情况下,批处理事件具有 Ordered.LOWEST_PRECEDENCE。要更改此值(例如,更改为 5),请使用以下配置。

spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5

附录

20. 任务仓库 Schema

本附录提供了任务仓库中使用的数据库 schema 的 ERD。

task schema

20.1. 表信息

TASK_EXECUTION

存储任务执行信息。

列名 必需 类型 字段长度 备注

TASK_EXECUTION_ID

BIGINT

X

Spring Cloud Task Framework 在应用程序启动时根据 TASK_SEQ 获取并确定下一个可用的 ID。如果记录是在任务外部创建的,则必须在记录创建时填充该值。

START_TIME

DATETIME(6)

X

Spring Cloud Task Framework 在应用程序启动时设置该值。

END_TIME

DATETIME(6)

X

Spring Cloud Task Framework 在应用程序退出时设置该值。

TASK_NAME

VARCHAR

100

Spring Cloud Task Framework 在应用程序启动时会将其设置为 "Application",除非用户使用 spring.application.name 设置了名称。

EXIT_CODE

INTEGER

X

遵循 Spring Boot 默认值,除非用户如这里讨论的那样覆盖。

EXIT_MESSAGE

VARCHAR

2500

用户定义,如这里讨论的那样。

ERROR_MESSAGE

VARCHAR

2500

Spring Cloud Task Framework 在应用程序退出时设置该值。

LAST_UPDATED

TIMESTAMP

X

Spring Cloud Task Framework 在应用程序启动时设置该值。如果记录是在任务外部创建的,则必须在记录创建时填充该值。

EXTERNAL_EXECUTION_ID

VARCHAR

250

如果设置了 spring.cloud.task.external-execution-id 属性,Spring Cloud Task Framework 在应用程序启动时会将其设置为指定的值。更多信息请见这里

PARENT_TASK_EXECUTION_ID

BIGINT

X

如果设置了 spring.cloud.task.parent-execution-id 属性,Spring Cloud Task Framework 在应用程序启动时会将其设置为指定的值。更多信息请见这里

TASK_EXECUTION_PARAMS

存储任务执行使用的参数

列名 必需 类型 字段长度

TASK_EXECUTION_ID

BIGINT

X

TASK_PARAM

VARCHAR

2500

TASK_TASK_BATCH

用于将任务执行与批处理执行链接起来。

列名 必需 类型 字段长度

TASK_EXECUTION_ID

BIGINT

X

JOB_EXECUTION_ID

BIGINT

X

TASK_LOCK

用于这里讨论的 single-instance-enabled 功能。

列名 必需 类型 字段长度 备注

LOCK_KEY

CHAR

36

此锁的 UUID

REGION

VARCHAR

100

用户可以使用此字段建立一组锁。

CLIENT_ID

CHAR

36

包含要锁定应用程序名称的任务执行 ID。

CREATED_DATE

DATETIME

X

条目创建日期

每种数据库类型设置表的 DDL 可以在这里找到。

20.2. SQL Server

默认情况下,Spring Cloud Task 使用序列表来确定 TASK_EXECUTION 表的 TASK_EXECUTION_ID。然而,在使用 SQL Server 同时启动多个任务时,这可能导致 TASK_SEQ 表发生死锁。解决方法是删除 TASK_EXECUTION_SEQ 表并使用相同的名称创建一个序列。例如:

DROP TABLE TASK_SEQ;

CREATE SEQUENCE [DBO].[TASK_SEQ] AS BIGINT
 START WITH 1
 INCREMENT BY 1;
START WITH 设置为比您当前执行 ID 更高的值。

21. 构建此文档

本项目使用 Maven 生成此文档。要为您自己生成它,请运行以下命令:$ mvn clean install -DskipTests -P docs

22. 可观测性元数据

22.1. 可观测性 - 指标

下面您可以找到此项目声明的所有指标列表。

22.1.1. 活跃任务

围绕任务执行创建的指标。

指标名称 spring.cloud.task (由约定类 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention 定义)。类型 timer

指标名称 spring.cloud.task.active (由约定类 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention 定义)。类型 long task timer

启动 Observation 后添加的 KeyValue 可能不会出现在 *.active 指标中。
Micrometer 内部使用 nanoseconds 作为基本单位。然而,每个后端会确定实际的基本单位。(例如 Prometheus 使用 seconds)

封闭类的完全限定名 org.springframework.cloud.task.listener.TaskExecutionObservation

所有标签必须以 spring.cloud.task 前缀开头!
表 10. 低基数键

名称

描述

spring.cloud.task.cf.app.id (必需)

CF 云的 App ID。

spring.cloud.task.cf.app.name (必需)

CF 云的 App 名称。

spring.cloud.task.cf.app.version (必需)

CF 云的 App 版本。

spring.cloud.task.cf.instance.index (必需)

CF 云的实例索引。

spring.cloud.task.cf.org.name (必需)

CF 云的组织名称。

spring.cloud.task.cf.space.id (必需)

CF 云的空间 ID。

spring.cloud.task.cf.space.name (必需)

CF 云的空间名称。

spring.cloud.task.execution.id (必需)

任务执行 ID。

spring.cloud.task.exit.code (必需)

任务退出码。

spring.cloud.task.external.execution.id (必需)

任务的外部执行 ID。

spring.cloud.task.name (必需)

任务名称度量。

spring.cloud.task.parent.execution.id (必需)

任务父执行 ID。

spring.cloud.task.status (必需)

任务状态。可以是 success 或 failure。

22.1.2. 任务运行器观测

任务运行器执行时创建的观测。

指标名称 spring.cloud.task.runner (由约定类 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention 定义)。类型 timer

指标名称 spring.cloud.task.runner.active (由约定类 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention 定义)。类型 long task timer

启动 Observation 后添加的 KeyValue 可能不会出现在 *.active 指标中。
Micrometer 内部使用 nanoseconds 作为基本单位。然而,每个后端会确定实际的基本单位。(例如 Prometheus 使用 seconds)

封闭类的完全限定名 org.springframework.cloud.task.configuration.observation.TaskDocumentedObservation

所有标签必须以 spring.cloud.task 前缀开头!
表 11. 低基数键

名称

描述

spring.cloud.task.runner.bean-name (必需)

由 Spring Cloud Task 执行的 bean 的名称。

22.2. 可观测性 - Spans

下面您可以找到此项目声明的所有 spans 列表。

22.2.1. 活跃任务 Span

围绕任务执行创建的指标。

Span 名称 spring.cloud.task (由约定类 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention 定义)。

封闭类的完全限定名 org.springframework.cloud.task.listener.TaskExecutionObservation

所有标签必须以 spring.cloud.task 前缀开头!
表 12. 标签键

名称

描述

spring.cloud.task.cf.app.id (必需)

CF 云的 App ID。

spring.cloud.task.cf.app.name (必需)

CF 云的 App 名称。

spring.cloud.task.cf.app.version (必需)

CF 云的 App 版本。

spring.cloud.task.cf.instance.index (必需)

CF 云的实例索引。

spring.cloud.task.cf.org.name (必需)

CF 云的组织名称。

spring.cloud.task.cf.space.id (必需)

CF 云的空间 ID。

spring.cloud.task.cf.space.name (必需)

CF 云的空间名称。

spring.cloud.task.execution.id (必需)

任务执行 ID。

spring.cloud.task.exit.code (必需)

任务退出码。

spring.cloud.task.external.execution.id (必需)

任务的外部执行 ID。

spring.cloud.task.name (必需)

任务名称度量。

spring.cloud.task.parent.execution.id (必需)

任务父执行 ID。

spring.cloud.task.status (必需)

任务状态。可以是 success 或 failure。

22.2.2. 任务运行器观测 Span

任务运行器执行时创建的观测。

Span 名称 spring.cloud.task.runner (由约定类 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention 定义)。

封闭类的完全限定名 org.springframework.cloud.task.configuration.observation.TaskDocumentedObservation

所有标签必须以 spring.cloud.task 前缀开头!
表 13. 标签键

名称

描述

spring.cloud.task.runner.bean-name (必需)

由 Spring Cloud Task 执行的 bean 的名称。