配置选项
本节包含 Kafka Streams 绑定器使用的配置选项。
有关绑定器相关的通用配置选项和属性,请参阅 核心文档。
Kafka Streams 绑定器属性
以下属性在绑定器级别可用,必须以 spring.cloud.stream.kafka.streams.binder.
为前缀。Kafka 绑定器提供的任何在 Kafka Streams 绑定器中重复使用的属性必须以 spring.cloud.stream.kafka.streams.binder
为前缀,而不是 spring.cloud.stream.kafka.binder
。此规则的唯一例外是定义 Kafka 引导服务器属性时,在这种情况下,两种前缀都可以使用。
- configuration
-
包含与 Apache Kafka Streams API 相关的属性的键值对映射。此属性必须以
spring.cloud.stream.kafka.streams.binder.
为前缀。以下是一些使用此属性的示例。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
有关可能进入流配置的所有属性的更多信息,请参阅 Apache Kafka Streams 文档中的 StreamsConfig
JavaDocs。您可以通过 StreamsConfig
设置的所有配置都可以通过此设置。使用此属性时,它适用于整个应用程序,因为这是一个绑定器级别属性。如果您的应用程序中有多个处理器,它们都将获取这些属性。对于 application.id
之类的属性,这将变得有问题,因此您必须仔细检查如何使用此绑定器级别 configuration
属性映射来自 StreamsConfig
的属性。
- functions.<function-bean-name>.applicationId
-
仅适用于函数式处理器。这可用于在应用程序中为每个函数设置应用程序 ID。在有多个函数的情况下,这是一种设置应用程序 ID 的便捷方法。
- functions.<function-bean-name>.configuration
-
仅适用于函数式处理器。包含与 Apache Kafka Streams API 相关的属性的键值对映射。这类似于上面描述的绑定器级别的
configuration
属性,但此级别的configuration
属性仅限于命名函数。当您有多个处理器并且想要根据特定函数限制对配置的访问时,您可能需要使用此方法。所有StreamsConfig
属性都可以在此处使用。 - brokers
-
代理 URL
默认值:
localhost
- zkNodes
-
Zookeeper URL
默认值:
localhost
- deserializationExceptionHandler
-
反序列化错误处理程序类型。此处理程序在绑定器级别应用,因此应用于应用程序中的所有输入绑定。在消费者绑定级别可以更细粒度地控制它。可能的值为 -
logAndContinue
、logAndFail
、skipAndContinue
或sendToDlq
默认值:
logAndFail
- applicationId
-
在绑定器级别全局设置 Kafka Streams 应用程序的 application.id 的便捷方法。如果应用程序包含多个函数,则应以不同的方式设置应用程序 ID。请参阅上面详细讨论设置应用程序 ID 的部分。
默认值:应用程序将生成一个静态应用程序 ID。有关更多详细信息,请参阅应用程序 ID 部分。
- stateStoreRetry.maxAttempts
-
尝试连接到状态存储的最大次数。
默认值:1
- stateStoreRetry.backoffPeriod
-
重试时尝试连接到状态存储的回退时间段。
默认值:1000 毫秒
- consumerProperties
-
绑定器级别的任意消费者属性。
- producerProperties
-
绑定器级别的任意生产者属性。
- includeStoppedProcessorsForHealthCheck
-
当通过执行器停止处理器的绑定时,默认情况下,此处理器将不会参与健康检查。将此属性设置为
true
以启用所有处理器的健康检查,包括当前通过绑定执行器端点停止的处理器。默认值:false
Kafka Streams 生产者属性
以下属性仅适用于 Kafka Streams 生产者,并且必须以spring.cloud.stream.kafka.streams.bindings.<绑定名称>.producer.
为前缀。为了方便起见,如果有多个输出绑定并且它们都需要一个公共值,则可以使用前缀spring.cloud.stream.kafka.streams.default.producer.
进行配置。
- keySerde
-
要使用的键序列化器/反序列化器
默认值:请参阅上面关于消息反序列化/序列化的讨论
- valueSerde
-
要使用的值序列化器/反序列化器
默认值:请参阅上面关于消息反序列化/序列化的讨论
- useNativeEncoding
-
启用/禁用原生编码的标志
默认值:
true
。 - streamPartitionerBeanName
-
要在消费者处使用的自定义出站分区器 Bean 名称。应用程序可以提供自定义的
StreamPartitioner
作为 Spring Bean,并且可以将此 Bean 的名称提供给生产者以代替默认分区器。默认值:请参阅上面关于出站分区支持的讨论。
- producedAs
-
处理器生产到的接收器组件的自定义名称。
默认值:
none
(由 Kafka Streams 生成)
Kafka Streams 消费者属性
以下属性适用于 Kafka Streams 消费者,并且必须以spring.cloud.stream.kafka.streams.bindings.<绑定名称>.consumer.
为前缀。为了方便起见,如果有多个输入绑定并且它们都需要一个公共值,则可以使用前缀spring.cloud.stream.kafka.streams.default.consumer.
进行配置。
- applicationId
-
为每个输入绑定设置 application.id。
默认值:请参阅上面。
- keySerde
-
要使用的键序列化器/反序列化器
默认值:请参阅上面关于消息反序列化/序列化的讨论
- valueSerde
-
要使用的值序列化器/反序列化器
默认值:请参阅上面关于消息反序列化/序列化的讨论
- materializedAs
-
使用传入 KTable 类型时要物化的状态存储
默认值:
none
。 - useNativeDecoding
-
启用/禁用原生解码的标志
默认值:
true
。 - dlqName
-
DLQ 主题名称。
默认值:请参阅上面关于错误处理和 DLQ 的讨论。
- startOffset
-
如果不存在已提交的偏移量用于消费,则要从中开始的偏移量。这主要用于消费者首次从主题中消费时。Kafka Streams 使用
earliest
作为默认策略,绑定器使用相同的默认策略。可以使用此属性将其覆盖为latest
。默认值:
earliest
。
注意:在消费者上使用resetOffsets
不会对 Kafka Streams 绑定器产生任何影响。与基于消息通道的绑定器不同,Kafka Streams 绑定器不会按需寻求开始或结束。
- deserializationExceptionHandler
-
反序列化错误处理程序类型。此处理程序应用于每个消费者绑定,而不是之前描述的绑定器级别属性。可能的值包括 -
logAndContinue
、logAndFail
、skipAndContinue
或sendToDlq
默认值:
logAndFail
- timestampExtractorBeanName
-
在消费者端使用的特定时间戳提取器 Bean 名称。应用程序可以提供
TimestampExtractor
作为 Spring Bean,并将此 Bean 的名称提供给消费者以供使用,而不是使用默认的 Bean。默认值:请参阅上面关于时间戳提取器的讨论。
- eventTypes
-
此绑定支持的事件类型列表,以逗号分隔。
默认值:
none
- eventTypeHeaderKey
-
通过此绑定传入的每个记录上的事件类型标头键。
默认值:
event_type
- consumedAs
-
处理器从中消费的源组件的自定义名称。
默认值:
none
(由 Kafka Streams 生成)
关于并发性的特别说明
在 Kafka Streams 中,您可以使用num.stream.threads
属性控制处理器可以创建的线程数。您可以使用上面在绑定器、函数、生产者或消费者级别描述的各种configuration
选项来实现这一点。您还可以使用核心 Spring Cloud Stream 为此目的提供的concurrency
属性。使用此属性时,您需要在消费者端使用它。当您有多个输入绑定时,请在第一个输入绑定上设置此属性。例如,当设置spring.cloud.stream.bindings.process-in-0.consumer.concurrency
时,它将被绑定器转换为num.stream.threads
。如果您有多个处理器,其中一个处理器定义了绑定级别的并发性,而其他处理器没有定义,那么那些没有绑定级别并发性的处理器将默认回退到通过spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads
指定的绑定器范围属性。如果此绑定器配置不可用,则应用程序将使用 Kafka Streams 设置的默认值。