Apache Kafka Streams 支持

从 1.1.4 版本开始,Spring for Apache Kafka 提供了对 Kafka Streams 的一流支持。要从 Spring 应用程序中使用它,kafka-streams jar 必须存在于类路径中。它是 Spring for Apache Kafka 项目的一个可选依赖项,不会被传递下载。

基础

参考 Apache Kafka Streams 文档建议以下使用 API 的方式

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

StreamsBuilder builder = ...;  // when using the Kafka Streams DSL

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

// Start the Kafka Streams instance
streams.start();

// Stop the Kafka Streams instance
streams.close();

因此,我们有两个主要组件

  • StreamsBuilder:具有构建 KStream(或 KTable)实例的 API。

  • KafkaStreams:管理这些实例的生命周期。

由单个 StreamsBuilder 暴露给 KafkaStreams 实例的所有 KStream 实例,即使它们具有不同的逻辑,也会同时启动和停止。换句话说,由 StreamsBuilder 定义的所有流都与单个生命周期控制绑定。一旦 KafkaStreams 实例通过 streams.close() 关闭,它就不能重新启动。相反,必须创建一个新的 KafkaStreams 实例来重新启动流处理。

Spring 管理

为了简化从 Spring 应用程序上下文角度使用 Kafka Streams 并通过容器使用生命周期管理,Spring for Apache Kafka 引入了 StreamsBuilderFactoryBean。这是一个 AbstractFactoryBean 实现,用于将 StreamsBuilder 单例实例公开为 bean。以下示例创建了这样一个 bean

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
从 2.2 版本开始,流配置现在以 KafkaStreamsConfiguration 对象而不是 StreamsConfig 的形式提供。

StreamsBuilderFactoryBean 也实现了 SmartLifecycle 来管理内部 KafkaStreams 实例的生命周期。与 Kafka Streams API 类似,您必须在启动 KafkaStreams 之前定义 KStream 实例。这也适用于 Kafka Streams 的 Spring API。因此,当您在 StreamsBuilderFactoryBean 上使用默认的 autoStartup = true 时,您必须在应用程序上下文刷新之前在 StreamsBuilder 上声明 KStream 实例。例如,KStream 可以是一个常规的 bean 定义,而 Kafka Streams API 的使用没有任何影响。以下示例展示了如何实现

@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
    KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    // Fluent KStream API
    return stream;
}

如果您想手动控制生命周期(例如,根据某些条件停止和启动),您可以使用工厂 bean (&) 前缀 直接引用 StreamsBuilderFactoryBean bean。由于 StreamsBuilderFactoryBean 使用其内部 KafkaStreams 实例,因此可以安全地停止并重新启动它。每次 start() 都会创建一个新的 KafkaStreams。如果您希望单独控制 KStream 实例的生命周期,您还可以考虑使用不同的 StreamsBuilderFactoryBean 实例。

您还可以在 StreamsBuilderFactoryBean 上指定 KafkaStreams.StateListenerThread.UncaughtExceptionHandlerStateRestoreListener 选项,这些选项将委托给内部 KafkaStreams 实例。

此外,除了间接在 StreamsBuilderFactoryBean 上设置这些选项之外,您还可以使用 KafkaStreamsCustomizer 回调接口来

  1. (从 2.1.5 版本开始)使用 customize(KafkaStreams) 配置内部 KafkaStreams 实例

  2. (从 3.3.0 版本开始)使用 initKafkaStreams(Topology, Properties, KafkaClientSupplier) 实例化 KafkaStreams 的自定义实现

请注意,KafkaStreamsCustomizer 会覆盖 StreamsBuilderFactoryBean 提供的选项。

如果您需要直接执行某些 KafkaStreams 操作,您可以通过使用 StreamsBuilderFactoryBean.getKafkaStreams() 访问该内部 KafkaStreams 实例。

您可以通过类型自动装配 StreamsBuilderFactoryBean bean,但您应该确保在 bean 定义中使用完整类型,如下例所示

@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

或者,如果您使用接口 bean 定义,您可以添加 @Qualifier 以通过名称进行注入。以下示例展示了如何实现

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

从 2.4.1 版本开始,工厂 bean 有一个名为 infrastructureCustomizer 的新属性,类型为 KafkaStreamsInfrastructureCustomizer;这允许在流创建之前自定义 StreamsBuilder(例如,添加状态存储)和/或 Topology

public interface KafkaStreamsInfrastructureCustomizer {

