版本 3.0.4

© 2009-2022 VMware, Inc. 保留所有权利。

您可以为自己使用和分发此文档的副本,前提是您不对这些副本收取任何费用,并且每份副本(无论是印刷版还是电子版)都包含此版权声明。

前言

本节简要概述了 Spring Cloud Task 参考文档。将其视为文档其余部分的地图。您可以按顺序阅读本参考指南,如果某些部分不感兴趣,也可以跳过。

1. 关于文档

Spring Cloud Task 参考指南提供 htmlpdfepub 格式。最新副本可在 docs.spring.io/spring-cloud-task/docs/current-SNAPSHOT/reference/html/ 获取。

您可以为自己使用和分发此文档的副本,前提是您不对这些副本收取任何费用,并且每份副本(无论是印刷版还是电子版)都包含此版权声明。

2. 获取帮助

使用 Spring Cloud Task 遇到问题?我们乐意帮助!

Spring Cloud Task 的所有内容,包括文档,都是开源的。如果您发现文档有问题,或者您只是想改进它们,请 参与进来

3. 第一步

如果您刚开始接触 Spring Cloud Task 或一般的“Spring”,我们建议您阅读 入门 章节。

要从头开始,请阅读以下部分

要学习本教程,请阅读 开发您的第一个 Spring Cloud Task 应用程序
要运行您的示例,请阅读 运行示例

入门

如果您刚开始使用 Spring Cloud Task,您应该阅读本节。在这里,我们将回答基本的“是什么?”、“如何做?”和“为什么?”等问题。我们首先对 Spring Cloud Task 进行简单介绍。然后,我们构建一个 Spring Cloud Task 应用程序,并在过程中讨论一些核心原则。

4. Spring Cloud Task 简介

Spring Cloud Task 使创建短生命周期的微服务变得容易。它提供了在生产环境中按需执行短生命周期 JVM 进程的功能。

5. 系统要求

您需要安装 Java (Java 17 或更高版本)。要构建,您还需要安装 Maven。

5.1. 数据库要求

Spring Cloud Task 使用关系型数据库存储已执行任务的结果。虽然您可以在没有数据库的情况下开始开发任务(任务状态会作为任务仓库更新的一部分进行日志记录),但对于生产环境,您需要使用受支持的数据库。Spring Cloud Task 目前支持以下数据库:

  • DB2

  • H2

  • HSQLDB

  • MySql

  • Oracle

  • Postgres

  • SqlServer

6. 开发您的第一个 Spring Cloud Task 应用程序

一个好的起点是一个简单的“Hello, World!”应用程序,因此我们创建 Spring Cloud Task 等效应用程序以突出框架的功能。大多数 IDE 对 Apache Maven 都有很好的支持,因此我们将其用作此项目的构建工具。

spring.io 网站包含许多使用 Spring Boot 的入门”指南。如果您需要解决特定问题,请先在那里查看。您可以通过访问 Spring Initializr 并创建一个新项目来简化以下步骤。这样做会自动生成新的项目结构,以便您可以立即开始编写代码。我们建议尝试使用 Spring Initializr 来熟悉它。

6.1. 使用 Spring Initializr 创建 Spring Task 项目

现在我们可以创建并测试一个在控制台打印 Hello, World! 的应用程序。

为此:

  1. 访问 Spring Initializr 网站。

    1. 创建一个新的 Maven 项目,Group 名称为 io.spring.demoArtifact 名称为 helloworld

    2. 在 Dependencies 文本框中,键入 task,然后选择 Cloud Task 依赖项。

    3. 在 Dependencies 文本框中,键入 jdbc,然后选择 JDBC 依赖项。

    4. 在 Dependencies 文本框中,键入 h2,然后选择 H2。(或您喜欢的数据库)

    5. 点击 Generate Project 按钮

  2. 解压 helloworld.zip 文件,并将项目导入您喜欢的 IDE。

6.2. 编写代码

为了完成我们的应用程序,我们需要用以下内容更新生成的 HelloworldApplication,以便它启动一个任务。

package io.spring.Helloworld;

import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.task.configuration.EnableTask;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
@EnableTask
public class HelloworldApplication {

    @Bean
    public ApplicationRunner applicationRunner() {
        return new HelloWorldApplicationRunner();
    }

    public static void main(String[] args) {
        SpringApplication.run(HelloworldApplication.class, args);
    }

    public static class HelloWorldApplicationRunner implements ApplicationRunner {

        @Override
        public void run(ApplicationArguments args) throws Exception {
            System.out.println("Hello, World!");

        }
    }
}

虽然看起来很小,但其中包含了许多内容。有关 Spring Boot 的具体细节,请参阅 Spring Boot 参考文档

现在我们可以打开 src/main/resources 中的 application.properties 文件。我们需要在 application.properties 中配置两个属性:

  • application.name:设置应用程序名称(它被转换为任务名称)

  • logging.level:将 Spring Cloud Task 的日志记录设置为 DEBUG,以便查看正在发生的事情。

以下示例展示了如何同时执行这两项操作

logging.level.org.springframework.cloud.task=DEBUG
spring.application.name=helloWorld

6.2.1. 任务自动配置

当包含 Spring Cloud Task Starter 依赖项时,Task 会自动配置所有 bean 以启动其功能。此配置的一部分注册了 TaskRepository 及其基础设施。

在我们的演示中,TaskRepository 使用嵌入式 H2 数据库来记录任务的结果。这个 H2 嵌入式数据库对于生产环境来说不是一个实用的解决方案,因为一旦任务结束,H2 数据库就会消失。然而,为了快速入门体验,我们可以在我们的示例中使用它,同时将该仓库中正在更新的内容回显到日志中。在 配置 部分(本文档的后面),我们介绍了如何自定义 Spring Cloud Task 提供的各个部分的配置。

当我们的示例应用程序运行时,Spring Boot 会启动我们的 HelloWorldCommandLineRunner 并将“Hello, World!”消息输出到标准输出。TaskLifecycleListener 在仓库中记录任务的开始和结束。

6.2.2. main 方法

main 方法作为任何 Java 应用程序的入口点。我们的 main 方法委托给 Spring Boot 的 SpringApplication 类。

6.2.3. ApplicationRunner

Spring 包含多种引导应用程序逻辑的方法。Spring Boot 通过其 *Runner 接口(CommandLineRunnerApplicationRunner)提供了一种方便且有组织的方法。一个行为良好的任务可以通过使用这些运行器中的一个来引导任何逻辑。

任务的生命周期被认为是:在 *Runner#run 方法执行之前开始,直到所有方法都完成。Spring Boot 允许应用程序使用多个 *Runner 实现,Spring Cloud Task 也是如此。

通过 CommandLineRunnerApplicationRunner 以外的机制(例如,使用 InitializingBean#afterPropertiesSet)引导的任何处理都不会被 Spring Cloud Task 记录。

6.3. 运行示例

此时,我们的应用程序应该可以工作了。由于此应用程序基于 Spring Boot,我们可以通过在应用程序根目录中使用 $ mvn spring-boot:run 从命令行运行它,如下例所示(及其输出)

$ mvn clean spring-boot:run
....... . . .
....... . . . (Maven log output here)
....... . . .

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.0.3.RELEASE)

