配置选项

本节包含 Apache Kafka 绑定器使用的配置选项。

有关绑定器的通用配置选项和属性,请参阅核心文档中的绑定属性

Kafka 绑定器属性

spring.cloud.stream.kafka.binder.brokers

Kafka 绑定器连接的 broker 列表。

默认值:localhost

spring.cloud.stream.kafka.binder.defaultBrokerPort

brokers 允许指定带端口信息或不带端口信息的主机(例如,host1,host2:port2)。当 broker 列表中未配置端口时,此属性设置默认端口。

默认值:9092

spring.cloud.stream.kafka.binder.configuration

传递给绑定器创建的所有客户端的客户端属性(包括生产者和消费者)的键/值映射。由于这些属性同时被生产者和消费者使用,因此应将其用途限制为通用属性,例如安全设置。通过此配置提供的未知 Kafka 生产者或消费者属性会被过滤掉,不允许传播。此处的属性会覆盖 boot 中设置的任何属性。

默认值:空映射。

spring.cloud.stream.kafka.binder.consumerProperties

任意 Kafka 客户端消费者属性的键/值映射。除了支持已知的 Kafka 消费者属性外,此处也允许未知的消费者属性。此处的属性会覆盖 boot 中和上面 configuration 属性中设置的任何属性。

默认值:空映射。

spring.cloud.stream.kafka.binder.headers

由绑定器传输的自定义头列表。仅在与使用 kafka-clients 版本 < 0.11.0.0 的旧版应用程序(⇐ 1.3.x)通信时才需要。更新的版本原生支持头。

默认值:空。

spring.cloud.stream.kafka.binder.healthTimeout

获取分区信息等待时间,单位为秒。如果此计时器到期,则健康状态报告为 down。

默认值:60。

spring.cloud.stream.kafka.binder.requiredAcks

broker 上所需的 acks 数量。请参阅 Kafka 文档中生产者 acks 属性的说明。

默认值:1

spring.cloud.stream.kafka.binder.minPartitionCount

仅当 autoCreateTopicsautoAddPartitions 设置为 true 时才生效。绑定器在其生产或消费数据的 topics 上配置的全局最小分区数。它可以被生产者的 partitionCount 设置或生产者的 instanceCount * concurrency 设置的值(如果两者之一更大)覆盖。

默认值:1

spring.cloud.stream.kafka.binder.producerProperties

任意 Kafka 客户端生产者属性的键/值映射。除了支持已知的 Kafka 生产者属性外,此处也允许未知的生产者属性。此处的属性会覆盖 boot 中和上面 configuration 属性中设置的任何属性。

默认值:空映射。

spring.cloud.stream.kafka.binder.replicationFactor

如果 autoCreateTopics 处于活动状态,则自动创建 topic 的复制因子。可以在每个绑定上覆盖。

如果您使用的 Kafka broker 版本早于 2.4,则此值应设置为至少 1。从 3.0.8 版本开始,绑定器使用 -1 作为默认值,表示将使用 broker 的 'default.replication.factor' 属性来确定副本数量。请与您的 Kafka broker 管理员核实,查看是否存在要求最小复制因子的策略,如果是这样,通常 default.replication.factor 将匹配该值,并且应使用 -1,除非您需要大于最小复制因子的复制因子。

默认值:-1

spring.cloud.stream.kafka.binder.autoCreateTopics

如果设置为 true,绑定器会自动创建新 topic。如果设置为 false,绑定器依赖于已配置的 topic。在后一种情况下,如果 topic 不存在,绑定器将无法启动。

此设置与 broker 的 auto.create.topics.enable 设置无关,也不影响它。如果服务器设置为自动创建 topic,它们可能会作为元数据检索请求的一部分创建,并使用默认的 broker 设置。

默认值:true

spring.cloud.stream.kafka.binder.autoAddPartitions

如果设置为 true,绑定器会在需要时创建新分区。如果设置为 false,绑定器依赖于已配置的 topic 分区大小。如果目标 topic 的分区数小于预期值,绑定器将无法启动。

默认值:false

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

在绑定器中启用事务。请参阅 Kafka 文档中的 transaction.idspring-kafka 文档中的事务。启用事务时,单个 producer 属性将被忽略,所有生产者都使用 spring.cloud.stream.kafka.binder.transaction.producer.* 属性。

默认值 null (无事务)

spring.cloud.stream.kafka.binder.transaction.producer.*

