混合使用高层级 DSL 和低层级 Processor API

Kafka Streams 提供了两种 API 变体。它有一个更高层级的类 DSL API,你可以在其中链接各种操作,这对于许多函数式编程人员来说可能很熟悉。Kafka Streams 也提供了低层级 Processor API 的访问能力。Processor API 虽然非常强大,并且能够以更低的层级控制事物,但本质上是命令式的。用于 Spring Cloud Stream 的 Kafka Streams Binder 允许你使用高层级 DSL 或混合使用 DSL 和 Processor API。混合使用这两种变体为你提供了许多选项来控制应用程序中的各种用例。应用程序可以使用 `transform` 或 `process` 方法 API 调用来访问 Processor API。

以下是使用 `process` API 在 Spring Cloud Stream 应用程序中如何结合使用 DSL 和 Processor API 的示例。

@Bean
public Consumer<KStream<Object, String>> process() {
    return input ->
        input.process(() -> new Processor<Object, String>() {
            @Override
            @SuppressWarnings("unchecked")
            public void init(ProcessorContext context) {
               this.context = context;
            }

            @Override
            public void process(Object key, String value) {
                //business logic
            }

            @Override
            public void close() {

        });
}

以下是使用 `transform` API 的示例。

@Bean
public Consumer<KStream<Object, String>> process() {
    return (input, a) ->
        input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
            @Override
            public void init(ProcessorContext context) {

            }

            @Override
            public void close() {

            }

            @Override
            public KeyValue<Object, String> transform(Object key, String value) {
                // business logic - return transformed KStream;
            }
        });
}

`process` API 方法调用是一个终端操作,而 `transform` API 是非终端操作,并给你一个经过潜在转换的 `KStream`,你可以使用它继续使用 DSL 或 Processor API 进行进一步处理。