版本 3.0.4
© 2009-2022 VMware, Inc. 保留所有权利。
本文档可供您自己使用和分发给他人,前提是您不收取任何费用,并且无论以印刷品还是电子形式分发,每个副本都必须包含此版权声明。
序言
1. 关于文档
Spring Cloud Task 参考指南提供 html、pdf 和 epub 格式。最新版本位于 docs.spring.io/spring-cloud-task/docs/current-SNAPSHOT/reference/html/。
本文档可供您自己使用和分发给他人,前提是您不收取任何费用,并且无论以印刷品还是电子形式分发,每个副本都必须包含此版权声明。
2. 获取帮助
使用 Spring Cloud Task 遇到问题了吗?我们乐于提供帮助!
-
提问。我们监控 stackoverflow.com 上标记为
spring-cloud-task
的问题。 -
在 github.com/spring-cloud/spring-cloud-task/issues 报告 Spring Cloud Task 的错误。
Spring Cloud Task 的所有内容,包括文档,都是开源的。如果您发现文档有问题或只是想改进它们,请参与进来。 |
3. 入门
如果您刚开始接触 Spring Cloud Task 或一般的 'Spring',我们建议阅读入门章节。
从零开始入门,请阅读以下章节
要按照教程进行操作,请阅读开发您的第一个 Spring Cloud Task 应用
要运行您的示例,请阅读运行示例
入门
如果您刚刚开始使用 Spring Cloud Task,应该阅读本节。在这里,我们回答基本的“是什么?”、“如何做?”和“为什么?”等问题。我们从 Spring Cloud Task 的温和介绍开始。然后构建一个 Spring Cloud Task 应用,在此过程中讨论一些核心原则。
4. Spring Cloud Task 简介
Spring Cloud Task 使得创建短生命周期微服务变得容易。它提供了按需在生产环境中执行短生命周期 JVM 进程的功能。
5. 系统要求
您需要安装 Java(Java 17 或更高版本)。要构建,您还需要安装 Maven。
5.1. 数据库要求
Spring Cloud Task 使用关系型数据库存储已执行任务的结果。虽然您可以在没有数据库的情况下开始开发任务(任务状态会作为任务仓库更新的一部分被记录到日志中),但在生产环境中,您会希望使用受支持的数据库。Spring Cloud Task 目前支持以下数据库
-
DB2
-
H2
-
HSQLDB
-
MySql
-
Oracle
-
Postgres
-
SqlServer
6. 开发您的第一个 Spring Cloud Task 应用
一个好的开始是使用一个简单的“Hello, World!”应用,因此我们创建一个等效的 Spring Cloud Task 应用来突出框架的特性。大多数 IDE 对 Apache Maven 有良好的支持,因此我们将其用作本项目构建工具。
spring.io 网站包含许多使用 Spring Boot 的“入门 ”指南。如果您需要解决特定问题,请先在那里查找。您可以通过访问Spring Initializr并创建新项目来简化以下步骤。这样做会自动生成一个新的项目结构,以便您可以立即开始编码。我们建议尝试使用 Spring Initializr 来熟悉它。 |
6.1. 使用 Spring Initializr 创建 Spring Task 项目
现在我们可以创建一个并测试一个将 Hello, World!
打印到控制台的应用。
为此
-
访问Spring Initialzr 网站。
-
创建一个新的 Maven 项目,Group 名称为
io.spring.demo
,Artifact 名称为helloworld
。 -
在 Dependencies 文本框中,输入
task
,然后选择Cloud Task
依赖项。 -
在 Dependencies 文本框中,输入
jdbc
,然后选择JDBC
依赖项。 -
在 Dependencies 文本框中,输入
h2
,然后选择H2
。(或您喜欢的数据库) -
点击 Generate Project 按钮
-
-
解压 helloworld.zip 文件,并将项目导入您喜欢的 IDE。
6.2. 编写代码
为了完成我们的应用,我们需要用以下内容更新生成的 HelloworldApplication
,以便它能启动一个任务。
package io.spring.Helloworld;
@SpringBootApplication
@EnableTask
public class HelloworldApplication {
@Bean
public ApplicationRunner applicationRunner() {
return new HelloWorldApplicationRunner();
}
public static void main(String[] args) {
SpringApplication.run(HelloworldApplication.class, args);
}
public static class HelloWorldApplicationRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
System.out.println("Hello, World!");
}
}
}
虽然看起来很小,但其中涉及不少内容。有关 Spring Boot 特定内容的更多信息,请参阅Spring Boot 参考文档。
现在我们可以打开 src/main/resources
目录下的 application.properties
文件。我们需要在 application.properties
中配置两个属性
-
application.name
: 设置应用名称(它会被转换成任务名称) -
logging.level
: 将 Spring Cloud Task 的日志级别设置为DEBUG
,以便查看正在发生的情况。
以下示例展示了如何同时进行这两项配置
logging.level.org.springframework.cloud.task=DEBUG
spring.application.name=helloWorld
6.2.1. 任务自动配置
当包含 Spring Cloud Task Starter 依赖项时,Task 会自动配置所有 bean 来启动其功能。此配置的一部分是注册 TaskRepository
及其使用所需的基础设施。
在我们的示例中,TaskRepository
使用嵌入式 H2 数据库记录任务结果。对于生产环境来说,嵌入式 H2 数据库不是一个实际的解决方案,因为任务结束后 H2 DB 就会消失。然而,对于快速入门体验,我们可以在示例中使用它,并将仓库中的更新信息回显到日志中。在本文档后面的配置章节中,我们将介绍如何定制 Spring Cloud Task 提供的各个部分的配置。
当我们的示例应用运行时,Spring Boot 会启动我们的 HelloWorldCommandLineRunner
并将“Hello, World!”消息输出到标准输出。TaskLifecycleListener
在仓库中记录任务的开始和结束。
6.2.2. main 方法
main 方法是任何 Java 应用的入口点。我们的 main 方法委托给 Spring Boot 的SpringApplication 类。
6.2.3. ApplicationRunner
Spring 包含多种启动应用逻辑的方式。Spring Boot 通过其 *Runner
接口(CommandLineRunner
或 ApplicationRunner
)提供了一种方便且有条理的方式来做到这一点。一个表现良好的任务可以使用这两个 runner 中的一个来启动任何逻辑。
任务的生命周期被认为是从 *Runner#run
方法执行之前开始,直到它们全部完成后结束。Spring Boot 允许一个应用使用多个 *Runner
实现,Spring Cloud Task 也是如此。
任何不是通过 CommandLineRunner 或 ApplicationRunner 启动的机制(例如使用 InitializingBean#afterPropertiesSet )启动的处理都不会被 Spring Cloud Task 记录。 |
6.3. 运行示例
此时,我们的应用应该可以工作了。由于此应用基于 Spring Boot,我们可以从应用根目录使用命令行命令 $ mvn spring-boot:run
来运行它,如下例所示(及其输出)
$ mvn clean spring-boot:run
....... . . .
....... . . . (Maven log output here)
....... . . .
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.0.3.RELEASE)
2018-07-23 17:44:34.426 INFO 1978 --- [ main] i.s.d.helloworld.HelloworldApplication : Starting HelloworldApplication on Glenns-MBP-2.attlocal.net with PID 1978 (/Users/glennrenfro/project/helloworld/target/classes started by glennrenfro in /Users/glennrenfro/project/helloworld)
2018-07-23 17:44:34.430 INFO 1978 --- [ main] i.s.d.helloworld.HelloworldApplication : No active profile set, falling back to default profiles: default
2018-07-23 17:44:34.472 INFO 1978 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1d24f32d: startup date [Mon Jul 23 17:44:34 EDT 2018]; root of context hierarchy
2018-07-23 17:44:35.280 INFO 1978 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2018-07-23 17:44:35.410 INFO 1978 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2018-07-23 17:44:35.419 DEBUG 1978 --- [ main] o.s.c.t.c.SimpleTaskConfiguration : Using org.springframework.cloud.task.configuration.DefaultTaskConfigurer TaskConfigurer
2018-07-23 17:44:35.420 DEBUG 1978 --- [ main] o.s.c.t.c.DefaultTaskConfigurer : No EntityManager was found, using DataSourceTransactionManager
2018-07-23 17:44:35.522 DEBUG 1978 --- [ main] o.s.c.t.r.s.TaskRepositoryInitializer : Initializing task schema for h2 database
2018-07-23 17:44:35.525 INFO 1978 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executing SQL script from class path resource [org/springframework/cloud/task/schema-h2.sql]
2018-07-23 17:44:35.558 INFO 1978 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executed SQL script from class path resource [org/springframework/cloud/task/schema-h2.sql] in 33 ms.
2018-07-23 17:44:35.728 INFO 1978 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2018-07-23 17:44:35.730 INFO 1978 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Bean with name 'dataSource' has been autodetected for JMX exposure
2018-07-23 17:44:35.733 INFO 1978 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Located MBean 'dataSource': registering with JMX server as MBean [com.zaxxer.hikari:name=dataSource,type=HikariDataSource]
2018-07-23 17:44:35.738 INFO 1978 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2018-07-23 17:44:35.762 DEBUG 1978 --- [ main] o.s.c.t.r.support.SimpleTaskRepository : Creating: TaskExecution{executionId=0, parentExecutionId=null, exitCode=null, taskName='application', startTime=Mon Jul 23 17:44:35 EDT 2018, endTime=null, exitMessage='null', externalExecutionId='null', errorMessage='null', arguments=[]}
2018-07-23 17:44:35.772 INFO 1978 --- [ main] i.s.d.helloworld.HelloworldApplication : Started HelloworldApplication in 1.625 seconds (JVM running for 4.764)
Hello, World!
2018-07-23 17:44:35.782 DEBUG 1978 --- [ main] o.s.c.t.r.support.SimpleTaskRepository : Updating: TaskExecution with executionId=1 with the following {exitCode=0, endTime=Mon Jul 23 17:44:35 EDT 2018, exitMessage='null', errorMessage='null'}
前面的输出中有三行我们感兴趣的内容
-
SimpleTaskRepository
记录了在TaskRepository
中条目的创建。 -
我们的
CommandLineRunner
的执行,由“Hello, World!”输出展示。 -
SimpleTaskRepository
记录了任务在TaskRepository
中的完成。
Spring Cloud Task 项目的 samples 模块中可以找到一个简单的任务应用,这里。 |
特性
本节更详细地介绍了 Spring Cloud Task,包括如何使用它、如何配置它以及适当的扩展点。
7. Spring Cloud Task 的生命周期
在大多数情况下,现代云环境围绕着预期不会结束的进程执行进行设计。如果它们结束了,通常会被重新启动。虽然大多数平台确实有某种方式来运行结束后不会重新启动的进程,但该运行的结果通常不会以可消费的方式维护。Spring Cloud Task 提供了在一个环境中执行短生命周期进程并记录结果的能力。这样做允许构建围绕短生命周期进程以及通过消息集成任务来实现的长生命周期服务的微服务架构。
虽然此功能在云环境中很有用,但在传统部署模型中也可能出现同样的问题。当使用 cron 等调度器运行 Spring Boot 应用时,能够在应用完成后监控其结果会很有用。
Spring Cloud Task 采取了一种方法,即 Spring Boot 应用可以有开始和结束,并且仍然是成功的。批处理应用就是预期会结束(且通常是短生命周期)的进程如何发挥作用的一个例子。
Spring Cloud Task 记录给定任务的生命周期事件。大多数长生命周期进程,典型的如大多数 Web 应用,不保存其生命周期事件。Spring Cloud Task 核心的任务则会保存。
生命周期由一次任务执行组成。这是一次配置为任务(即,包含 Spring Cloud Task 依赖项)的 Spring Boot 应用的物理执行。
在任务开始时,在任何 CommandLineRunner
或 ApplicationRunner
实现运行之前,会在 TaskRepository
中创建一个记录开始事件的条目。此事件是通过 Spring Framework 触发的 SmartLifecycle#start
来触发的。这表示系统中的所有 bean 已准备就绪,并且发生在运行 Spring Boot 提供的任何 CommandLineRunner
或 ApplicationRunner
实现之前。
任务的记录仅在 ApplicationContext 成功启动后发生。如果上下文根本无法启动,任务的运行将不会被记录。 |
在 Spring Boot 的所有 *Runner#run
调用完成后,或者 ApplicationContext
失败时(由 ApplicationFailedEvent
指示),任务执行结果会在仓库中更新。
如果应用要求在任务完成后(所有 *Runner#run 方法都被调用并且任务仓库已更新)关闭 ApplicationContext ,请将属性 spring.cloud.task.closecontextEnabled 设置为 true。 |
7.1. TaskExecution
存储在 TaskRepository
中的信息在 TaskExecution
类中建模,包含以下信息
字段 | 描述 |
---|---|
|
任务运行的唯一 ID。 |
|
由 |
|
任务名称,由配置的 |
|
任务启动的时间,由 |
|
任务完成的时间,由 |
|
退出时可用的任何信息。这可以通过 |
|
如果异常是任务结束的原因(由 |
|
字符串命令行参数的 |
7.2. 映射退出码
任务完成时,它会尝试向操作系统返回一个退出码。如果我们查看我们的原始示例,我们可以看到我们没有控制应用的这一方面。因此,如果抛出异常,JVM 返回的代码可能对您的调试有用,也可能没用。
因此,Spring Boot 提供了一个接口 ExitCodeExceptionMapper
,允许您将未捕获的异常映射到退出码。这样做可以让您在退出码级别指示出了什么问题。此外,通过这种方式映射退出码,Spring Cloud Task 会记录返回的退出码。
如果任务因 SIG-INT 或 SIG-TERM 终止,除非代码中另有指定,否则退出码为零。
任务运行时,退出码在仓库中存储为 null。任务完成后,根据本节前面描述的指南存储适当的退出码。 |
8. 配置
Spring Cloud Task 提供了开箱即用的配置,在 DefaultTaskConfigurer
和 SimpleTaskConfiguration
类中定义。本节将介绍默认配置以及如何根据您的需求定制 Spring Cloud Task。
8.1. DataSource
Spring Cloud Task 使用一个数据源来存储任务执行的结果。默认情况下,我们提供一个内存中的 H2 实例,以提供一种简单的开发引导方法。然而,在生产环境中,您可能希望配置自己的 DataSource
。
如果您的应用只使用一个 DataSource
,并且它同时作为您的业务 schema 和任务仓库,您只需要提供任何 DataSource
(最简单的方法是通过 Spring Boot 的配置约定)。Spring Cloud Task 会自动将此 DataSource
用于仓库。
如果您的应用使用多个 DataSource
,您需要使用适当的 DataSource
配置任务仓库。此定制可以通过实现 TaskConfigurer
来完成。
8.2. 表前缀
TaskRepository
的一个可修改属性是任务表的表前缀。默认情况下,它们都以 TASK_
为前缀。TASK_EXECUTION
和 TASK_EXECUTION_PARAMS
是两个示例。然而,修改此前缀可能有一些原因。如果需要在表名前添加 schema 名称,或者在同一 schema 内需要多组任务表,您必须更改表前缀。您可以通过将 spring.cloud.task.tablePrefix
设置为您需要的前缀来做到这一点,如下所示
spring.cloud.task.tablePrefix=yourPrefix
通过使用 spring.cloud.task.tablePrefix
,用户承担创建满足任务表 schema 标准但根据用户业务需求进行修改的任务表的责任。在创建自己的任务 DDL 时,可以将 Spring Cloud Task Schema DDL 作为参考,如这里所示。
8.3. 启用/禁用表初始化
如果您自己创建了任务表,并且不希望 Spring Cloud Task 在任务启动时创建它们,请将 spring.cloud.task.initialize-enabled
属性设置为 false
,如下所示
spring.cloud.task.initialize-enabled=false
它默认为 true
。
属性 spring.cloud.task.initialize.enable 已弃用。 |
8.4. 外部生成的任务 ID
在某些情况下,您可能希望考虑任务被请求与基础设施实际启动它之间的时间差。Spring Cloud Task 允许您在任务被请求时创建一个 TaskExecution
。然后将生成的 TaskExecution
的执行 ID 传递给任务,以便它可以在任务的生命周期中更新 TaskExecution
。
可以通过在引用存储 TaskExecution
对象的 datastore 的 TaskRepository
实现上调用 createTaskExecution
方法来创建一个 TaskExecution
。
为了配置您的任务以使用生成的 TaskExecutionId
,请添加以下属性
spring.cloud.task.executionid=yourtaskId
8.5. 外部任务 Id
Spring Cloud Task 允许您为每个 TaskExecution
存储一个外部任务 ID。为了配置您的任务以使用生成的 TaskExecutionId
,请添加以下属性
spring.cloud.task.external-execution-id=<externalTaskId>
8.6. 父任务 Id
Spring Cloud Task 允许您为每个 TaskExecution
存储一个父任务 ID。例如,一个任务执行另一个或多个任务,并且您想记录哪个任务启动了每个子任务。为了配置您的任务以设置父 TaskExecutionId
,请在子任务上添加以下属性
spring.cloud.task.parent-execution-id=<parentExecutionTaskId>
8.7. TaskConfigurer
TaskConfigurer
是一个策略接口,允许您定制 Spring Cloud Task 组件的配置方式。默认情况下,我们提供 DefaultTaskConfigurer
,它提供逻辑默认值:基于 Map
的内存组件(如果未提供 DataSource
,则对开发有用)和基于 JDBC 的组件(如果提供了 DataSource
,则有用)。
TaskConfigurer
允许您配置三个主要组件
组件 | 描述 | 默认值(由 DefaultTaskConfigurer 提供) |
---|---|---|
|
要使用的 |
|
|
要使用的 |
|
|
用于运行任务更新时的事务管理器。 |
如果使用了 |
您可以通过创建 TaskConfigurer
接口的自定义实现来定制前面表格中描述的任何组件。通常,继承 DefaultTaskConfigurer
(如果在找不到 TaskConfigurer
时提供)并覆盖所需的 getter 就足够了。然而,可能需要从头开始实现自己的。
用户不应直接使用 TaskConfigurer 的 getter 方法,除非他们使用它来提供作为 Spring Bean 暴露的实现。 |
8.8. 任务执行监听器
TaskExecutionListener
允许您为任务生命周期期间发生的特定事件注册监听器。为此,创建一个实现 TaskExecutionListener
接口的类。实现 TaskExecutionListener
接口的类会收到以下事件的通知
-
onTaskStartup
: 在将TaskExecution
存储到TaskRepository
之前。 -
onTaskEnd
: 在更新TaskRepository
中的TaskExecution
条目并标记任务最终状态之前。 -
onTaskFailed
: 在任务抛出未处理异常时调用onTaskEnd
方法之前。
Spring Cloud Task 还允许您使用以下方法注解将 TaskExecution
监听器添加到 bean 中的方法
-
@BeforeTask
: 在将TaskExecution
存储到TaskRepository
之前 -
@AfterTask
: 在更新TaskRepository
中的TaskExecution
条目并标记任务最终状态之前。 -
@FailedTask
: 在任务抛出未处理异常时调用@AfterTask
方法之前。
以下示例展示了这三个注解的使用
public class MyBean {
@BeforeTask
public void methodA(TaskExecution taskExecution) {
}
@AfterTask
public void methodB(TaskExecution taskExecution) {
}
@FailedTask
public void methodC(TaskExecution taskExecution, Throwable throwable) {
}
}
在 TaskLifecycleListener 之前插入 ApplicationListener 可能会导致意想不到的效果。 |
8.8.1. 任务执行监听器抛出的异常
如果 TaskExecutionListener
事件处理器抛出异常,则该事件处理器的所有监听处理都会停止。例如,如果三个 onTaskStartup
监听器已启动,并且第一个 onTaskStartup
事件处理器抛出异常,则不会调用其他两个 onTaskStartup
方法。然而,TaskExecutionListeners
的其他事件处理器(onTaskEnd
和 onTaskFailed
)仍会被调用。
当 TaskExecutionListener
事件处理器抛出异常时返回的退出码是 ExitCodeEvent 报告的退出码。如果没有发出 ExitCodeEvent
,则评估抛出的异常,查看其是否属于 ExitCodeGenerator 类型。如果是,则返回 ExitCodeGenerator
的退出码。否则,返回 1
。
如果在 onTaskStartup
方法中抛出异常,应用的退出码将是 1
。如果在 onTaskEnd
或 onTaskFailed
方法中抛出异常,应用的退出码将是使用上述规则确定的那个。
如果在 onTaskStartup 、onTaskEnd 或 onTaskFailed 中抛出异常,您不能使用 ExitCodeExceptionMapper 覆盖应用的退出码。 |
8.8.2. 退出消息
您可以使用 TaskExecutionListener
以编程方式设置任务的退出消息。这通过设置 TaskExecution
的 exitMessage
来完成,该 exitMessage
随后传递给 TaskExecutionListener
。以下示例显示了一个使用 @AfterTask
ExecutionListener
注解的方法
@AfterTask
public void afterMe(TaskExecution taskExecution) {
taskExecution.setExitMessage("AFTER EXIT MESSAGE");
}
可以在任何监听器事件(onTaskStartup
、onTaskFailed
和 onTaskEnd
)中设置 ExitMessage
。三个监听器的优先顺序如下
-
onTaskEnd
-
onTaskFailed
-
onTaskStartup
例如,如果您为 onTaskStartup
和 onTaskFailed
监听器设置了 exitMessage
,并且任务在没有失败的情况下结束,则 onTaskStartup
中的 exitMessage
将存储在仓库中。否则,如果发生失败,则存储 onTaskFailed
中的 exitMessage
。此外,如果您使用 onTaskEnd
监听器设置了 exitMessage
,则 onTaskEnd
中的 exitMessage
会覆盖 onTaskStartup
和 onTaskFailed
中的 exit messages。
8.9. 限制 Spring Cloud Task 实例
Spring Cloud Task 允许您设定具有给定任务名称的任务一次只能运行一个实例。为此,您需要设定 任务名称 并为每个任务执行设置 spring.cloud.task.single-instance-enabled=true
。当第一个任务执行正在运行时,任何其他尝试运行具有相同 任务名称 和 spring.cloud.task.single-instance-enabled=true
的任务都会失败,并显示以下错误消息:Task with name "application" is already running.
spring.cloud.task.single-instance-enabled
的默认值为 false
。以下示例展示了如何将 spring.cloud.task.single-instance-enabled
设置为 true
。
spring.cloud.task.single-instance-enabled=true or false
要使用此功能,您必须将以下 Spring Integration 依赖项添加到您的应用程序中
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
</dependency>
如果任务由于此功能被启用且另一个同名任务正在运行而失败,应用程序的退出码将为 1。 |
8.9.1. Spring AOT 和原生编译的单实例用法
在创建原生编译应用程序时使用 Spring Cloud Task 的单实例功能,您需要在构建时启用此功能。为此,请添加 process-aot execution 并将 spring.cloud.task.single-step-instance-enabled=true
作为 JVM 参数设置,如下所示
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<id>process-aot</id>
<goals>
<goal>process-aot</goal>
</goals>
<configuration>
<jvmArguments>
-Dspring.cloud.task.single-instance-enabled=true
</jvmArguments>
</configuration>
</execution>
</executions>
</plugin>
8.10. 为 ApplicationRunner 和 CommandLineRunner 启用观测
要为 ApplicationRunner
或 CommandLineRunner
启用任务观测,请将 spring.cloud.task.observation.enabled
设置为 true。
一个使用 SimpleMeterRegistry
启用观测功能的示例任务应用程序可以在这里找到。
8.11. 禁用 Spring Cloud Task 自动配置
在某些实现中,如果不需要 Spring Cloud Task 自动配置,您可以禁用任务的自动配置。可以通过将以下注解添加到您的任务应用程序来实现这一点
@EnableAutoConfiguration(exclude={SimpleTaskAutoConfiguration.class})
您也可以通过将 spring.cloud.task.autoconfiguration.enabled
属性设置为 false
来禁用任务自动配置。
8.12. 关闭上下文
如果应用程序需要在任务完成时关闭 ApplicationContext
(所有 *Runner#run
方法已被调用且任务仓库已更新),请将属性 spring.cloud.task.closecontextEnabled
设置为 true
。
另一种需要关闭上下文的情况是,任务执行完成但应用程序并未终止。在这些情况下,上下文之所以保持打开是因为分配了一个线程(例如:如果您正在使用一个 TaskExecutor
)。在这些情况下,当启动任务时,将 spring.cloud.task.closecontextEnabled
属性设置为 true
。这将在任务完成后关闭应用程序的上下文。从而允许应用程序终止。
8.13. 启用任务指标
Spring Cloud Task 与 Micrometer 集成,并为其执行的任务创建观测。要启用任务可观测性集成,您必须将 spring-boot-starter-actuator
、您首选的注册表实现(如果您想发布指标)以及 micrometer-tracing(如果您想发布跟踪数据)添加到您的任务应用程序中。一个使用 Influx 启用任务可观测性和指标的 Maven 依赖项示例是
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-influx</artifactId>
<scope>runtime</scope>
</dependency>
8.14. Spring Task 和 Spring Cloud Task 属性
术语 task
在行业中是一个常用词。例如,Spring Boot 提供了 spring.task
,而 Spring Cloud Task 提供了 spring.cloud.task
属性。这在过去引起了一些混淆,认为这两组属性直接相关。然而,它们代表了 Spring 生态系统中提供的两组不同的功能。
-
spring.task
指的是配置ThreadPoolTaskScheduler
的属性。 -
spring.cloud.task
指的是配置 Spring Cloud Task 功能的属性。
批处理
本节将更详细地介绍 Spring Cloud Task 与 Spring Batch 的集成。本节涵盖了如何跟踪作业执行与其执行的任务之间的关联,以及如何通过 Spring Cloud Deployer 进行远程分区。
9. 将作业执行关联到执行它的任务
Spring Boot 提供了在 über-jar 中执行批处理作业的功能。Spring Boot 对此功能的支持允许开发人员在该执行中执行多个批处理作业。Spring Cloud Task 提供了将作业的执行(即作业执行)与任务的执行关联起来的能力,以便可以相互追溯。
Spring Cloud Task 通过使用 TaskBatchExecutionListener
实现此功能。默认情况下,此监听器在任何同时配置了 Spring Batch Job(通过在上下文中定义类型为 Job
的 bean)并且类路径中有 spring-cloud-task-batch
jar 的上下文中自动配置。该监听器会被注入到所有符合这些条件的作业中。
9.1. 覆盖 TaskBatchExecutionListener
为了防止该监听器被注入到当前上下文中的任何批处理作业中,您可以使用标准的 Spring Boot 机制禁用自动配置。
要只将该监听器注入到上下文中的特定作业中,请覆盖 batchTaskExecutionListenerBeanPostProcessor
并提供作业 bean ID 列表,如下例所示
public static TaskBatchExecutionListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() {
TaskBatchExecutionListenerBeanPostProcessor postProcessor =
new TaskBatchExecutionListenerBeanPostProcessor();
postProcessor.setJobNames(Arrays.asList(new String[] {"job1", "job2"}));
return postProcessor;
}
您可以在 Spring Cloud Task 项目的 samples 模块中找到一个示例批处理应用程序,链接在这里。 |
10. 远程分区
Spring Cloud Deployer 提供了在大多数云基础设施上启动基于 Spring Boot 的应用程序的功能。DeployerPartitionHandler
和 DeployerStepExecutionHandler
将工作步骤执行的启动委托给 Spring Cloud Deployer。
要配置 DeployerStepExecutionHandler
,您必须提供一个代表要执行的 Spring Boot über-jar 的 Resource
,一个 TaskLauncherHandler
和一个 JobExplorer
。您可以配置任何环境变量属性,以及同时执行的最大工作进程数、轮询结果的间隔(默认为 10 秒)以及超时时间(默认为 -1 或无超时)。以下示例展示了如何配置此 PartitionHandler
。
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
JobExplorer jobExplorer) throws Exception {
MavenProperties mavenProperties = new MavenProperties();
mavenProperties.setRemoteRepositories(new HashMap<>(Collections.singletonMap("springRepo",
new MavenProperties.RemoteRepository(repository))));
Resource resource =
MavenResource.parse(String.format("%s:%s:%s",
"io.spring.cloud",
"partitioned-batch-job",
"1.1.0.RELEASE"), mavenProperties);
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");
List<String> commandLineArgs = new ArrayList<>(3);
commandLineArgs.add("--spring.profiles.active=worker");
commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
commandLineArgs.add("--spring.batch.initializer.enabled=false");
partitionHandler.setCommandLineArgsProvider(
new PassThroughCommandLineArgsProvider(commandLineArgs));
partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
partitionHandler.setMaxWorkers(2);
partitionHandler.setApplicationName("PartitionedBatchJobTask");
return partitionHandler;
}
在向分区传递环境变量时,每个分区可能位于不同的机器上,具有不同的环境设置。因此,您应该只传递必需的环境变量。 |
请注意,在上面的示例中,我们将最大工作进程数设置为 2。设置最大工作进程数确定了应同时运行的最大分区数。
要执行的 Resource
预期是一个 Spring Boot über-jar,其中在当前上下文中将 DeployerStepExecutionHandler
配置为 CommandLineRunner
。前面示例中列出的仓库应该是 uber-jar 所在的远程仓库。管理器和工作进程都应该能够访问用作作业仓库和任务仓库的相同数据存储。一旦底层基础设施引导了 Spring Boot jar 并且 Spring Boot 启动了 DeployerStepExecutionHandler
,步骤处理器就会执行请求的 Step
。以下示例展示了如何配置 DeployerStepExecutionHandler
。
@Bean
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
DeployerStepExecutionHandler handler =
new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
return handler;
}
您可以在 Spring Cloud Task 项目的 samples 模块中找到一个示例远程分区应用程序,链接在这里。 |
10.1. 异步启动远程批处理分区
默认情况下,批处理分区是按顺序启动的。然而,在某些情况下,这可能会影响性能,因为每次启动都会阻塞直到资源(例如:在 Kubernetes 中 provision 一个 pod)被 provision 完成。在这些情况下,您可以向 DeployerPartitionHandler
提供一个 ThreadPoolTaskExecutor
。这将根据 ThreadPoolTaskExecutor
的配置启动远程批处理分区。例如
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setThreadNamePrefix("default_task_executor_thread");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception {
Resource resource = this.resourceLoader
.getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
"workerStep", taskRepository, executor);
...
}
由于使用 ThreadPoolTaskExecutor 会留下一个活动线程,应用程序将不会终止,因此我们需要关闭上下文。为了正确关闭应用程序,我们需要将 spring.cloud.task.closecontextEnabled 属性设置为 true 。 |
10.2. 关于为 Kubernetes 平台开发批处理分区应用程序的注意事项
-
在 Kubernetes 平台上部署分区应用程序时,您必须使用 Spring Cloud Kubernetes Deployer 的以下依赖项
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-deployer-kubernetes</artifactId> </dependency>
-
任务应用程序及其分区的应用程序名称需要遵循以下正则表达式模式:
[a-z0-9]([-a-z0-9]*[a-z0-9])
。否则,会抛出异常。
11. 批处理信息消息
Spring Cloud Task 提供了批处理作业发出信息消息的能力。“Spring Batch 事件”部分详细介绍了此功能。
12. 批处理作业退出码
正如之前讨论的,Spring Cloud Task 应用程序支持记录任务执行的退出码。然而,在任务中运行 Spring Batch Job 的情况下,无论 Batch Job Execution 如何完成,使用默认的 Batch/Boot 行为时,任务的结果总是零。请记住,任务是一个 boot 应用程序,从任务返回的退出码与 boot 应用程序的退出码相同。要覆盖此行为并允许任务在批处理作业返回 FAILED
的 BatchStatus 时返回非零退出码,请将 spring.cloud.task.batch.fail-on-job-failure
设置为 true
。此时,退出码可以是 1(默认值),也可以基于指定的 ExitCodeGenerator
。
此功能使用一个新的 ApplicationRunner
,它替换了 Spring Boot 提供的那个。默认情况下,它配置了相同的顺序。但是,如果您想自定义 ApplicationRunner
运行的顺序,可以通过设置 spring.cloud.task.batch.applicationRunnerOrder
属性来设置其顺序。要让您的任务根据批处理作业执行的结果返回退出码,您需要编写自己的 CommandLineRunner
。
单步批处理作业 Starter
本节介绍如何使用 Spring Cloud Task 中包含的 starter 开发具有单个 Step
的 Spring Batch Job
。此 starter 允许您使用配置来定义 ItemReader
、ItemWriter
或一个完整的单步 Spring Batch Job
。有关 Spring Batch 及其功能的更多信息,请参阅Spring Batch 文档。
要获取 Maven 的 starter,请将以下内容添加到您的构建中
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-single-step-batch-job</artifactId>
<version>2.3.0</version>
</dependency>
要获取 Gradle 的 starter,请将以下内容添加到您的构建中
compile "org.springframework.cloud:spring-cloud-starter-single-step-batch-job:2.3.0"
13. 定义作业
您可以使用此 starter 定义最少的(例如 ItemReader
或 ItemWriter
)或最多的(例如完整的 Job
)。在本节中,我们定义了配置 Job
所需的属性。
13.1. 属性
首先,此 starter 提供了一组属性,允许您配置包含一个 Step 的 Job 的基本信息
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
作业的名称。 |
|
|
|
步骤的名称。 |
|
|
|
每事务处理的项数。 |
配置了上述属性后,您将拥有一个包含单个基于 chunk 的步骤的作业。此基于 chunk 的步骤将 Map<String, Object>
实例作为项读取、处理和写入。然而,此步骤尚无任何实际操作。您需要配置一个 ItemReader
、一个可选的 ItemProcessor
和一个 ItemWriter
来使其执行某些操作。要配置其中之一,您可以使用属性并配置已提供自动配置的选项之一,或者使用标准的 Spring 配置机制配置您自己的。
如果您自行配置,输入和输出类型必须与步骤中的其他类型匹配。此 starter 中的 ItemReader 实现和 ItemWriter 实现都使用 Map<String, Object> 作为输入和输出项。 |
14. ItemReader 实现的自动配置
此 starter 为四种不同的 ItemReader
实现提供了自动配置:AmqpItemReader
、FlatFileItemReader
、JdbcCursorItemReader
和 KafkaItemReader
。在本节中,我们将概述如何使用提供的自动配置来配置这些实现。
14.1. AmqpItemReader
您可以使用 AmqpItemReader
通过 AMQP 从队列或主题读取数据。此 ItemReader
实现的自动配置依赖于两组配置。第一组是 AmqpTemplate
的配置。您可以自行配置,也可以使用 Spring Boot 提供的自动配置。请参阅Spring Boot AMQP 文档。配置好 AmqpTemplate
后,您可以通过设置以下属性来启用支持其的批处理功能。
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
如果为 |
|
|
|
指示是否应注册 |
有关更多信息,请参阅AmqpItemReader
文档。
14.2. FlatFileItemReader
FlatFileItemReader
允许您从平面文件(例如 CSV 和其他文件格式)读取数据。要从文件读取,您可以通过普通的 Spring 配置(LineTokenizer
、RecordSeparatorPolicy
、FieldSetMapper
、LineMapper
或 SkippedLinesCallback
)自行提供一些组件。您也可以使用以下属性来配置读取器
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
确定是否应为重启保存状态。 |
|
|
|
在 |
|
|
|
从文件读取的最大项数。 |
|
|
0 |
已读取的项数。用于重启。 |
|
|
|
文件中指示注释行(要忽略的行)的字符串列表。 |
|
|
|
要读取的资源。 |
|
|
|
如果设置为 |
|
|
|
读取文件时使用的编码。 |
|
|
0 |
指示在文件开头要跳过的行数。 |
|
|
|
指示文件是否为分隔文件(CSV 和其他格式)。此属性或 |
|
|
|
如果读取分隔文件,指示用于解析的分隔符。 |
|
|
|
用于确定引用值的字符。 |
|
|
|
用于确定记录中哪些字段应包含在项中的索引列表。 |
|
|
|
指示文件的记录是否按列号解析。此属性或 |
|
|
|
用于解析固定宽度记录的列范围列表。请参阅Range 文档。 |
|
|
|
从记录解析出的每个字段的名称列表。这些名称是此 |
|
|
|
如果设置为 |
14.3. JdbcCursorItemReader
JdbcCursorItemReader
对关系数据库运行查询,并遍历结果游标(ResultSet
)以提供结果项。此自动配置允许您提供 PreparedStatementSetter
、RowMapper
或两者。您也可以使用以下属性来配置 JdbcCursorItemReader
。
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
确定是否应为重启保存状态。 |
|
|
|
在 |
|
|
|
从文件读取的最大项数。 |
|
|
0 |
已读取的项数。用于重启。 |
|
|
对驱动程序的提示,指示每次调用数据库系统时检索多少条记录。为了获得最佳性能,通常希望将其设置为与 chunk size 匹配。 |
|
|
|
从数据库读取的最大项数。 |
|
|
|
查询超时前的毫秒数。 |
|
|
|
|
确定读取器在处理时是否应忽略 SQL 警告。 |
|
|
|
指示在每次读取后是否应验证游标位置,以确认 |
|
|
|
指示驱动程序是否支持游标的绝对定位。 |
|
|
|
指示连接是否与其他处理共享(因此是事务的一部分)。 |
|
|
|
从中读取的 SQL 查询。 |
您还可以使用以下属性专门为读取器指定 JDBC DataSource:.JdbcCursorItemReader
属性
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
确定是否应启用 |
|
|
|
数据库的 |
|
|
|
数据库的登录用户名。 |
|
|
|
数据库的登录密码。 |
|
|
|
|
如果未指定 jdbccursoritemreader_datasource ,JDBCCursorItemReader 将使用默认的 DataSource 。 |
14.4. KafkaItemReader
从 Kafka topic 摄取数据分区非常有用,这正是 KafkaItemReader
可以做到的。要配置 KafkaItemReader
,需要两个配置项。首先,需要使用 Spring Boot 的 Kafka 自动配置来配置 Kafka(请参阅Spring Boot Kafka 文档)。配置好 Spring Boot 中的 Kafka 属性后,您可以通过设置以下属性来配置 KafkaItemReader
本身。
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
在 |
|
|
|
要从中读取的 topic 的名称。 |
|
|
|
要从中读取的分区索引列表。 |
|
|
30 |
|
|
|
|
确定是否应为重启保存状态。 |
14.5. 原生编译
单步批处理的优点是,在使用 JVM 时,它允许您在运行时动态选择要使用的 reader 和 writer beans。但是,在使用原生编译时,您必须在构建时而不是运行时确定 reader 和 writer。以下示例展示了如何做到这一点
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<id>process-aot</id>
<goals>
<goal>process-aot</goal>
</goals>
<configuration>
<jvmArguments>
-Dspring.batch.job.flatfileitemreader.name=fooReader
-Dspring.batch.job.flatfileitemwriter.name=fooWriter
</jvmArguments>
</configuration>
</execution>
</executions>
</plugin>
15. ItemProcessor 配置
如果 ApplicationContext
中存在 ItemProcessor
,单步批处理作业自动配置会接受它。如果找到类型正确(ItemProcessor<Map<String, Object>, Map<String, Object>>
)的 ItemProcessor,它将被自动注入到步骤中。
16. ItemWriter 实现的自动配置
此 starter 为与支持的 ItemReader
实现匹配的 ItemWriter
实现提供了自动配置:AmqpItemWriter
、FlatFileItemWriter
、JdbcItemWriter
和 KafkaItemWriter
。本节介绍如何使用自动配置来配置受支持的 ItemWriter
。
16.1. AmqpItemWriter
要写入 RabbitMQ 队列,您需要两组配置。首先,您需要一个 AmqpTemplate
。获取此对象的最简单方法是使用 Spring Boot 的 RabbitMQ 自动配置。请参阅Spring Boot AMQP 文档。
配置好 AmqpTemplate
后,您可以通过设置以下属性来配置 AmqpItemWriter
。
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
如果为 |
|
|
|
指示是否应注册 |
16.2. FlatFileItemWriter
要将文件作为步骤的输出写入,可以配置 FlatFileItemWriter
。自动配置接受已明确配置的组件(例如 LineAggregator
、FieldExtractor
、FlatFileHeaderCallback
或 FlatFileFooterCallback
)以及通过设置以下指定属性配置的组件。
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
要读取的资源。 |
|
|
|
指示输出文件是否为分隔文件。如果为 |
|
|
|
指示输出文件是否为格式化文件。如果为 |
|
|
|
用于为格式化文件生成输出的格式。格式化是使用 |
|
|
|
生成文件时要使用的 |
|
|
0 |
记录的最大长度。如果为 0,则大小无限制。 |
|
|
0 |
记录的最小长度。 |
|
|
|
用于分隔分隔文件中字段的 |
|
|
|
写入文件时使用的编码。 |
|
|
|
指示文件在刷新时是否应强制同步到磁盘。 |
|
|
|
从记录解析出的每个字段的名称列表。这些名称是此 |
|
|
|
指示如果找到输出文件,是否应向其追加内容。 |
|
|
|
用于分隔输出文件中行的 |
|
|
|
在 |
|
|
|
确定是否应为重启保存状态。 |
|
|
|
如果设置为 |
|
|
|
如果设置为 |
|
|
|
指示读取器是否是事务性队列(表示读取的项在失败时会返回到队列)。 |
16.3. JdbcBatchItemWriter
要将步骤的输出写入关系数据库,此 starter 提供了自动配置 JdbcBatchItemWriter
的能力。自动配置允许您通过设置以下属性来提供自己的 ItemPreparedStatementSetter
或 ItemSqlParameterSourceProvider
以及配置选项。
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
在 |
|
|
|
用于插入每个项的 |
|
|
|
是否验证每次插入至少更新一条记录。 |
您还可以使用以下属性专门为写入器指定 JDBC DataSource
:.JdbcBatchItemWriter
属性
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
确定是否应启用 |
|
|
|
数据库的 |
|
|
|
数据库的登录用户名。 |
|
|
|
数据库的登录密码。 |
|
|
|
|
如果未指定 jdbcbatchitemwriter_datasource ,JdbcBatchItemWriter 将使用默认的 DataSource 。 |
16.4. KafkaItemWriter
要将步骤输出写入 Kafka topic,您需要 KafkaItemWriter
。此 starter 利用两个地方的功能为 KafkaItemWriter
提供自动配置。第一个是 Spring Boot 的 Kafka 自动配置。(请参阅Spring Boot Kafka 文档。)其次,此 starter 允许您配置写入器的两个属性。
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
|
|
|
要写入的 Kafka topic。 |
|
|
|
传递给写入器的项是否全部作为删除事件发送到 topic。 |
有关 KafkaItemWriter
配置选项的更多信息,请参阅KafkaItemWiter
文档。
16.5. Spring AOT
将 Spring AOT 与 Single Step Batch Starter 结合使用时,您必须在编译时设置 reader 和 writer 名称属性(除非您为 reader 和/或 writer 创建 bean(s))。为此,您必须将您希望使用的 reader 和 writer 的名称作为参数或环境变量包含在 boot maven plugin 或 gradle plugin 中。例如,如果您希望在 Maven 中启用 FlatFileItemReader
和 FlatFileItemWriter
,它会是这样
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<id>process-aot</id>
<goals>
<goal>process-aot</goal>
</goals>
</execution>
</executions>
<configuration>
<arguments>
<argument>--spring.batch.job.flatfileitemreader.name=foobar</argument>
<argument>--spring.batch.job.flatfileitemwriter.name=fooWriter</argument>
</arguments>
</configuration>
</plugin>
Spring Cloud Stream 集成
任务本身可能很有用,但将任务集成到更大的生态系统中,可以使其在更复杂的处理和编排中发挥作用。本节介绍了 Spring Cloud Task 与 Spring Cloud Stream 的集成选项。
17. 从 Spring Cloud Stream 启动任务
您可以从 stream 启动任务。为此,创建一个 sink,监听包含 TaskLaunchRequest
作为 payload 的消息。TaskLaunchRequest
包含
-
uri
:要执行的任务 artifact 的 URI。 -
applicationName
:与任务关联的名称。如果未设置 applicationName,TaskLaunchRequest
将生成一个包含以下内容的任务名称:Task-<UUID>
。 -
commandLineArguments
:包含任务命令行参数的列表。 -
environmentProperties
:包含任务要使用的环境变量的 map。 -
deploymentProperties
:包含 deployer 用于部署任务的属性的 map。
如果 payload 类型不同,sink 会抛出异常。 |
例如,可以创建一个 stream,其中包含一个 processor,该 processor 接收来自 HTTP 源的数据并创建一个包含 TaskLaunchRequest
的 GenericMessage
,然后将消息发送到其输出通道。任务 sink 将从其输入通道接收该消息,然后启动任务。
要创建一个 taskSink,您只需要创建一个包含 EnableTaskLauncher
注解的 Spring Boot 应用程序,如下例所示
@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
public static void main(String[] args) {
SpringApplication.run(TaskSinkApplication.class, args);
}
}
Spring Cloud Task 项目的 samples 模块 包含了一个示例 Sink 和 Processor。要将这些示例安装到你的本地 maven 仓库,请在 spring-cloud-task-samples
目录中运行 maven 构建,并将 skipInstall
属性设置为 false
,如以下示例所示
mvn clean install
必须将 maven.remoteRepositories.springRepo.url 属性设置为 über-jar 所在的远程仓库位置。如果未设置,则没有远程仓库,因此它将仅依赖本地仓库。 |
17.1. Spring Cloud Data Flow
要在 Spring Cloud Data Flow 中创建流,必须首先注册我们创建的 Task Sink 应用程序。在以下示例中,我们使用 Spring Cloud Data Flow shell 注册 Processor 和 Sink 示例应用程序。
app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>
以下示例展示了如何使用 Spring Cloud Data Flow shell 创建流。
stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy
18. Spring Cloud Task 事件
Spring Cloud Task 提供了在通过 Spring Cloud Stream 通道运行任务时,通过 Spring Cloud Stream 通道发出事件的能力。任务监听器用于将 TaskExecution
发布到名为 task-events
的消息通道上。此功能会自动注入到任何类路径中包含 spring-cloud-stream
、spring-cloud-stream-<binder>
和已定义任务的任务中。
要禁用事件发出监听器,请将 spring.cloud.task.events.enabled 属性设置为 false 。 |
定义了适当的类路径后,以下任务将在 task-events
通道上将 TaskExecution
作为事件发出(在任务开始和结束时)。
@SpringBootApplication
public class TaskEventsApplication {
public static void main(String[] args) {
SpringApplication.run(TaskEventsApplication.class, args);
}
@Configuration
public static class TaskConfiguration {
@Bean
public ApplicationRunner applicationRunner() {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) {
System.out.println("The ApplicationRunner was executed");
}
};
}
}
}
类路径中还需要包含一个绑定器实现。 |
可以在 Spring Cloud Task 项目的 samples 模块中找到一个示例任务事件应用程序,点击这里。 |
18.1. 禁用特定任务事件
要禁用任务事件,可以将 spring.cloud.task.events.enabled
属性设置为 false
。
19. Spring Batch 事件
通过任务执行 Spring Batch 作业时,Spring Cloud Task 可以配置为根据 Spring Batch 中可用的 Spring Batch 监听器发出信息性消息。具体来说,当通过 Spring Cloud Task 运行时,以下 Spring Batch 监听器会自动配置到每个批处理作业中,并在相关的 Spring Cloud Stream 通道上发出消息。
-
JobExecutionListener
监听job-execution-events
-
StepExecutionListener
监听step-execution-events
-
ChunkListener
监听chunk-events
-
ItemReadListener
监听item-read-events
-
ItemProcessListener
监听item-process-events
-
ItemWriteListener
监听item-write-events
-
SkipListener
监听skip-events
当上下文中存在适当的 bean(一个 Job
和一个 TaskLifecycleListener
)时,这些监听器会自动配置到任何 AbstractJob
中。监听这些事件的配置与绑定到任何其他 Spring Cloud Stream 通道的方式相同。我们的任务(运行批处理作业的任务)充当 Source
,而监听应用程序充当 Processor
或 Sink
。
一个示例是有一个应用程序监听 job-execution-events
通道,以了解作业的启动和停止。要配置监听应用程序,您可以按如下方式将输入配置为 job-execution-events
。
spring.cloud.stream.bindings.input.destination=job-execution-events
类路径中还需要包含一个绑定器实现。 |
可以在 Spring Cloud Task 项目的 samples 模块中找到一个示例批处理事件应用程序,点击这里。 |
19.1. 将批处理事件发送到不同的通道
Spring Cloud Task 为批处理事件提供的一个选项是更改特定监听器可以发出消息的通道的能力。为此,请使用以下配置:spring.cloud.stream.bindings.<the channel>.destination=<new destination>
。例如,如果 StepExecutionListener
需要将其消息发出到名为 my-step-execution-events
的另一个通道,而不是默认的 step-execution-events
,您可以添加以下配置。
spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events
19.2. 禁用批处理事件
要禁用所有批处理事件的监听器功能,请使用以下配置
spring.cloud.task.batch.events.enabled=false
要禁用特定的批处理事件,请使用以下配置
spring.cloud.task.batch.events.<batch event listener>.enabled=false
:
以下列表显示了可以禁用的各个监听器
spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false
19.3. 批处理事件的发出顺序
默认情况下,批处理事件具有 Ordered.LOWEST_PRECEDENCE
。要更改此值(例如,更改为 5),请使用以下配置。
spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5
附录
20. 任务仓库 Schema
本附录提供了任务仓库中使用的数据库 schema 的 ERD。