2018-07-23 17:44:34.426  INFO 1978 --- [           main] i.s.d.helloworld.HelloworldApplication   : Starting HelloworldApplication on Glenns-MBP-2.attlocal.net with PID 1978 (/Users/glennrenfro/project/helloworld/target/classes started by glennrenfro in /Users/glennrenfro/project/helloworld)
2018-07-23 17:44:34.430  INFO 1978 --- [           main] i.s.d.helloworld.HelloworldApplication   : No active profile set, falling back to default profiles: default
2018-07-23 17:44:34.472  INFO 1978 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1d24f32d: startup date [Mon Jul 23 17:44:34 EDT 2018]; root of context hierarchy
2018-07-23 17:44:35.280  INFO 1978 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2018-07-23 17:44:35.410  INFO 1978 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2018-07-23 17:44:35.419 DEBUG 1978 --- [           main] o.s.c.t.c.SimpleTaskConfiguration        : Using org.springframework.cloud.task.configuration.DefaultTaskConfigurer TaskConfigurer
2018-07-23 17:44:35.420 DEBUG 1978 --- [           main] o.s.c.t.c.DefaultTaskConfigurer          : No EntityManager was found, using DataSourceTransactionManager
2018-07-23 17:44:35.522 DEBUG 1978 --- [           main] o.s.c.t.r.s.TaskRepositoryInitializer    : Initializing task schema for h2 database
2018-07-23 17:44:35.525  INFO 1978 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executing SQL script from class path resource [org/springframework/cloud/task/schema-h2.sql]
2018-07-23 17:44:35.558  INFO 1978 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executed SQL script from class path resource [org/springframework/cloud/task/schema-h2.sql] in 33 ms.
2018-07-23 17:44:35.728  INFO 1978 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2018-07-23 17:44:35.730  INFO 1978 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Bean with name 'dataSource' has been autodetected for JMX exposure
2018-07-23 17:44:35.733  INFO 1978 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Located MBean 'dataSource': registering with JMX server as MBean [com.zaxxer.hikari:name=dataSource,type=HikariDataSource]
2018-07-23 17:44:35.738  INFO 1978 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
2018-07-23 17:44:35.762 DEBUG 1978 --- [           main] o.s.c.t.r.support.SimpleTaskRepository   : Creating: TaskExecution{executionId=0, parentExecutionId=null, exitCode=null, taskName='application', startTime=Mon Jul 23 17:44:35 EDT 2018, endTime=null, exitMessage='null', externalExecutionId='null', errorMessage='null', arguments=[]}
2018-07-23 17:44:35.772  INFO 1978 --- [           main] i.s.d.helloworld.HelloworldApplication   : Started HelloworldApplication in 1.625 seconds (JVM running for 4.764)
Hello, World!
2018-07-23 17:44:35.782 DEBUG 1978 --- [           main] o.s.c.t.r.support.SimpleTaskRepository   : Updating: TaskExecution with executionId=1 with the following {exitCode=0, endTime=Mon Jul 23 17:44:35 EDT 2018, exitMessage='null', errorMessage='null'}

前面的输出有三行我们感兴趣的内容

  • SimpleTaskRepository 记录了 TaskRepository 中条目的创建。

  • 我们的 CommandLineRunner 的执行,由“Hello, World!”输出演示。

  • SimpleTaskRepository 记录了 TaskRepository 中任务的完成。

一个简单的任务应用程序可以在 Spring Cloud Task 项目的 samples 模块中找到这里

功能

本节将更详细地介绍 Spring Cloud Task,包括如何使用它、如何配置它以及适当的扩展点。

7. Spring Cloud Task 的生命周期

在大多数情况下,现代云环境是围绕不期望结束的进程执行而设计的。如果它们确实结束了,它们通常会被重新启动。虽然大多数平台确实有某种方法可以运行一个在结束后不重新启动的进程,但该运行的结果通常不会以可消费的方式维护。Spring Cloud Task 提供了在环境中执行短生命周期进程并记录结果的能力。这样做允许围绕短生命周期进程以及通过消息集成任务来实现长时间运行服务的微服务架构。

虽然此功能在云环境中很有用,但在传统部署模型中也可能出现同样的问题。当使用 cron 等调度程序运行 Spring Boot 应用程序时,能够在应用程序完成之后监视其结果会很有用。

Spring Cloud Task 采取的方法是,Spring Boot 应用程序可以有开始和结束,并且仍然成功。批处理应用程序是预期结束(并且通常是短生命周期)的进程如何提供帮助的一个示例。

Spring Cloud Task 记录给定任务的生命周期事件。大多数长时间运行的进程(以大多数 Web 应用程序为典型)不保存其生命周期事件。Spring Cloud Task 的核心任务则会。

生命周期由单个任务执行组成。这是配置为任务(即具有 Sprint Cloud Task 依赖项)的 Spring Boot 应用程序的物理执行。

在任务开始时,在任何 CommandLineRunnerApplicationRunner 实现运行之前,会在 TaskRepository 中创建一个记录开始事件的条目。此事件通过 Spring Framework 触发 SmartLifecycle#start 来触发。这表示系统所有 Bean 都已准备好使用,并且在运行 Spring Boot 提供的任何 CommandLineRunnerApplicationRunner 实现之前发生。

任务的记录仅在 ApplicationContext 成功引导时发生。如果上下文完全无法引导,则不记录任务的运行。

在 Spring Boot 的所有 *Runner#run 调用完成后或 ApplicationContext 失败(由 ApplicationFailedEvent 指示)时,任务执行的结果会在存储库中更新。

