Spring Cloud Stream 集成

任务本身可能很有用,但将任务集成到更大的生态系统中,可以使其对更复杂的处理和编排更加有用。本节介绍 Spring Cloud Task 与 Spring Cloud Stream 的集成选项。

从 Spring Cloud Stream 启动任务

您可以从流中启动任务。为此,您需要创建一个 sink,它监听包含 TaskLaunchRequest 作为其有效载荷的消息。TaskLaunchRequest 包含

  • uri: 要执行的任务 artifact 的 URI。

  • applicationName: 与任务关联的名称。如果未设置 applicationName,TaskLaunchRequest 会生成一个由以下部分组成的任务名称:Task-<UUID>

  • commandLineArguments: 一个包含任务命令行参数的列表。

  • environmentProperties: 一个包含任务要使用的环境变量的 Map。

  • deploymentProperties: 一个包含部署器用于部署任务的属性的 Map。

如果有效载荷是不同类型,sink 会抛出异常。

例如,可以创建一个流,其中包含一个处理器,该处理器从 HTTP 源获取数据,创建一个包含 TaskLaunchRequestGenericMessage,然后将消息发送到其输出通道。然后,任务 sink 会从其输入通道接收消息并启动任务。

要创建 taskSink,您只需创建一个包含 EnableTaskLauncher 注解的 Spring Boot 应用,如下例所示

@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
	public static void main(String[] args) {
		SpringApplication.run(TaskSinkApplication.class, args);
	}
}

Spring Cloud Task 项目的示例模块包含一个示例 Sink 和 Processor。要将这些示例安装到您的本地 Maven 仓库,请从 spring-cloud-task-samples 目录运行 Maven 构建,并将 skipInstall 属性设置为 false,如下例所示

mvn clean install

必须将 maven.remoteRepositories.springRepo.url 属性设置为 Spring Boot Uber-jar 所在的远程仓库位置。如果未设置,则没有远程仓库,因此仅依赖本地仓库。

Spring Cloud Data Flow

要在 Spring Cloud Data Flow 中创建流,您必须首先注册我们创建的 Task Sink Application。在以下示例中,我们使用 Spring Cloud Data Flow shell 注册 Processor 和 Sink 示例应用

app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>

以下示例展示如何使用 Spring Cloud Data Flow shell 创建流

stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy

Spring Cloud Task 事件

当任务通过 Spring Cloud Stream 通道运行时,Spring Cloud Task 提供了通过 Spring Cloud Stream 通道发出事件的能力。使用任务监听器将 TaskExecution 发布到名为 task-events 的消息通道上。此功能会自动注入到 classpath 中包含 spring-cloud-streamspring-cloud-stream-<binder> 以及定义任务的任何任务中。

要禁用事件发出监听器,请将 spring.cloud.task.events.enabled 属性设置为 false

定义适当的 classpath 后,以下任务会将 TaskExecution 作为事件发送到 task-events 通道(在任务开始和结束时)

@SpringBootApplication
public class TaskEventsApplication {

	public static void main(String[] args) {
		SpringApplication.run(TaskEventsApplication.class, args);
	}

	@Configuration
	public static class TaskConfiguration {

		@Bean
		public ApplicationRunner applicationRunner() {
			return new ApplicationRunner() {
				@Override
				public void run(ApplicationArguments args) {
					System.out.println("The ApplicationRunner was executed");
				}
			};
		}
	}
}
还需要在 classpath 中包含 Binder 实现。
可以在 Spring Cloud Task 项目的示例模块中找到示例任务事件应用,此处

禁用特定任务事件

要禁用任务事件,可以将 spring.cloud.task.events.enabled 属性设置为 false

Spring Batch 事件

通过任务执行 Spring Batch 作业时,可以配置 Spring Cloud Task 根据 Spring Batch 中可用的 Spring Batch 监听器发出信息性消息。具体来说,以下 Spring Batch 监听器会被自动配置到每个批处理作业中,并通过 Spring Cloud Task 运行时在关联的 Spring Cloud Stream 通道上发出消息

  • JobExecutionListener 监听 job-execution-events

  • StepExecutionListener 监听 step-execution-events

  • ChunkListener 监听 chunk-events

  • ItemReadListener 监听 item-read-events

  • ItemProcessListener 监听 item-process-events

  • ItemWriteListener 监听 item-write-events

  • SkipListener 监听 skip-events

当上下文中存在适当的 bean(一个 Job 和一个 TaskLifecycleListener)时,这些监听器会被自动配置到任何 AbstractJob 中。监听这些事件的配置方式与绑定到任何其他 Spring Cloud Stream 通道的方式相同。我们的任务(运行批处理作业的任务)充当 Source,而监听应用程序充当 ProcessorSink

例如,可以有一个应用监听 job-execution-events 通道以获取作业的开始和停止事件。要配置监听应用,您可以如下配置输入为 job-execution-events

spring.cloud.stream.bindings.input.destination=job-execution-events

还需要在 classpath 中包含 Binder 实现。
可以在 Spring Cloud Task 项目的示例模块中找到示例批处理事件应用,此处

将批处理事件发送到不同通道

Spring Cloud Task 为批处理事件提供的一个选项是更改特定监听器发出消息的通道的能力。为此,请使用以下配置:spring.cloud.stream.bindings.<the channel>.destination=<new destination>。例如,如果 StepExecutionListener 需要将消息发送到名为 my-step-execution-events 的另一个通道,而不是默认的 step-execution-events,您可以添加以下配置

spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events

禁用批处理事件

要禁用所有批处理事件的监听器功能,请使用以下配置

spring.cloud.task.batch.events.enabled=false

要禁用特定批处理事件,请使用以下配置

spring.cloud.task.batch.events.<batch event listener>.enabled=false:

以下列表显示您可以禁用的单个监听器

spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false

批处理事件的发出顺序

默认情况下,批处理事件的排序为 Ordered.LOWEST_PRECEDENCE。要更改此值(例如,更改为 5),请使用以下配置

spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5