Spring Cloud Stream 集成
任务本身可能很有用,但将任务集成到更大的生态系统中,可以使其对更复杂的处理和编排更加有用。本节介绍 Spring Cloud Task 与 Spring Cloud Stream 的集成选项。
从 Spring Cloud Stream 启动任务
您可以从流中启动任务。为此,您需要创建一个 sink,它监听包含 TaskLaunchRequest
作为其有效载荷的消息。TaskLaunchRequest
包含
-
uri
: 要执行的任务 artifact 的 URI。 -
applicationName
: 与任务关联的名称。如果未设置 applicationName,TaskLaunchRequest
会生成一个由以下部分组成的任务名称:Task-<UUID>
。 -
commandLineArguments
: 一个包含任务命令行参数的列表。 -
environmentProperties
: 一个包含任务要使用的环境变量的 Map。 -
deploymentProperties
: 一个包含部署器用于部署任务的属性的 Map。
如果有效载荷是不同类型,sink 会抛出异常。 |
例如,可以创建一个流,其中包含一个处理器,该处理器从 HTTP 源获取数据,创建一个包含 TaskLaunchRequest
的 GenericMessage
,然后将消息发送到其输出通道。然后,任务 sink 会从其输入通道接收消息并启动任务。
要创建 taskSink,您只需创建一个包含 EnableTaskLauncher
注解的 Spring Boot 应用,如下例所示
@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
public static void main(String[] args) {
SpringApplication.run(TaskSinkApplication.class, args);
}
}
Spring Cloud Task 项目的示例模块包含一个示例 Sink 和 Processor。要将这些示例安装到您的本地 Maven 仓库,请从 spring-cloud-task-samples
目录运行 Maven 构建,并将 skipInstall
属性设置为 false
,如下例所示
mvn clean install
必须将 maven.remoteRepositories.springRepo.url 属性设置为 Spring Boot Uber-jar 所在的远程仓库位置。如果未设置,则没有远程仓库,因此仅依赖本地仓库。 |
Spring Cloud Data Flow
要在 Spring Cloud Data Flow 中创建流,您必须首先注册我们创建的 Task Sink Application。在以下示例中,我们使用 Spring Cloud Data Flow shell 注册 Processor 和 Sink 示例应用
app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>
以下示例展示如何使用 Spring Cloud Data Flow shell 创建流
stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy
Spring Cloud Task 事件
当任务通过 Spring Cloud Stream 通道运行时,Spring Cloud Task 提供了通过 Spring Cloud Stream 通道发出事件的能力。使用任务监听器将 TaskExecution
发布到名为 task-events
的消息通道上。此功能会自动注入到 classpath 中包含 spring-cloud-stream
、spring-cloud-stream-<binder>
以及定义任务的任何任务中。
要禁用事件发出监听器,请将 spring.cloud.task.events.enabled 属性设置为 false 。 |
定义适当的 classpath 后,以下任务会将 TaskExecution
作为事件发送到 task-events
通道(在任务开始和结束时)
@SpringBootApplication
public class TaskEventsApplication {
public static void main(String[] args) {
SpringApplication.run(TaskEventsApplication.class, args);
}
@Configuration
public static class TaskConfiguration {
@Bean
public ApplicationRunner applicationRunner() {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) {
System.out.println("The ApplicationRunner was executed");
}
};
}
}
}
还需要在 classpath 中包含 Binder 实现。 |
可以在 Spring Cloud Task 项目的示例模块中找到示例任务事件应用,此处。 |
Spring Batch 事件
通过任务执行 Spring Batch 作业时,可以配置 Spring Cloud Task 根据 Spring Batch 中可用的 Spring Batch 监听器发出信息性消息。具体来说,以下 Spring Batch 监听器会被自动配置到每个批处理作业中,并通过 Spring Cloud Task 运行时在关联的 Spring Cloud Stream 通道上发出消息
-
JobExecutionListener
监听job-execution-events
-
StepExecutionListener
监听step-execution-events
-
ChunkListener
监听chunk-events
-
ItemReadListener
监听item-read-events
-
ItemProcessListener
监听item-process-events
-
ItemWriteListener
监听item-write-events
-
SkipListener
监听skip-events
当上下文中存在适当的 bean(一个 Job
和一个 TaskLifecycleListener
)时,这些监听器会被自动配置到任何 AbstractJob
中。监听这些事件的配置方式与绑定到任何其他 Spring Cloud Stream 通道的方式相同。我们的任务(运行批处理作业的任务)充当 Source
,而监听应用程序充当 Processor
或 Sink
。
例如,可以有一个应用监听 job-execution-events
通道以获取作业的开始和停止事件。要配置监听应用,您可以如下配置输入为 job-execution-events
spring.cloud.stream.bindings.input.destination=job-execution-events
还需要在 classpath 中包含 Binder 实现。 |
可以在 Spring Cloud Task 项目的示例模块中找到示例批处理事件应用,此处。 |
将批处理事件发送到不同通道
Spring Cloud Task 为批处理事件提供的一个选项是更改特定监听器发出消息的通道的能力。为此,请使用以下配置:spring.cloud.stream.bindings.<the channel>.destination=<new destination>
。例如,如果 StepExecutionListener
需要将消息发送到名为 my-step-execution-events
的另一个通道,而不是默认的 step-execution-events
,您可以添加以下配置
spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events
禁用批处理事件
要禁用所有批处理事件的监听器功能,请使用以下配置
spring.cloud.task.batch.events.enabled=false
要禁用特定批处理事件,请使用以下配置
spring.cloud.task.batch.events.<batch event listener>.enabled=false
:
以下列表显示您可以禁用的单个监听器
spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false
批处理事件的发出顺序
默认情况下,批处理事件的排序为 Ordered.LOWEST_PRECEDENCE
。要更改此值(例如,更改为 5),请使用以下配置
spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5