如果应用程序要求在任务完成时(所有 *Runner#run 方法已调用且任务存储库已更新)关闭 ApplicationContext,请将属性 spring.cloud.task.closecontextEnabled 设置为 true。

7.1. TaskExecution

存储在 TaskRepository 中的信息在 TaskExecution 类中建模,并包含以下信息

字段 描述

executionid

任务运行的唯一 ID。

exitCode

ExitCodeExceptionMapper 实现生成的退出代码。如果没有生成退出代码但抛出 ApplicationFailedEvent,则设置为 1。否则,假定为 0。

taskName

任务的名称,由配置的 TaskNameResolver 确定。

startTime

任务开始时间,由 SmartLifecycle#start 调用指示。

endTime

任务完成时间,由 ApplicationReadyEvent 指示。

exitMessage

退出时可用的任何信息。这可以通过 TaskExecutionListener 以编程方式设置。

errorMessage

如果异常是任务结束的原因(由 ApplicationFailedEvent 指示),则该异常的堆栈跟踪存储在此处。

arguments

作为命令行参数传递给可执行引导应用程序的字符串命令行参数列表。

7.2. 映射退出代码

当任务完成时,它会尝试向操作系统返回一个退出代码。如果我们查看我们的 原始示例,我们可以看到我们没有控制应用程序的这方面。因此,如果抛出异常,JVM 会返回一个可能对您调试有用也可能没用的代码。

因此,Spring Boot 提供了一个接口 ExitCodeExceptionMapper,它允许您将未捕获的异常映射到退出代码。这样做可以让您在退出代码级别指示出了什么问题。此外,通过以这种方式映射退出代码,Spring Cloud Task 会记录返回的退出代码。

如果任务因 SIG-INT 或 SIG-TERM 终止,则除非代码中另有规定,否则退出代码为零。

任务运行时,退出代码在存储库中存储为 null。任务完成后,根据本节前面所述的准则存储相应的退出代码。

8. 配置

Spring Cloud Task 提供了开箱即用的配置,如 DefaultTaskConfigurerSimpleTaskConfiguration 类中定义。本节将介绍默认值以及如何根据您的需求自定义 Spring Cloud Task。

8.1. DataSource

Spring Cloud Task 使用数据源来存储任务执行的结果。默认情况下,我们提供了一个 H2 内存实例,以提供一种简单的开发引导方法。但是,在生产环境中,您可能希望配置自己的 DataSource

如果您的应用程序只使用一个 DataSource,并且该数据源既用作您的业务模式又用作任务存储库,则您只需提供任何 DataSource(最简单的方法是通过 Spring Boot 的配置约定)。此 DataSource 将由 Spring Cloud Task 自动用于存储库。

如果您的应用程序使用多个 DataSource,则需要使用适当的 DataSource 配置任务存储库。此自定义可以通过 TaskConfigurer 的实现来完成。

8.2. 表前缀

TaskRepository 的一个可修改属性是任务表的表前缀。默认情况下,它们都以 TASK_ 为前缀。TASK_EXECUTIONTASK_EXECUTION_PARAMS 是两个示例。但是,修改此前缀可能有一些原因。如果需要将模式名称添加到表名称前面,或者在同一模式中需要多组任务表,则必须更改表前缀。您可以通过将 spring.cloud.task.tablePrefix 设置为您需要的前缀来完成此操作,如下所示

spring.cloud.task.tablePrefix=yourPrefix

通过使用 spring.cloud.task.tablePrefix,用户承担创建符合任务表模式标准但又根据用户业务需求进行修改的任务表的责任。您可以将 Spring Cloud Task Schema DDL 作为创建您自己的任务 DDL 的指南,如此处所示。

8.3. 启用/禁用表初始化

如果您正在创建任务表并且不希望 Spring Cloud Task 在任务启动时创建它们,请将 spring.cloud.task.initialize-enabled 属性设置为 false,如下所示

spring.cloud.task.initialize-enabled=false

它默认为 true

属性 spring.cloud.task.initialize.enable 已弃用。

8.4. 外部生成的任务 ID

在某些情况下,您可能希望允许任务请求时间与基础设施实际启动任务时间之间的差异。Spring Cloud Task 允许您在请求任务时创建 TaskExecution。然后将生成的 TaskExecution 的执行 ID 传递给任务,以便任务可以通过任务的生命周期更新 TaskExecution

可以通过在引用存储 TaskExecution 对象的数据库的 TaskRepository 实现上调用 createTaskExecution 方法来创建 TaskExecution

要将您的任务配置为使用生成的 TaskExecutionId,请添加以下属性

spring.cloud.task.executionid=yourtaskId

8.5. 外部任务 ID

Spring Cloud Task 允许您为每个 TaskExecution 存储一个外部任务 ID。要将您的任务配置为使用生成的 TaskExecutionId,请添加以下属性

spring.cloud.task.external-execution-id=<externalTaskId>

8.6. 父任务 ID

Spring Cloud Task 允许您为每个 TaskExecution 存储一个父任务 ID。例如,一个任务执行另一个任务或多个任务,并且您希望记录哪个任务启动了每个子任务。要将您的任务配置为设置父 TaskExecutionId,请在子任务上添加以下属性

spring.cloud.task.parent-execution-id=<parentExecutionTaskId>

8.7. TaskConfigurer

TaskConfigurer 是一个策略接口,允许您自定义 Spring Cloud Task 组件的配置方式。默认情况下,我们提供了 DefaultTaskConfigurer,它提供逻辑默认值:基于 Map 的内存中组件(如果没有提供 DataSource,则适用于开发)和基于 JDBC 的组件(如果 DataSource 可用,则适用)。

TaskConfigurer 允许您配置三个主要组件

组件 描述 默认(由 DefaultTaskConfigurer 提供)

TaskRepository

要使用的 TaskRepository 的实现。

SimpleTaskRepository

TaskExplorer

要使用的 TaskExplorer(用于只读访问任务存储库的组件)的实现。

SimpleTaskExplorer

PlatformTransactionManager

在运行任务更新时使用的事务管理器。

如果使用了 DataSource,则为 JdbcTransactionManager。如果未使用,则为 ResourcelessTransactionManager

您可以通过创建 TaskConfigurer 接口的自定义实现来定制上表中描述的任何组件。通常,扩展 DefaultTaskConfigurer(如果未找到 TaskConfigurer 则提供)并重写所需的 getter 就足够了。但是,可能需要从头开始实现您自己的。

除非用户将其用于提供要作为 Spring Bean 公开的实现,否则不应直接使用 TaskConfigurer 中的 getter 方法。

8.8. 任务执行监听器

TaskExecutionListener 允许您注册在任务生命周期期间发生的特定事件的监听器。为此,请创建一个实现 TaskExecutionListener 接口的类。实现 TaskExecutionListener 接口的类将收到以下事件的通知

  • onTaskStartup:在将 TaskExecution 存储到 TaskRepository 之前。

  • onTaskEnd:在更新 TaskRepository 中的 TaskExecution 条目并标记任务的最终状态之前。

  • onTaskFailed:当任务抛出未处理的异常时,在调用 onTaskEnd 方法之前。

Spring Cloud Task 还允许您使用以下方法注解将 TaskExecution 监听器添加到 Bean 中的方法

  • @BeforeTask:在将 TaskExecution 存储到 TaskRepository 之前。

  • @AfterTask:在更新 TaskRepository 中的 TaskExecution 条目并标记任务的最终状态之前。

  • @FailedTask:当任务抛出未处理的异常时,在调用 @AfterTask 方法之前。

以下示例显示了正在使用的三个注解

 public class MyBean {

    @BeforeTask
    public void methodA(TaskExecution taskExecution) {
    }

    @AfterTask
    public void methodB(TaskExecution taskExecution) {
    }

    @FailedTask
    public void methodC(TaskExecution taskExecution, Throwable throwable) {
    }
}
在链中比 TaskLifecycleListener 存在更早地插入 ApplicationListener 可能会导致意外的影响。

8.8.1. 任务执行监听器抛出的异常

如果 TaskExecutionListener 事件处理程序抛出异常,则该事件处理程序的所有监听器处理都将停止。例如,如果三个 onTaskStartup 监听器已启动,并且第一个 onTaskStartup 事件处理程序抛出异常,则其他两个 onTaskStartup 方法不会被调用。但是,TaskExecutionListeners 的其他事件处理程序(onTaskEndonTaskFailed)将被调用。

TaskExecutionListener 事件处理程序抛出异常时返回的退出代码是 ExitCodeEvent 报告的退出代码。如果没有发出 ExitCodeEvent,则评估抛出的异常以查看其是否为 ExitCodeGenerator 类型。如果是,则返回 ExitCodeGenerator 的退出代码。否则,返回 1

如果在 onTaskStartup 方法中抛出异常,则应用程序的退出代码将为 1。如果在 onTaskEndonTaskFailed 方法中抛出异常,则应用程序的退出代码将是根据上述规则确定的退出代码。

如果 onTaskStartuponTaskEndonTaskFailed 中抛出异常,您不能使用 ExitCodeExceptionMapper 覆盖应用程序的退出代码。

8.8.2. 退出消息

您可以通过使用 TaskExecutionListener 以编程方式设置任务的退出消息。这是通过设置 TaskExecutionexitMessage 来完成的,该消息随后会传递给 TaskExecutionListener。以下示例展示了一个使用 @AfterTask ExecutionListener 注解的方法

@AfterTask
public void afterMe(TaskExecution taskExecution) {
    taskExecution.setExitMessage("AFTER EXIT MESSAGE");
}

可以在任何监听器事件(onTaskStartuponTaskFailedonTaskEnd)中设置 ExitMessage。三个监听器的优先级顺序如下

  1. onTaskEnd

  2. onTaskFailed

  3. onTaskStartup

例如,如果您为 onTaskStartuponTaskFailed 监听器设置了 exitMessage,并且任务在没有失败的情况下结束,则 onTaskStartupexitMessage 将存储在存储库中。否则,如果发生故障,则存储 onTaskFailedexitMessage。此外,如果您使用 onTaskEnd 监听器设置 exitMessage,则 onTaskEndexitMessage 将覆盖 onTaskStartuponTaskFailed 的退出消息。

8.9. 限制 Spring Cloud Task 实例

Spring Cloud Task 允许您确定给定任务名称的某个任务只能同时运行一个实例。为此,您需要建立 任务名称 并为每个任务执行设置 spring.cloud.task.single-instance-enabled=true。当第一个任务执行正在运行时,任何其他尝试运行具有相同 任务名称spring.cloud.task.single-instance-enabled=true 的任务都将失败,并出现以下错误消息:Task with name "application" is already running. spring.cloud.task.single-instance-enabled 的默认值为 false。以下示例展示了如何将 spring.cloud.task.single-instance-enabled 设置为 true

spring.cloud.task.single-instance-enabled=true or false

要使用此功能,您必须将以下 Spring Integration 依赖项添加到您的应用程序中

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jdbc</artifactId>
</dependency>
如果任务因启用此功能且另一个任务正在运行具有相同任务名称而失败,则应用程序的退出代码将为 1。

8.9.1. Spring AOT 和本机编译的单实例用法

在创建原生编译应用程序时使用 Spring Cloud Task 的单实例功能,您需要在构建时启用该功能。为此,请添加 process-aot 执行并将 spring.cloud.task.single-step-instance-enabled=true 设置为 JVM 参数,如下所示

<plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <executions>
        <execution>
            <id>process-aot</id>
            <goals>
                <goal>process-aot</goal>
            </goals>
            <configuration>
                <jvmArguments>
                    -Dspring.cloud.task.single-instance-enabled=true
                </jvmArguments>
            </configuration>
        </execution>
    </executions>
</plugin>

8.10. 启用 ApplicationRunner 和 CommandLineRunner 的观察

要为 ApplicationRunnerCommandLineRunner 启用任务观测,请将 spring.cloud.task.observation.enabled 设置为 true。

使用 SimpleMeterRegistry 启用观测的示例任务应用程序可以在此处找到。

8.11. 禁用 Spring Cloud Task 自动配置

在某些情况下,Spring Cloud Task 不应该为某个实现自动配置,您可以禁用 Task 的自动配置。这可以通过将以下注解添加到您的任务应用程序来完成

@EnableAutoConfiguration(exclude={SimpleTaskAutoConfiguration.class})

您还可以通过将 spring.cloud.task.autoconfiguration.enabled 属性设置为 false 来禁用任务自动配置。

8.12. 关闭上下文

如果应用程序要求在任务完成时(所有 *Runner#run 方法已调用且任务存储库已更新)关闭 ApplicationContext,请将属性 spring.cloud.task.closecontextEnabled 设置为 true

另一种关闭上下文的情况是当任务执行完成但应用程序不终止时。在这些情况下,上下文保持打开状态,因为已分配了一个线程(例如:如果您正在使用 TaskExecutor)。在这些情况下,在启动任务时将 spring.cloud.task.closecontextEnabled 属性设置为 true。这将关闭应用程序的上下文,一旦任务完成。从而允许应用程序终止。

8.13. 启用任务指标

Spring Cloud Task 与 Micrometer 集成,并为其执行的任务创建观测。要启用任务可观测性集成,您必须将 spring-boot-starter-actuator、您首选的注册表实现(如果您想发布指标)和 micrometer-tracing(如果您想发布跟踪数据)添加到您的任务应用程序中。以下是使用 Influx 启用任务可观测性和指标的 Maven 依赖项示例

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-registry-influx</artifactId>
    <scope>runtime</scope>
</dependency>

8.14. Spring Task 和 Spring Cloud Task 属性

“任务”一词在行业中经常使用。例如,Spring Boot 提供了 spring.task,而 Spring Cloud Task 提供了 spring.cloud.task 属性。这在过去造成了一些混淆,认为这两组属性直接相关。然而,它们代表了 Spring 生态系统中提供的两组不同的功能。

  • spring.task 指的是配置 ThreadPoolTaskScheduler 的属性。

  • spring.cloud.task 指的是配置 Spring Cloud Task 功能的属性。

批处理

本节将详细介绍 Spring Cloud Task 与 Spring Batch 的集成。本节涵盖了跟踪作业执行与其执行的任务之间的关联,以及通过 Spring Cloud Deployer 进行远程分区。

9. 将作业执行与执行它的任务关联

Spring Boot 提供了在 uber-jar 中执行批处理作业的便利。Spring Boot 对此功能的支持使开发人员可以在该执行中执行多个批处理作业。Spring Cloud Task 提供了将作业的执行(作业执行)与任务的执行关联起来的能力,以便可以相互追溯。

Spring Cloud Task 通过使用 TaskBatchExecutionListener 来实现此功能。默认情况下,此监听器会在任何同时配置了 Spring Batch 作业(通过在上下文中定义了一个 Job 类型的 bean)和类路径上存在 spring-cloud-task-batch jar 的上下文中自动配置。该监听器会被注入到所有符合这些条件的作业中。

9.1. 覆盖 TaskBatchExecutionListener

为了防止监听器被注入到当前上下文中的任何批处理作业中,您可以使用标准的 Spring Boot 机制禁用自动配置。

要仅将监听器注入到上下文中的特定作业中,请覆盖 batchTaskExecutionListenerBeanPostProcessor 并提供作业 bean ID 列表,如下例所示

public static TaskBatchExecutionListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() {
    TaskBatchExecutionListenerBeanPostProcessor postProcessor =
        new TaskBatchExecutionListenerBeanPostProcessor();

    postProcessor.setJobNames(Arrays.asList(new String[] {"job1", "job2"}));

    return postProcessor;
}
您可以在 Spring Cloud Task 项目的 samples 模块中找到一个示例批处理应用程序,此处

10. 远程分区

Spring Cloud Deployer 提供了在大多数云基础设施上启动基于 Spring Boot 的应用程序的功能。DeployerPartitionHandlerDeployerStepExecutionHandler 将工作者步骤执行的启动委托给 Spring Cloud Deployer。

要配置 DeployerStepExecutionHandler,您必须提供一个表示要执行的 Spring Boot uber-jar 的 Resource、一个 TaskLauncherHandler 和一个 JobExplorer。您可以配置任何环境属性以及一次执行的最大工作者数量、轮询结果的间隔(默认为 10 秒)和超时(默认为 -1 或无超时)。以下示例展示了如何配置此 PartitionHandler

@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
        JobExplorer jobExplorer) throws Exception {

    MavenProperties mavenProperties = new MavenProperties();
    mavenProperties.setRemoteRepositories(new HashMap<>(Collections.singletonMap("springRepo",
        new MavenProperties.RemoteRepository(repository))));

    Resource resource =
        MavenResource.parse(String.format("%s:%s:%s",
                "io.spring.cloud",
                "partitioned-batch-job",
                "1.1.0.RELEASE"), mavenProperties);

    DeployerPartitionHandler partitionHandler =
        new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");

    List<String> commandLineArgs = new ArrayList<>(3);
    commandLineArgs.add("--spring.profiles.active=worker");
    commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
    commandLineArgs.add("--spring.batch.initializer.enabled=false");

    partitionHandler.setCommandLineArgsProvider(
        new PassThroughCommandLineArgsProvider(commandLineArgs));
    partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
    partitionHandler.setMaxWorkers(2);
    partitionHandler.setApplicationName("PartitionedBatchJobTask");

    return partitionHandler;
}
将环境变量传递给分区时,每个分区可能位于具有不同环境设置的不同机器上。因此,您应该只传递所需的那些环境变量。

请注意,在上面的示例中,我们已将最大工作者数量设置为 2。设置最大工作者数量可确定应同时运行的最大分区数量。

要执行的 Resource 预计是一个 Spring Boot uber-jar,其中 DeployerStepExecutionHandler 在当前上下文中配置为 CommandLineRunner。前面示例中列举的仓库应该是 uber-jar 所在的远程仓库。管理器和工作者都应该能够访问用作作业仓库和任务仓库的相同数据存储。一旦底层基础设施引导了 Spring Boot jar 并且 Spring Boot 启动了 DeployerStepExecutionHandler,步骤处理程序就会执行请求的 Step。以下示例展示了如何配置 DeployerStepExecutionHandler

@Bean
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
    DeployerStepExecutionHandler handler =
        new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);

    return handler;
}
您可以在 Spring Cloud Task 项目的 samples 模块中找到一个示例远程分区应用程序,此处

