Spring Cloud Stream 集成
任务本身可能很有用,但将任务集成到更大的生态系统中可以使其用于更复杂的处理和编排。本节涵盖 Spring Cloud Task 与 Spring Cloud Stream 的集成选项。
Spring Cloud Task 事件
Spring Cloud Task 提供了在任务通过 Spring Cloud Stream 通道运行时,通过 Spring Cloud Stream 通道发出事件的能力。任务监听器用于在名为 task-events 的消息通道上发布 TaskExecution。此功能会自动装配到任何具有 spring-cloud-stream、spring-cloud-stream-<binder> 并在其类路径上定义了任务的任务中。
要禁用事件发射监听器,请将 spring.cloud.task.events.enabled 属性设置为 false。 |
通过定义适当的类路径,以下任务会在 task-events 通道(在任务开始和结束时)发出 TaskExecution 作为事件
@SpringBootApplication
public class TaskEventsApplication {
public static void main(String[] args) {
SpringApplication.run(TaskEventsApplication.class, args);
}
@Configuration
public static class TaskConfiguration {
@Bean
public ApplicationRunner applicationRunner() {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) {
System.out.println("The ApplicationRunner was executed");
}
};
}
}
}
| 还需要在类路径中包含一个绑定器实现。 |
| Spring Cloud Task 项目的 samples 模块中提供了任务事件示例应用程序,点击此处查看。 |
Spring Batch 事件
当通过任务执行 Spring Batch 作业时,Spring Cloud Task 可以配置为根据 Spring Batch 中可用的 Spring Batch 监听器发出信息性消息。具体来说,当通过 Spring Cloud Task 运行时,以下 Spring Batch 监听器会自动配置到每个批处理作业中,并在关联的 Spring Cloud Stream 通道上发出消息
-
JobExecutionListener监听job-execution-events -
StepExecutionListener监听step-execution-events -
ChunkListener监听chunk-events -
ItemReadListener监听item-read-events -
ItemProcessListener监听item-process-events -
ItemWriteListener监听item-write-events -
SkipListener监听skip-events
当上下文中存在适当的 Bean(一个 Job 和一个 TaskLifecycleListener)时,这些监听器会自动配置到任何 AbstractJob 中。监听这些事件的配置方式与绑定到任何其他 Spring Cloud Stream 通道的方式相同。我们的任务(运行批处理作业的任务)充当 Source,而监听应用程序充当 Processor 或 Sink。
一个示例可以是有一个应用程序监听 job-execution-events 通道以获取作业的开始和停止。要配置监听应用程序,您可以将输入配置为 job-execution-events,如下所示
spring.cloud.stream.bindings.input.destination=job-execution-events
| 还需要在类路径中包含一个绑定器实现。 |
| Spring Cloud Task 项目的 samples 模块中提供了批处理事件示例应用程序,点击此处查看。 |
将批处理事件发送到不同的通道
Spring Cloud Task 为批处理事件提供的选项之一是能够更改特定监听器可以发出消息的通道。为此,请使用以下配置:spring.cloud.stream.bindings.<the channel>.destination=<new destination>。例如,如果 StepExecutionListener 需要将其消息发出到名为 my-step-execution-events 的另一个通道,而不是默认的 step-execution-events,您可以添加以下配置
spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events
禁用批处理事件
要禁用所有批处理事件的监听器功能,请使用以下配置
spring.cloud.task.batch.events.enabled=false
要禁用特定的批处理事件,请使用以下配置
spring.cloud.task.batch.events.<batch event listener>.enabled=false:
以下列表显示了您可以禁用的各个监听器
spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false
批处理事件的发出顺序
默认情况下,批处理事件具有 Ordered.LOWEST_PRECEDENCE。要更改此值(例如,更改为 5),请使用以下配置
spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5