混合高级 DSL 和低级 Processor API
Kafka Streams 提供了两种 API 变体。它有一个高级 DSL 类似 API,您可以链式操作,这对于许多函数式程序员来说可能很熟悉。Kafka Streams 还提供了对低级处理器 API 的访问。处理器 API 尽管功能强大,并能够以更低的级别控制事物,但本质上是命令式的。Spring Cloud Stream 的 Kafka Streams 绑定器允许您使用高级 DSL 或混合 DSL 和处理器 API。混合这两种变体为您提供了许多选项来控制应用程序中的各种用例。应用程序可以使用 transform 或 process 方法 API 调用来访问处理器 API。
以下是关于如何在 Spring Cloud Stream 应用程序中使用 process API 结合 DSL 和处理器 API 的示例。
@Bean
public Consumer<KStream<Object, String>> process() {
return input ->
input.process(() -> new Processor<Object, String>() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, String value) {
//business logic
}
@Override
public void close() {
});
}
以下是使用 transform API 的示例。
@Bean
public Consumer<KStream<Object, String>> process() {
return (input, a) ->
input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void close() {
}
@Override
public KeyValue<Object, String> transform(Object key, String value) {
// business logic - return transformed KStream;
}
});
}
process API 方法调用是一个终止操作,而 transform API 是非终止操作,它为您提供一个可能经过转换的 KStream,您可以使用它通过 DSL 或处理器 API 继续进一步处理。