10.1. 异步启动远程批处理分区

默认情况下,批处理分区是按顺序启动的。但是,在某些情况下,这可能会影响性能,因为每次启动都会阻塞,直到资源(例如:在 Kubernetes 中提供 Pod)被提供。在这些情况下,您可以为 DeployerPartitionHandler 提供一个 ThreadPoolTaskExecutor。这将根据 ThreadPoolTaskExecutor 的配置启动远程批处理分区。例如

    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setThreadNamePrefix("default_task_executor_thread");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }

    @Bean
    public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
        TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception {
        Resource resource = this.resourceLoader
            .getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");

        DeployerPartitionHandler partitionHandler =
            new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
                "workerStep", taskRepository, executor);
    ...
    }
我们需要关闭上下文,因为使用 ThreadPoolTaskExecutor 会使线程处于活动状态,因此应用程序不会终止。要适当关闭应用程序,我们需要将 spring.cloud.task.closecontextEnabled 属性设置为 true

10.2. 在 Kubernetes 平台开发批处理分区应用程序的注意事项

  • 在 Kubernetes 平台上部署分区应用程序时,您必须使用以下依赖项来获取 Spring Cloud Kubernetes Deployer

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-deployer-kubernetes</artifactId>
    </dependency>
  • 任务应用程序及其分区的应用程序名称需要遵循以下正则表达式模式:[a-z0-9]([-a-z0-9]*[a-z0-9])。否则,将抛出异常。

11. 批处理信息消息

Spring Cloud Task 提供了批处理作业发出信息消息的功能。“Spring Batch 事件”部分详细介绍了此功能。

12. 批处理作业退出代码