事务绑定器中生产者的全局生产者属性。请参阅 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefixKafka 生产者属性以及所有绑定器支持的通用生产者属性。

默认值:请参阅单独的生产者属性。

spring.cloud.stream.kafka.binder.headerMapperBeanName

用于将 spring-messaging 头映射到 Kafka 头以及从 Kafka 头映射的 KafkaHeaderMapper 的 bean 名称。例如,如果您希望自定义使用 JSON 反序列化头的 BinderHeaderMapper bean 中的受信任包,请使用此属性。如果此自定义 BinderHeaderMapper bean 未通过此属性提供给绑定器,则绑定器将查找名为 kafkaBinderHeaderMapper 且类型为 BinderHeaderMapper 的头映射器 bean,然后回退到绑定器创建的默认 BinderHeaderMapper

默认值:无。

spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader

当 topic 上的任何分区(无论哪个消费者正在从中接收数据)被发现没有 leader 时,将绑定器健康状态设置为 down 的标志。

默认值:true

spring.cloud.stream.kafka.binder.certificateStoreDirectory

当信任库或密钥库证书位置作为非本地文件系统资源(org.springframework.core.io.Resource 支持的资源,例如 CLASSPATH、HTTP 等)提供时,绑定器会将资源从路径(可转换为 org.springframework.core.io.Resource)复制到文件系统上的某个位置。这对于 broker 级别的证书(ssl.truststore.locationssl.keystore.location)和用于 schema 注册表的证书(schema.registry.ssl.truststore.locationschema.registry.ssl.keystore.location)都适用。请记住,信任库和密钥库位置路径必须在 spring.cloud.stream.kafka.binder.configuration…​ 下提供。例如,spring.cloud.stream.kafka.binder.configuration.ssl.truststore.locationspring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location 等。文件将复制到此属性值指定的位置,该位置必须是文件系统上应用程序运行进程可写的现有目录。如果此值未设置且证书文件是非本地文件系统资源,则它将复制到 System.getProperty("java.io.tmpdir") 返回的系统临时目录。如果此值存在但找不到目录或不可写,则也适用。

默认值:无。

spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled

当设置为 true 时,每个消费者 topic 的 offset 滞后指标会在每次访问指标时计算。当设置为 false 时,仅使用定期计算的 offset 滞后。

默认值:true

spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval

计算每个消费者 topic 的 offset 滞后的间隔。当 metrics.defaultOffsetLagMetricsEnabled 被禁用或其计算时间过长时,将使用此值。

默认值:60 秒

spring.cloud.stream.kafka.binder.enableObservation

在此绑定器中的所有绑定上启用 Micrometer 观察注册表。

默认值:false

spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup

KafkaHealthIndicator 元数据消费者 group.id。此消费者由 HealthIndicator 用于查询所用 topic 的元数据。

默认值:无。

Kafka 消费者属性

以下属性仅适用于 Kafka 消费者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.consumer. 为前缀。

为避免重复,Spring Cloud Stream 支持以 spring.cloud.stream.kafka.default.consumer.<property>=<value> 的格式设置所有通道的值。
admin.configuration

自 2.1.1 版本起,此属性已弃用,取而代之的是 topic.properties,并将在未来版本中移除对其的支持。

admin.replicas-assignment

自 2.1.1 版本起,此属性已弃用,取而代之的是 topic.replicas-assignment,并将在未来版本中移除对其的支持。

admin.replication-factor

自 2.1.1 版本起,此属性已弃用,取而代之的是 topic.replication-factor,并将在未来版本中移除对其的支持。

autoRebalanceEnabled

true 时,topic 分区会在消费者组的成员之间自动重新平衡。当 false 时,每个消费者根据 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 分配一组固定的分区。这要求在每个启动的实例上适当地设置 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 属性。在这种情况下,spring.cloud.stream.instanceCount 属性的值通常必须大于 1。

默认值:true

ackEachRecord

autoCommitOffsettrue 时,此设置决定是否在处理每个记录后提交 offset。默认情况下,在处理完 consumer.poll() 返回的批次中的所有记录后提交 offset。通过消费者 configuration 属性设置的 max.poll.records Kafka 属性可以控制 poll 返回的记录数。将其设置为 true 可能会导致性能下降,但这样做可以减少发生故障时重新传递记录的可能性。另请参阅绑定器 requiredAcks 属性,它也影响提交 offset 的性能。此属性自 3.1 版本起已弃用,取而代之的是使用 ackMode。如果未设置 ackMode 且未启用批处理模式,则将使用 RECORD 确认模式。

