任务执行与调度

如果在上下文中没有 Executor bean,Spring Boot 会自动配置一个 AsyncTaskExecutor。当启用虚拟线程时(使用 Java 21+ 且 spring.threads.virtual.enabled 设置为 true),这将是一个使用虚拟线程的 SimpleAsyncTaskExecutor。否则,它将是一个带有合理默认值的 ThreadPoolTaskExecutor

除非定义了自定义的 Executor bean,否则自动配置的 AsyncTaskExecutor 将用于以下集成:

  • 使用 @EnableAsync 执行异步任务,除非定义了 AsyncConfigurer 类型的 bean。

  • 在 Spring for GraphQL 中异步处理控制器方法返回的 Callable 值。

  • Spring MVC 中的异步请求处理。

  • Spring WebFlux 中的阻塞执行支持。

  • 用于 Spring WebSocket 中的入站和出站消息通道。

  • 基于 JPA 仓库引导模式的 JPA 引导执行器。

  • 用于 ApplicationContext 中 bean 的后台初始化的引导执行器。

虽然这种方法在大多数情况下都有效,但 Spring Boot 允许您覆盖自动配置的 AsyncTaskExecutor。默认情况下,当注册了自定义的 Executor bean 时,自动配置的 AsyncTaskExecutor 会回退,而自定义的 Executor 将用于常规任务执行(通过 @EnableAsync)。

然而,Spring MVC、Spring WebFlux 和 Spring GraphQL 都需要一个名为 applicationTaskExecutor 的 bean。对于 Spring MVC 和 Spring WebFlux,此 bean 必须是 AsyncTaskExecutor 类型,而 Spring GraphQL 不强制执行此类型要求。

如果存在此类型的单个 bean 或定义了名为 applicationTaskExecutor 的 bean,Spring WebSocket 和 JPA 将使用 AsyncTaskExecutor

最后,ApplicationContext 的引导执行器使用名为 applicationTaskExecutor 的 bean,除非定义了名为 bootstrapExecutor 的 bean。

以下代码片段演示了如何注册自定义的 AsyncTaskExecutor 以与 Spring MVC、Spring WebFlux、Spring GraphQL、Spring WebSocket、JPA 以及 bean 的后台初始化一起使用。

  • Java

  • Kotlin

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

@Configuration(proxyBeanMethods = false)
public class MyTaskExecutorConfiguration {

	@Bean("applicationTaskExecutor")
	SimpleAsyncTaskExecutor applicationTaskExecutor() {
		return new SimpleAsyncTaskExecutor("app-");
	}

}
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.core.task.SimpleAsyncTaskExecutor

@Configuration(proxyBeanMethods = false)
class MyTaskExecutorConfiguration {

	@Bean("applicationTaskExecutor")
	fun applicationTaskExecutor(): SimpleAsyncTaskExecutor {
		return SimpleAsyncTaskExecutor("app-")
	}

}

如果应用程序上下文中没有 @Primary bean 或名为 taskExecutor 且类型为 ExecutorAsyncConfigurer 的 bean,则 applicationTaskExecutor bean 也将用于常规任务执行。

如果既未定义自动配置的 AsyncTaskExecutor 也未定义 applicationTaskExecutor bean,则应用程序将默认使用名为 taskExecutor 的 bean 进行常规任务执行(@EnableAsync),遵循 Spring Framework 的行为。然而,此 bean 将不会用于 Spring MVC、Spring WebFlux、Spring GraphQL。但是,如果 bean 的类型是 AsyncTaskExecutor,它可能用于 Spring WebSocket 或 JPA。

如果您的应用程序需要多个 Executor bean 用于不同的集成,例如一个用于使用 @EnableAsync 的常规任务执行,另一个用于 Spring MVC、Spring WebFlux、Spring WebSocket 和 JPA,您可以按如下方式配置它们。

  • Java

  • Kotlin

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration(proxyBeanMethods = false)
public class MyTaskExecutorConfiguration {

	@Bean("applicationTaskExecutor")
	SimpleAsyncTaskExecutor applicationTaskExecutor() {
		return new SimpleAsyncTaskExecutor("app-");
	}