前文 所述,Spring Cloud Task 应用程序支持记录任务执行的退出代码。但是,在您在任务中运行 Spring Batch 作业的情况下,无论 Batch Job Execution 如何完成,使用默认的 Batch/Boot 行为时,任务的结果始终为零。请记住,任务是一个引导应用程序,并且从任务返回的退出代码与引导应用程序相同。要覆盖此行为并允许任务在批处理作业返回 FAILEDBatchStatus 时返回非零退出代码,请将 spring.cloud.task.batch.fail-on-job-failure 设置为 true。然后退出代码可以是 1(默认值)或基于 指定的 ExitCodeGenerator)。

此功能使用了一个新的 ApplicationRunner,它取代了 Spring Boot 提供的那个。默认情况下,它配置为相同的顺序。但是,如果您想自定义 ApplicationRunner 运行的顺序,可以通过设置 spring.cloud.task.batch.applicationRunnerOrder 属性来设置其顺序。要使任务根据批处理作业执行的结果返回退出代码,您需要编写自己的 CommandLineRunner

单步批处理作业启动器

本节将介绍如何使用 Spring Cloud Task 中包含的启动器,开发一个包含单个 Step 的 Spring Batch Job。此启动器允许您通过配置定义 ItemReaderItemWriter 或完整的单步 Spring Batch Job。有关 Spring Batch 及其功能的更多信息,请参阅 Spring Batch 文档

要获取 Maven 启动器,请将以下内容添加到您的构建中

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-single-step-batch-job</artifactId>
    <version>2.3.0</version>
</dependency>

要获取 Gradle 启动器,请将以下内容添加到您的构建中

compile "org.springframework.cloud:spring-cloud-starter-single-step-batch-job:2.3.0"

13. 定义作业

您可以使用启动器来定义最少的 ItemReaderItemWriter,或者定义完整的 Job。在本节中,我们将定义配置 Job 所需的属性。

13.1. 属性

首先,启动器提供了一组属性,让您可以配置包含一个 Step 的 Job 的基本信息

表 1. Job 属性
财产 类型 默认值 描述

spring.batch.job.jobName

字符串

null

作业名称。

spring.batch.job.stepName

字符串

null

步骤名称。

spring.batch.job.chunkSize

整数

null

每个事务处理的项数。

配置上述属性后,您将拥有一个包含单个基于块的步骤的作业。此基于块的步骤读取、处理并写入 Map<String, Object> 实例作为项。但是,该步骤尚未执行任何操作。您需要配置一个 ItemReader、一个可选的 ItemProcessor 和一个 ItemWriter 以使其执行某些操作。要配置其中之一,您可以使用属性并配置已提供自动配置的选项之一,或者您可以使用标准的 Spring 配置机制配置您自己的。

如果您配置自己的,则输入和输出类型必须与步骤中的其他类型匹配。此启动器中的 ItemReader 实现和 ItemWriter 实现都使用 Map<String, Object> 作为输入和输出项。

14. ItemReader 实现的自动配置

此启动器为四种不同的 ItemReader 实现提供了自动配置:AmqpItemReaderFlatFileItemReaderJdbcCursorItemReaderKafkaItemReader。在本节中,我们将概述如何使用提供的自动配置来配置这些实现。

14.1. AmqpItemReader

您可以使用 AmqpItemReader 从 AMQP 队列或主题读取数据。此 ItemReader 实现的自动配置取决于两组配置。首先是 AmqpTemplate 的配置。您可以自己配置此项,也可以使用 Spring Boot 提供的自动配置。请参阅 Spring Boot AMQP 文档。配置 AmqpTemplate 后,您可以通过设置以下属性来启用批处理功能以支持它

表 2. AmqpItemReader 属性
财产 类型 默认值 描述

spring.batch.job.amqpitemreader.enabled

布尔值

如果为 true,则执行自动配置。

spring.batch.job.amqpitemreader.jsonConverterEnabled

布尔值

true

指示是否应注册 Jackson2JsonMessageConverter 以解析消息。

有关更多信息,请参阅 AmqpItemReader 文档

14.2. FlatFileItemReader

FlatFileItemReader 允许您从平面文件(例如 CSV 和其他文件格式)读取数据。要从文件读取数据,您可以通过正常的 Spring 配置(LineTokenizerRecordSeparatorPolicyFieldSetMapperLineMapperSkippedLinesCallback)自行提供一些组件。您还可以使用以下属性配置读取器

表 3. FlatFileItemReader 属性
财产 类型 默认值 描述

spring.batch.job.flatfileitemreader.saveState

布尔值

true

确定是否应保存状态以进行重新启动。

spring.batch.job.flatfileitemreader.name

字符串

null

用于在 ExecutionContext 中提供唯一键的名称。

spring.batch.job.flatfileitemreader.maxItemcount

int

Integer.MAX_VALUE

要从文件中读取的最大项数。

spring.batch.job.flatfileitemreader.currentItemCount

int

0

已读取的项数。在重新启动时使用。

spring.batch.job.flatfileitemreader.comments

List<String>

空列表

指示文件中注释行(要忽略的行)的字符串列表。

spring.batch.job.flatfileitemreader.resource

资源

null

要读取的资源。

spring.batch.job.flatfileitemreader.strict

布尔值

true

如果设置为 true,则如果找不到资源,读取器将抛出异常。

spring.batch.job.flatfileitemreader.encoding

字符串

FlatFileItemReader.DEFAULT_CHARSET

读取文件时使用的编码。

spring.batch.job.flatfileitemreader.linesToSkip

int

0

指示在文件开头要跳过的行数。

spring.batch.job.flatfileitemreader.delimited

布尔值

指示文件是否为分隔文件(CSV 和其他格式)。此属性或 spring.batch.job.flatfileitemreader.fixedLength 只能同时为 true

spring.batch.job.flatfileitemreader.delimiter

字符串

DelimitedLineTokenizer.DELIMITER_COMMA

如果读取分隔文件,则指示要解析的分隔符。

spring.batch.job.flatfileitemreader.quoteCharacter

char

DelimitedLineTokenizer.DEFAULT_QUOTE_CHARACTER

用于确定用于引用值的字符。

spring.batch.job.flatfileitemreader.includedFields

List<Integer>

空列表

用于确定记录中要包含在项中的字段的索引列表。

spring.batch.job.flatfileitemreader.fixedLength

布尔值

指示文件的记录是否按列号解析。此属性或 spring.batch.job.flatfileitemreader.delimited 只能同时为 true

spring.batch.job.flatfileitemreader.ranges

List<Range>

空列表

用于解析固定宽度记录的列范围列表。请参阅 Range 文档

spring.batch.job.flatfileitemreader.names

String []

null

从记录中解析出的每个字段的名称列表。这些名称是此 ItemReader 返回的项中 Map<String, Object> 的键。

spring.batch.job.flatfileitemreader.parsingStrict

布尔值

true

如果设置为 true,则如果字段无法映射,则映射失败。

14.3. JdbcCursorItemReader

JdbcCursorItemReader 对关系数据库运行查询,并迭代结果游标(ResultSet)以提供结果项。此自动配置允许您提供 PreparedStatementSetterRowMapper 或两者。您还可以使用以下属性配置 JdbcCursorItemReader

表 4. JdbcCursorItemReader 属性
财产 类型 默认值 描述

spring.batch.job.jdbccursoritemreader.saveState

布尔值

true

确定是否应保存状态以进行重新启动。

spring.batch.job.jdbccursoritemreader.name

字符串

null

用于在 ExecutionContext 中提供唯一键的名称。

spring.batch.job.jdbccursoritemreader.maxItemcount

int

Integer.MAX_VALUE

要从文件中读取的最大项数。

spring.batch.job.jdbccursoritemreader.currentItemCount

int

0

已读取的项数。在重新启动时使用。

spring.batch.job.jdbccursoritemreader.fetchSize

int

向驱动程序提供的提示,指示每次调用数据库系统时要检索的记录数。为了获得最佳性能,您通常希望将其设置为与块大小匹配。

spring.batch.job.jdbccursoritemreader.maxRows

int

要从数据库读取的最大项数。

spring.batch.job.jdbccursoritemreader.queryTimeout

int

查询超时前的毫秒数。

spring.batch.job.jdbccursoritemreader.ignoreWarnings

布尔值

true

确定读取器在处理时是否应忽略 SQL 警告。

spring.batch.job.jdbccursoritemreader.verifyCursorPosition

