配置选项
本节包含 Kafka Streams 绑定器使用的配置选项。
有关绑定器的通用配置选项和属性,请参阅 核心文档。
Kafka Streams 绑定器属性
以下属性在绑定器级别可用,并且必须以 spring.cloud.stream.kafka.streams.binder. 为前缀。Kafka Streams 绑定器中重复使用的任何 Kafka 绑定器提供的属性必须以 spring.cloud.stream.kafka.streams.binder 为前缀,而不是 spring.cloud.stream.kafka.binder。此规则的唯一例外是当定义 Kafka 引导服务器属性时,此时任一前缀都有效。
- configuration
-
包含与 Apache Kafka Streams API 相关的键/值对的映射。此属性必须以
spring.cloud.stream.kafka.streams.binder.为前缀。以下是使用此属性的一些示例。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
有关可用于流配置的所有属性的更多信息,请参阅 Apache Kafka Streams 文档中的 StreamsConfig JavaDocs。您可以通过此属性设置 StreamsConfig 中的所有配置。当使用此属性时,它适用于整个应用程序,因为这是一个绑定器级别的属性。如果应用程序中有多个处理器,所有处理器都将获取这些属性。在像 application.id 这样的属性情况下,这会变得有问题,因此您必须仔细检查如何使用此绑定器级别的 configuration 属性映射 StreamsConfig 中的属性。
- functions.<function-bean-name>.applicationId
-
仅适用于函数式处理器。这可以用于在应用程序中为每个函数设置应用程序 ID。在有多个函数的情况下,这是一种方便的设置应用程序 ID 的方法。
- functions.<function-bean-name>.configuration
-
仅适用于函数式处理器。包含与 Apache Kafka Streams API 相关的键/值对的映射。这类似于上面描述的绑定器级别的
configuration属性,但此级别的configuration属性仅限于命名函数。当您有多个处理器并且希望根据特定函数限制对配置的访问时,您可能需要使用此属性。此处可以使用所有StreamsConfig属性。 - brokers
-
Broker URL
默认值:
localhost - zkNodes
-
Zookeeper URL
默认值:
localhost - deserializationExceptionHandler
-
反序列化错误处理程序类型。此处理程序在绑定器级别应用,因此适用于应用程序中的所有输入绑定。有一种方法可以在消费者绑定级别以更精细的方式控制它。可能的值为 -
logAndContinue、logAndFail、skipAndContinue或sendToDlq默认值:
logAndFail - applicationId
-
方便地在绑定器级别为 Kafka Streams 应用程序全局设置 application.id。如果应用程序包含多个函数,则应不同地设置应用程序 ID。有关设置应用程序 ID 的详细讨论,请参阅上面。
默认值:应用程序将生成一个静态应用程序 ID。有关更多详细信息,请参阅应用程序 ID 部分。
- stateStoreRetry.maxAttempts
-
尝试连接状态存储的最大次数。
默认值:1
- stateStoreRetry.backoffPeriod
-
重试连接状态存储时的退避周期。
默认值:1000 毫秒
- consumerProperties
-
绑定器级别的任意消费者属性。
- producerProperties
-
绑定器级别的任意生产者属性。
- includeStoppedProcessorsForHealthCheck
-
当处理器绑定通过执行器停止时,此处理器默认不参与健康检查。将此属性设置为
true以启用所有处理器的健康检查,包括那些当前通过绑定执行器端点停止的处理器。默认值:false
Kafka Streams 生产者属性
以下属性仅适用于 Kafka Streams 生产者,并且必须以 spring.cloud.stream.kafka.streams.bindings.<binding name>.producer. 为前缀。为方便起见,如果存在多个输出绑定且它们都需要一个公共值,则可以使用 spring.cloud.stream.kafka.streams.default.producer. 前缀进行配置。
- keySerde
-
要使用的键序列化/反序列化器
默认值:请参阅上面关于消息反序列化/序列化的讨论
- valueSerde
-
要使用的值序列化/反序列化器
默认值:请参阅上面关于消息反序列化/序列化的讨论
- useNativeEncoding
-
启用/禁用原生编码的标志
默认值:
true。 - streamPartitionerBeanName
-
要在消费者端使用的自定义出站分区器 bean 名称。应用程序可以提供自定义的
StreamPartitioner作为 Spring bean,并且此 bean 的名称可以提供给生产者以替代默认分区器。默认值:请参阅上面关于出站分区支持的讨论。
- producedAs
-
处理器生产到的 Sink 组件的自定义名称。
默认值:
none(由 Kafka Streams 生成)
Kafka Streams 消费者属性
以下属性适用于 Kafka Streams 消费者,并且必须以 spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer. 为前缀。为方便起见,如果存在多个输入绑定且它们都需要一个公共值,则可以使用 spring.cloud.stream.kafka.streams.default.consumer. 前缀进行配置。
- applicationId
-
为每个输入绑定设置 application.id。
默认值:如上所述。
- keySerde
-
要使用的键序列化/反序列化器
默认值:请参阅上面关于消息反序列化/序列化的讨论
- valueSerde
-
要使用的值序列化/反序列化器
默认值:请参阅上面关于消息反序列化/序列化的讨论
- materializedAs
-
当使用传入 KTable 类型时要具体化的状态存储
默认值:
none。 - cachingDisabled
-
禁用具体化 KTable 的缓存。设置为
true时,调用Materialized对象的withCachingDisabled()。设置为false时,调用Materialized对象的withCachingEnabled()。默认值:
false。 - loggingDisabled
-
禁用具体化 KTable 的日志记录。设置为
true时,调用Materialized对象的withLoggingDisabled()。默认值:
false。 - useNativeDecoding
-
启用/禁用原生解码的标志
默认值:
true。 - dlqName
-
DLQ 主题名称。
默认值:请参阅上面关于错误处理和 DLQ 的讨论。
- startOffset
-
如果没有已提交的偏移量可供消费,则从该偏移量开始消费。这主要用于消费者首次从主题消费时。Kafka Streams 使用
earliest作为默认策略,绑定器也使用相同的默认策略。可以使用此属性将其覆盖为latest。默认值:
earliest。
注意:在消费者上使用 resetOffsets 对 Kafka Streams 绑定器没有任何影响。与基于消息通道的绑定器不同,Kafka Streams 绑定器不会按需跳转到开头或结尾。
- deserializationExceptionHandler
-
反序列化错误处理程序类型。此处理程序按每个消费者绑定应用,而不是之前描述的绑定器级别属性。可能的值为 -
logAndContinue、logAndFail、skipAndContinue或sendToDlq默认值:
logAndFail - timestampExtractorBeanName
-
要在消费者端使用的特定时间戳提取器 bean 名称。应用程序可以提供
TimestampExtractor作为 Spring bean,并且此 bean 的名称可以提供给消费者以替代默认时间戳提取器。默认值:请参阅上面关于时间戳提取器的讨论。
- eventTypes
-
此绑定支持的事件类型,以逗号分隔。
默认值:
none - eventTypeHeaderKey
-
通过此绑定传入的每条记录上的事件类型头键。
默认值:
event_type - consumedAs
-
处理器消费的源组件的自定义名称。
默认值:
none(由 Kafka Streams 生成)
关于并发的特别说明
在 Kafka Streams 中,您可以使用 num.stream.threads 属性控制处理器可以创建的线程数。这可以通过上面描述的绑定器、函数、生产者或消费者级别的各种 configuration 选项来完成。您还可以为此目的使用核心 Spring Cloud Stream 提供的 concurrency 属性。使用此属性时,您需要在消费者上使用它。当您有多个输入绑定时,请将其设置在第一个输入绑定上。例如,当设置 spring.cloud.stream.bindings.process-in-0.consumer.concurrency 时,它将被绑定器转换为 num.stream.threads。如果您有多个处理器,并且一个处理器定义了绑定级别并发,而其他处理器没有,则没有绑定级别并发的处理器将回退到通过 spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads 指定的绑定器范围属性。如果此绑定器配置不可用,则应用程序将使用 Kafka Streams 设置的默认值。