	@Bean("taskExecutor")
	ThreadPoolTaskExecutor taskExecutor() {
		ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
		threadPoolTaskExecutor.setThreadNamePrefix("async-");
		return threadPoolTaskExecutor;
	}

}
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.core.task.SimpleAsyncTaskExecutor
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor


@Configuration(proxyBeanMethods = false)
class MyTaskExecutorConfiguration {

	@Bean("applicationTaskExecutor")
	fun applicationTaskExecutor(): SimpleAsyncTaskExecutor {
		return SimpleAsyncTaskExecutor("app-")
	}

	@Bean("taskExecutor")
	fun taskExecutor(): ThreadPoolTaskExecutor {
		val threadPoolTaskExecutor = ThreadPoolTaskExecutor()
		threadPoolTaskExecutor.setThreadNamePrefix("async-")
		return threadPoolTaskExecutor
	}

}

自动配置的 ThreadPoolTaskExecutorBuilderSimpleAsyncTaskExecutorBuilder 允许您轻松创建 AsyncTaskExecutor 类型的实例,以复制自动配置的默认行为。

  • Java

  • Kotlin

import org.springframework.boot.task.SimpleAsyncTaskExecutorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

@Configuration(proxyBeanMethods = false)
public class MyTaskExecutorConfiguration {

	@Bean
	SimpleAsyncTaskExecutor taskExecutor(SimpleAsyncTaskExecutorBuilder builder) {
		return builder.build();
	}

}
import org.springframework.boot.task.SimpleAsyncTaskExecutorBuilder
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.core.task.SimpleAsyncTaskExecutor

@Configuration(proxyBeanMethods = false)
class MyTaskExecutorConfiguration {

	@Bean
	fun taskExecutor(builder: SimpleAsyncTaskExecutorBuilder): SimpleAsyncTaskExecutor {
		return builder.build()
	}

}

如果名为 taskExecutor 的 bean 不是一个选项,您可以将您的 bean 标记为 @Primary 或定义一个 AsyncConfigurer bean 来指定负责使用 @EnableAsync 处理常规任务执行的 Executor。以下示例演示了如何实现这一点。

  • Java

  • Kotlin

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;

@Configuration(proxyBeanMethods = false)
public class MyTaskExecutorConfiguration {

	@Bean
	AsyncConfigurer asyncConfigurer(ExecutorService executorService) {
		return new AsyncConfigurer() {

			@Override
			public Executor getAsyncExecutor() {
				return executorService;
			}

		};
	}

	@Bean
	ExecutorService executorService() {
		return Executors.newCachedThreadPool();
	}

}
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.scheduling.annotation.AsyncConfigurer
import java.util.concurrent.Executor
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

@Configuration(proxyBeanMethods = false)
class MyTaskExecutorConfiguration {

	@Bean
	fun asyncConfigurer(executorService: ExecutorService): AsyncConfigurer {
		return object : AsyncConfigurer {
			override fun getAsyncExecutor(): Executor {
				return executorService
			}
		}
	}

	@Bean
	fun executorService(): ExecutorService {
		return Executors.newCachedThreadPool()
	}

}

要在保留自动配置的 AsyncTaskExecutor 的同时注册自定义的 Executor,您可以创建一个自定义的 Executor bean,并在其 @Bean 注解中设置 defaultCandidate=false 属性,如下例所示:

  • Java

  • Kotlin

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class MyTaskExecutorConfiguration {

	@Bean(defaultCandidate = false)
	@Qualifier("scheduledExecutorService")
	ScheduledExecutorService scheduledExecutorService() {
		return Executors.newSingleThreadScheduledExecutor();
	}

}
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService

@Configuration(proxyBeanMethods = false)
class MyTaskExecutorConfiguration {

	@Bean(defaultCandidate = false)
	@Qualifier("scheduledExecutorService")
	fun scheduledExecutorService(): ScheduledExecutorService {
		return Executors.newSingleThreadScheduledExecutor()
	}

}

