Apache Kafka Streams 支持

从 1.1.4 版本开始,Spring for Apache Kafka 为 Kafka Streams 提供了第一类支持。要在 Spring 应用中使用它,classpath 中必须包含 kafka-streams jar。它是 Spring for Apache Kafka 项目的可选依赖项,不会被传递下载。

基本概念

Apache Kafka Streams 参考文档建议以下使用 API 的方式

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

StreamsBuilder builder = ...;  // when using the Kafka Streams DSL

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

// Start the Kafka Streams instance
streams.start();

// Stop the Kafka Streams instance
streams.close();

因此,我们有两个主要组件

  • StreamsBuilder:提供用于构建 KStream(或 KTable)实例的 API。

  • KafkaStreams:用于管理这些实例的生命周期。

单个 StreamsBuilderKafkaStreams 实例公开的所有 KStream 实例会同时启动和停止,即使它们的逻辑不同。换句话说,由 StreamsBuilder 定义的所有流都绑定到同一个生命周期控制。一旦 KafkaStreams 实例通过 streams.close() 关闭,就无法重新启动。相反,必须创建一个新的 KafkaStreams 实例来重新启动流处理。

Spring 管理

为了简化从 Spring 应用上下文角度使用 Kafka Streams 并通过容器进行生命周期管理,Spring for Apache Kafka 引入了 StreamsBuilderFactoryBean。这是一个 AbstractFactoryBean 实现,用于将 StreamsBuilder 单例实例作为 bean 公开。以下示例创建了这样一个 bean

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
从 2.2 版本开始,流配置现在以 KafkaStreamsConfiguration 对象而不是 StreamsConfig 提供。

StreamsBuilderFactoryBean 还实现了 SmartLifecycle 来管理内部 KafkaStreams 实例的生命周期。与 Kafka Streams API 类似,你必须在启动 KafkaStreams 之前定义 KStream 实例。这同样适用于 Spring for Kafka Streams API。因此,当你在 StreamsBuilderFactoryBean 上使用默认的 autoStartup = true 时,你必须在应用上下文刷新之前在 StreamsBuilder 上声明 KStream 实例。例如,KStream 可以是一个常规的 bean 定义,而 Kafka Streams API 的使用不受任何影响。以下示例展示了如何做到这一点

@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
    KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    // Fluent KStream API
    return stream;
}

如果你想手动控制生命周期(例如,根据某些条件停止和启动),你可以使用 factory bean (&) 前缀直接引用 StreamsBuilderFactoryBean bean。由于 StreamsBuilderFactoryBean 使用其内部 KafkaStreams 实例,因此可以安全地再次停止和重新启动它。每次 start() 时都会创建一个新的 KafkaStreams 实例。如果你想分别控制 KStream 实例的生命周期,也可以考虑使用不同的 StreamsBuilderFactoryBean 实例。

你还可以在 StreamsBuilderFactoryBean 上指定 KafkaStreams.StateListenerThread.UncaughtExceptionHandlerStateRestoreListener 选项,这些选项会被委托给内部 KafkaStreams 实例。

此外,除了在 StreamsBuilderFactoryBean 上间接设置这些选项之外,你还可以使用 KafkaStreamsCustomizer 回调接口来

  1. (从 版本 2.1.5 开始)使用 customize(KafkaStreams) 配置内部 KafkaStreams 实例

  2. (从 版本 3.3.0 开始)使用 initKafkaStreams(Topology, Properties, KafkaClientSupplier) 实例化 KafkaStreams 的自定义实现

请注意,KafkaStreamsCustomizer 会覆盖 StreamsBuilderFactoryBean 提供的选项。

如果你需要直接执行一些 KafkaStreams 操作,可以通过 StreamsBuilderFactoryBean.getKafkaStreams() 访问该内部 KafkaStreams 实例。

你可以按类型自动装配 StreamsBuilderFactoryBean bean,但必须确保在 bean 定义中使用完整类型,如下例所示

@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

另外,如果使用接口 bean 定义,可以添加 @Qualifier 按名称注入。以下示例展示了如何做到这一点

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

从 2.4.1 版本开始,factory bean 有一个新属性 infrastructureCustomizer,类型为 KafkaStreamsInfrastructureCustomizer;这允许在创建流之前自定义 StreamsBuilder(例如,添加状态存储)和/或 Topology