默认值:false

autoCommitOffset

从 3.1 版本开始,此属性已弃用。有关替代方案的更多详细信息,请参阅 ackMode。消息处理后是否自动提交 offset。如果设置为 false,则入站消息中会存在一个键为 kafka_acknowledgment 类型为 org.springframework.kafka.support.Acknowledgment 的头。应用程序可以使用此头来确认消息。有关详细信息,请参阅示例部分。当此属性设置为 false 时,Kafka 绑定器会将 ack 模式设置为 org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL,并且应用程序负责确认记录。另请参阅 ackEachRecord

默认值:true

ackMode

指定容器确认模式。这基于 Spring Kafka 中定义的 AckMode 枚举。如果 ackEachRecord 属性设置为 true 且消费者不处于批处理模式,则这将使用 RECORD 的确认模式,否则,使用通过此属性提供的确认模式。

autoCommitOnError

在可轮询消费者中,如果设置为 true,则始终在出错时自动提交。如果未设置(默认)或为 false,则在可轮询消费者中不会自动提交。请注意,此属性仅适用于可轮询消费者。

默认值:未设置。

resetOffsets

是否将消费者上的 offset 重置为由 startOffset 提供的值。如果提供了 KafkaBindingRebalanceListener,则必须为 false;请参阅重新平衡监听器。有关此属性的更多信息,请参阅重置 offset

默认值:false

startOffset

新组的起始 offset。允许的值:earliestlatest。如果为消费者 'binding' 显式设置了消费者组(通过 spring.cloud.stream.bindings.<channelName>.group),则 'startOffset' 设置为 earliest。否则,对于 anonymous 消费者组,它设置为 latest。有关此属性的更多信息,请参阅重置 offset

默认值:null(相当于 earliest)。

enableDlq

当设置为 true 时,它为消费者启用 DLQ 行为。默认情况下,导致错误的消息会转发到名为 error.<destination>.<group> 的 topic。DLQ topic 名称可以通过设置 dlqName 属性或定义类型为 DlqDestinationResolver@Bean 来配置。这为更常见的 Kafka 重放场景提供了一个替代选项,适用于错误数量相对较少且重放整个原始 topic 可能过于繁琐的情况。有关更多信息,请参阅kafka dlq 处理。从 2.0 版本开始,发送到 DLQ topic 的消息会增强以下头:x-original-topicx-exception-messagex-exception-stacktrace,类型为 byte[]。默认情况下,失败的记录会发送到 DLQ topic 中与原始记录相同的分区号。有关如何更改此行为的信息,请参阅dlq 分区选择destinationIsPatterntrue 时不允许。

默认值:false

dlqPartitions

enableDlq 为 true 且未设置此属性时,将创建一个死信 topic,其分区数与主 topic 相同。通常,死信记录会发送到死信 topic 中与原始记录相同的分区。此行为可以更改;请参阅dlq 分区选择。如果此属性设置为 1 并且没有 DqlPartitionFunction bean,则所有死信记录都将写入分区 0。如果此属性大于 1,则必须提供 DlqPartitionFunction bean。请注意,实际分区计数受绑定器 minPartitionCount 属性的影响。

默认值:none

configuration

包含通用 Kafka 消费者属性的键/值对映射。除了 Kafka 消费者属性外,此处还可以传递其他配置属性。例如,应用程序需要的一些属性,如 spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar。此处不能设置 bootstrap.servers 属性;如果需要连接到多个集群,请使用多绑定器支持。

默认值:空映射。

dlqName

接收错误消息的 DLQ topic 的名称。

默认值:null(如果未指定,则导致错误的消息会转发到名为 error.<destination>.<group> 的 topic)。

dlqProducerProperties

使用此属性可以设置 DLQ 特定的生产者属性。所有通过 Kafka 生产者属性可用的属性都可以通过此属性设置。当消费者上启用原生解码(即 useNativeDecoding: true)时,应用程序必须为 DLQ 提供相应的键/值序列化器。这必须以 dlqProducerProperties.configuration.key.serializerdlqProducerProperties.configuration.value.serializer 的形式提供。

默认值:默认 Kafka 生产者属性。

standardHeaders

指示入站通道适配器填充哪些标准头。允许的值:noneidtimestampboth。如果使用原生反序列化且接收消息的第一个组件需要 id(例如配置为使用 JDBC 消息存储的聚合器),则此属性非常有用。