    void configureBuilder(StreamsBuilder builder);

    void configureTopology(Topology topology);

}

提供了默认的无操作实现,以避免在不需要时必须实现这两个方法。

提供了一个 CompositeKafkaStreamsInfrastructureCustomizer,用于在您需要应用多个自定义器时使用。

KafkaStreams Micrometer 支持

在 2.5.3 版本中引入,您可以配置一个 KafkaStreamsMicrometerListener,以自动为工厂 bean 管理的 KafkaStreams 对象注册 Micrometer 计量器

streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
        Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));

流 JSON 序列化和反序列化

为了在以 JSON 格式读写主题或状态存储时序列化和反序列化数据,Spring for Apache Kafka 提供了一个 JacksonJsonSerde 实现,它使用 JSON,委托给 序列化、反序列化和消息转换 中描述的 JacksonJsonSerializerJacksonJsonDeserializerJacksonJsonSerde 实现通过其构造函数(目标类型或 ObjectMapper)提供相同的配置选项。在以下示例中,我们使用 JacksonJsonSerde 序列化和反序列化 Kafka 流的 Cat 有效负载(JacksonJsonSerde 可以在需要实例的任何地方以类似方式使用)

stream.through(Serdes.Integer(), new JacksonJsonSerde<>(Cat.class), "cats");

在以编程方式构建序列化器/反序列化器以在生产者/消费者工厂中使用时,从 2.3 版本开始,您可以使用流式 API,这简化了配置。

stream.through(
    new JacksonJsonSerde<>(MyKeyType.class)
        .forKeys()
        .noTypeInfo(),
    new JacksonJsonSerde<>(MyValueType.class)
        .noTypeInfo(),
    "myTypes");

使用 KafkaStreamBrancher

KafkaStreamBrancher 类引入了一种更方便的方式来在 KStream 之上构建条件分支。

考虑以下不使用 KafkaStreamBrancher 的示例

KStream<String, String>[] branches = builder.stream("source").branch(
        (key, value) -> value.contains("A"),
        (key, value) -> value.contains("B"),
        (key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");

以下示例使用 KafkaStreamBrancher

new KafkaStreamBrancher<String, String>()
        .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
        .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
        //default branch should not necessarily be defined in the end of the chain!
        .defaultBranch(ks -> ks.to("C"))
        .onTopOf(builder.stream("source"));
        //onTopOf method returns the provided stream so we can continue with method chaining

配置

为了配置 Kafka Streams 环境,StreamsBuilderFactoryBean 需要一个 KafkaStreamsConfiguration 实例。有关所有可能的选项,请参阅 Apache Kafka 文档

从 2.2 版本开始,流配置现在以 KafkaStreamsConfiguration 对象而不是 StreamsConfig 的形式提供。

为了避免大多数情况下的样板代码,尤其是在您开发微服务时,Spring for Apache Kafka 提供了 @EnableKafkaStreams 注解,您应该将其放置在 @Configuration 类上。您所需要做的就是声明一个名为 defaultKafkaStreamsConfigKafkaStreamsConfiguration bean。一个名为 defaultKafkaStreamsBuilderStreamsBuilderFactoryBean bean 会自动在应用程序上下文中声明。您还可以声明和使用任何额外的 StreamsBuilderFactoryBean bean。您可以通过提供一个实现 StreamsBuilderFactoryBeanConfigurer 的 bean 来对该 bean 执行额外的自定义。如果有多个这样的 bean,它们将根据其 Ordered.order 属性应用。

清理和停止配置

当工厂停止时,KafkaStreams.close() 会带两个参数调用

  • closeTimeout:等待线程关闭的时间(默认为 DEFAULT_CLOSE_TIMEOUT,设置为 10 秒)。可以使用 StreamsBuilderFactoryBean.setCloseTimeout() 进行配置。

  • leaveGroupOnClose:触发消费者离开组的调用(默认为 false)。可以使用 StreamsBuilderFactoryBean.setLeaveGroupOnClose() 进行配置。

默认情况下,当工厂 bean 停止时,会调用 KafkaStreams.cleanUp() 方法。从 2.1.2 版本开始,工厂 bean 具有额外的构造函数,接受一个 CleanupConfig 对象,该对象具有属性,允许您控制在 start()stop() 期间是否调用 cleanUp() 方法,或者两者都不调用。从 2.7 版本开始,默认情况下永不清理本地状态。

头部增强器

版本 3.0 添加了 ContextualProcessorHeaderEnricherProcessor 扩展;提供了与已弃用的 HeaderEnricher 相同的功能,后者实现了已弃用的 Transformer 接口。这可用于在流处理中添加头部;头部值是 SpEL 表达式;表达式评估的根对象有 3 个属性

  • record - org.apache.kafka.streams.processor.api.Record (key, value, timestamp, headers)

  • key - 当前记录的键

  • value - 当前记录的值

  • context - ProcessorContext,允许访问当前记录元数据

表达式必须返回 byte[]String(将使用 UTF-8 转换为 byte[])。

要在流中使用增强器

.process(() -> new HeaderEnricherProcessor(expressions))

处理器不改变 keyvalue;它只是添加头部。

每个记录都需要一个新的实例。
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))