public interface KafkaStreamsInfrastructureCustomizer {

    void configureBuilder(StreamsBuilder builder);

    void configureTopology(Topology topology);

}

提供了默认的无操作(no-op)实现,以避免在不需要实现某个方法时必须实现两者。

提供了 CompositeKafkaStreamsInfrastructureCustomizer,用于需要应用多个自定义器(customizers)的情况。

Kafka Streams Micrometer 支持

从 2.5.3 版本开始引入,你可以配置 KafkaStreamsMicrometerListener 来为 factory bean 管理的 KafkaStreams 对象自动注册 micrometer 指标。

streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
        Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));

Streams JSON 序列化和反序列化

为了在以 JSON 格式读写主题或状态存储时进行数据序列化和反序列化,Spring for Apache Kafka 提供了一个使用 JSON 的 JsonSerde 实现,它委托给序列化、反序列化和消息转换中描述的 JsonSerializerJsonDeserializerJsonSerde 实现通过其构造函数(目标类型或 ObjectMapper)提供了相同的配置选项。在以下示例中,我们使用 JsonSerde 对 Kafka 流的 Cat 载荷进行序列化和反序列化(JsonSerde 可以在任何需要实例的地方以类似的方式使用)

stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");

自 2.3 版本起,在以编程方式构建序列化器/反序列化器用于生产者/消费者工厂时,你可以使用流式 API,这简化了配置。

stream.through(
    new JsonSerde<>(MyKeyType.class)
        .forKeys()
        .noTypeInfo(),
    new JsonSerde<>(MyValueType.class)
        .noTypeInfo(),
    "myTypes");

使用 KafkaStreamBrancher

KafkaStreamBrancher 类引入了一种更方便的方式来在 KStream 之上构建条件分支。

考虑以下不使用 KafkaStreamBrancher 的示例

KStream<String, String>[] branches = builder.stream("source").branch(
        (key, value) -> value.contains("A"),
        (key, value) -> value.contains("B"),
        (key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");

以下示例使用 KafkaStreamBrancher

new KafkaStreamBrancher<String, String>()
        .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
        .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
        //default branch should not necessarily be defined in the end of the chain!
        .defaultBranch(ks -> ks.to("C"))
        .onTopOf(builder.stream("source"));
        //onTopOf method returns the provided stream so we can continue with method chaining

配置

要配置 Kafka Streams 环境,StreamsBuilderFactoryBean 需要一个 KafkaStreamsConfiguration 实例。有关所有可能的选项,请参阅 Apache Kafka 文档

从 2.2 版本开始,流配置现在以 KafkaStreamsConfiguration 对象而不是 StreamsConfig 提供。

为了避免大多数情况下的样板代码,尤其是在开发微服务时,Spring for Apache Kafka 提供了 @EnableKafkaStreams 注解,你应该将其放在 @Configuration 类上。你只需要声明一个名为 defaultKafkaStreamsConfigKafkaStreamsConfiguration bean。一个名为 defaultKafkaStreamsBuilderStreamsBuilderFactoryBean bean 会自动在应用上下文中声明。你也可以声明和使用任何额外的 StreamsBuilderFactoryBean bean。通过提供一个实现 StreamsBuilderFactoryBeanConfigurer 的 bean,你可以对该 bean 进行额外的自定义。如果存在多个这样的 bean,它们将按照其 Ordered.order 属性进行应用。

清理 & 停止配置

当 factory 停止时,会调用 KafkaStreams.close() 方法,带有 2 个参数

  • closeTimeout:等待线程关闭的时间(默认为 DEFAULT_CLOSE_TIMEOUT,设置为 10 秒)。可以通过 StreamsBuilderFactoryBean.setCloseTimeout() 进行配置。

  • leaveGroupOnClose:是否触发消费者离开组的调用(默认为 false)。可以通过 StreamsBuilderFactoryBean.setLeaveGroupOnClose() 进行配置。

默认情况下,当 factory bean 停止时,会调用 KafkaStreams.cleanUp() 方法。从 2.1.2 版本开始,factory bean 提供了额外的构造函数,接受一个 CleanupConfig 对象,该对象包含属性,允许你控制 cleanUp() 方法是在 start()stop() 期间调用,还是都不调用。从 2.7 版本开始,默认情况下永远不清理本地状态。

头部丰富器

3.0 版本添加了 ContextualProcessor 的扩展 HeaderEnricherProcessor;提供了与已废弃的 HeaderEnricher 相同的功能,后者实现了已废弃的 Transformer 接口。这可用于在流处理中添加头部;头部值是 SpEL 表达式;表达式评估的根对象具有 3 个属性

  • record - org.apache.kafka.streams.processor.api.Recordkeyvaluetimestampheaders

  • key - 当前记录的键

  • value - 当前记录的值

  • context - ProcessorContext,允许访问当前记录元数据

表达式必须返回 byte[]String(后者将使用 UTF-8 转换为 byte[])。

要在流中使用 enricher

.process(() -> new HeaderEnricherProcessor(expressions))

该处理器不会改变 keyvalue;它仅添加头部。

你需要为每条记录创建一个新实例。
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))

