配置选项

本节包含 Kafka Streams 绑定器使用的配置选项。

有关绑定器的通用配置选项和属性,请参阅 核心文档

Kafka Streams 绑定器属性

以下属性在绑定器级别可用,并且必须以 spring.cloud.stream.kafka.streams.binder. 为前缀。Kafka Streams 绑定器中重复使用的任何 Kafka 绑定器提供的属性必须以 spring.cloud.stream.kafka.streams.binder 为前缀,而不是 spring.cloud.stream.kafka.binder。此规则的唯一例外是当定义 Kafka 引导服务器属性时,此时任一前缀都有效。

configuration

包含与 Apache Kafka Streams API 相关的键/值对的映射。此属性必须以 spring.cloud.stream.kafka.streams.binder. 为前缀。以下是使用此属性的一些示例。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000

有关可用于流配置的所有属性的更多信息,请参阅 Apache Kafka Streams 文档中的 StreamsConfig JavaDocs。您可以通过此属性设置 StreamsConfig 中的所有配置。当使用此属性时,它适用于整个应用程序,因为这是一个绑定器级别的属性。如果应用程序中有多个处理器,所有处理器都将获取这些属性。在像 application.id 这样的属性情况下,这会变得有问题,因此您必须仔细检查如何使用此绑定器级别的 configuration 属性映射 StreamsConfig 中的属性。

functions.<function-bean-name>.applicationId

仅适用于函数式处理器。这可以用于在应用程序中为每个函数设置应用程序 ID。在有多个函数的情况下,这是一种方便的设置应用程序 ID 的方法。

functions.<function-bean-name>.configuration

仅适用于函数式处理器。包含与 Apache Kafka Streams API 相关的键/值对的映射。这类似于上面描述的绑定器级别的 configuration 属性,但此级别的 configuration 属性仅限于命名函数。当您有多个处理器并且希望根据特定函数限制对配置的访问时,您可能需要使用此属性。此处可以使用所有 StreamsConfig 属性。

brokers

Broker URL

默认值:localhost

zkNodes

Zookeeper URL

默认值:localhost

deserializationExceptionHandler

反序列化错误处理程序类型。此处理程序在绑定器级别应用,因此适用于应用程序中的所有输入绑定。有一种方法可以在消费者绑定级别以更精细的方式控制它。可能的值为 - logAndContinuelogAndFailskipAndContinuesendToDlq

默认值:logAndFail

applicationId

方便地在绑定器级别为 Kafka Streams 应用程序全局设置 application.id。如果应用程序包含多个函数,则应不同地设置应用程序 ID。有关设置应用程序 ID 的详细讨论,请参阅上面。

默认值:应用程序将生成一个静态应用程序 ID。有关更多详细信息,请参阅应用程序 ID 部分。

stateStoreRetry.maxAttempts

尝试连接状态存储的最大次数。

默认值:1

stateStoreRetry.backoffPeriod

重试连接状态存储时的退避周期。

默认值:1000 毫秒

consumerProperties

绑定器级别的任意消费者属性。

producerProperties

绑定器级别的任意生产者属性。

includeStoppedProcessorsForHealthCheck

当处理器绑定通过执行器停止时,此处理器默认不参与健康检查。将此属性设置为 true 以启用所有处理器的健康检查,包括那些当前通过绑定执行器端点停止的处理器。

默认值:false

Kafka Streams 生产者属性

以下属性适用于 Kafka Streams 生产者,并且必须以 spring.cloud.stream.kafka.streams.bindings.<binding name>.producer. 为前缀。为方便起见,如果存在多个输出绑定且它们都需要一个公共值,则可以使用 spring.cloud.stream.kafka.streams.default.producer. 前缀进行配置。

keySerde

要使用的键序列化/反序列化器

默认值:请参阅上面关于消息反序列化/序列化的讨论

valueSerde

要使用的值序列化/反序列化器

默认值:请参阅上面关于消息反序列化/序列化的讨论

useNativeEncoding

启用/禁用原生编码的标志

