© 2009-2020 VMware, Inc. 保留所有权利。
本文档可供您自行使用和分发给他人,但前提是您不得对这些副本收取任何费用,并且无论是以印刷版还是电子版形式分发,每个副本都必须包含本版权声明。
1. 前言
本节简要概述了 Spring Cloud Task 参考文档。可以将其视为本文档其余部分的地图。您可以按顺序阅读本参考指南,如果某些部分不感兴趣,也可以跳过。
1.1. 关于文档
Spring Cloud Task 参考指南提供 html 和 pdf、epub 版本。最新副本可从 docs.spring.io/spring-cloud-task/docs/current-SNAPSHOT/reference/html/ 获取。
本文档可供您自行使用和分发给他人,但前提是您不得对这些副本收取任何费用,并且无论是以印刷版还是电子版形式分发,每个副本都必须包含本版权声明。
1.2. 获取帮助
使用 Spring Cloud Task 遇到问题?我们乐意提供帮助!
-
提问。我们会在 stackoverflow.com 监控标记为
spring-cloud-task
的问题。 -
在 github.com/spring-cloud/spring-cloud-task/issues 报告 Spring Cloud Task 的错误。
Spring Cloud Task 的所有内容都是开源的,包括文档。如果您发现文档有问题或只想改进它们,请 参与进来。 |
1.3. 第一步
如果您刚开始使用 Spring Cloud Task 或广义上的 'Spring',我们建议阅读入门一章。
要从头开始,请阅读以下章节:
要 mengikuti 教程,请阅读 开发您的第一个 Spring Cloud Task 应用
要运行您的示例,请阅读 运行示例
2. 入门
如果您刚开始使用 Spring Cloud Task,应阅读本节。在此,我们将解答基本的“是什么?”、“如何做?”和“为什么?”等问题。我们首先温和地介绍 Spring Cloud Task。然后,我们将构建一个 Spring Cloud Task 应用,并在过程中讨论一些核心原则。
2.1. Spring Cloud Task 介绍
Spring Cloud Task 使得创建短生命周期的微服务变得容易。它提供了能够在生产环境中按需执行短生命周期的 JVM 进程的功能。
2.2. 系统要求
您需要安装 Java(Java 17 或更高版本)。要进行构建,还需要安装 Maven。
2.2.1. 数据库要求
Spring Cloud Task 使用关系型数据库来存储已执行任务的结果。虽然您可以在没有数据库的情况下开始开发任务(任务状态会作为任务仓库更新的一部分被记录到日志中),但对于生产环境,您需要使用一个受支持的数据库。Spring Cloud Task 当前支持以下数据库:
-
DB2
-
H2
-
HSQLDB
-
MySql
-
Oracle
-
Postgres
-
SqlServer
2.3. 开发您的第一个 Spring Cloud Task 应用
一个好的起点是简单的“Hello, World!”应用,因此我们创建相应的 Spring Cloud Task 应用来突出框架的特性。大多数 IDE 对 Apache Maven 都有良好的支持,所以我们将其用作本项目的构建工具。
spring.io 网站包含许多使用 Spring Boot 的 “入门 ”指南。如果您需要解决特定问题,请先在那里查找。您可以通过访问 Spring Initializr 创建新项目来简化以下步骤。这样做会自动生成新的项目结构,以便您可以立即开始编码。我们建议您尝试使用 Spring Initializr 以熟悉它。 |
2.3.1. 使用 Spring Initializr 创建 Spring Task 项目
现在我们可以创建一个并测试一个向控制台打印 Hello, World!
的应用。
为此:
-
访问 Spring Initialzr 网站。
-
创建一个新的 Maven 项目,其中 Group 名称为
io.spring.demo
, Artifact 名称为helloworld
。 -
在 Dependencies(依赖项)文本框中,输入
task
,然后选择Cloud Task
依赖项。 -
在 Dependencies(依赖项)文本框中,输入
jdbc
,然后选择JDBC
依赖项。 -
在 Dependencies(依赖项)文本框中,输入
h2
,然后选择H2
。(或您喜欢的数据库) -
点击 Generate Project(生成项目)按钮
-
-
解压 helloworld.zip 文件,并将项目导入您喜欢的 IDE 中。
2.3.2. 编写代码
为了完成我们的应用,我们需要使用以下内容更新生成的 HelloworldApplication
,以便它能够启动一个任务。
package io.spring.Helloworld;
@SpringBootApplication
@EnableTask
public class HelloworldApplication {
@Bean
public ApplicationRunner applicationRunner() {
return new HelloWorldApplicationRunner();
}
public static void main(String[] args) {
SpringApplication.run(HelloworldApplication.class, args);
}
public static class HelloWorldApplicationRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
System.out.println("Hello, World!");
}
}
}
虽然代码量可能看起来不大,但其背后发生了很多事情。有关 Spring Boot 的更多细节,请参阅 Spring Boot 参考文档。
现在我们可以打开位于 src/main/resources
目录下的 application.properties
文件。我们需要在 application.properties
中配置两个属性:
-
application.name
:设置应用名称(它会被转换为任务名称) -
logging.level
:将 Spring Cloud Task 的日志级别设置为DEBUG
,以便查看其运行情况。
以下示例展示了如何同时进行这两项配置:
logging.level.org.springframework.cloud.task=DEBUG
spring.application.name=helloWorld
任务自动配置
包含 Spring Cloud Task Starter 依赖项时,Task 会自动配置所有 bean 以引导其功能。此配置的一部分会注册 TaskRepository
及其使用所需的基础设施。
在我们的演示中,TaskRepository
使用嵌入式 H2 数据库来记录任务的结果。这种嵌入式 H2 数据库对于生产环境来说并不是一个实用的解决方案,因为任务结束后 H2 数据库就会消失。然而,为了快速入门体验,我们可以在示例中使用它,同时将仓库中更新的内容回显到日志中。在本文档后面介绍的配置部分,我们将介绍如何自定义 Spring Cloud Task 提供的各个组件的配置。
当我们的示例应用运行时,Spring Boot 会启动我们的 HelloWorldCommandLineRunner
并将我们的“Hello, World!”消息输出到标准输出。TaskLifecycleListener
会在仓库中记录任务的开始和结束。
main 方法
main 方法作为任何 Java 应用的入口点。我们的 main 方法委托给 Spring Boot 的 SpringApplication 类。
ApplicationRunner
Spring 包含多种引导应用逻辑的方式。Spring Boot 通过其 *Runner
接口(CommandLineRunner
或 ApplicationRunner
)提供了一种便捷且组织化的方式。一个设计良好的任务可以通过使用这两种 runner 之一来引导任何逻辑。
任务的生命周期被认为是开始于 *Runner#run
方法执行之前,结束于所有方法执行完毕之后。Spring Boot 允许应用使用多个 *Runner
实现,Spring Cloud Task 也是如此。
任何通过 CommandLineRunner 或 ApplicationRunner 以外的机制(例如使用 InitializingBean#afterPropertiesSet )引导的处理都不会被 Spring Cloud Task 记录。 |
2.3.3. 运行示例
至此,我们的应用应该可以工作了。由于这个应用基于 Spring Boot,我们可以从应用根目录使用命令行 $ mvn spring-boot:run
来运行它,如下例所示(包括其输出):
$ mvn clean spring-boot:run
....... . . .
....... . . . (Maven log output here)
....... . . .
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.0.3.RELEASE)
2018-07-23 17:44:34.426 INFO 1978 --- [ main] i.s.d.helloworld.HelloworldApplication : Starting HelloworldApplication on Glenns-MBP-2.attlocal.net with PID 1978 (/Users/glennrenfro/project/helloworld/target/classes started by glennrenfro in /Users/glennrenfro/project/helloworld)
2018-07-23 17:44:34.430 INFO 1978 --- [ main] i.s.d.helloworld.HelloworldApplication : No active profile set, falling back to default profiles: default
2018-07-23 17:44:34.472 INFO 1978 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1d24f32d: startup date [Mon Jul 23 17:44:34 EDT 2018]; root of context hierarchy
2018-07-23 17:44:35.280 INFO 1978 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2018-07-23 17:44:35.410 INFO 1978 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2018-07-23 17:44:35.419 DEBUG 1978 --- [ main] o.s.c.t.c.SimpleTaskConfiguration : Using org.springframework.cloud.task.configuration.DefaultTaskConfigurer TaskConfigurer
2018-07-23 17:44:35.420 DEBUG 1978 --- [ main] o.s.c.t.c.DefaultTaskConfigurer : No EntityManager was found, using DataSourceTransactionManager
2018-07-23 17:44:35.522 DEBUG 1978 --- [ main] o.s.c.t.r.s.TaskRepositoryInitializer : Initializing task schema for h2 database
2018-07-23 17:44:35.525 INFO 1978 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executing SQL script from class path resource [org/springframework/cloud/task/schema-h2.sql]
2018-07-23 17:44:35.558 INFO 1978 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executed SQL script from class path resource [org/springframework/cloud/task/schema-h2.sql] in 33 ms.
2018-07-23 17:44:35.728 INFO 1978 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2018-07-23 17:44:35.730 INFO 1978 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Bean with name 'dataSource' has been autodetected for JMX exposure
2018-07-23 17:44:35.733 INFO 1978 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Located MBean 'dataSource': registering with JMX server as MBean [com.zaxxer.hikari:name=dataSource,type=HikariDataSource]
2018-07-23 17:44:35.738 INFO 1978 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2018-07-23 17:44:35.762 DEBUG 1978 --- [ main] o.s.c.t.r.support.SimpleTaskRepository : Creating: TaskExecution{executionId=0, parentExecutionId=null, exitCode=null, taskName='application', startTime=Mon Jul 23 17:44:35 EDT 2018, endTime=null, exitMessage='null', externalExecutionId='null', errorMessage='null', arguments=[]}
2018-07-23 17:44:35.772 INFO 1978 --- [ main] i.s.d.helloworld.HelloworldApplication : Started HelloworldApplication in 1.625 seconds (JVM running for 4.764)
Hello, World!
2018-07-23 17:44:35.782 DEBUG 1978 --- [ main] o.s.c.t.r.support.SimpleTaskRepository : Updating: TaskExecution with executionId=1 with the following {exitCode=0, endTime=Mon Jul 23 17:44:35 EDT 2018, exitMessage='null', errorMessage='null'}
上述输出有三行是我们需要关注的:
-
SimpleTaskRepository
记录了在TaskRepository
中创建条目。 -
我们的
CommandLineRunner
的执行,通过“Hello, World!”输出展示。 -
SimpleTaskRepository
在TaskRepository
中记录了任务的完成。
Spring Cloud Task 项目的 samples 模块中可以找到一个简单的任务应用,点此查看。 |
3. 特性
本节将更详细地介绍 Spring Cloud Task,包括如何使用、如何配置以及适当的扩展点。
3.1. Spring Cloud Task 的生命周期
在大多数情况下,现代云环境的设计围绕着不期望结束的进程的执行。如果它们确实结束了,通常会被重新启动。虽然大多数平台都有某种方式可以运行结束时不会重新启动的进程,但其运行结果通常不会以一种可消费的方式维护。Spring Cloud Task 提供了在环境中执行短生命周期进程并记录结果的能力。这样做可以通过消息集成任务,从而构建围绕短生命周期进程以及长期运行服务的微服务架构。
虽然此功能在云环境中很有用,但同样的问题也可能出现在传统的部署模型中。当使用 cron 等调度器运行 Spring Boot 应用时,能够在应用完成后监控其结果可能会很有用。
Spring Cloud Task 采取的方法是,Spring Boot 应用可以有开始和结束,并且仍然是成功的。批量应用就是期望结束(并且通常是短生命周期)的进程如何有用的一个例子。
Spring Cloud Task 记录给定任务的生命周期事件。大多数长期运行的进程,以大多数 Web 应用为典型代表,不保存其生命周期事件。而 Spring Cloud Task 核心的任务会保存。
生命周期由一个任务执行组成。这是配置为任务的 Spring Boot 应用的一次物理执行(即,它具有 Spring Cloud Task 依赖项)。
在任务开始时,在任何 CommandLineRunner
或 ApplicationRunner
实现运行之前,会在 TaskRepository
中创建一个条目来记录开始事件。此事件通过 Spring Framework 触发 SmartLifecycle#start
来触发。这向系统表明所有 bean 都已准备就绪,并在运行 Spring Boot 提供的任何 CommandLineRunner
或 ApplicationRunner
实现之前发生。
任务的记录仅在 ApplicationContext 成功引导时发生。如果上下文完全无法引导,则任务的运行不会被记录。 |
在 Spring Boot 的所有 *Runner#run
调用完成后,或者 ApplicationContext
失败(由 ApplicationFailedEvent
指示)时,任务执行结果会在仓库中更新。
如果应用需要在任务完成后(所有 *Runner#run 方法已调用且任务仓库已更新)关闭 ApplicationContext ,请将属性 spring.cloud.task.closecontextEnabled 设置为 true。 |
3.1.1. TaskExecution
存储在 TaskRepository
中的信息建模在 TaskExecution
类中,包含以下信息:
字段 | 描述 |
---|---|
|
任务运行的唯一 ID。 |
|
由 |
|
任务的名称,由配置的 |
|
任务开始的时间,由 |
|
任务完成的时间,由 |
|
退出时可用的任何信息。这可以通过 |
|
如果异常是任务结束的原因(如 |
|
传递给可执行 boot 应用的字符串命令行参数的 |
3.1.2. 映射退出码
任务完成后,它会尝试向操作系统返回一个退出码。如果我们查看 原始示例,可以看到我们并未控制应用的这方面。因此,如果抛出异常,JVM 返回的代码可能对您的调试有用,也可能没用。
因此,Spring Boot 提供了一个接口 ExitCodeExceptionMapper
,允许您将未捕获的异常映射到退出码。这样做可以让您在退出码层面指示出错原因。此外,通过这种方式映射退出码,Spring Cloud Task 会记录返回的退出码。
如果任务以 SIG-INT 或 SIG-TERM 终止,除非在代码中另有指定,否则退出码为零。
任务运行时,退出码在仓库中存储为 null。任务完成后,会根据本节前面所述的指南存储适当的退出码。 |
3.2. 配置
Spring Cloud Task 提供了一套开箱即用的配置,这些配置定义在 DefaultTaskConfigurer
和 SimpleTaskConfiguration
类中。本节将介绍默认配置以及如何根据您的需求自定义 Spring Cloud Task。
3.2.1. DataSource
Spring Cloud Task 使用一个数据源来存储任务执行的结果。默认情况下,我们提供了一个内存中的 H2 实例,以提供一种简单的开发引导方法。然而,在生产环境中,您可能希望配置自己的 DataSource
。
如果您的应用仅使用一个 DataSource
,并且该数据源同时用于您的业务 schema 和任务仓库,那么您只需提供任何 DataSource
即可(最简单的方法是通过 Spring Boot 的配置约定来提供)。Spring Cloud Task 会自动使用此 DataSource
作为任务仓库。
如果您的应用使用多个 DataSource
,您需要使用适当的 DataSource
来配置任务仓库。这种自定义可以通过实现 TaskConfigurer
来完成。
3.2.2. 表前缀
TaskRepository
的一个可修改属性是任务表的表前缀。默认情况下,所有任务表都以 TASK_
作为前缀。TASK_EXECUTION
和 TASK_EXECUTION_PARAMS
是两个例子。然而,可能有一些原因需要修改此前缀。如果需要将 schema 名称作为表名的前缀,或者在同一 schema 中需要多组任务表,则必须更改表前缀。您可以通过将 spring.cloud.task.tablePrefix
设置为您需要的前缀来实现,如下所示:
spring.cloud.task.tablePrefix=yourPrefix
通过使用 spring.cloud.task.tablePrefix
,用户承担创建满足任务表 schema 要求且根据用户业务需求进行修改的任务表的责任。您可以使用 Spring Cloud Task Schema DDL 作为创建您自己的任务 DDL 的参考,点此查看。
3.2.3. 启用/禁用表初始化
在您自行创建任务表,并且不希望 Spring Cloud Task 在任务启动时创建它们的情况下,将属性 spring.cloud.task.initialize-enabled
设置为 false
,如下所示:
spring.cloud.task.initialize-enabled=false
默认值为 true
。
属性 spring.cloud.task.initialize.enable 已被弃用。 |
3.2.4. 外部生成的任务 ID
在某些情况下,您可能希望考虑到任务被请求与基础设施实际启动它之间的时间差。Spring Cloud Task 允许您在任务被请求时创建一个 TaskExecution
。然后将生成的 TaskExecution
的执行 ID 传递给任务,以便任务可以在其生命周期中更新 TaskExecution
。
通过在引用存储 TaskExecution
对象的 datastore 的 TaskRepository
实现上调用 createTaskExecution
方法,可以创建一个 TaskExecution
。
为了配置您的任务使用生成的 TaskExecutionId
,添加以下属性:
spring.cloud.task.executionid=yourtaskId
3.2.5. 外部任务 ID
Spring Cloud Task 允许您为每个 TaskExecution
存储一个外部任务 ID。为了配置您的任务使用生成的 TaskExecutionId
,添加以下属性:
spring.cloud.task.external-execution-id=<externalTaskId>
3.2.6. 父任务 ID
Spring Cloud Task 允许您为每个 TaskExecution
存储一个父任务 ID。一个例子是,一个任务执行另一个或多个任务,并且您想记录是哪个任务启动了每个子任务。为了配置您的子任务设置父 TaskExecutionId
,请在子任务中添加以下属性:
spring.cloud.task.parent-execution-id=<parentExecutionTaskId>
3.2.7. TaskConfigurer
TaskConfigurer
是一个策略接口,允许您自定义 Spring Cloud Task 各组件的配置方式。默认情况下,我们提供 DefaultTaskConfigurer
,它提供逻辑默认值:基于 Map
的内存组件(在未提供 DataSource
时对开发有用)和基于 JDBC 的组件(在有 DataSource
可用时有用)。
TaskConfigurer
允许您配置三个主要组件:
组件 | 描述 | 默认值(由 DefaultTaskConfigurer 提供) |
---|---|---|
|
将使用的 |
|
|
将使用的 |
|
|
用于任务更新时的事务管理器。 |
如果使用了 |
您可以通过创建 TaskConfigurer
接口的自定义实现来定制前面表格中描述的任何组件。通常,扩展 DefaultTaskConfigurer
(如果在找不到 TaskConfigurer
时提供)并重写所需的 getter 就足够了。但是,可能需要从头开始实现您自己的版本。
用户不应直接使用 TaskConfigurer 中的 getter 方法,除非他们是使用它来提供要暴露为 Spring Bean 的实现。 |
3.2.8. 任务执行监听器
TaskExecutionListener
允许您注册任务生命周期中发生的特定事件的监听器。为此,创建一个实现 TaskExecutionListener
接口的类。实现 TaskExecutionListener
接口的类会收到以下事件的通知:
-
onTaskStartup
:在将TaskExecution
存储到TaskRepository
之前。 -
onTaskEnd
:在更新TaskExecution
在TaskRepository
中的条目并标记任务最终状态之前。 -
onTaskFailed
:当任务抛出未处理的异常时,在调用onTaskEnd
方法之前。
Spring Cloud Task 还允许您使用以下方法注解将 TaskExecution
监听器添加到 bean 中的方法上:
-
@BeforeTask
:在将TaskExecution
存储到TaskRepository
之前。 -
@AfterTask
:在更新TaskExecution
在TaskRepository
中的条目并标记任务最终状态之前。 -
@FailedTask
:当任务抛出未处理的异常时,在调用@AfterTask
方法之前。
以下示例展示了这三个注解的使用:
public class MyBean {
@BeforeTask
public void methodA(TaskExecution taskExecution) {
}
@AfterTask
public void methodB(TaskExecution taskExecution) {
}
@FailedTask
public void methodC(TaskExecution taskExecution, Throwable throwable) {
}
}
在链中插入一个早于 TaskLifecycleListener 的 ApplicationListener 可能会导致意外的效果。 |
任务执行监听器抛出的异常
如果 TaskExecutionListener
事件处理器抛出异常,该事件处理器的所有监听器处理都会停止。例如,如果三个 onTaskStartup
监听器已启动,并且第一个 onTaskStartup
事件处理器抛出异常,则不会调用另外两个 onTaskStartup
方法。但是,TaskExecutionListeners
的其他事件处理器(onTaskEnd
和 onTaskFailed
)会被调用。
当 TaskExecutionListener
事件处理器抛出异常时返回的退出码是 ExitCodeEvent 报告的退出码。如果没有发出 ExitCodeEvent
,则会评估抛出的异常,看其是否属于 ExitCodeGenerator 类型。如果是,则返回 ExitCodeGenerator
的退出码。否则,返回 1
。
如果在 onTaskStartup
方法中抛出异常,应用的退出码将为 1
。如果在 onTaskEnd
或 onTaskFailed
方法中抛出异常,应用的退出码将是根据上述规则确定的退出码。
如果在 onTaskStartup 、onTaskEnd 或 onTaskFailed 中抛出异常,您不能使用 ExitCodeExceptionMapper 来覆盖应用的退出码。 |
退出消息
您可以使用 TaskExecutionListener
以编程方式设置任务的退出消息。这是通过设置 TaskExecution
的 exitMessage
来完成的,然后该消息会传递给 TaskExecutionListener
。以下示例显示了一个使用 @AfterTask
ExecutionListener
注解的方法:
@AfterTask
public void afterMe(TaskExecution taskExecution) {
taskExecution.setExitMessage("AFTER EXIT MESSAGE");
}
可以在任何监听器事件(onTaskStartup
、onTaskFailed
和 onTaskEnd
)中设置 ExitMessage
。三个监听器的优先顺序如下:
-
onTaskEnd
-
onTaskFailed
-
onTaskStartup
例如,如果您为 onTaskStartup
和 onTaskFailed
监听器设置了 exitMessage
,并且任务在没有失败的情况下结束,则存储在仓库中的是来自 onTaskStartup
的 exitMessage
。否则,如果发生失败,则存储来自 onTaskFailed
的 exitMessage
。此外,如果您使用 onTaskEnd
监听器设置了 exitMessage
,则来自 onTaskEnd
的 exitMessage
会覆盖来自 onTaskStartup
和 onTaskFailed
的退出消息。
3.2.9. 限制 Spring Cloud Task 实例
Spring Cloud Task 允许您指定具有给定任务名称的任务一次只能运行一个实例。为此,您需要指定任务名称,并为每个任务执行设置 spring.cloud.task.single-instance-enabled=true
。当第一个任务执行正在运行时,如果您尝试使用相同的任务名称和 spring.cloud.task.single-instance-enabled=true
再次运行任务,任务将失败并显示以下错误消息:Task with name "application" is already running.
spring.cloud.task.single-instance-enabled
的默认值为 false
。以下示例展示了如何将 spring.cloud.task.single-instance-enabled
设置为 true
:
spring.cloud.task.single-instance-enabled=true or false
要使用此功能,您必须向应用添加以下 Spring Integration 依赖项:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
</dependency>
如果任务因启用此功能且有另一个同名任务正在运行而失败,则应用的退出码将为 1。 |
Spring AOT 和原生编译中的单实例用法
在创建原生编译应用时使用 Spring Cloud Task 的单实例功能,您需要在构建时启用该功能。为此,添加 process-aot 执行,并将 spring.cloud.task.single-step-instance-enabled=true
设置为 JVM 参数,如下所示:
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<id>process-aot</id>
<goals>
<goal>process-aot</goal>
</goals>
<configuration>
<jvmArguments>
-Dspring.cloud.task.single-instance-enabled=true
</jvmArguments>
</configuration>
</execution>
</executions>
</plugin>
3.2.10. 为 ApplicationRunner 和 CommandLineRunner 启用观测
要为 ApplicationRunner
或 CommandLineRunner
启用任务观测,请将 spring.cloud.task.observation.enabled
设置为 true。
一个启用了观测并使用 SimpleMeterRegistry
的任务应用示例可在 此处找到。
3.2.11. 禁用 Spring Cloud Task 自动配置
在某些情况下,不应为某个实现自动配置 Spring Cloud Task,您可以禁用 Task 的自动配置。这可以通过向您的任务应用添加以下注解来实现:
@EnableAutoConfiguration(exclude={SimpleTaskAutoConfiguration.class})
您也可以通过将属性 spring.cloud.task.autoconfiguration.enabled
设置为 false
来禁用 Task 的自动配置。
3.2.12. 关闭上下文
如果应用需要在任务完成后(所有 *Runner#run
方法已调用且任务仓库已更新)关闭 ApplicationContext
,请将属性 spring.cloud.task.closecontextEnabled
设置为 true
。
另一种需要关闭上下文的情况是当任务执行完成但应用没有终止时。在这些情况下,上下文保持打开状态是因为分配了线程(例如:如果您正在使用 TaskExecutor)。在这种情况下,当启动任务时,将 spring.cloud.task.closecontextEnabled
属性设置为 true
。这会在任务完成后关闭应用的上下文。从而允许应用终止。
3.2.13. 启用任务指标
Spring Cloud Task 集成了 Micrometer 并为其执行的任务创建观测。要启用任务可观测性集成,您必须向您的任务应用添加 spring-boot-starter-actuator
、您首选的注册表实现(如果您想发布指标)以及 micrometer-tracing(如果您想发布跟踪数据)。使用 Influx 启用任务可观测性和指标的 Maven 依赖示例是:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-influx</artifactId>
<scope>runtime</scope>
</dependency>
3.2.14. Spring Task 和 Spring Cloud Task 属性
术语 task
是业内一个常用词汇。一个例子是 Spring Boot 提供了 spring.task
,而 Spring Cloud Task 提供了 spring.cloud.task
属性。这在过去导致了一些混淆,认为这两组属性直接相关。然而,它们代表了 Spring 生态系统中提供的两组不同的功能。
-
spring.task
指的是配置ThreadPoolTaskScheduler
的属性。 -
spring.cloud.task
指的是配置 Spring Cloud Task 功能的属性。
4. 批处理
本节详细介绍了 Spring Cloud Task 与 Spring Batch 的集成。本节涵盖了如何跟踪作业执行与其所在任务之间的关联,以及通过 Spring Cloud Deployer 进行远程分区。
4.1. 将作业执行与其所在任务关联
Spring Boot 提供了在 über-jar 中执行批处理作业的功能。Spring Boot 对此功能的支持允许开发人员在该执行中执行多个批处理作业。Spring Cloud Task 提供了将作业执行(即作业执行)与任务执行关联的能力,以便可以相互追溯。
Spring Cloud Task 通过使用 TaskBatchExecutionListener
实现此功能。默认情况下,在同时配置了 Spring Batch 作业(即在上下文中定义了类型为 Job
的 bean)并且类路径中包含 spring-cloud-task-batch
jar 的任何上下文中,此监听器都会自动配置。满足这些条件的所有作业都会注入该监听器。
4.1.1. 覆盖 TaskBatchExecutionListener
要阻止监听器被注入到当前上下文中的任何批处理作业中,可以使用标准的 Spring Boot 机制禁用自动配置。
要仅将监听器注入到上下文中的特定作业中,请覆盖 batchTaskExecutionListenerBeanPostProcessor
并提供作业 bean ID 列表,如下例所示
public static TaskBatchExecutionListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() {
TaskBatchExecutionListenerBeanPostProcessor postProcessor =
new TaskBatchExecutionListenerBeanPostProcessor();
postProcessor.setJobNames(Arrays.asList(new String[] {"job1", "job2"}));
return postProcessor;
}
您可以在 Spring Cloud Task 项目的 samples 模块中找到批处理应用程序示例,点此查看。 |
4.2. 远程分区
Spring Cloud Deployer 提供了在大多数云基础设施上启动基于 Spring Boot 的应用程序的功能。DeployerPartitionHandler
和 DeployerStepExecutionHandler
将工作步骤执行的启动委托给 Spring Cloud Deployer。
要配置 DeployerStepExecutionHandler
,必须提供表示要执行的 Spring Boot über-jar 的 Resource
、TaskLauncherHandler
和 JobExplorer
。您可以配置任何环境属性,以及同时执行的最大工作线程数、轮询结果的间隔(默认为 10 秒)和超时(默认为 -1 或无超时)。以下示例展示了如何配置此 PartitionHandler
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
JobExplorer jobExplorer) throws Exception {
MavenProperties mavenProperties = new MavenProperties();
mavenProperties.setRemoteRepositories(new HashMap<>(Collections.singletonMap("springRepo",
new MavenProperties.RemoteRepository(repository))));
Resource resource =
MavenResource.parse(String.format("%s:%s:%s",
"io.spring.cloud",
"partitioned-batch-job",
"1.1.0.RELEASE"), mavenProperties);
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");
List<String> commandLineArgs = new ArrayList<>(3);
commandLineArgs.add("--spring.profiles.active=worker");
commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
commandLineArgs.add("--spring.batch.initializer.enabled=false");
partitionHandler.setCommandLineArgsProvider(
new PassThroughCommandLineArgsProvider(commandLineArgs));
partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
partitionHandler.setMaxWorkers(2);
partitionHandler.setApplicationName("PartitionedBatchJobTask");
return partitionHandler;
}
将环境变量传递给分区时,每个分区可能位于具有不同环境设置的不同机器上。因此,您只应传递那些必需的环境变量。 |
请注意上例中,我们将最大工作线程数设置为 2。设置最大工作线程数确定了应同时运行的最大分区数。
要执行的 Resource
预期是配置了 DeployerStepExecutionHandler
作为当前上下文中的 CommandLineRunner
的 Spring Boot über-jar。前面示例中列举的仓库应是 über-jar 所在的远程仓库。管理者和工作线程都应能访问用作作业仓库和任务仓库的同一数据存储。一旦底层基础设施引导了 Spring Boot jar 并且 Spring Boot 启动了 DeployerStepExecutionHandler
,该步骤处理器就会执行请求的 Step
。以下示例展示了如何配置 DeployerStepExecutionHandler
@Bean
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
DeployerStepExecutionHandler handler =
new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
return handler;
}
您可以在 Spring Cloud Task 项目的 samples 模块中找到远程分区应用程序示例,点此查看。 |
4.2.1. 异步启动远程批处理分区
默认情况下,批处理分区是顺序启动的。然而,在某些情况下,这可能会影响性能,因为每次启动都会阻塞,直到资源(例如:在 Kubernetes 中预置 Pod)被预置完成。在这种情况下,您可以向 DeployerPartitionHandler
提供一个 ThreadPoolTaskExecutor
。这将根据 ThreadPoolTaskExecutor
的配置启动远程批处理分区。例如
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setThreadNamePrefix("default_task_executor_thread");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception {
Resource resource = this.resourceLoader
.getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
"workerStep", taskRepository, executor);
...
}
我们需要关闭上下文,因为使用 ThreadPoolTaskExecutor 会留下一个活动的线程,从而导致应用程序不会终止。要适当地关闭应用程序,我们需要将 spring.cloud.task.closecontextEnabled 属性设置为 true 。 |
4.2.2. 关于为 Kubernetes 平台开发批处理分区应用程序的注意事项
-
在 Kubernetes 平台上部署分区应用程序时,必须使用 Spring Cloud Kubernetes Deployer 的以下依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-deployer-kubernetes</artifactId> </dependency>
-
任务应用程序及其分区的应用程序名称需要遵循以下正则表达式模式:
[a-z0-9]([-a-z0-9]*[a-z0-9])
。否则,将抛出异常。
4.3. 批处理信息消息
Spring Cloud Task 提供了批处理作业发出信息消息的能力。“Spring Batch 事件” 部分详细介绍了此功能。
4.4. 批处理作业退出码
如前所述,Spring Cloud Task 应用程序支持记录任务执行的退出码。但是,在任务中运行 Spring Batch 作业的情况下,无论批处理作业执行如何完成,使用默认的 Batch/Boot 行为时,任务的结果始终为零。请记住,任务是引导应用程序,从任务返回的退出码与引导应用程序相同。要覆盖此行为并在批处理作业返回 BatchStatus 为 FAILED
时允许任务返回非零退出码,请将 spring.cloud.task.batch.fail-on-job-failure
设置为 true
。然后退出码可以是 1(默认值)或基于指定的 ExitCodeGenerator
)
此功能使用一个新的 ApplicationRunner
,它替换了 Spring Boot 提供的那一个。默认情况下,它的配置顺序与 Spring Boot 提供的相同。但是,如果您想自定义 ApplicationRunner
的运行顺序,可以通过设置 spring.cloud.task.batch.applicationRunnerOrder
属性来设置其顺序。要让您的任务根据批处理作业执行结果返回退出码,您需要编写自己的 CommandLineRunner
。
5. 单步批处理作业启动器
本节介绍了如何使用 Spring Cloud Task 中包含的启动器开发具有单个 Step
的 Spring Batch Job
。该启动器允许您使用配置来定义 ItemReader
、ItemWriter
或完整的单步 Spring Batch Job
。有关 Spring Batch 及其功能的更多信息,请参阅Spring Batch 文档。
要获取 Maven 启动器,请将以下内容添加到您的构建中
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-single-step-batch-job</artifactId>
<version>2.3.0</version>
</dependency>
要获取 Gradle 启动器,请将以下内容添加到您的构建中
compile "org.springframework.cloud:spring-cloud-starter-single-step-batch-job:2.3.0"
5.1. 定义作业
您可以使用此启动器定义一个简单的 ItemReader
或 ItemWriter
,也可以定义一个完整的 Job
。在本节中,我们将定义配置 Job
所需的属性。
5.1.1. 属性
首先,此启动器提供了一组属性,允许您配置具有一个 Step 的 Job 的基本信息
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
作业的名称。 |
|
|
|
步骤的名称。 |
|
|
|
每个事务要处理的条目数。 |
配置了上述属性后,您就拥有了一个具有单个基于 chunk 的步骤的作业。这个基于 chunk 的步骤以 Map<String, Object>
实例作为条目进行读取、处理和写入。但是,这个步骤还没有执行任何操作。您需要配置一个 ItemReader
、一个可选的 ItemProcessor
和一个 ItemWriter
来让它做一些事情。要配置其中之一,您可以使用属性并配置提供了自动配置的选项之一,或者使用标准的 Spring 配置机制配置您自己的。
如果您配置自己的实现,输入和输出类型必须与步骤中的其他实现匹配。此启动器中的 ItemReader 实现和 ItemWriter 实现都使用 Map<String, Object> 作为输入和输出条目。 |
5.2. ItemReader 实现的自动配置
此启动器为四种不同的 ItemReader
实现提供了自动配置:AmqpItemReader
、FlatFileItemReader
、JdbcCursorItemReader
和 KafkaItemReader
。在本节中,我们将概述如何使用提供的自动配置来配置它们中的每一个。
5.2.1. AmqpItemReader
您可以使用 AmqpItemReader
通过 AMQP 从队列或主题读取数据。此 ItemReader
实现的自动配置取决于两组配置。第一组是 AmqpTemplate
的配置。您可以自己配置,也可以使用 Spring Boot 提供的自动配置。请参阅Spring Boot AMQP 文档。配置好 AmqpTemplate
后,您可以通过设置以下属性启用批处理能力来支持它
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
如果为 |
|
|
|
指示是否应该注册 |
更多信息,请参阅AmqpItemReader
文档。
5.2.2. FlatFileItemReader
FlatFileItemReader
允许您从平面文件(如 CSV 和其他文件格式)读取数据。要从文件读取,您可以通过正常的 Spring 配置自行提供一些组件(LineTokenizer
, RecordSeparatorPolicy
, FieldSetMapper
, LineMapper
, 或 SkippedLinesCallback
)。您也可以使用以下属性来配置读取器
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
确定是否应为重新启动保存状态。 |
|
|
|
用于在 |
|
|
|
从文件读取的最大条目数。 |
|
|
0 |
已读取的条目数。用于重新启动。 |
|
|
空列表 |
一个字符串列表,指示文件中的注释行(要忽略的行)。 |
|
|
|
要读取的资源。 |
|
|
|
如果设置为 |
|
|
|
读取文件时使用的编码。 |
|
|
0 |
指示文件开头要跳过的行数。 |
|
|
|
指示文件是否为分隔文件(CSV 及其他格式)。此属性或 |
|
|
|
如果读取分隔文件,指示用于解析的分隔符。 |
|
|
|
用于确定用于引用值的字符。 |
|
|
空列表 |
一个索引列表,用于确定记录中哪些字段包含在条目中。 |
|
|
|
指示文件的记录是否按列号解析。此属性或 |
|
|
空列表 |
用于解析定宽记录的列范围列表。请参阅Range 文档。 |
|
|
|
从记录解析出的每个字段的名称列表。这些名称是此 |
|
|
|
如果设置为 |
5.2.3. JdbcCursorItemReader
JdbcCursorItemReader
对关系数据库运行查询,并遍历结果游标(ResultSet
)以提供结果条目。此自动配置允许您提供一个 PreparedStatementSetter
、一个 RowMapper
或两者。您还可以使用以下属性来配置 JdbcCursorItemReader
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
确定是否应为重新启动保存状态。 |
|
|
|
用于在 |
|
|
|
从文件读取的最大条目数。 |
|
|
0 |
已读取的条目数。用于重新启动。 |
|
|
给驱动程序的一个提示,指示每次调用数据库系统时检索多少条记录。为了获得最佳性能,通常希望将其设置为与 chunk 大小匹配。 |
|
|
|
从数据库读取的最大条目数。 |
|
|
|
查询超时的毫秒数。 |
|
|
|
|
确定读取器在处理时是否应忽略 SQL 警告。 |
|
|
|
指示每次读取后是否应验证游标位置,以验证 |
|
|
|
指示驱动程序是否支持游标的绝对定位。 |
|
|
|
指示连接是否与其他处理共享(因此是事务的一部分)。 |
|
|
|
从中读取的 SQL 查询。 |
您还可以使用以下属性专门为读取器指定 JDBC DataSource:.JdbcCursorItemReader
属性
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
确定是否应启用 |
|
|
|
数据库的 JDBC URL。 |
|
|
|
数据库的登录用户名。 |
|
|
|
数据库的登录密码。 |
|
|
|
JDBC 驱动程序的完全限定名。 |
如果未指定 jdbccursoritemreader_datasource ,JDBCCursorItemReader 将使用默认的 DataSource 。 |
5.2.4. KafkaItemReader
从 Kafka 主题摄取分区数据非常有用,这正是 KafkaItemReader
可以做到的。要配置 KafkaItemReader
,需要两部分配置。首先,需要使用 Spring Boot 的 Kafka 自动配置来配置 Kafka(请参阅Spring Boot Kafka 文档)。一旦您配置了 Spring Boot 的 Kafka 属性,就可以通过设置以下属性来配置 KafkaItemReader
本身
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
用于在 |
|
|
|
要从中读取的主题名称。 |
|
|
空列表 |
要从中读取的分区索引列表。 |
|
|
30 |
|
|
|
|
确定是否应为重新启动保存状态。 |
5.2.5. 本地编译
单步批处理的优点是,在使用 JVM 时,您可以在运行时动态选择要使用的读取器和写入器 bean。然而,在使用本地编译时,您必须在构建时而不是运行时确定读取器和写入器。以下示例展示了如何做到这一点
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<id>process-aot</id>
<goals>
<goal>process-aot</goal>
</goals>
<configuration>
<jvmArguments>
-Dspring.batch.job.flatfileitemreader.name=fooReader
-Dspring.batch.job.flatfileitemwriter.name=fooWriter
</jvmArguments>
</configuration>
</execution>
</executions>
</plugin>
5.3. ItemProcessor 配置
如果 ApplicationContext
中有一个 ItemProcessor
可用,单步批处理作业自动配置将接受它。如果找到类型正确(ItemProcessor<Map<String, Object>, Map<String, Object>>
)的处理器,它将自动注入到步骤中。
5.4. ItemWriter 实现的自动配置
此启动器为与支持的 ItemReader
实现匹配的 ItemWriter
实现提供自动配置:AmqpItemWriter
、FlatFileItemWriter
、JdbcItemWriter
和 KafkaItemWriter
。本节介绍了如何使用自动配置来配置支持的 ItemWriter
。
5.4.1. AmqpItemWriter
要写入 RabbitMQ 队列,您需要两组配置。首先,您需要一个 AmqpTemplate
。最简单的方法是使用 Spring Boot 的 RabbitMQ 自动配置。请参阅Spring Boot AMQP 文档。
配置好 AmqpTemplate
后,您可以通过设置以下属性来配置 AmqpItemWriter
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
如果为 |
|
|
|
指示是否应注册 |
5.4.2. FlatFileItemWriter
要将文件作为步骤的输出写入,您可以配置 FlatFileItemWriter
。自动配置接受已明确配置的组件(如 LineAggregator
、FieldExtractor
、FlatFileHeaderCallback
或 FlatFileFooterCallback
)以及通过设置以下指定属性配置的组件
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
要读取的资源。 |
|
|
|
指示输出文件是否为分隔文件。如果为 |
|
|
|
指示输出文件是否为格式化文件。如果为 |
|
|
|
用于为格式化文件生成输出的格式。格式化通过使用 |
|
|
|
生成文件时使用的 |
|
|
0 |
记录的最大长度。如果为 0,则大小无限制。 |
|
|
0 |
记录的最小长度。 |
|
|
|
用于分隔分隔文件中字段的 |
|
|
|
写入文件时使用的编码。 |
|
|
|
指示文件在刷新时是否应强制同步到磁盘。 |
|
|
|
从记录解析出的每个字段的名称列表。这些名称是此 |
|
|
|
指示如果找到输出文件是否应追加到文件中。 |
|
|
|
用于分隔输出文件中行的 |
|
|
|
用于在 |
|
|
|
确定是否应为重新启动保存状态。 |
|
|
|
如果设置为 |
|
|
|
如果设置为 |
|
|
|
指示读取器是否为事务性队列(表示读取的条目在失败时返回队列)。 |
5.4.3. JdbcBatchItemWriter
要将步骤的输出写入关系数据库,此启动器提供了自动配置 JdbcBatchItemWriter
的能力。通过设置以下属性,自动配置允许您提供自己的 ItemPreparedStatementSetter
或 ItemSqlParameterSourceProvider
以及配置选项
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
用于在 |
|
|
|
用于插入每个条目的 SQL。 |
|
|
|
是否验证每次插入至少更新了一条记录。 |
您还可以使用以下属性专门为写入器指定 JDBC DataSource:.JdbcBatchItemWriter
属性
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
确定是否应启用 |
|
|
|
数据库的 JDBC URL。 |
|
|
|
数据库的登录用户名。 |
|
|
|
数据库的登录密码。 |
|
|
|
JDBC 驱动程序的完全限定名。 |
如果未指定 jdbcbatchitemwriter_datasource ,JdbcBatchItemWriter 将使用默认的 DataSource 。 |
5.4.4. KafkaItemWriter
要将步骤输出写入 Kafka 主题,您需要 KafkaItemWriter
。此启动器通过使用两方面的功能为 KafkaItemWriter
提供自动配置。首先是 Spring Boot 的 Kafka 自动配置。(请参阅Spring Boot Kafka 文档。)其次,此启动器允许您配置写入器上的两个属性。
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
要写入的 Kafka 主题。 |
|
|
|
传递给写入器的条目是否都作为删除事件发送到主题。 |
有关 KafkaItemWriter
配置选项的更多信息,请参阅KafkaItemWiter
文档。
5.4.5. Spring AOT
使用 Spring AOT 和单步批处理启动器时,必须在编译时设置读取器和写入器的名称属性(除非为读取器和/或写入器创建 bean)。为此,您必须在 boot maven 插件或 gradle 插件中将您希望使用的读取器和写入器名称作为参数或环境变量包含进来。例如,如果您希望在 Maven 中启用 FlatFileItemReader
和 FlatFileItemWriter
,它看起来像这样
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<id>process-aot</id>
<goals>
<goal>process-aot</goal>
</goals>
</execution>
</executions>
<configuration>
<arguments>
<argument>--spring.batch.job.flatfileitemreader.name=foobar</argument>
<argument>--spring.batch.job.flatfileitemwriter.name=fooWriter</argument>
</arguments>
</configuration>
</plugin>
6. Spring Cloud Stream 集成
任务本身可能有用,但将任务集成到更大的生态系统中可以使其在更复杂的处理和编排中发挥作用。本节涵盖了 Spring Cloud Task 与 Spring Cloud Stream 的集成选项。
6.1. 从 Spring Cloud Stream 启动任务
您可以从流中启动任务。为此,请创建一个侦听包含 TaskLaunchRequest
作为其有效载荷的消息的 sink。TaskLaunchRequest
包含
-
uri
:指向要执行的任务 artifact。 -
applicationName
:与任务关联的名称。如果未设置 applicationName,TaskLaunchRequest
将生成一个由以下内容组成的任务名称:Task-<UUID>
。 -
commandLineArguments
:一个包含任务命令行参数的列表。 -
environmentProperties
:一个包含任务要使用的环境变量的 Map。 -
deploymentProperties
:一个包含部署器用于部署任务的属性的 Map。
如果有效载荷是不同类型,则 sink 将抛出异常。 |
例如,可以创建一个流,该流具有一个处理器,该处理器从 HTTP 源接收数据并创建包含 TaskLaunchRequest
的 GenericMessage
,然后将消息发送到其输出通道。任务 sink 将从其输入通道接收消息,然后启动任务。
要创建 taskSink,只需创建一个包含 EnableTaskLauncher
注解的 Spring Boot 应用程序,如下例所示
@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
public static void main(String[] args) {
SpringApplication.run(TaskSinkApplication.class, args);
}
}
Spring Cloud Task 项目的 samples 模块包含一个 Sink 和 Processor 示例。要将这些示例安装到您的本地 Maven 仓库,请在 spring-cloud-task-samples
目录中运行 Maven 构建,并将 skipInstall
属性设置为 false
,如下例所示
mvn clean install
必须将 maven.remoteRepositories.springRepo.url 属性设置为 über-jar 所在的远程仓库位置。如果未设置,则没有远程仓库,因此仅依赖本地仓库。 |
6.1.1. Spring Cloud Data Flow
要在 Spring Cloud Data Flow 中创建流,必须先注册我们创建的 Task Sink 应用程序。在下面的示例中,我们使用 Spring Cloud Data Flow shell 注册 Processor 和 Sink 示例应用程序。
app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>
以下示例展示了如何从 Spring Cloud Data Flow shell 创建流。
stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy
6.2. Spring Cloud Task 事件
Spring Cloud Task 提供了当任务通过 Spring Cloud Stream 通道运行时,可以通过 Spring Cloud Stream 通道发出事件的能力。任务监听器用于将 TaskExecution
发布到名为 task-events
的消息通道上。此功能会自动装配到任何 classpath 中包含 spring-cloud-stream
、spring-cloud-stream-<binder>
和已定义任务的应用程序中。
要禁用事件发送监听器,请将 spring.cloud.task.events.enabled 属性设置为 false 。 |
定义了适当的 classpath 后,以下任务会在 task-events
通道上将 TaskExecution
作为事件发出(在任务开始和结束时都会发出)。
@SpringBootApplication
public class TaskEventsApplication {
public static void main(String[] args) {
SpringApplication.run(TaskEventsApplication.class, args);
}
@Configuration
public static class TaskConfiguration {
@Bean
public ApplicationRunner applicationRunner() {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) {
System.out.println("The ApplicationRunner was executed");
}
};
}
}
}
classpath 中还需要包含绑定器实现。 |
Spring Cloud Task 项目的 samples 模块中提供了一个任务事件示例应用程序,请参见此处。 |
6.2.1. 禁用特定任务事件
要禁用任务事件,可以将 spring.cloud.task.events.enabled
属性设置为 false
。
6.3. Spring Batch 事件
当通过任务执行 Spring Batch 作业时,Spring Cloud Task 可以配置为根据 Spring Batch 中可用的 Spring Batch 监听器发出信息性消息。具体来说,当通过 Spring Cloud Task 运行时,以下 Spring Batch 监听器会自动配置到每个批处理作业中,并在相关的 Spring Cloud Stream 通道上发出消息。
-
JobExecutionListener
监听job-execution-events
-
StepExecutionListener
监听step-execution-events
-
ChunkListener
监听chunk-events
-
ItemReadListener
监听item-read-events
-
ItemProcessListener
监听item-process-events
-
ItemWriteListener
监听item-write-events
-
SkipListener
监听skip-events
当上下文中存在适当的 Bean(Job
和 TaskLifecycleListener
)时,这些监听器会自动配置到任何 AbstractJob
中。监听这些事件的配置方式与绑定任何其他 Spring Cloud Stream 通道相同。我们的任务(运行批处理作业的任务)充当 Source
,而监听应用程序充当 Processor
或 Sink
。
一个例子是有一个应用程序监听 job-execution-events
通道,以获取作业的开始和停止事件。要配置监听应用程序,您可以如下配置输入为 job-execution-events
。
spring.cloud.stream.bindings.input.destination=job-execution-events
classpath 中还需要包含绑定器实现。 |
Spring Cloud Task 项目的 samples 模块中提供了一个批处理事件示例应用程序,请参见此处。 |
6.3.1. 将批处理事件发送到不同的通道
Spring Cloud Task 为批处理事件提供的一个选项是更改特定监听器发出消息的通道的能力。为此,请使用以下配置:spring.cloud.stream.bindings.<the channel>.destination=<new destination>
。例如,如果 StepExecutionListener
需要将消息发送到名为 my-step-execution-events
的另一个通道,而不是默认的 step-execution-events
,您可以添加以下配置。
spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events
6.3.2. 禁用批处理事件
要禁用所有批处理事件的监听器功能,请使用以下配置。
spring.cloud.task.batch.events.enabled=false
要禁用特定的批处理事件,请使用以下配置。
spring.cloud.task.batch.events.<batch event listener>.enabled=false
:
以下列表显示了您可以禁用的单个监听器。
spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false
6.3.3. 批处理事件的发送顺序
默认情况下,批处理事件具有 Ordered.LOWEST_PRECEDENCE
。要更改此值(例如,改为 5),请使用以下配置。
spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5
7. 附录
7.1. 任务仓库 Schema
本附录提供了任务仓库中使用的数据库 schema 的 ERD(实体关系图)。