20.1. 表信息
存储任务执行信息。
列名 | 必需 | 类型 | 字段长度 | 备注 |
---|---|---|---|---|
TASK_EXECUTION_ID |
是 |
BIGINT |
X |
Spring Cloud Task Framework 在应用程序启动时根据 |
START_TIME |
否 |
DATETIME(6) |
X |
Spring Cloud Task Framework 在应用程序启动时设置该值。 |
END_TIME |
否 |
DATETIME(6) |
X |
Spring Cloud Task Framework 在应用程序退出时设置该值。 |
TASK_NAME |
否 |
VARCHAR |
100 |
Spring Cloud Task Framework 在应用程序启动时会将其设置为 "Application",除非用户使用 |
EXIT_CODE |
否 |
INTEGER |
X |
遵循 Spring Boot 默认值,除非用户如这里讨论的那样覆盖。 |
EXIT_MESSAGE |
否 |
VARCHAR |
2500 |
用户定义,如这里讨论的那样。 |
ERROR_MESSAGE |
否 |
VARCHAR |
2500 |
Spring Cloud Task Framework 在应用程序退出时设置该值。 |
LAST_UPDATED |
是 |
TIMESTAMP |
X |
Spring Cloud Task Framework 在应用程序启动时设置该值。如果记录是在任务外部创建的,则必须在记录创建时填充该值。 |
EXTERNAL_EXECUTION_ID |
否 |
VARCHAR |
250 |
如果设置了 |
PARENT_TASK_EXECUTION_ID |
否 |
BIGINT |
X |
如果设置了 |
存储任务执行使用的参数
列名 | 必需 | 类型 | 字段长度 |
---|---|---|---|
TASK_EXECUTION_ID |
是 |
BIGINT |
X |
TASK_PARAM |
否 |
VARCHAR |
2500 |
用于将任务执行与批处理执行链接起来。
列名 | 必需 | 类型 | 字段长度 |
---|---|---|---|
TASK_EXECUTION_ID |
是 |
BIGINT |
X |
JOB_EXECUTION_ID |
是 |
BIGINT |
X |
用于这里讨论的 single-instance-enabled
功能。
列名 | 必需 | 类型 | 字段长度 | 备注 |
---|---|---|---|---|
LOCK_KEY |
是 |
CHAR |
36 |
此锁的 UUID |
REGION |
是 |
VARCHAR |
100 |
用户可以使用此字段建立一组锁。 |
CLIENT_ID |
是 |
CHAR |
36 |
包含要锁定应用程序名称的任务执行 ID。 |
CREATED_DATE |
是 |
DATETIME |
X |
条目创建日期 |
每种数据库类型设置表的 DDL 可以在这里找到。 |
20.2. SQL Server
默认情况下,Spring Cloud Task 使用序列表来确定 TASK_EXECUTION
表的 TASK_EXECUTION_ID
。然而,在使用 SQL Server 同时启动多个任务时,这可能导致 TASK_SEQ
表发生死锁。解决方法是删除 TASK_EXECUTION_SEQ
表并使用相同的名称创建一个序列。例如:
DROP TABLE TASK_SEQ;
CREATE SEQUENCE [DBO].[TASK_SEQ] AS BIGINT
START WITH 1
INCREMENT BY 1;
将 START WITH 设置为比您当前执行 ID 更高的值。 |
21. 构建此文档
本项目使用 Maven 生成此文档。要为您自己生成它,请运行以下命令:$ mvn clean install -DskipTests -P docs
。
22. 可观测性元数据
22.1. 可观测性 - 指标
下面您可以找到此项目声明的所有指标列表。
22.1.1. 活跃任务
围绕任务执行创建的指标。
指标名称 spring.cloud.task
(由约定类 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention
定义)。类型 timer
。
指标名称 spring.cloud.task.active
(由约定类 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention
定义)。类型 long task timer
。
启动 Observation 后添加的 KeyValue 可能不会出现在 *.active 指标中。 |
Micrometer 内部使用 nanoseconds 作为基本单位。然而,每个后端会确定实际的基本单位。(例如 Prometheus 使用 seconds) |
封闭类的完全限定名 org.springframework.cloud.task.listener.TaskExecutionObservation
。
所有标签必须以 spring.cloud.task 前缀开头! |
名称 |
描述 |
|
CF 云的 App ID。 |
|
CF 云的 App 名称。 |
|
CF 云的 App 版本。 |
|
CF 云的实例索引。 |
|
CF 云的组织名称。 |
|
CF 云的空间 ID。 |
|
CF 云的空间名称。 |
|
任务执行 ID。 |
|
任务退出码。 |
|
任务的外部执行 ID。 |
|
任务名称度量。 |
|
任务父执行 ID。 |
|
任务状态。可以是 success 或 failure。 |
22.1.2. 任务运行器观测
任务运行器执行时创建的观测。
指标名称 spring.cloud.task.runner
(由约定类 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention
定义)。类型 timer
。
指标名称 spring.cloud.task.runner.active
(由约定类 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention
定义)。类型 long task timer
。
启动 Observation 后添加的 KeyValue 可能不会出现在 *.active 指标中。 |
Micrometer 内部使用 nanoseconds 作为基本单位。然而,每个后端会确定实际的基本单位。(例如 Prometheus 使用 seconds) |
封闭类的完全限定名 org.springframework.cloud.task.configuration.observation.TaskDocumentedObservation
。
所有标签必须以 spring.cloud.task 前缀开头! |
名称 |
描述 |
|
由 Spring Cloud Task 执行的 bean 的名称。 |
22.2. 可观测性 - Spans
下面您可以找到此项目声明的所有 spans 列表。
22.2.1. 活跃任务 Span
围绕任务执行创建的指标。
Span 名称 spring.cloud.task
(由约定类 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention
定义)。
封闭类的完全限定名 org.springframework.cloud.task.listener.TaskExecutionObservation
。
所有标签必须以 spring.cloud.task 前缀开头! |
名称 |
描述 |
|
CF 云的 App ID。 |
|
CF 云的 App 名称。 |
|
CF 云的 App 版本。 |
|
CF 云的实例索引。 |
|
CF 云的组织名称。 |
|
CF 云的空间 ID。 |
|
CF 云的空间名称。 |
|
任务执行 ID。 |
|
任务退出码。 |
|
任务的外部执行 ID。 |
|
任务名称度量。 |
|
任务父执行 ID。 |
|
任务状态。可以是 success 或 failure。 |
22.2.2. 任务运行器观测 Span
任务运行器执行时创建的观测。
Span 名称 spring.cloud.task.runner
(由约定类 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention
定义)。
封闭类的完全限定名 org.springframework.cloud.task.configuration.observation.TaskDocumentedObservation
。
所有标签必须以 spring.cloud.task 前缀开头! |
名称 |
描述 |
|
由 Spring Cloud Task 执行的 bean 的名称。 |