配置选项
本节包含 Kafka Streams binder 使用的配置选项。
有关 Binder 的通用配置选项和属性,请参阅核心文档。
Kafka Streams Binder 属性
以下属性可在 Binder 级别使用,并且必须以 spring.cloud.stream.kafka.streams.binder.
为前缀。在 Kafka Streams binder 中重复使用的任何 Kafka binder 提供的属性必须以 spring.cloud.stream.kafka.streams.binder
为前缀,而不是 spring.cloud.stream.kafka.binder
。此规则的唯一例外是定义 Kafka 引导服务器属性时,此时任何一个前缀都可以使用。
- configuration
-
包含与 Apache Kafka Streams API 相关属性的键/值对 Map。此属性必须以
spring.cloud.stream.kafka.streams.binder.
为前缀。以下是一些使用此属性的示例。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
有关可能包含在 Streams 配置中的所有属性的更多信息,请参阅 Apache Kafka Streams 文档中的 StreamsConfig
JavaDocs。您可以从 StreamsConfig
设置的所有配置都可以通过此属性进行设置。使用此属性时,它适用于整个应用程序,因为它是一个 Binder 级别的属性。如果应用程序中有多个处理器,所有这些处理器都将获取这些属性。对于像 application.id
这样的属性,这将成为问题,因此您必须仔细检查如何使用此 Binder 级别的 configuration
属性来映射来自 StreamsConfig
的属性。
- functions.<function-bean-name>.applicationId
-
仅适用于函数式处理器。此属性可用于在应用程序中为每个函数设置 application ID。在多个函数的情况下,这是设置 application ID 的便捷方式。
- functions.<function-bean-name>.configuration
-
仅适用于函数式处理器。包含与 Apache Kafka Streams API 相关属性的键/值对 Map。这类似于上面描述的 Binder 级别的
configuration
属性,但此级别的configuration
属性仅针对指定函数进行限制。当您有多个处理器并且希望根据特定函数限制对配置的访问时,您可能希望使用此属性。所有StreamsConfig
属性都可以在这里使用。 - brokers
-
Broker URL
默认值:
localhost
- zkNodes
-
Zookeeper URL
默认值:
localhost
- deserializationExceptionHandler
-
反序列化错误处理器类型。此处理器应用于 Binder 级别,因此应用于应用程序中的所有输入绑定。可以在消费者绑定级别以更精细的方式对其进行控制。可能的值包括 -
logAndContinue
,logAndFail
,skipAndContinue
或sendToDlq
默认值:
logAndFail
- applicationId
-
一种方便的方式,可在 Binder 级别全局设置 Kafka Streams 应用的 application.id。如果应用程序包含多个函数,则 application id 应设置不同。详细信息请参见上文关于设置 application id 的讨论。
默认值: 应用程序将生成一个静态 application ID。有关更多详细信息,请参阅 application ID 部分。
- stateStoreRetry.maxAttempts
-
尝试连接状态存储的最大次数。
默认值: 1
- stateStoreRetry.backoffPeriod
-
重试时尝试连接状态存储的回退周期。
默认值: 1000 ms
- consumerProperties
-
Binder 级别的任意消费者属性。
- producerProperties
-
Binder 级别的任意生产者属性。
- includeStoppedProcessorsForHealthCheck
-
当通过 actuator 停止处理器绑定时,默认情况下该处理器将不参与健康检查。将此属性设置为
true
以启用所有处理器的健康检查,包括当前通过 bindings actuator endpoint 停止的处理器。默认值: false
Kafka Streams 生产者属性
以下属性*仅*适用于 Kafka Streams 生产者,且必须以 spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.
为前缀。为方便起见,如果存在多个输出绑定且它们都需要一个公共值,则可以使用前缀 spring.cloud.stream.kafka.streams.default.producer.
进行配置。
- keySerde
-
要使用的 key serde
默认值: 请参阅上面关于消息序列化/反序列化的讨论
- valueSerde
-
要使用的 value serde
默认值: 请参阅上面关于消息序列化/反序列化的讨论
- useNativeEncoding
-
启用/禁用原生编码的标志
默认值:
true
。 - streamPartitionerBeanName
-
要在消费者端使用的自定义出站分区器 Bean 名称。应用程序可以将自定义
StreamPartitioner
作为 Spring Bean 提供,并将此 Bean 的名称提供给生产者使用,而不是使用默认分区器。默认值: 请参阅上面关于出站分区支持的讨论。
- producedAs
-
处理器生成到的 Sink 组件的自定义名称。
默认值:
none
(由 Kafka Streams 生成)
Kafka Streams 消费者属性
以下属性适用于 Kafka Streams 消费者,且必须以 spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.
为前缀。为方便起见,如果存在多个输入绑定且它们都需要一个公共值,则可以使用前缀 spring.cloud.stream.kafka.streams.default.consumer.
进行配置。
- applicationId
-
为每个输入绑定设置 application.id。
默认值: 请参阅上文。
- keySerde
-
要使用的 key serde
默认值: 请参阅上面关于消息序列化/反序列化的讨论
- valueSerde
-
要使用的 value serde
默认值: 请参阅上面关于消息序列化/反序列化的讨论
- materializedAs
-
使用传入 KTable 类型时要物化(materialize)的状态存储
默认值:
none
。 - useNativeDecoding
-
启用/禁用原生解码的标志
默认值:
true
。 - dlqName
-
DLQ 主题名称。
默认值: 请参阅上文关于错误处理和 DLQ 的讨论。
- startOffset
-
如果没有已提交的 offset 可供消费,则从此 offset 开始消费。这主要用于消费者首次消费主题时。Kafka Streams 使用
earliest
作为默认策略,Binder 也使用相同的默认值。可以使用此属性将其覆盖为latest
。默认值:
earliest
。
注意: 在消费者上使用 resetOffsets
对 Kafka Streams binder 没有影响。与基于消息通道的 binder 不同,Kafka Streams binder 不会按需寻址到开头或结尾。
- deserializationExceptionHandler
-
反序列化错误处理器类型。此处理器应用于每个消费者绑定,而不是应用于之前描述的 Binder 级别属性。可能的值包括 -
logAndContinue
,logAndFail
,skipAndContinue
或sendToDlq
默认值:
logAndFail
- timestampExtractorBeanName
-
要在消费者端使用的特定时间戳提取器 Bean 名称。应用程序可以将
TimestampExtractor
作为 Spring Bean 提供,并将此 Bean 的名称提供给消费者使用,而不是使用默认提取器。默认值: 请参阅上文关于时间戳提取器的讨论。
- eventTypes
-
此绑定支持的事件类型列表,以逗号分隔。
默认值:
none
- eventTypeHeaderKey
-
通过此绑定的每个传入记录上的事件类型头键。
默认值:
event_type
- consumedAs
-
处理器消费来源的 Source 组件的自定义名称。
默认值:
none
(由 Kafka Streams 生成)
关于并发性的特别说明
在 Kafka Streams 中,您可以使用 num.stream.threads
属性控制处理器可以创建的线程数。您可以使用上述在 Binder、函数、生产者或消费者级别描述的各种 configuration
选项来实现这一点。您也可以使用核心 Spring Cloud Stream 为此目的提供的 concurrency
属性。使用此属性时,需要在消费者上进行设置。当您有多个输入绑定时,将其设置在第一个输入绑定上。例如,设置 spring.cloud.stream.bindings.process-in-0.consumer.concurrency
时,它将被 Binder 转换为 num.stream.threads
。如果您有多个处理器,并且一个处理器定义了绑定级别的并发性,而其他处理器没有,那么那些没有绑定级别并发性的处理器将回退到通过 spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads
指定的 Binder 范围属性。如果此 Binder 配置不可用,则应用程序将使用 Kafka Streams 设置的默认值。