这里有一个简单示例,添加一个字面量头部和一个变量

Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
        .process(() -> supplier)
        .to(OUTPUT);

MessagingProcessor

3.0 版本添加了 ContextualProcessor 的扩展 MessagingProcessor,提供了与已废弃的 MessagingTransformer 相同的功能,后者实现了已废弃的 Transformer 接口。这允许 Kafka Streams 拓扑与 Spring Messaging 组件(例如 Spring Integration 流)进行交互。该 transformer 需要 MessagingFunction 的实现。

@FunctionalInterface
public interface MessagingFunction {

    Message<?> exchange(Message<?> message);

}

Spring Integration 自动使用其 GatewayProxyFactoryBean 提供一个实现。它还需要一个 MessagingMessageConverter 将 key、value 和元数据(包括头部)与 Spring Messaging Message 相互转换。有关更多信息,请参阅[从 KStream 调用 Spring Integration 流]

从反序列化异常中恢复

2.3 版本引入了 RecoveringDeserializationExceptionHandler,它可以在发生反序列化异常时采取一些行动。请参考 Kafka 关于 DeserializationExceptionHandler 的文档,RecoveringDeserializationExceptionHandler 是它的一个实现。RecoveringDeserializationExceptionHandler 配置了一个 ConsumerRecordRecoverer 实现。框架提供了 DeadLetterPublishingRecoverer,它将失败的记录发送到死信主题。有关此 recoverer 的更多信息,请参阅发布死信记录

要配置 recoverer,请将以下属性添加到你的流配置中

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringDeserializationExceptionHandler.class);
    props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
    ...
    return new KafkaStreamsConfiguration(props);
}

@Bean
public DeadLetterPublishingRecoverer recoverer() {
    return new DeadLetterPublishingRecoverer(kafkaTemplate(),
            (record, ex) -> new TopicPartition("recovererDLQ", -1));
}

当然,recoverer() bean 可以是你自己的 ConsumerRecordRecoverer 实现。

交互式查询支持

从 3.2 版本开始,Spring for Apache Kafka 提供了 Kafka Streams 交互式查询所需的基本功能。交互式查询在有状态的 Kafka Streams 应用中非常有用,因为它们提供了一种持续查询应用中状态存储的方式。因此,如果应用想要实现系统中当前视图的实体化,交互式查询提供了一种实现方式。要了解更多关于交互式查询的信息,请参阅这篇文章。Spring for Apache Kafka 中的支持围绕一个名为 KafkaStreamsInteractiveQueryService 的 API,它是 Kafka Streams 库中交互式查询 API 的一个外观(facade)。应用可以将此服务实例创建为一个 bean,然后使用它按名称检索状态存储。

以下代码片段展示了一个示例。

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    return kafkaStreamsInteractiveQueryService;
}

假设 Kafka Streams 应用有一个名为 app-store 的状态存储,那么可以通过如下所示的 KafkStreamsInteractiveQuery API 检索该存储。

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

ReadOnlyKeyValueStore<Object, Object>  appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());

一旦应用访问到状态存储,就可以从中查询键值信息。

在这种情况下,应用使用的状态存储是一个只读的键值存储。Kafka Streams 应用还可以使用其他类型的状态存储。例如,如果应用希望查询基于窗口的存储,可以在 Kafka Streams 应用业务逻辑中构建该存储,然后稍后检索它。因此,KafkaStreamsInteractiveQueryService 中检索可查询存储的 API 具有通用存储类型签名,以便最终用户可以指定正确的类型。