布尔值

true

指示是否应在每次读取后验证游标位置,以验证 RowMapper 是否未移动游标。

spring.batch.job.jdbccursoritemreader.driverSupportsAbsolute

布尔值

指示驱动程序是否支持游标的绝对定位。

spring.batch.job.jdbccursoritemreader.useSharedExtendedConnection

布尔值

指示连接是否与其他处理共享(因此是事务的一部分)。

spring.batch.job.jdbccursoritemreader.sql

字符串

null

要读取的 SQL 查询。

您还可以使用以下属性专门为读取器指定 JDBC DataSource:。JdbcCursorItemReader 属性

财产 类型 默认值 描述

spring.batch.job.jdbccursoritemreader.datasource.enable

布尔值

确定是否应启用 JdbcCursorItemReader DataSource

jdbccursoritemreader.datasource.url

字符串

null

数据库的 JDBC URL。

jdbccursoritemreader.datasource.username

字符串

null

数据库的登录用户名。

jdbccursoritemreader.datasource.password

字符串

null

数据库的登录密码。

jdbccursoritemreader.datasource.driver-class-name

字符串

null

JDBC 驱动程序的完全限定名称。

如果未指定 jdbccursoritemreader_datasource,则 JDBCCursorItemReader 将使用默认的 DataSource

14.4. KafkaItemReader

从 Kafka 主题摄取分区数据非常有用,这正是 KafkaItemReader 可以做到的。要配置 KafkaItemReader,需要两部分配置。首先,需要使用 Spring Boot 的 Kafka 自动配置来配置 Kafka(请参阅 Spring Boot Kafka 文档)。配置 Spring Boot 的 Kafka 属性后,您可以通过设置以下属性来配置 KafkaItemReader 本身

表 5. KafkaItemReader 属性
财产 类型 默认值 描述

spring.batch.job.kafkaitemreader.name

字符串

null

用于在 ExecutionContext 中提供唯一键的名称。

spring.batch.job.kafkaitemreader.topic

字符串

null

要读取的主题名称。

spring.batch.job.kafkaitemreader.partitions

List<Integer>

空列表

要读取的分区索引列表。

spring.batch.job.kafkaitemreader.pollTimeOutInSeconds

long

30

poll() 操作的超时时间。

spring.batch.job.kafkaitemreader.saveState

布尔值

true

确定是否应保存状态以进行重新启动。

请参阅 KafkaItemReader 文档

14.5. 本机编译

单步批处理的优势在于,当您使用 JVM 时,它允许您在运行时动态选择要使用的读取器和写入器 Bean。但是,当您使用原生编译时,必须在构建时而不是在运行时确定读取器和写入器。以下示例演示了如何实现这一点

<plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <executions>
        <execution>
            <id>process-aot</id>
            <goals>
                <goal>process-aot</goal>
            </goals>
            <configuration>
                <jvmArguments>
                    -Dspring.batch.job.flatfileitemreader.name=fooReader
                    -Dspring.batch.job.flatfileitemwriter.name=fooWriter
                </jvmArguments>
            </configuration>
        </execution>
    </executions>
</plugin>

15. ItemProcessor 配置

如果 ApplicationContext 中有一个 ItemProcessor 可用,则单步批处理作业自动配置将接受该 ItemProcessor。如果找到正确类型(ItemProcessor<Map<String, Object>, Map<String, Object>>)的处理器,它将被自动注入到步骤中。

16. ItemWriter 实现的自动配置

此启动器为与支持的 ItemReader 实现相匹配的 ItemWriter 实现提供自动配置:AmqpItemWriterFlatFileItemWriterJdbcItemWriterKafkaItemWriter。本节介绍如何使用自动配置来配置受支持的 ItemWriter

16.1. AmqpItemWriter

要写入 RabbitMQ 队列,您需要两组配置。首先,您需要一个 AmqpTemplate。最简单的方法是使用 Spring Boot 的 RabbitMQ 自动配置。请参阅 Spring Boot AMQP 文档

配置 AmqpTemplate 后,您可以通过设置以下属性来配置 AmqpItemWriter

表 6. AmqpItemWriter 属性
财产 类型 默认值 描述

spring.batch.job.amqpitemwriter.enabled

布尔值

如果为 true,则自动配置运行。

spring.batch.job.amqpitemwriter.jsonConverterEnabled

布尔值

true

指示是否应注册 Jackson2JsonMessageConverter 以转换消息。

16.2. FlatFileItemWriter

要将文件作为步骤的输出写入,您可以配置 FlatFileItemWriter。自动配置接受已明确配置的组件(例如 LineAggregatorFieldExtractorFlatFileHeaderCallbackFlatFileFooterCallback)以及通过设置以下指定属性配置的组件

表 7. FlatFileItemWriter 属性
财产 类型 默认值 描述

spring.batch.job.flatfileitemwriter.resource

资源

null

要读取的资源。

spring.batch.job.flatfileitemwriter.delimited

布尔值

指示输出文件是否为分隔文件。如果为 true,则 spring.batch.job.flatfileitemwriter.formatted 必须为 false

spring.batch.job.flatfileitemwriter.formatted

布尔值

指示输出文件是否为格式化文件。如果为 true,则 spring.batch.job.flatfileitemwriter.delimited 必须为 false

spring.batch.job.flatfileitemwriter.format

字符串

null

用于为格式化文件生成输出的格式。格式化通过 String.format 执行。

spring.batch.job.flatfileitemwriter.locale

Locale

Locale.getDefault()

生成文件时使用的 Locale

spring.batch.job.flatfileitemwriter.maximumLength

int

0

记录的最大长度。如果为 0,则大小无限制。

spring.batch.job.flatfileitemwriter.minimumLength

int

0

最小记录长度。

spring.batch.job.flatfileitemwriter.delimiter

字符串

,

用于分隔分隔文件中字段的 String

spring.batch.job.flatfileitemwriter.encoding

字符串

FlatFileItemReader.DEFAULT_CHARSET

写入文件时使用的编码。

spring.batch.job.flatfileitemwriter.forceSync

布尔值

指示文件在刷新时是否应强制同步到磁盘。

spring.batch.job.flatfileitemwriter.names

String []

null

从记录中解析出的每个字段的名称列表。这些名称是此 ItemWriter 接收的项中 Map<String, Object> 的键。

spring.batch.job.flatfileitemwriter.append

布尔值

指示如果找到输出文件,是否应追加到文件。

spring.batch.job.flatfileitemwriter.lineSeparator

字符串

FlatFileItemWriter.DEFAULT_LINE_SEPARATOR

用于在输出文件中分隔行的 String

spring.batch.job.flatfileitemwriter.name

字符串

null

用于在 ExecutionContext 中提供唯一键的名称。

spring.batch.job.flatfileitemwriter.saveState

布尔值

true

确定是否应保存状态以进行重新启动。

spring.batch.job.flatfileitemwriter.shouldDeleteIfEmpty

布尔值

如果设置为 true,则在作业完成时删除空文件(无输出)。

spring.batch.job.flatfileitemwriter.shouldDeleteIfExists

布尔值

true

如果设置为 true,并且在输出文件应存在的位置找到文件,则在步骤开始前删除该文件。

spring.batch.job.flatfileitemwriter.transactional

布尔值

FlatFileItemWriter.DEFAULT_TRANSACTIONAL

指示读取器是否为事务队列(指示读取的项在失败时返回到队列)。

16.3. JdbcBatchItemWriter

要将步骤的输出写入关系数据库,此启动器提供了自动配置 JdbcBatchItemWriter 的能力。自动配置允许您通过设置以下属性提供您自己的 ItemPreparedStatementSetterItemSqlParameterSourceProvider 以及配置选项

表 8. JdbcBatchItemWriter 属性
财产 类型 默认值 描述

spring.batch.job.jdbcbatchitemwriter.name

字符串

null

用于在 ExecutionContext 中提供唯一键的名称。

spring.batch.job.jdbcbatchitemwriter.sql

字符串

null

用于插入每个项的 SQL。

spring.batch.job.jdbcbatchitemwriter.assertUpdates

布尔值

true

是否验证每次插入是否至少更新一条记录。

您还可以使用以下属性专门为写入器指定 JDBC DataSource:。JdbcBatchItemWriter 属性

财产 类型 默认值 描述