在这种情况下,您将能够将自定义的 Executor 自动装配到其他组件中,同时保留自动配置的 AsyncTaskExecutor。但是,请记住在 @Autowired 旁边使用 @Qualifier 注解。

如果这对您来说不可能,您可以要求 Spring Boot 无论如何都自动配置一个 AsyncTaskExecutor,如下所示:

  • 属性

  • YAML

spring.task.execution.mode=force
spring:
  task:
    execution:
      mode: force

自动配置的 AsyncTaskExecutor 将自动用于所有集成,即使注册了自定义的 Executor bean,包括那些标记为 @Primary 的。这些集成包括:

  • 异步任务执行(@EnableAsync),除非存在 AsyncConfigurer bean。

  • Spring for GraphQL 异步处理控制器方法返回的 Callable 值。

  • Spring MVC 的异步请求处理。

  • Spring WebFlux 的阻塞执行支持。

  • 用于 Spring WebSocket 中的入站和出站消息通道。

  • 基于 JPA 仓库引导模式的 JPA 引导执行器。

  • 用于 ApplicationContext 中 bean 的后台初始化的引导执行器,除非定义了名为 bootstrapExecutor 的 bean。

根据您的目标安排,您可以将 spring.task.execution.mode 设置为 force 来自动配置 applicationTaskExecutor,将您的 Executor 更改为 AsyncTaskExecutor,或者同时定义一个 AsyncTaskExecutor 和一个封装了您的自定义 ExecutorAsyncConfigurer

当启用 force 模式时,即使存在 @Primary bean 或名为 taskExecutor 且类型为 Executor 的 bean,applicationTaskExecutor 也将配置为使用 @EnableAsync 进行常规任务执行。覆盖常规任务的 Executor 的唯一方法是注册一个 AsyncConfigurer bean。

当自动配置 ThreadPoolTaskExecutor 时,线程池使用 8 个核心线程,这些线程可以根据负载增长和收缩。这些默认设置可以使用 spring.task.execution 命名空间进行微调,如下例所示:

  • 属性

  • YAML

spring.task.execution.pool.max-size=16
spring.task.execution.pool.queue-capacity=100
spring.task.execution.pool.keep-alive=10s
spring:
  task:
    execution:
      pool:
        max-size: 16
        queue-capacity: 100
        keep-alive: "10s"

这会将线程池更改为使用有界队列,因此当队列满(100 个任务)时,线程池会增加到最大 16 个线程。池的收缩更具侵略性,因为线程在空闲 10 秒后(而不是默认的 60 秒)就会被回收。

如果调度器需要与计划任务执行关联(例如使用 @EnableScheduling),也可以自动配置调度器。

如果启用虚拟线程(使用 Java 21+ 且 spring.threads.virtual.enabled 设置为 true),这将是一个使用虚拟线程的 SimpleAsyncTaskScheduler。此 SimpleAsyncTaskScheduler 将忽略任何与池相关的属性。

如果未启用虚拟线程,它将是一个带有合理默认值的 ThreadPoolTaskSchedulerThreadPoolTaskScheduler 默认使用一个线程,其设置可以使用 spring.task.scheduling 命名空间进行微调,如下例所示:

  • 属性

  • YAML

spring.task.scheduling.thread-name-prefix=scheduling-
spring.task.scheduling.pool.size=2
spring:
  task:
    scheduling:
      thread-name-prefix: "scheduling-"
      pool:
        size: 2

如果需要创建自定义执行器或调度器,上下文中将提供一个 ThreadPoolTaskExecutorBuilder bean、一个 SimpleAsyncTaskExecutorBuilder bean、一个 ThreadPoolTaskSchedulerBuilder bean 和一个 SimpleAsyncTaskSchedulerBuilder。如果启用了虚拟线程(使用 Java 21+ 且 spring.threads.virtual.enabled 设置为 true),则 SimpleAsyncTaskExecutorBuilderSimpleAsyncTaskSchedulerBuilder bean 将自动配置为使用虚拟线程。

© . This site is unofficial and not affiliated with VMware.