异步 @KafkaListener
返回类型
从 3.2 版本开始,可以为 @KafkaListener
(和 @KafkaHandler
)方法指定异步返回类型,从而允许异步发送回复。返回类型包括 CompletableFuture<?>
、Mono<?>
和 Kotlin suspend
函数。
@KafkaListener(id = "myListener", topics = "myTopic")
public CompletableFuture<String> listen(String data) {
...
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("done");
return future;
}
@KafkaListener(id = "myListener", topics = "myTopic")
public Mono<Void> listen(String data) {
...
return Mono.empty();
}
检测到异步返回类型时,AckMode 将自动设置为 MANUAL 并启用乱序提交;取而代之的是,异步操作完成后将进行 ack。当异步结果以错误完成时,消息是否恢复取决于容器错误处理器。如果在监听器方法中发生阻止创建异步结果对象的异常,您必须捕获该异常并返回适当的返回对象,该对象将导致消息被 ack 或恢复。 |
如果在具有异步返回类型(包括 Kotlin suspend 函数)的监听器上配置了 KafkaListenerErrorHandler
,则在失败后会调用该错误处理器。有关此错误处理器及其用途的更多信息,请参阅异常处理。