默认值:true

streamPartitionerBeanName

要在消费者端使用的自定义出站分区器 bean 名称。应用程序可以提供自定义的 StreamPartitioner 作为 Spring bean,并且此 bean 的名称可以提供给生产者以替代默认分区器。

默认值:请参阅上面关于出站分区支持的讨论。

producedAs

处理器生产到的 Sink 组件的自定义名称。

默认值:none (由 Kafka Streams 生成)

Kafka Streams 消费者属性

以下属性适用于 Kafka Streams 消费者,并且必须以 spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer. 为前缀。为方便起见,如果存在多个输入绑定且它们都需要一个公共值,则可以使用 spring.cloud.stream.kafka.streams.default.consumer. 前缀进行配置。

applicationId

为每个输入绑定设置 application.id。

默认值:如上所述。

keySerde

要使用的键序列化/反序列化器

默认值:请参阅上面关于消息反序列化/序列化的讨论

valueSerde

要使用的值序列化/反序列化器

默认值:请参阅上面关于消息反序列化/序列化的讨论

materializedAs

当使用传入 KTable 类型时要具体化的状态存储

默认值:none

cachingDisabled

禁用具体化 KTable 的缓存。设置为 true 时,调用 Materialized 对象的 withCachingDisabled()。设置为 false 时,调用 Materialized 对象的 withCachingEnabled()

默认值:false

loggingDisabled

禁用具体化 KTable 的日志记录。设置为 true 时,调用 Materialized 对象的 withLoggingDisabled()

默认值:false

useNativeDecoding

启用/禁用原生解码的标志

默认值:true

dlqName

DLQ 主题名称。

默认值:请参阅上面关于错误处理和 DLQ 的讨论。

startOffset

如果没有已提交的偏移量可供消费,则从该偏移量开始消费。这主要用于消费者首次从主题消费时。Kafka Streams 使用 earliest 作为默认策略,绑定器也使用相同的默认策略。可以使用此属性将其覆盖为 latest

默认值:earliest

注意:在消费者上使用 resetOffsets 对 Kafka Streams 绑定器没有任何影响。与基于消息通道的绑定器不同,Kafka Streams 绑定器不会按需跳转到开头或结尾。

deserializationExceptionHandler

反序列化错误处理程序类型。此处理程序按每个消费者绑定应用,而不是之前描述的绑定器级别属性。可能的值为 - logAndContinuelogAndFailskipAndContinuesendToDlq

默认值:logAndFail

timestampExtractorBeanName

要在消费者端使用的特定时间戳提取器 bean 名称。应用程序可以提供 TimestampExtractor 作为 Spring bean,并且此 bean 的名称可以提供给消费者以替代默认时间戳提取器。

默认值:请参阅上面关于时间戳提取器的讨论。

eventTypes

此绑定支持的事件类型,以逗号分隔。

默认值:none

eventTypeHeaderKey

通过此绑定传入的每条记录上的事件类型头键。

默认值:event_type

consumedAs

处理器消费的源组件的自定义名称。

默认值:none (由 Kafka Streams 生成)

关于并发的特别说明

在 Kafka Streams 中,您可以使用 num.stream.threads 属性控制处理器可以创建的线程数。这可以通过上面描述的绑定器、函数、生产者或消费者级别的各种 configuration 选项来完成。您还可以为此目的使用核心 Spring Cloud Stream 提供的 concurrency 属性。使用此属性时,您需要在消费者上使用它。当您有多个输入绑定时,请将其设置在第一个输入绑定上。例如,当设置 spring.cloud.stream.bindings.process-in-0.consumer.concurrency 时,它将被绑定器转换为 num.stream.threads。如果您有多个处理器,并且一个处理器定义了绑定级别并发,而其他处理器没有,则没有绑定级别并发的处理器将回退到通过 spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads 指定的绑定器范围属性。如果此绑定器配置不可用,则应用程序将使用 Kafka Streams 设置的默认值。

© . This site is unofficial and not affiliated with VMware.