配置选项

本节包含 Kafka Streams 绑定器使用的配置选项。

有关绑定器相关的通用配置选项和属性,请参阅 核心文档

Kafka Streams 绑定器属性

以下属性在绑定器级别可用,必须以 spring.cloud.stream.kafka.streams.binder. 为前缀。Kafka 绑定器提供的任何在 Kafka Streams 绑定器中重复使用的属性必须以 spring.cloud.stream.kafka.streams.binder 为前缀,而不是 spring.cloud.stream.kafka.binder。此规则的唯一例外是定义 Kafka 引导服务器属性时,在这种情况下,两种前缀都可以使用。

configuration

包含与 Apache Kafka Streams API 相关的属性的键值对映射。此属性必须以 spring.cloud.stream.kafka.streams.binder. 为前缀。以下是一些使用此属性的示例。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000

有关可能进入流配置的所有属性的更多信息,请参阅 Apache Kafka Streams 文档中的 StreamsConfig JavaDocs。您可以通过 StreamsConfig 设置的所有配置都可以通过此设置。使用此属性时,它适用于整个应用程序,因为这是一个绑定器级别属性。如果您的应用程序中有多个处理器,它们都将获取这些属性。对于 application.id 之类的属性,这将变得有问题,因此您必须仔细检查如何使用此绑定器级别 configuration 属性映射来自 StreamsConfig 的属性。

functions.<function-bean-name>.applicationId

仅适用于函数式处理器。这可用于在应用程序中为每个函数设置应用程序 ID。在有多个函数的情况下,这是一种设置应用程序 ID 的便捷方法。

functions.<function-bean-name>.configuration

仅适用于函数式处理器。包含与 Apache Kafka Streams API 相关的属性的键值对映射。这类似于上面描述的绑定器级别的 configuration 属性,但此级别的 configuration 属性仅限于命名函数。当您有多个处理器并且想要根据特定函数限制对配置的访问时,您可能需要使用此方法。所有 StreamsConfig 属性都可以在此处使用。

brokers

代理 URL

默认值:localhost

zkNodes

Zookeeper URL

默认值:localhost

deserializationExceptionHandler

反序列化错误处理程序类型。此处理程序在绑定器级别应用,因此应用于应用程序中的所有输入绑定。在消费者绑定级别可以更细粒度地控制它。可能的值为 - logAndContinuelogAndFailskipAndContinuesendToDlq

默认值:logAndFail

applicationId

在绑定器级别全局设置 Kafka Streams 应用程序的 application.id 的便捷方法。如果应用程序包含多个函数,则应以不同的方式设置应用程序 ID。请参阅上面详细讨论设置应用程序 ID 的部分。

默认值:应用程序将生成一个静态应用程序 ID。有关更多详细信息,请参阅应用程序 ID 部分。

stateStoreRetry.maxAttempts

尝试连接到状态存储的最大次数。

默认值:1

stateStoreRetry.backoffPeriod

重试时尝试连接到状态存储的回退时间段。

默认值:1000 毫秒

consumerProperties

绑定器级别的任意消费者属性。

producerProperties

绑定器级别的任意生产者属性。

includeStoppedProcessorsForHealthCheck

当通过执行器停止处理器的绑定时,默认情况下,此处理器将不会参与健康检查。将此属性设置为 true 以启用所有处理器的健康检查,包括当前通过绑定执行器端点停止的处理器。

默认值:false

Kafka Streams 生产者属性

以下属性适用于 Kafka Streams 生产者,并且必须以spring.cloud.stream.kafka.streams.bindings.<绑定名称>.producer.为前缀。为了方便起见,如果有多个输出绑定并且它们都需要一个公共值,则可以使用前缀spring.cloud.stream.kafka.streams.default.producer.进行配置。

keySerde

要使用的键序列化器/反序列化器

默认值:请参阅上面关于消息反序列化/序列化的讨论

valueSerde

要使用的值序列化器/反序列化器

默认值:请参阅上面关于消息反序列化/序列化的讨论

useNativeEncoding

启用/禁用原生编码的标志

默认值:true

streamPartitionerBeanName

要在消费者处使用的自定义出站分区器 Bean 名称。应用程序可以提供自定义的StreamPartitioner作为 Spring Bean,并且可以将此 Bean 的名称提供给生产者以代替默认分区器。

默认值:请参阅上面关于出站分区支持的讨论。

producedAs

处理器生产到的接收器组件的自定义名称。

默认值:none(由 Kafka Streams 生成)

Kafka Streams 消费者属性

以下属性适用于 Kafka Streams 消费者,并且必须以spring.cloud.stream.kafka.streams.bindings.<绑定名称>.consumer.为前缀。为了方便起见,如果有多个输入绑定并且它们都需要一个公共值,则可以使用前缀spring.cloud.stream.kafka.streams.default.consumer.进行配置。

applicationId

为每个输入绑定设置 application.id。

默认值:请参阅上面。

keySerde

要使用的键序列化器/反序列化器

默认值:请参阅上面关于消息反序列化/序列化的讨论

valueSerde

要使用的值序列化器/反序列化器

默认值:请参阅上面关于消息反序列化/序列化的讨论

materializedAs

使用传入 KTable 类型时要物化的状态存储

默认值:none

useNativeDecoding

启用/禁用原生解码的标志

默认值:true

dlqName

DLQ 主题名称。

默认值:请参阅上面关于错误处理和 DLQ 的讨论。

startOffset

如果不存在已提交的偏移量用于消费,则要从中开始的偏移量。这主要用于消费者首次从主题中消费时。Kafka Streams 使用earliest作为默认策略,绑定器使用相同的默认策略。可以使用此属性将其覆盖为latest

默认值:earliest

注意:在消费者上使用resetOffsets不会对 Kafka Streams 绑定器产生任何影响。与基于消息通道的绑定器不同,Kafka Streams 绑定器不会按需寻求开始或结束。

deserializationExceptionHandler

反序列化错误处理程序类型。此处理程序应用于每个消费者绑定,而不是之前描述的绑定器级别属性。可能的值包括 - logAndContinuelogAndFailskipAndContinuesendToDlq

默认值:logAndFail

timestampExtractorBeanName

在消费者端使用的特定时间戳提取器 Bean 名称。应用程序可以提供TimestampExtractor作为 Spring Bean,并将此 Bean 的名称提供给消费者以供使用,而不是使用默认的 Bean。

默认值:请参阅上面关于时间戳提取器的讨论。

eventTypes

此绑定支持的事件类型列表,以逗号分隔。

默认值:none

eventTypeHeaderKey

通过此绑定传入的每个记录上的事件类型标头键。

默认值:event_type

consumedAs

处理器从中消费的源组件的自定义名称。

默认值:none(由 Kafka Streams 生成)

关于并发性的特别说明

在 Kafka Streams 中,您可以使用num.stream.threads属性控制处理器可以创建的线程数。您可以使用上面在绑定器、函数、生产者或消费者级别描述的各种configuration选项来实现这一点。您还可以使用核心 Spring Cloud Stream 为此目的提供的concurrency属性。使用此属性时,您需要在消费者端使用它。当您有多个输入绑定时,请在第一个输入绑定上设置此属性。例如,当设置spring.cloud.stream.bindings.process-in-0.consumer.concurrency时,它将被绑定器转换为num.stream.threads。如果您有多个处理器,其中一个处理器定义了绑定级别的并发性,而其他处理器没有定义,那么那些没有绑定级别并发性的处理器将默认回退到通过spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads指定的绑定器范围属性。如果此绑定器配置不可用,则应用程序将使用 Kafka Streams 设置的默认值。