默认值:none

converterBeanName

实现 RecordMessageConverter 的 bean 名称。在入站通道适配器中使用,以替换默认的 MessagingMessageConverter

默认值:null

idleEventInterval

表示最近没有收到消息的事件之间的间隔,单位为毫秒。使用 ApplicationListener<ListenerContainerIdleEvent> 接收这些事件。有关使用示例,请参阅暂停-恢复

默认值:30000

destinationIsPattern

当 true 时,目标被视为正则表达式 Pattern,用于由 broker 匹配 topic 名称。当 true 时,不提供 topic,并且不允许 enableDlq,因为绑定器在 provision 阶段不知道 topic 名称。请注意,检测匹配模式的新 topic 所花费的时间由消费者属性 metadata.max.age.ms 控制,该属性(在撰写本文时)默认为 300,000ms(5 分钟)。这可以使用上面的 configuration 属性进行配置。

默认值:false

topic.properties

一个 Kafka topic 属性的 Map,用于 provision 新 topic,例如 spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0

默认值:无。

topic.replicas-assignment

一个 Map<Integer, List<Integer>> 的副本分配,其中键是分区,值是分配。用于 provision 新 topic。请参阅 kafka-clients jar 中的 NewTopic Javadoc。

默认值:无。

topic.replication-factor

provision topic 时使用的复制因子。覆盖绑定器范围的设置。如果存在 replicas-assignments,则忽略。

默认值:无(使用绑定器范围的默认值 -1)。

pollTimeout

可轮询消费者中用于轮询的超时时间。

默认值:5 秒。

事务管理器

KafkaAwareTransactionManager 的 Bean 名称,用于覆盖此绑定的绑定器事务管理器。如果您想使用 ChainedKafkaTransactionManaager 将另一个事务与 Kafka 事务同步,通常需要此属性。为了实现记录的精确一次消费和生产,消费者和生产者绑定都必须配置相同的事务管理器。

默认值:无。

txCommitRecovered

当使用事务绑定器时,恢复的记录的 offset(例如,当重试耗尽且记录被发送到死信 topic 时)将通过新事务提交,这是默认行为。将此属性设置为 false 会阻止提交恢复记录的 offset。

默认值:true。

commonErrorHandlerBeanName

每个消费者绑定使用的 CommonErrorHandler bean 名称。当存在时,此用户提供的 CommonErrorHandler 优先于绑定器定义的任何其他错误处理器。如果应用程序不想使用 ListenerContainerCustomizer 然后检查目标/组组合来设置错误处理器,这是一种方便表达错误处理器的方式。

默认值:无。

Kafka 生产者属性

以下属性仅适用于 Kafka 生产者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.producer. 为前缀。

为避免重复,Spring Cloud Stream 支持以 spring.cloud.stream.kafka.default.producer.<property>=<value> 的格式设置所有通道的值。
admin.configuration

自 2.1.1 版本起,此属性已弃用,取而代之的是 topic.properties,并将在未来版本中移除对其的支持。

admin.replicas-assignment

自 2.1.1 版本起,此属性已弃用,取而代之的是 topic.replicas-assignment,并将在未来版本中移除对其的支持。

admin.replication-factor

自 2.1.1 版本起,此属性已弃用,取而代之的是 topic.replication-factor,并将在未来版本中移除对其的支持。

bufferSize

Kafka 生产者在发送前尝试批量处理的数据的上限,单位为字节。

默认值:16384

sync

生产者是否是同步的。

默认值:false

sendTimeoutExpression

一个针对出站消息进行评估的 SpEL 表达式,用于在启用同步发布时评估等待 ack 的时间,例如 headers['mySendTimeout']。超时时间的值以毫秒为单位。在 3.0 版本之前,除非使用原生编码,否则无法使用 payload,因为在评估此表达式时,payload 已经采用 byte[] 的形式。现在,在转换 payload 之前评估表达式。

默认值:none

batchTimeout

生产者等待允许更多消息在同一批次中累积然后发送消息的时间。(通常,生产者根本不等待,只需发送在前一次发送正在进行时累积的所有消息。)非零值可能会以延迟为代价提高吞吐量。

默认值:0

messageKeyExpression

