异步请求
DeferredResult
一旦在 Servlet 容器中启用了异步请求处理功能,控制器方法就可以用 DeferredResult 包装任何支持的控制器方法返回值,如下例所示
-
Java
-
Kotlin
@GetMapping("/quotes")
@ResponseBody
public DeferredResult<String> quotes() {
DeferredResult<String> deferredResult = new DeferredResult<>();
// Save the deferredResult somewhere..
return deferredResult;
}
// From some other thread...
deferredResult.setResult(result);
@GetMapping("/quotes")
@ResponseBody
fun quotes(): DeferredResult<String> {
val deferredResult = DeferredResult<String>()
// Save the deferredResult somewhere..
return deferredResult
}
// From some other thread...
deferredResult.setResult(result)
控制器可以从不同的线程异步产生返回值,例如,响应外部事件 (JMS 消息)、计划任务或其他事件。
Callable
控制器可以用 java.util.concurrent.Callable 包装任何支持的返回值,如下例所示
-
Java
-
Kotlin
@PostMapping
public Callable<String> processUpload(final MultipartFile file) {
return () -> "someView";
}
@PostMapping
fun processUpload(file: MultipartFile) = Callable<String> {
// ...
"someView"
}
然后可以通过配置的 AsyncTaskExecutor 运行给定任务来获取返回值。
处理流程
以下是 Servlet 异步请求处理的非常简洁的概览
-
通过调用
request.startAsync()可以将ServletRequest置于异步模式。这样做主要的效果是 Servlet (以及任何过滤器) 可以退出,但响应保持打开状态以允许稍后完成处理。 -
调用
request.startAsync()会返回AsyncContext,您可以使用它来进一步控制异步处理。例如,它提供了dispatch方法,类似于 Servlet API 中的 forward,不同之处在于它允许应用程序在 Servlet 容器线程上恢复请求处理。 -
ServletRequest提供了对当前DispatcherType的访问,您可以使用它来区分处理初始请求、异步分派、forward 和其他分派类型。
DeferredResult 处理流程如下
-
控制器返回一个
DeferredResult并将其保存在某个内存队列或列表中,以便可以访问它。 -
Spring MVC 调用
request.startAsync()。 -
同时,
DispatcherServlet和所有配置的过滤器退出请求处理线程,但响应保持打开状态。 -
应用程序从某个线程设置
DeferredResult,然后 Spring MVC 将请求分派回 Servlet 容器。 -
DispatcherServlet再次被调用,处理流程继续进行,使用异步产生的返回值。
Callable 处理流程如下
-
控制器返回一个
Callable。 -
Spring MVC 调用
request.startAsync()并将Callable提交给AsyncTaskExecutor以在单独的线程中进行处理。 -
同时,
DispatcherServlet和所有过滤器退出 Servlet 容器线程,但响应保持打开状态。 -
最终
Callable产生结果,Spring MVC 将请求分派回 Servlet 容器以完成处理。 -
DispatcherServlet再次被调用,处理流程继续进行,使用从Callable异步产生的返回值。
有关更多背景和上下文,您还可以阅读在 Spring MVC 3.2 中引入异步请求处理支持的博客文章。
异常处理
使用 DeferredResult 时,您可以选择调用 setResult 或 setErrorResult 并带上异常。在两种情况下,Spring MVC 都会将请求分派回 Servlet 容器以完成处理。然后,它被视为控制器方法返回了给定值,或者视为产生了给定异常。异常会通过常规的异常处理机制(例如,调用 @ExceptionHandler 方法)进行处理。
使用 Callable 时,会发生类似的处理逻辑,主要区别在于结果是从 Callable 返回,或者由它抛出异常。
拦截
HandlerInterceptor 实例可以是 AsyncHandlerInterceptor 类型,以在启动异步处理的初始请求上接收 afterConcurrentHandlingStarted 回调(而不是 postHandle 和 afterCompletion)。
HandlerInterceptor 实现还可以注册一个 CallableProcessingInterceptor 或 DeferredResultProcessingInterceptor,以更深入地集成到异步请求的生命周期中(例如,处理超时事件)。有关更多详细信息,请参见AsyncHandlerInterceptor 的 javadoc。
DeferredResult 提供了 onTimeout(Runnable) 和 onCompletion(Runnable) 回调。有关更多详细信息,请参见DeferredResult 的 javadoc。Callable 可以替换为 WebAsyncTask,后者暴露了超时和完成回调的附加方法。
异步 Spring MVC 与 WebFlux 对比
Servlet API 最初设计用于在 Filter-Servlet 链中进行单次传递。异步请求处理允许应用程序退出 Filter-Servlet 链但保留响应处于打开状态以供进一步处理。Spring MVC 的异步支持就是围绕这一机制构建的。当控制器返回 DeferredResult 时,Filter-Servlet 链被退出,Servlet 容器线程被释放。稍后,当 DeferredResult 被设置时,会进行一次 ASYNC 分派(到相同的 URL),在此期间控制器再次被映射,但不是调用它,而是使用 DeferredResult 值(就像控制器返回了它一样)来恢复处理。
相比之下,Spring WebFlux 既不是基于 Servlet API 构建的,也不需要这样的异步请求处理功能,因为它从设计上就是异步的。异步处理内置于所有框架契约中,并在请求处理的所有阶段都得到内在支持。
从编程模型的角度来看,Spring MVC 和 Spring WebFlux 都支持将异步和响应式类型作为控制器方法的返回值。Spring MVC 甚至支持流式传输,包括响应式背压。然而,对响应的单次写入仍然是阻塞的(并在单独的线程上执行),这与 WebFlux 不同,WebFlux 依赖非阻塞 I/O,并且每次写入都不需要额外的线程。
另一个根本区别是,Spring MVC 不支持在控制器方法参数中使用异步或响应式类型(例如,@RequestBody、@RequestPart 等),也没有明确支持将异步和响应式类型作为模型属性。Spring WebFlux 支持所有这些。
最后,从配置的角度来看,必须在Servlet 容器级别启用异步请求处理功能。
HTTP 流式传输
您可以使用 DeferredResult 和 Callable 处理单个异步返回值。如果您想产生多个异步值并将其写入响应,该怎么办?本节描述了如何做到这一点。
对象
您可以使用 ResponseBodyEmitter 返回值产生对象流,其中每个对象都使用HttpMessageConverter 进行序列化并写入响应,如下例所示
-
Java
-
Kotlin
@GetMapping("/events")
public ResponseBodyEmitter handle() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
// Save the emitter somewhere..
return emitter;
}
// In some other thread
emitter.send("Hello once");
// and again later on
emitter.send("Hello again");
// and done at some point
emitter.complete();
@GetMapping("/events")
fun handle() = ResponseBodyEmitter().apply {
// Save the emitter somewhere..
}
// In some other thread
emitter.send("Hello once")
// and again later on
emitter.send("Hello again")
// and done at some point
emitter.complete()
您还可以将 ResponseBodyEmitter 用作 ResponseEntity 中的 body,允许您自定义响应的状态和头部。
当 emitter 抛出 IOException(例如,远程客户端断开连接)时,应用程序无需负责清理连接,也不应调用 emitter.complete 或 emitter.completeWithError。相反,servlet 容器会自动启动 AsyncListener 错误通知,Spring MVC 在此通知中调用 completeWithError。这个调用进而向应用程序执行最后一次 ASYNC 分派,在此期间 Spring MVC 调用配置的异常解析器并完成请求。
SSE
SseEmitter(ResponseBodyEmitter 的子类)提供了对Server-Sent Events 的支持,其中从服务器发送的事件按照 W3C SSE 规范进行格式化。要从控制器生成 SSE 流,返回 SseEmitter,如下例所示
-
Java
-
Kotlin
@GetMapping(path="/events", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter handle() {
SseEmitter emitter = new SseEmitter();
// Save the emitter somewhere..
return emitter;
}
// In some other thread
emitter.send("Hello once");
// and again later on
emitter.send("Hello again");
// and done at some point
emitter.complete();
@GetMapping("/events", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun handle() = SseEmitter().apply {
// Save the emitter somewhere..
}
// In some other thread
emitter.send("Hello once")
// and again later on
emitter.send("Hello again")
// and done at some point
emitter.complete()
虽然 SSE 是流式传输到浏览器的主要选项,但请注意 Internet Explorer 不支持 Server-Sent Events。考虑使用 Spring 的WebSocket 消息以及SockJS 回退传输(包括 SSE),这些传输目标广泛的浏览器。
另请参阅上一节关于异常处理的说明。
原始数据
有时,跳过消息转换并直接流式传输到响应的 OutputStream 非常有用(例如,用于文件下载)。您可以使用 StreamingResponseBody 返回值类型来实现此目的,如下例所示
-
Java
-
Kotlin
@GetMapping("/download")
public StreamingResponseBody handle() {
return new StreamingResponseBody() {
@Override
public void writeTo(OutputStream outputStream) throws IOException {
// write...
}
};
}
@GetMapping("/download")
fun handle() = StreamingResponseBody {
// write...
}
您可以使用 StreamingResponseBody 作为 ResponseEntity 中的 body,以自定义响应的状态和头部。
响应式类型
Spring MVC 支持在控制器中使用响应式客户端库(另请阅读 WebFlux 部分中的响应式库)。这包括来自 spring-webflux 的 WebClient 以及其他库,例如 Spring Data 响应式数据仓库。在这种情况下,能够从控制器方法返回响应式类型非常方便。
响应式返回值的处理方式如下
-
单值 promise 会被适配,类似于使用
DeferredResult。示例包括Mono(Reactor) 或Single(RxJava)。 -
具有流式媒体类型(例如
application/x-ndjson或text/event-stream)的多值流会被适配,类似于使用ResponseBodyEmitter或SseEmitter。示例包括Flux(Reactor) 或Observable(RxJava)。应用程序还可以返回Flux<ServerSentEvent>或Observable<ServerSentEvent>。 -
具有任何其他媒体类型(例如
application/json)的多值流会被适配,类似于使用DeferredResult<List<?>>。
Spring MVC 通过来自 spring-core 的 ReactiveAdapterRegistry 支持 Reactor 和 RxJava,这使得它可以适配来自多个响应式库的类型。 |
对于流式传输到响应,支持响应式背压,但对响应的写入仍然是阻塞的,并通过配置的 AsyncTaskExecutor 在单独的线程上运行,以避免阻塞上游源,例如从 WebClient 返回的 Flux。
上下文传播
通常通过 java.lang.ThreadLocal 传播上下文。这对于在同一线程上的处理是透明工作的,但对于跨多个线程的异步处理需要额外的工作。Micrometer Context Propagation 库简化了跨线程以及跨上下文机制(如 ThreadLocal 值、Reactor 上下文、GraphQL Java 上下文等)的上下文传播。
如果 Micrometer Context Propagation 存在于类路径上,当控制器方法返回响应式类型(如 Flux 或 Mono)时,所有已注册了 io.micrometer.ThreadLocalAccessor 的 ThreadLocal 值都会以键值对的形式写入 Reactor Context,使用由 ThreadLocalAccessor 分配的键。
对于其他异步处理场景,您可以直接使用 Context Propagation 库。例如
// Capture ThreadLocal values from the main thread ...
ContextSnapshot snapshot = ContextSnapshot.captureAll();
// On a different thread: restore ThreadLocal values
try (ContextSnapshot.Scope scope = snapshot.setThreadLocals()) {
// ...
}
提供了以下现成的 ThreadLocalAccessor 实现
-
LocaleContextThreadLocalAccessor— 通过LocaleContextHolder传播LocaleContext -
RequestAttributesThreadLocalAccessor— 通过RequestContextHolder传播RequestAttributes
以上实现不会自动注册。您需要在启动时通过 ContextRegistry.getInstance() 进行注册。
有关更多详细信息,请参见 Micrometer Context Propagation 库的文档。
断开连接
Servlet API 不提供远程客户端断开连接时的任何通知。因此,在流式传输到响应时,无论是通过SseEmitter 还是响应式类型,定期发送数据非常重要,因为如果客户端已断开连接,写入会失败。发送的数据可以采用空的(仅注释)SSE 事件形式,或者任何其他对方需要解释为心跳并忽略的数据。
或者,考虑使用具有内置心跳机制的 Web 消息解决方案(例如基于 WebSocket 的 STOMP 或带有SockJS 的 WebSocket)。
配置
必须在 Servlet 容器级别启用异步请求处理功能。MVC 配置也暴露了一些异步请求的选项。
Servlet 容器
Filter 和 Servlet 声明有一个 asyncSupported 标志,需要设置为 true 以启用异步请求处理。此外,应声明 Filter mapping 来处理 ASYNC 类型的 jakarta.servlet.DispatchType。
在 Java 配置中,当您使用 AbstractAnnotationConfigDispatcherServletInitializer 初始化 Servlet 容器时,这会自动完成。
在 web.xml 配置中,您可以将 <async-supported>true</async-supported> 添加到 DispatcherServlet 和 Filter 声明中,并将 <dispatcher>ASYNC</dispatcher> 添加到 filter mapping 中。
Spring MVC
MVC 配置暴露了以下异步请求处理选项
-
Java 配置:在
WebMvcConfigurer上使用configureAsyncSupport回调。 -
XML 命名空间:在
<mvc:annotation-driven>下使用<async-support>元素。
您可以配置以下内容
-
异步请求的默认超时值取决于底层的 Servlet 容器,除非明确设置。
-
用于在使用响应式类型进行流式传输时进行阻塞写入以及执行从控制器方法返回的
Callable实例的AsyncTaskExecutor。默认使用的 executor 不适合在负载下的生产环境中使用。 -
DeferredResultProcessingInterceptor实现和CallableProcessingInterceptor实现。
注意,你也可以在 DeferredResult、ResponseBodyEmitter 和 SseEmitter 上设置默认的超时值。对于 Callable,你可以使用 WebAsyncTask 来提供超时值。