Kotlin 支持

框架也已改进以支持用于函数的 Kotlin Lambda 表达式,因此您现在可以使用 Kotlin 语言和 Spring Integration 流定义结合进行开发

@Bean
@Transformer(inputChannel = "functionServiceChannel")
fun kotlinFunction(): (String) -> String {
    return { it.toUpperCase() }
}

@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
fun kotlinConsumer(): (Message<Any>) -> Unit {
    return { print(it) }
}

@Bean
@InboundChannelAdapter(value = "counterChannel",
        poller = Poller(fixedRate = "10", maxMessagesPerPoll = "1"))
fun kotlinSupplier(): () -> String {
    return { "baz" }
}

Kotlin Coroutines

从 6.0 版本开始,Spring Integration 提供了对 Kotlin Coroutines 的支持。现在,suspend 函数以及 kotlinx.coroutines.Deferredkotlinx.coroutines.flow.Flow 返回类型可用于服务方法

@ServiceActivator(inputChannel = "suspendServiceChannel", outputChannel = "resultChannel")
suspend fun suspendServiceFunction(payload: String) = payload.uppercase()

@ServiceActivator(inputChannel = "flowServiceChannel", outputChannel = "resultChannel", async = "true")
fun flowServiceFunction(payload: String) =
    flow {
        for (i in 1..3) {
            emit("$payload #$i")
        }
    }

框架将其视为 Reactive Streams 交互,并使用 ReactiveAdapterRegistry 转换为相应的 MonoFlux Reactor 类型。然后,此类函数回复会在回复通道(如果是 ReactiveStreamsSubscribableChannel)或相应的回调中作为 CompletableFuture 的结果进行处理。

默认情况下,具有 Flow 结果的函数在 @ServiceActivator 上不是异步的,因此 Flow 实例作为回复消息的 payload 生成。目标应用程序负责分别将此对象作为协程处理或将其转换为 Flux

@MessagingGateway 接口方法在 Kotlin 中声明时,也可以使用 suspend 修饰符标记。框架内部使用 Mono 来通过下游流执行请求-回复。此类 Mono 结果由内部的 MonoKt.awaitSingleOrNull() API 处理,以满足被调用的网关 suspend 函数的 kotlin.coroutines.Continuation 参数

@MessagingGateway(defaultRequestChannel = "suspendRequestChannel")
interface SuspendFunGateway {

    suspend fun suspendGateway(payload: String): String

}

根据 Kotlin 语言要求,必须将此方法作为协程调用

@Autowired
private lateinit var suspendFunGateway: SuspendFunGateway

fun someServiceMethod() {
    runBlocking {
        val reply = suspendFunGateway.suspendGateway("test suspend gateway")
    }
}