这是 API 中的类型签名。

public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)

调用此方法时,用户可以像我们在上面的示例中那样,明确请求正确的状态存储类型。

重试状态存储检索

尝试使用 KafkaStreamsInteractiveQueryService 检索状态存储时,状态存储可能因各种原因找不到。如果这些原因是暂时的,KafkaStreamsInteractiveQueryService 提供了重试检索状态存储的选项,允许注入自定义的 RetryTemplate。默认情况下,KafkaStreamsInteractiveQueryService 中使用的 RetryTemplate 最大尝试次数为三次,固定退避时间为一秒。

以下是如何将最大尝试次数设置为十的自定义 RetryTemplate 注入到 KafkaStreamsInteractiveQueryService 中。

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
    RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
    retryTemplate.setRetryPolicy(retryPolicy);
    kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
    return kafkaStreamsInteractiveQueryService;
}

查询远程状态存储

上面展示的用于检索状态存储的 API - retrieveQueryableStore,主要用于本地可用的键值状态存储。在生产环境中,Kafka Streams 应用很可能根据分区数量进行分布式部署。如果一个主题有四个分区,并且有四个相同的 Kafka Streams 处理器实例正在运行,那么每个实例可能负责处理该主题的一个分区。在这种情况下,调用 retrieveQueryableStore 可能不会得到实例正在寻找的正确结果,即使它可能返回一个有效的存储。假设具有四个分区的主题包含关于各种键的数据,并且一个分区总是负责特定的键。如果调用 retrieveQueryableStore 的实例正在查找该实例未托管的键的信息,那么它将不会收到任何数据。这是因为当前的 Kafka Streams 实例对此键一无所知。为了解决这个问题,调用实例首先需要确保它们拥有托管特定键的 Kafka Streams 处理器实例的主机信息。这可以从同一 application.id 下的任何 Kafka Streams 实例检索,如下所示。

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());

在上面的示例代码中,调用实例正在从名为 app-store 的状态存储中查询特定键 12345。该 API 还需要相应的键序列化器,在本例中是 IntegerSerializer。Kafka Streams 会在其同一 application.id 下的所有实例中查找哪个实例托管了此特定键,找到后,它将该主机信息作为 HostInfo 对象返回。

该 API 如下所示

public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)

在这种分布式方式下使用相同 application.id 的多个 Kafka Streams 处理器实例时,应用应提供一个 RPC 层,以便可以通过 RPC 端点(例如 REST)查询状态存储。有关此内容的更多详细信息,请参阅这篇文章。使用 Spring for Apache Kafka 时,利用 spring-web 技术添加基于 Spring 的 REST 端点非常容易。一旦有了 REST 端点,就可以从任何 Kafka Streams 实例查询状态存储,前提是该实例知道托管该键的 HostInfo

如果托管键的实例是当前实例,那么应用无需调用 RPC 机制,而是进行 JVM 内部调用。然而,问题在于应用可能不知道发起调用的实例就是托管键的实例,因为特定服务器可能由于消费者再平衡而失去一个分区。为了解决这个问题,KafkaStreamsInteractiveQueryService 提供了一个方便的 API,通过 API 方法 getCurrentKafkaStreamsApplicationHostInfo() 查询当前主机信息,该方法返回当前的 HostInfo。其思想是应用可以先获取关于键所在位置的信息,然后将该 HostInfo 与当前实例的 HostInfo 进行比较。如果 HostInfo 数据匹配,则可以通过 retrieveQueryableStore 进行简单的 JVM 调用,否则选择 RPC 选项。

Kafka Streams 示例

以下示例结合了我们在本章中介绍的各种主题

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public StreamsBuilderFactoryBeanConfigurer configurer() {
        return fb -> fb.setStateListener((newState, oldState) -> {
            System.out.println("State transition from " + oldState + " to " + newState);
        });
    }

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
        KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
        stream
                .mapValues((ValueMapper<String, String>) String::toUpperCase)
                .groupByKey()
                .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(1_000)))
                .reduce((String value1, String value2) -> value1 + value2,
                		Named.as("windowStore"))
                .toStream()
                .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
                .filter((i, s) -> s.length() > 40)
                .to("streamingTopic2");

        stream.print(Printed.toSysOut());

        return stream;
    }

}