spring.batch.job.jdbcbatchitemwriter.datasource.enable

布尔值

确定是否应启用 JdbcCursorItemReader DataSource

jdbcbatchitemwriter.datasource.url

字符串

null

数据库的 JDBC URL。

jdbcbatchitemwriter.datasource.username

字符串

null

数据库的登录用户名。

jdbcbatchitemwriter.datasource.password

字符串

null

数据库的登录密码。

jdbcbatchitemreader.datasource.driver-class-name

字符串

null

JDBC 驱动程序的完全限定名称。

如果未指定 jdbcbatchitemwriter_datasource,则 JdbcBatchItemWriter 将使用默认的 DataSource

16.4. KafkaItemWriter

要将步骤输出写入 Kafka 主题,您需要 KafkaItemWriter。此启动器通过使用两个来源的功能为 KafkaItemWriter 提供自动配置。首先是 Spring Boot 的 Kafka 自动配置。(请参阅 Spring Boot Kafka 文档。)其次,此启动器允许您配置写入器上的两个属性。

表 9. KafkaItemWriter 属性
财产 类型 默认值 描述

spring.batch.job.kafkaitemwriter.topic

字符串

null

要写入的 Kafka 主题。

spring.batch.job.kafkaitemwriter.delete

布尔值

传递给写入器的所有项是否都作为删除事件发送到主题。

有关 KafkaItemWriter 的配置选项的更多信息,请参阅 KafkaItemWiter 文档

16.5. Spring AOT

当您将 Spring AOT 与单步批处理启动器一起使用时,您必须在编译时设置读取器和写入器名称属性(除非您为读取器和/或写入器创建 Bean)。为此,您必须在 Maven 插件或 Gradle 插件的启动参数或环境变量中包含您希望使用的读取器和写入器名称。例如,如果您希望在 Maven 中启用 FlatFileItemReaderFlatFileItemWriter,它将如下所示

    <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
        <executions>
            <execution>
            <id>process-aot</id>
            <goals>
                <goal>process-aot</goal>
            </goals>
            </execution>
        </executions>
        <configuration>
            <arguments>
                <argument>--spring.batch.job.flatfileitemreader.name=foobar</argument>
                <argument>--spring.batch.job.flatfileitemwriter.name=fooWriter</argument>
            </arguments>
        </configuration>
    </plugin>

Spring Cloud Stream 集成

任务本身可能很有用,但将任务集成到更大的生态系统中可以使其用于更复杂的处理和编排。本节涵盖 Spring Cloud Task 与 Spring Cloud Stream 的集成选项。

17. 从 Spring Cloud Stream 启动任务

您可以从流中启动任务。为此,创建一个侦听包含 TaskLaunchRequest 作为其有效负载的消息的接收器。TaskLaunchRequest 包含

  • uri: 要执行的任务工件的 URI。

  • applicationName: 与任务关联的名称。如果未设置 applicationName,TaskLaunchRequest 将生成一个由以下内容组成的任务名称:Task-<UUID>

  • commandLineArguments: 包含任务命令行参数的列表。

  • environmentProperties: 包含任务要使用的环境变量的映射。

  • deploymentProperties: 包含部署器用于部署任务的属性的映射。

如果有效负载是不同类型,则接收器会抛出异常。

例如,可以创建一个流,其中包含一个处理器,该处理器从 HTTP 源获取数据并创建包含 TaskLaunchRequestGenericMessage,并将消息发送到其输出通道。然后任务接收器将从其输入通道接收消息并启动任务。

要创建任务接收器,您只需要创建一个包含 EnableTaskLauncher 注解的 Spring Boot 应用程序,如下例所示

@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
    public static void main(String[] args) {
        SpringApplication.run(TaskSinkApplication.class, args);
    }
}

Spring Cloud Task 项目的 示例模块 包含一个示例 Sink 和 Processor。要将这些示例安装到您的本地 maven 仓库中,请从 spring-cloud-task-samples 目录运行 maven 构建,并将 skipInstall 属性设置为 false,如下例所示

mvn clean install

maven.remoteRepositories.springRepo.url 属性必须设置为 uber-jar 所在的远程仓库的位置。如果未设置,则没有远程仓库,因此它只依赖于本地仓库。

17.1. Spring Cloud Data Flow

要在 Spring Cloud Data Flow 中创建流,您必须首先注册我们创建的任务接收器应用程序。在以下示例中,我们使用 Spring Cloud Data Flow shell 注册 Processor 和 Sink 示例应用程序

app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>

以下示例展示了如何从 Spring Cloud Data Flow shell 创建流

stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy

18. Spring Cloud Task 事件

Spring Cloud Task 提供了在任务通过 Spring Cloud Stream 通道运行时,通过 Spring Cloud Stream 通道发出事件的能力。任务监听器用于在名为 task-events 的消息通道上发布 TaskExecution。此功能会自动装配到任何具有 spring-cloud-streamspring-cloud-stream-<binder> 并在其类路径上定义了任务的任务中。

要禁用事件发射监听器,请将 spring.cloud.task.events.enabled 属性设置为 false

通过定义适当的类路径,以下任务会在 task-events 通道(在任务开始和结束时)发出 TaskExecution 作为事件

@SpringBootApplication
public class TaskEventsApplication {

    public static void main(String[] args) {
        SpringApplication.run(TaskEventsApplication.class, args);
    }

    @Configuration
    public static class TaskConfiguration {

        @Bean
        public ApplicationRunner applicationRunner() {
            return new ApplicationRunner() {
                @Override
                public void run(ApplicationArguments args) {
                    System.out.println("The ApplicationRunner was executed");
                }
            };
        }
    }
}
还需要在类路径中包含一个绑定器实现。
Spring Cloud Task 项目的 samples 模块中提供了任务事件示例应用程序,点击此处查看。

18.1. 禁用特定任务事件

要禁用任务事件,可以将 spring.cloud.task.events.enabled 属性设置为 false

19. Spring Batch 事件

当通过任务执行 Spring Batch 作业时,Spring Cloud Task 可以配置为根据 Spring Batch 中可用的 Spring Batch 监听器发出信息性消息。具体来说,当通过 Spring Cloud Task 运行时,以下 Spring Batch 监听器会自动配置到每个批处理作业中,并在关联的 Spring Cloud Stream 通道上发出消息

  • JobExecutionListener 监听 job-execution-events

  • StepExecutionListener 监听 step-execution-events

  • ChunkListener 监听 chunk-events

  • ItemReadListener 监听 item-read-events

  • ItemProcessListener 监听 item-process-events

  • ItemWriteListener 监听 item-write-events

  • SkipListener 监听 skip-events

当上下文中存在适当的 Bean(一个 Job 和一个 TaskLifecycleListener)时,这些监听器会自动配置到任何 AbstractJob 中。监听这些事件的配置方式与绑定到任何其他 Spring Cloud Stream 通道的方式相同。我们的任务(运行批处理作业的任务)充当 Source,而监听应用程序充当 ProcessorSink

一个示例可以是有一个应用程序监听 job-execution-events 通道以获取作业的开始和停止。要配置监听应用程序,您可以将输入配置为 job-execution-events,如下所示

spring.cloud.stream.bindings.input.destination=job-execution-events

还需要在类路径中包含一个绑定器实现。
Spring Cloud Task 项目的 samples 模块中提供了批处理事件示例应用程序,点击此处查看。

19.1. 将批处理事件发送到不同通道

Spring Cloud Task 为批处理事件提供的选项之一是能够更改特定监听器可以发出消息的通道。为此,请使用以下配置:spring.cloud.stream.bindings.<the channel>.destination=<new destination>。例如,如果 StepExecutionListener 需要将其消息发出到名为 my-step-execution-events 的另一个通道,而不是默认的 step-execution-events,您可以添加以下配置

spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events

19.2. 禁用批处理事件

要禁用所有批处理事件的监听器功能,请使用以下配置

spring.cloud.task.batch.events.enabled=false

要禁用特定的批处理事件,请使用以下配置

spring.cloud.task.batch.events.<batch event listener>.enabled=false:

以下列表显示了您可以禁用的各个监听器

spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false

19.3. 批处理事件的发出顺序

默认情况下,批处理事件具有 Ordered.LOWEST_PRECEDENCE。要更改此值(例如,更改为 5),请使用以下配置

spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5

附录

20. 任务仓库模式

本附录提供了任务仓库中使用的数据库模式的 ERD。

task schema

20.1. 表信息

TASK_EXECUTION

存储任务执行信息。

列名 必需 类型 字段长度 备注

TASK_EXECUTION_ID

TRUE

BIGINT

X

Spring Cloud Task 框架在应用程序启动时,从 TASK_SEQ 获取下一个可用的 ID。如果记录是在任务外部创建的,则必须在记录创建时填充该值。

START_TIME

FALSE

DATETIME(6)

X

Spring Cloud Task 框架在应用程序启动时设定该值。

END_TIME

FALSE

DATETIME(6)

X

Spring Cloud Task 框架在应用程序退出时设定该值。

TASK_NAME

FALSE

VARCHAR

100

除非用户使用 spring.application.name 建立名称,否则 Spring Cloud Task 框架在应用程序启动时会将其设置为“Application”。

EXIT_CODE

FALSE

INTEGER

X

遵循 Spring Boot 默认值,除非用户根据此处讨论的进行覆盖。

EXIT_MESSAGE

FALSE

VARCHAR

2500

用户自定义,如此处讨论。

ERROR_MESSAGE

FALSE

VARCHAR

2500

Spring Cloud Task 框架在应用程序退出时设定该值。

LAST_UPDATED

TRUE

TIMESTAMP

X

Spring Cloud Task 框架在应用程序启动时设定该值。如果记录是在任务外部创建的,则必须在记录创建时填充该值。

EXTERNAL_EXECUTION_ID

FALSE

VARCHAR

250

如果设置了 spring.cloud.task.external-execution-id 属性,则 Spring Cloud Task 框架在应用程序启动时会将其设置为指定的值。更多信息可以在 此处 找到

PARENT_TASK_EXECUTION_ID

FALSE

BIGINT

X

如果设置了 spring.cloud.task.parent-execution-id 属性,则 Spring Cloud Task 框架在应用程序启动时会将其设置为指定的值。更多信息可以在 此处 找到

TASK_EXECUTION_PARAMS

存储任务执行使用的参数

列名 必需 类型 字段长度

TASK_EXECUTION_ID

TRUE

BIGINT

X

TASK_PARAM

FALSE

VARCHAR

2500

TASK_TASK_BATCH

用于将任务执行链接到批处理执行。

列名 必需 类型 字段长度

TASK_EXECUTION_ID

TRUE

BIGINT

X

JOB_EXECUTION_ID

TRUE

BIGINT

X

TASK_LOCK

用于 此处 讨论的 single-instance-enabled 功能。

列名 必需 类型 字段长度 备注

LOCK_KEY

TRUE

CHAR

36

此锁的 UUID

REGION

TRUE

VARCHAR

100

用户可以使用此字段建立一组锁。

CLIENT_ID

TRUE

CHAR

36

包含要锁定应用程序名称的任务执行 ID。

CREATED_DATE

TRUE

DATETIME

X

条目创建日期

每种数据库类型设置表的 DDL 可以在此处找到。

20.2. SQL Server

默认情况下,Spring Cloud Task 使用序列表来确定 TASK_EXECUTION 表的 TASK_EXECUTION_ID。但是,当在 SQL Server 上同时启动多个任务时,这可能会导致 TASK_SEQ 表死锁。解决方案是删除 TASK_EXECUTION_SEQ 表并使用相同的名称创建一个序列。例如:

DROP TABLE TASK_SEQ;

CREATE SEQUENCE [DBO].[TASK_SEQ] AS BIGINT
 START WITH 1
 INCREMENT BY 1;
START WITH 设置为高于当前执行 ID 的值。

21. 构建此文档

本项目使用 Maven 生成此文档。要自行生成,请运行以下命令:$ mvn clean install -DskipTests -P docs

22. 可观察性元数据

22.1. 可观察性 - 指标

以下是本项目声明的所有指标列表。

22.1.1. 任务活跃

围绕任务执行创建的指标。

指标名称 spring.cloud.task(由约定类 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention 定义)。 类型 timer

指标名称 spring.cloud.task.active(由约定类 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention 定义)。 类型 long task timer

在启动观测后添加的键值可能会从 *.active 指标中缺失。
Micrometer 内部使用 纳秒 作为基本单位。但是,每个后端确定实际的基本单位。(即 Prometheus 使用秒)

封闭类的完全限定名称 org.springframework.cloud.task.listener.TaskExecutionObservation

所有标签必须以 spring.cloud.task 前缀开头!
表 10. 低基数键

名称

描述

spring.cloud.task.cf.app.id (必需)

CF 云的应用程序 ID。

spring.cloud.task.cf.app.name (必需)

CF 云的应用程序名称。

spring.cloud.task.cf.app.version (必需)

CF 云的应用程序版本。

spring.cloud.task.cf.instance.index (必需)

CF 云的实例索引。

spring.cloud.task.cf.org.name (必需)

CF 云的组织名称。

spring.cloud.task.cf.space.id (必需)

CF 云的空间 ID。

spring.cloud.task.cf.space.name (必需)

CF 云的空间名称。

spring.cloud.task.execution.id (必需)

任务执行 ID。

spring.cloud.task.exit.code (必需)

任务退出代码。

spring.cloud.task.external.execution.id (必需)

任务的外部执行 ID。

spring.cloud.task.name (必需)

任务名称测量。

spring.cloud.task.parent.execution.id (必需)

任务父级执行 ID。

spring.cloud.task.status (必需)

任务状态。可以是成功或失败。

22.1.2. 任务运行器观察

任务运行器执行时创建的观测。

指标名称 spring.cloud.task.runner(由约定类 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention 定义)。 类型 timer

指标名称 spring.cloud.task.runner.active(由约定类 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention 定义)。 类型 long task timer

在启动观测后添加的键值可能会从 *.active 指标中缺失。
Micrometer 内部使用 纳秒 作为基本单位。但是,每个后端确定实际的基本单位。(即 Prometheus 使用秒)

封闭类的完全限定名称 org.springframework.cloud.task.configuration.observation.TaskDocumentedObservation

所有标签必须以 spring.cloud.task 前缀开头!
表11. 低基数键

名称

描述

spring.cloud.task.runner.bean-name (必需)

由 Spring Cloud Task 执行的 bean 的名称。

22.2. 可观察性 - 跨度

以下是本项目声明的所有 Span 列表。

22.2.1. 任务活跃跨度

围绕任务执行创建的指标。

跨度名称 spring.cloud.task(由约定类 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention 定义)。

封闭类的完全限定名称 org.springframework.cloud.task.listener.TaskExecutionObservation

所有标签必须以 spring.cloud.task 前缀开头!
表 12. 标签键

名称

描述

spring.cloud.task.cf.app.id (必需)

CF 云的应用程序 ID。

spring.cloud.task.cf.app.name (必需)

CF 云的应用程序名称。

spring.cloud.task.cf.app.version (必需)

CF 云的应用程序版本。

spring.cloud.task.cf.instance.index (必需)

CF 云的实例索引。

spring.cloud.task.cf.org.name (必需)

CF 云的组织名称。

spring.cloud.task.cf.space.id (必需)

CF 云的空间 ID。

spring.cloud.task.cf.space.name (必需)

CF 云的空间名称。

spring.cloud.task.execution.id (必需)

任务执行 ID。

spring.cloud.task.exit.code (必需)

任务退出代码。

spring.cloud.task.external.execution.id (必需)

任务的外部执行 ID。

spring.cloud.task.name (必需)

任务名称测量。

spring.cloud.task.parent.execution.id (必需)

任务父级执行 ID。

spring.cloud.task.status (必需)

任务状态。可以是成功或失败。

22.2.2. 任务运行器观察跨度

任务运行器执行时创建的观测。

跨度名称 spring.cloud.task.runner(由约定类 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention 定义)。

封闭类的完全限定名称 org.springframework.cloud.task.configuration.observation.TaskDocumentedObservation

所有标签必须以 spring.cloud.task 前缀开头!
表 13. 标签键

名称

描述

spring.cloud.task.runner.bean-name (必需)

由 Spring Cloud Task 执行的 bean 的名称。

© . This site is unofficial and not affiliated with VMware.