7.1.1. 表信息
存储任务执行信息。
列名 | 必需 | 类型 | 字段长度 | 备注 |
---|---|---|---|---|
TASK_EXECUTION_ID |
TRUE |
BIGINT |
X |
Spring Cloud Task Framework 在应用程序启动时从 |
START_TIME |
FALSE |
DATETIME(6) |
X |
Spring Cloud Task Framework 在应用程序启动时建立该值。 |
END_TIME |
FALSE |
DATETIME(6) |
X |
Spring Cloud Task Framework 在应用程序退出时建立该值。 |
TASK_NAME |
FALSE |
VARCHAR |
100 |
Spring Cloud Task Framework 在应用程序启动时会将其设置为 "Application",除非用户使用 |
EXIT_CODE |
FALSE |
INTEGER |
X |
遵循 Spring Boot 默认值,除非用户如此处所述进行覆盖。 |
EXIT_MESSAGE |
FALSE |
VARCHAR |
2500 |
用户定义,如此处所述。 |
ERROR_MESSAGE |
FALSE |
VARCHAR |
2500 |
Spring Cloud Task Framework 在应用程序退出时建立该值。 |
LAST_UPDATED |
TRUE |
TIMESTAMP |
X |
Spring Cloud Task Framework 在应用程序启动时建立该值。或者如果在任务之外创建记录,则必须在记录创建时填充此值。 |
EXTERNAL_EXECUTION_ID |
FALSE |
VARCHAR |
250 |
如果设置了 |
PARENT_TASK_EXECUTION_ID |
FALSE |
BIGINT |
X |
如果设置了 |
存储任务执行使用的参数。
列名 | 必需 | 类型 | 字段长度 |
---|---|---|---|
TASK_EXECUTION_ID |
TRUE |
BIGINT |
X |
TASK_PARAM |
FALSE |
VARCHAR |
2500 |
用于将任务执行链接到批处理执行。
列名 | 必需 | 类型 | 字段长度 |
---|---|---|---|
TASK_EXECUTION_ID |
TRUE |
BIGINT |
X |
JOB_EXECUTION_ID |
TRUE |
BIGINT |
X |
用于此处讨论的 single-instance-enabled
特性。
列名 | 必需 | 类型 | 字段长度 | 备注 |
---|---|---|---|---|
LOCK_KEY |
TRUE |
CHAR |
36 |
此锁的 UUID |
REGION |
TRUE |
VARCHAR |
100 |
用户可以使用此字段建立一组锁。 |
CLIENT_ID |
TRUE |
CHAR |
36 |
包含要锁定应用程序名称的任务执行 ID。 |
CREATED_DATE |
TRUE |
DATETIME |
X |
创建条目的日期 |
每种数据库类型的表设置 DDL 可以在此处找到。 |
7.1.2. SQL Server
默认情况下,Spring Cloud Task 使用序列表来确定 TASK_EXECUTION
表的 TASK_EXECUTION_ID
。但是,当在使用 SQL Server 时同时启动多个任务,这可能会导致 TASK_SEQ
表发生死锁。解决方法是删除 TASK_EXECUTION_SEQ
表并创建一个同名的 sequence。例如:
DROP TABLE TASK_SEQ;
CREATE SEQUENCE [DBO].[TASK_SEQ] AS BIGINT
START WITH 1
INCREMENT BY 1;
将 START WITH 设置为您当前执行 ID 的更高值。 |
7.2. 构建本文档
本项目使用 Maven 生成本文档。要自己生成它,请运行以下命令:$ mvn clean install -DskipTests -P docs
。
7.3. 可观测性元数据
7.3.1. 可观测性 - 指标
下面您可以找到此项目声明的所有指标列表。
活跃任务
围绕任务执行创建的指标。
指标名称 spring.cloud.task
(由约定类 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention
定义)。类型 timer
。
指标名称 spring.cloud.task.active
(由约定类 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention
定义)。类型 long task timer
。
在开始观测后添加的 KeyValue 可能不会出现在 *.active 指标中。 |
Micrometer 在内部使用 nanoseconds 作为基本单位。然而,每个后端决定实际的基本单位。(例如 Prometheus 使用秒) |
包含类 org.springframework.cloud.task.listener.TaskExecutionObservation
的完全限定名称。
所有标签必须以 spring.cloud.task 前缀开头! |
名称 |
描述 |
|
CF 云的应用程序 ID。 |
|
CF 云的应用程序名称。 |
|
CF 云的应用程序版本。 |
|
CF 云的实例索引。 |
|
CF 云的组织名称。 |
|
CF 云的空间 ID。 |
|
CF 云的空间名称。 |
|
任务执行 ID。 |
|
任务退出码。 |
|
任务的外部执行 ID。 |
|
任务名称度量。 |
|
任务父执行 ID。 |
|
任务状态。可以是 success 或 failure。 |
任务运行器观测
任务运行器执行时创建的观测。
指标名称 spring.cloud.task.runner
(由约定类 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention
定义)。类型 timer
。
指标名称 spring.cloud.task.runner.active
(由约定类 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention
定义)。类型 long task timer
。
在开始观测后添加的 KeyValue 可能不会出现在 *.active 指标中。 |
Micrometer 在内部使用 nanoseconds 作为基本单位。然而,每个后端决定实际的基本单位。(例如 Prometheus 使用秒) |
包含类 org.springframework.cloud.task.configuration.observation.TaskDocumentedObservation
的完全限定名称。
所有标签必须以 spring.cloud.task 前缀开头! |
名称 |
描述 |
|
由 Spring Cloud Task 执行的 Bean 的名称。 |
7.3.2. 可观测性 - Span
下面您可以找到此项目声明的所有 span 列表。
活跃任务 Span
围绕任务执行创建的指标。
Span 名称 spring.cloud.task
(由约定类 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention
定义)。
包含类 org.springframework.cloud.task.listener.TaskExecutionObservation
的完全限定名称。
所有标签必须以 spring.cloud.task 前缀开头! |
名称 |
描述 |
|
CF 云的应用程序 ID。 |
|
CF 云的应用程序名称。 |
|
CF 云的应用程序版本。 |
|
CF 云的实例索引。 |
|
CF 云的组织名称。 |
|
CF 云的空间 ID。 |
|
CF 云的空间名称。 |
|
任务执行 ID。 |
|
任务退出码。 |
|
任务的外部执行 ID。 |
|
任务名称度量。 |
|
任务父执行 ID。 |
|
任务状态。可以是 success 或 failure。 |
任务运行器观测 Span
任务运行器执行时创建的观测。
Span 名称 spring.cloud.task.runner
(由约定类 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention
定义)。
包含类 org.springframework.cloud.task.configuration.observation.TaskDocumentedObservation
的完全限定名称。
所有标签必须以 spring.cloud.task 前缀开头! |
名称 |
描述 |
|
由 Spring Cloud Task 执行的 Bean 的名称。 |