这是一个简单的示例,添加一个字面量头部和一个变量

Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
        .process(() -> supplier)
        .to(OUTPUT);

MessagingProcessor

版本 3.0 添加了 ContextualProcessorMessagingProcessor 扩展,提供了与已弃用的 MessagingTransformer 相同的功能,后者实现了已弃用的 Transformer 接口。这允许 Kafka Streams 拓扑与 Spring Messaging 组件(例如 Spring Integration 流)进行交互。转换器需要 MessagingFunction 的实现。

@FunctionalInterface
public interface MessagingFunction {

    Message<?> exchange(Message<?> message);

}

Spring Integration 自动使用其 GatewayProxyFactoryBean 提供了一个实现。它还需要一个 MessagingMessageConverter 来将键、值和元数据(包括头部)转换为 Spring Messaging Message<?>。有关更多信息,请参阅 KStream 调用 Spring Integration 流

从反序列化异常中恢复

版本 2.3 引入了 RecoveringDeserializationExceptionHandler,它可以在发生反序列化异常时采取一些操作。请参阅 Kafka 文档中关于 DeserializationExceptionHandler 的内容,RecoveringDeserializationExceptionHandler 是其实现之一。RecoveringDeserializationExceptionHandler 配置了一个 ConsumerRecordRecoverer 实现。框架提供了 DeadLetterPublishingRecoverer,它将失败的记录发送到死信主题。有关此恢复器的更多信息,请参阅 发布死信记录

要配置恢复器,请将以下属性添加到您的流配置中

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringDeserializationExceptionHandler.class);
    props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
    ...
    return new KafkaStreamsConfiguration(props);
}

@Bean
public DeadLetterPublishingRecoverer recoverer() {
    return new DeadLetterPublishingRecoverer(kafkaTemplate(),
            (record, ex) -> new TopicPartition("recovererDLQ", -1));
}

当然,recoverer() bean 可以是您自己的 ConsumerRecordRecoverer 实现。

交互式查询支持

从 3.2 版本开始,Spring for Apache Kafka 提供了 Kafka Streams 中交互式查询所需的基本设施。交互式查询在有状态的 Kafka Streams 应用程序中非常有用,因为它们提供了一种持续查询应用程序中有状态存储的方法。因此,如果应用程序想要具体化所考虑系统的当前视图,交互式查询提供了一种实现方式。要了解有关交互式查询的更多信息,请参阅这篇文章。Spring for Apache Kafka 中的支持以一个名为 KafkaStreamsInteractiveQueryService 的 API 为中心,该 API 是 Kafka Streams 库中交互式查询 API 的一个外观。应用程序可以创建此服务的一个实例作为 bean,然后使用它通过名称检索状态存储。

以下代码片段显示了一个示例。

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    return kafkaStreamsInteractiveQueryService;
}

假设一个 Kafka Streams 应用程序有一个名为 app-store 的状态存储,那么该存储可以通过 KafkaStreamsInteractiveQuery API 检索,如下所示。

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

ReadOnlyKeyValueStore<Object, Object>  appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());

一旦应用程序获得了对状态存储的访问权限,它就可以从中查询键值信息。

在这种情况下,应用程序使用的状态存储是一个只读的键值存储。Kafka Streams 应用程序可以使用其他类型的状态存储。例如,如果应用程序更喜欢查询基于窗口的存储,它可以在 Kafka Streams 应用程序业务逻辑中构建该存储,然后稍后检索它。因此,KafkaStreamsInteractiveQueryService 中检索可查询存储的 API 具有通用存储类型签名,以便最终用户可以分配适当的类型。