一个针对出站消息进行评估的 SpEL 表达式,用于填充生成的 Kafka 消息的键,例如 headers['myKey']。在 3.0 版本之前,除非使用原生编码,否则无法使用 payload,因为在评估此表达式时,payload 已经采用 byte[] 的形式。现在,在转换 payload 之前评估表达式。对于常规处理器(Function<String, String>Function<Message<?>, Message<?>),如果生成的键需要与来自 topic 的入站键相同,则可以按如下方式设置此属性:spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey'] 对于反应式函数,需要记住一个重要的注意事项。在这种情况下,应用程序需要手动将头从入站消息复制到出站消息。您可以设置头,例如 myKey 并使用上面建议的 headers['myKey'],或者为了方便起见,只需设置 KafkaHeaders.MESSAGE_KEY 头,您根本不需要设置此属性。

默认值:none

headerPatterns

一个逗号分隔的简单模式列表,用于匹配 Spring messaging 头,以映射到 ProducerRecord 中的 Kafka Headers。模式可以以通配符(星号)开头或结尾。模式可以通过加前缀 ! 来否定。匹配在第一个匹配(正或负)之后停止。例如 !ask,as* 将通过 ash 但不会通过 askidtimestamp 永远不会被映射。

默认值:*(所有头 - 除了 idtimestamp

configuration

包含通用 Kafka 生产者属性的键/值对映射。此处不能设置 bootstrap.servers 属性;如果需要连接到多个集群,请使用多绑定器支持。

默认值:空映射。

topic.properties

一个 Kafka topic 属性的 Map,用于 provision 新 topic,例如 spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0

topic.replicas-assignment

一个 Map<Integer, List<Integer>> 的副本分配,其中键是分区,值是分配。用于 provision 新 topic。请参阅 kafka-clients jar 中的 NewTopic Javadoc。

默认值:无。

topic.replication-factor

provision topic 时使用的复制因子。覆盖绑定器范围的设置。如果存在 replicas-assignments,则忽略。

默认值:无(使用绑定器范围的默认值 -1)。

useTopicHeader

设置为 true 以使用出站消息中 KafkaHeaders.TOPIC 消息头的值覆盖默认的绑定目标(topic 名称)。如果头不存在,则使用默认的绑定目标。

默认值:false

recordMetadataChannel

一个 MessageChannel 的 bean 名称,用于发送成功的发送结果;该 bean 必须存在于应用程序上下文中。发送到通道的消息是已发送的消息(如果进行了任何转换,则为转换后的消息),并带有一个附加头 KafkaHeaders.RECORD_METADATA。该头包含 Kafka 客户端提供的 RecordMetadata 对象;它包括记录写入 topic 的分区和 offset。

ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)

失败的发送会转到生产者错误通道(如果已配置);请参阅Kafka 错误通道

默认值:null。

Kafka 绑定器使用生产者的 partitionCount 设置作为提示,以创建具有给定分区数的 topic(结合 minPartitionCount,两者中的最大值将作为所使用的值)。在为绑定器配置 minPartitionCount 和为应用程序配置 partitionCount 时要谨慎,因为将使用较大的值。如果已存在分区数较小的 topic 且 autoAddPartitions 被禁用(默认),则绑定器将无法启动。如果已存在分区数较小的 topic 且 autoAddPartitions 被启用,则会添加新分区。如果已存在分区数大于 (minPartitionCountpartitionCount) 最大值的 topic,则使用现有分区数。
compression

设置 compression.type 生产者属性。支持的值为 nonegzipsnappylz4zstd。如果您将 kafka-clients jar 覆盖到 2.1.0(或更高版本),如 Spring for Apache Kafka 文档中所述,并希望使用 zstd 压缩,请使用 spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd

默认值:none

事务管理器

KafkaAwareTransactionManager 的 Bean 名称,用于覆盖此绑定的绑定器事务管理器。如果您想使用 ChainedKafkaTransactionManaager 将另一个事务与 Kafka 事务同步,通常需要此属性。为了实现记录的精确一次消费和生产,消费者和生产者绑定都必须配置相同的事务管理器。

默认值:无。

closeTimeout

关闭生产者时等待的超时时间,单位为秒。

默认值:30

allowNonTransactional

通常,与事务绑定器关联的所有输出绑定都将在新事务中发布,如果尚未进行事务。此属性允许您覆盖该行为。如果设置为 true,则发布到此输出绑定的记录将不会在事务中运行,除非事务已经在进行中。

默认值:false

© . This site is unofficial and not affiliated with VMware.