这是 API 的类型签名。

public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)

调用此方法时,用户可以明确请求适当的状态存储类型,就像我们在上面的示例中所做的那样。

重试状态存储检索

尝试使用 KafkaStreamsInteractiveQueryService 检索状态存储时,状态存储可能会因各种原因而未找到。如果这些原因是暂时的,KafkaStreamsInteractiveQueryService 提供了一个选项,通过允许注入自定义的 RetryTemplate 来重试检索状态存储。默认情况下,KafkaStreamsInteractiveQueryService 中使用的 RetryTemplate 使用最大三次尝试,并具有一秒的固定回退。

您可以像这样将自定义的 RetryTemplate 注入到 KafkaStreamsInteractiveQueryService 中,最大尝试次数为十次。

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
    RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
    retryTemplate.setRetryPolicy(retryPolicy);
    kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
    return kafkaStreamsInteractiveQueryService;
}

查询远程状态存储

上面显示用于检索状态存储的 API - retrieveQueryableStore 用于本地可用的键值状态存储。在生产环境中,Kafka Streams 应用程序很可能根据分区数量进行分布式。如果一个主题有四个分区,并且有四个相同 Kafka Streams 处理器实例正在运行,那么每个实例可能负责处理主题中的单个分区。在这种情况下,调用 retrieveQueryableStore 可能无法提供实例正在寻找的正确结果,尽管它可能返回一个有效的存储。让我们假设一个有四个分区的主题包含各种键的数据,并且单个分区始终负责特定的键。如果调用 retrieveQueryableStore 的实例正在查找此实例不托管的键的信息,那么它将不会收到任何数据。这是因为当前的 Kafka Streams 实例对该键一无所知。要解决此问题,调用实例首先需要确保它们拥有 Kafka Streams 处理器实例的主机信息,该实例托管特定键。这可以从同一 application.id 下的任何 Kafka Streams 实例中检索,如下所示。

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());

在上面的示例代码中,调用实例正在从名为 app-store 的状态存储中查询特定键 12345。API 还需要一个相应的键序列化器,在本例中为 IntegerSerializer。Kafka Streams 会遍历同一 application.id 下的所有实例,并尝试查找哪个实例托管此特定键。一旦找到,它会以 HostInfo 对象的形式返回该主机信息。

API 如下所示

public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)

当以这种分布式方式使用同一 application.id 的多个 Kafka Streams 处理器实例时,应用程序应该提供一个 RPC 层,状态存储可以通过 RPC 端点(例如 REST 端点)进行查询。有关此内容的更多详细信息,请参阅这篇文章。当使用 Spring for Apache Kafka 时,通过使用 spring-web 技术添加基于 Spring 的 REST 端点非常容易。一旦有了 REST 端点,就可以使用它从任何 Kafka Streams 实例查询状态存储,前提是实例知道托管键的 HostInfo

如果托管密钥的实例是当前实例,则应用程序无需调用 RPC 机制,而是进行 JVM 内部调用。然而,问题是应用程序可能不知道进行调用的实例就是托管密钥的实例,因为特定服务器可能会由于消费者重新平衡而丢失分区。为了解决这个问题,KafkaStreamsInteractiveQueryService 提供了一个方便的 API,通过 API 方法 getCurrentKafkaStreamsApplicationHostInfo() 查询当前主机信息,该方法返回当前的 HostInfo。其思想是应用程序可以首先获取有关密钥所在位置的信息,然后将 HostInfo 与当前实例的 HostInfo 进行比较。如果 HostInfo 数据匹配,则可以通过 retrieveQueryableStore 进行简单的 JVM 调用,否则选择 RPC 选项。

Kafka Streams 示例

以下示例结合了本章中涵盖的各种主题

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public StreamsBuilderFactoryBeanConfigurer configurer() {
        return fb -> fb.setStateListener((newState, oldState) -> {
            System.out.println("State transition from " + oldState + " to " + newState);
        });
    }

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
        KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
        stream
                .mapValues((ValueMapper<String, String>) String::toUpperCase)
                .groupByKey()
                .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(1_000)))
                .reduce((String value1, String value2) -> value1 + value2,
                		Named.as("windowStore"))
                .toStream()
                .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
                .filter((i, s) -> s.length() > 40)
                .to("streamingTopic2");

        stream.print(Printed.toSysOut());

        return stream;
    }

}
© . This site is unofficial and not affiliated with VMware.