配置选项
本节包含 Apache Kafka Binder 使用的配置选项。
有关 Binder 的通用配置选项和属性,请参阅核心文档中的绑定属性。
Kafka Binder 属性
- spring.cloud.stream.kafka.binder.brokers
-
Kafka Binder 连接的 Broker 列表。
默认值:
localhost
。 - spring.cloud.stream.kafka.binder.defaultBrokerPort
-
brokers
允许指定带端口或不带端口信息的主机(例如,host1,host2:port2
)。此属性在 Broker 列表中未配置端口时设置默认端口。默认值:
9092
。 - spring.cloud.stream.kafka.binder.configuration
-
Key/Value Map,包含传递给 Binder 创建的所有客户端(生产者和消费者)的客户端属性。由于这些属性同时用于生产者和消费者,因此其使用应限于通用属性——例如,安全设置。通过此配置提供的未知 Kafka 生产者或消费者属性会被过滤掉,不允许传播。这里的属性会覆盖在 Boot 中设置的任何属性。
默认值:空 Map。
- spring.cloud.stream.kafka.binder.consumerProperties
-
Key/Value Map,包含任意 Kafka 客户端消费者属性。除了支持已知的 Kafka 消费者属性外,此处也允许使用未知消费者属性。这里的属性会覆盖在 Boot 中以及上述
configuration
属性中设置的任何属性。默认值:空 Map。
- spring.cloud.stream.kafka.binder.headers
-
Binder 传输的自定义 header 列表。仅在使用旧版本应用 (⇐ 1.3.x) 且
kafka-clients
版本低于 0.11.0.0 时需要。更新版本原生支持 header。默认值:空。
- spring.cloud.stream.kafka.binder.healthTimeout
-
获取分区信息等待的时间,单位为秒。如果此计时器过期,健康状态将报告为 down。
默认值:60。
- spring.cloud.stream.kafka.binder.requiredAcks
-
Broker 上所需的 acks 数量。参见 Kafka 文档中关于生产者
acks
属性的说明。默认值:
1
。 - spring.cloud.stream.kafka.binder.minPartitionCount
-
仅在设置了
autoCreateTopics
或autoAddPartitions
时生效。Binder 在其生产或消费数据的 Topic 上配置的全局最小分区数。它可能被生产者的partitionCount
设置或生产者的instanceCount * concurrency
设置(如果其中一个更大)所覆盖。默认值:
1
。 - spring.cloud.stream.kafka.binder.producerProperties
-
Key/Value Map,包含任意 Kafka 客户端生产者属性。除了支持已知的 Kafka 生产者属性外,此处也允许使用未知生产者属性。这里的属性会覆盖在 Boot 中以及上述
configuration
属性中设置的任何属性。默认值:空 Map。
- spring.cloud.stream.kafka.binder.replicationFactor
-
如果
autoCreateTopics
启用,则为自动创建的 Topic 的副本因子。可以在每个 Binding 上覆盖。如果您使用的是 2.4 之前的 Kafka Broker 版本,则此值应至少设置为 1
。从 3.0.8 版本开始,Binder 使用-1
作为默认值,这表示将使用 Broker 的 'default.replication.factor' 属性来确定副本数量。请与您的 Kafka Broker 管理员确认是否存在需要最小副本因子的策略,如果是,则通常default.replication.factor
将与该值匹配,应使用-1
,除非您需要大于最小值的副本因子。默认值:
-1
。 - spring.cloud.stream.kafka.binder.autoCreateTopics
-
如果设置为
true
,Binder 会自动创建新的 Topic。如果设置为false
,Binder 则依赖于已配置的 Topic。在后一种情况下,如果 Topic 不存在,Binder 将无法启动。此设置独立于 Broker 的 auto.create.topics.enable
设置,并且不会影响它。如果服务器设置为自动创建 Topic,它们可能会作为元数据检索请求的一部分创建,并使用默认的 Broker 设置。默认值:
true
。 - spring.cloud.stream.kafka.binder.autoAddPartitions
-
如果设置为
true
,Binder 会在需要时创建新的分区。如果设置为false
,Binder 则依赖于已配置 Topic 的分区大小。如果目标 Topic 的分区数小于预期值,Binder 将无法启动。默认值:
false
。 - spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
-
启用 Binder 中的事务。参见 Kafka 文档中的
transaction.id
和spring-kafka
文档中的事务。启用事务后,单个producer
属性将被忽略,所有生产者都使用spring.cloud.stream.kafka.binder.transaction.producer.*
属性。默认值
null
(无事务) - spring.cloud.stream.kafka.binder.transaction.producer.*
-
事务性 Binder 中生产者的全局生产者属性。参见
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
和Kafka 生产者属性以及所有 Binder 支持的通用生产者属性。默认值:参见各个生产者属性。
- spring.cloud.stream.kafka.binder.headerMapperBeanName
-
用于将
spring-messaging
header 与 Kafka header 之间进行映射的KafkaHeaderMapper
Bean 名称。例如,如果您希望定制使用 JSON 反序列化 header 的BinderHeaderMapper
Bean 中的受信任包,可以使用此属性。如果没有通过此属性将自定义的BinderHeaderMapper
Bean 提供给 Binder,那么 Binder 将查找名称为kafkaBinderHeaderMapper
且类型为BinderHeaderMapper
的 header mapper Bean,然后再回退到 Binder 创建的默认BinderHeaderMapper
。默认值:无。
- spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader
-
标志,用于在 Topic 上的任何分区(无论接收数据的消费者是哪个)被发现没有 Leader 时,将 Binder 健康状态设置为
down
。默认值:
true
。 - spring.cloud.stream.kafka.binder.certificateStoreDirectory
-
当 truststore 或 keystore 证书位置以非本地文件系统资源(
org.springframework.core.io.Resource
支持的资源,例如 CLASSPATH, HTTP 等)形式给出时,Binder 会将资源从路径(可转换为org.springframework.core.io.Resource
)复制到文件系统上的某个位置。这对于 Broker 级别的证书(ssl.truststore.location
和ssl.keystore.location
)和用于 Schema Registry 的证书(schema.registry.ssl.truststore.location
和schema.registry.ssl.keystore.location
)都适用。请注意,truststore 和 keystore 的位置路径必须在spring.cloud.stream.kafka.binder.configuration…
下提供。例如,spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location
、spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location
等。文件将复制到为此属性值指定的目录,该目录必须是文件系统上已存在且运行应用程序的进程可写入的目录。如果未设置此值且证书文件是非本地文件系统资源,则它将被复制到通过System.getProperty("java.io.tmpdir")
返回的系统临时目录。即使此值存在,但目录无法在文件系统上找到或不可写入时,情况也是如此。默认值:无。
- spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled
-
当设置为 true 时,每次访问度量指标时都会计算每个消费者主题的偏移量滞后度量。当设置为 false 时,仅使用周期性计算的偏移量滞后。
默认值:true
- spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval
-
计算每个消费者主题偏移量滞后的间隔。当
metrics.defaultOffsetLagMetricsEnabled
被禁用或计算时间过长时,使用此值。默认值:60 秒
- spring.cloud.stream.kafka.binder.enableObservation
-
在此 Binder 中的所有绑定上启用 Micrometer 观察注册表。
默认值:false
- spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup
-
KafkaHealthIndicator
元数据消费者group.id
。HealthIndicator
使用此消费者查询正在使用的 Topic 的元数据。默认值:无。
Kafka 消费者属性
以下属性仅适用于 Kafka 消费者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.consumer.
为前缀。
为了避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为 spring.cloud.stream.kafka.default.consumer.<property>=<value> 。 |
- admin.configuration
-
自版本 2.1.1 起,此属性已被弃用,推荐使用
topic.properties
,未来版本将移除对它的支持。 - admin.replicas-assignment
-
自版本 2.1.1 起,此属性已被弃用,推荐使用
topic.replicas-assignment
,未来版本将移除对它的支持。 - admin.replication-factor
-
自版本 2.1.1 起,此属性已被弃用,推荐使用
topic.replication-factor
,未来版本将移除对它的支持。 - autoRebalanceEnabled
-
当为
true
时,Topic 分区会在消费者组成员之间自动重新平衡。当为false
时,每个消费者根据spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
分配一组固定的分区。这要求在每个启动的实例上适当地设置spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
属性。在这种情况下,spring.cloud.stream.instanceCount
属性的值通常必须大于 1。默认值:
true
。 - ackEachRecord
-
当
autoCommitOffset
为true
时,此设置决定是否在处理每个记录后提交偏移量。默认情况下,在处理完consumer.poll()
返回的批次中的所有记录后提交偏移量。Poll 返回的记录数量可以通过max.poll.records
Kafka 属性控制,该属性通过消费者configuration
属性设置。将其设置为true
可能会导致性能下降,但这样做可以减少发生故障时重新投递记录的可能性。此外,请参阅 Binder 的requiredAcks
属性,它也影响提交偏移量的性能。此属性自 3.1 版本起已被弃用,推荐使用ackMode
。如果未设置ackMode
且未启用批量模式,将使用RECORD
确认模式。默认值:
false
。 - autoCommitOffset
-
从版本 3.1 开始,此属性已被弃用。有关替代方案的详细信息,请参见
ackMode
。消息处理完成后是否自动提交偏移量。如果设置为false
,则入站消息中将存在一个 key 为kafka_acknowledgment
、类型为org.springframework.kafka.support.Acknowledgment
的 header。应用程序可以使用此 header 来确认消息。有关详细信息,请参见示例部分。当此属性设置为false
时,Kafka Binder 将确认模式设置为org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL
,并且应用程序负责确认记录。另请参阅ackEachRecord
。默认值:
true
。 - ackMode
-
指定容器确认模式。这基于 Spring Kafka 中定义的
AckMode
枚举。如果ackEachRecord
属性设置为true
且消费者未处于批量模式,则将使用RECORD
确认模式,否则,使用通过此属性提供的确认模式。 - autoCommitOnError
-
在 Poll 式消费者中,如果设置为
true
,则在发生错误时始终自动提交。如果未设置(默认值)或为 false,则在 Poll 式消费者中不会自动提交。请注意,此属性仅适用于 Poll 式消费者。默认值:未设置。
- resetOffsets
-
是否将消费者上的偏移量重置为
startOffset
提供的值。如果提供了KafkaBindingRebalanceListener
,则必须设置为 false;参见Rebalance 监听器。有关此属性的更多信息,请参见重置偏移量。默认值:
false
。 - startOffset
-
新组的起始偏移量。允许的值:
earliest
和latest
。如果通过spring.cloud.stream.bindings.<channelName>.group
为消费者“绑定”显式设置了消费者组,则 'startOffset' 设置为earliest
。否则,对于匿名消费者组,它设置为latest
。有关此属性的更多信息,请参见重置偏移量。默认值:
null
(等同于earliest
)。 - enableDlq
-
当设置为 true 时,为消费者启用 DLQ 行为。默认情况下,导致错误的消息会被转发到名为
error.<destination>.<group>
的主题。可以通过设置dlqName
属性或定义类型为DlqDestinationResolver
的@Bean
来配置 DLQ 主题名称。当错误数量相对较少且重放整个原始主题可能过于繁琐时,这提供了一种替代 Kafka 更常见的重放场景的选项。有关更多信息,请参见Kafka DLQ 处理。从 2.0 版本开始,发送到 DLQ 主题的消息会增加以下 header:x-original-topic
、x-exception-message
和x-exception-stacktrace
,类型为byte[]
。默认情况下,失败的记录会发送到 DLQ 主题中与原始记录相同的分区号。有关如何更改此行为的信息,请参见DLQ 分区选择。当destinationIsPattern
为true
时不允许设置此属性。默认值:
false
。 - dlqPartitions
-
当
enableDlq
为 true 且未设置此属性时,将创建一个与主主题拥有相同分区数的死信主题。通常,死信记录会发送到死信主题中与原始记录相同的分区。此行为可以更改;参见DLQ 分区选择。如果此属性设置为1
且没有DqlPartitionFunction
bean,所有死信记录将写入分区0
。如果此属性大于1
,则必须提供一个DlqPartitionFunction
bean。请注意,实际分区数受 Binder 的minPartitionCount
属性影响。默认值:无
- configuration
-
Map,包含通用 Kafka 消费者属性的 Key/Value 对。除了 Kafka 消费者属性外,此处还可以传递其他配置属性。例如,应用程序需要的一些属性,如
spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar
。此处不能设置bootstrap.servers
属性;如果需要连接到多个集群,请使用多 Binder 支持。默认值:空 Map。
- dlqName
-
接收错误消息的 DLQ 主题名称。
默认值:
null
(如果未指定,导致错误的消息将转发到名为error.<destination>.<group>
的主题)。 - dlqProducerProperties
-
使用此属性,可以设置特定于 DLQ 的生产者属性。所有通过 Kafka 生产者属性可用的属性都可以通过此属性设置。当消费者上启用原生解码(即
useNativeDecoding: true
)时,应用程序必须为 DLQ 提供相应的 key/value 序列化器。这必须以dlqProducerProperties.configuration.key.serializer
和dlqProducerProperties.configuration.value.serializer
的形式提供。默认值:默认 Kafka 生产者属性。
- standardHeaders
-
指示入站通道适配器填充哪些标准 header。允许的值:
none
、id
、timestamp
或both
。在使用原生反序列化且第一个接收消息的组件需要一个 ID(例如配置为使用 JDBC 消息存储的聚合器)时很有用。默认值:无
- converterBeanName
-
实现
RecordMessageConverter
的 Bean 名称。用于入站通道适配器中,替换默认的MessagingMessageConverter
。默认值:
null
- idleEventInterval
-
事件之间的间隔,单位为毫秒,这些事件指示最近没有收到消息。使用
ApplicationListener<ListenerContainerIdleEvent>
来接收这些事件。有关使用示例,请参见暂停-恢复。默认值:
30000
- destinationIsPattern
-
当为 true 时,目标被视为由 Broker 用于匹配 Topic 名称的正则表达式
Pattern
。当为 true 时,Topic 不会被 Provision,并且不允许使用enableDlq
,因为 Binder 在 Provision 阶段不知道 Topic 名称。请注意,检测匹配模式的新 Topic 所花费的时间由消费者属性metadata.max.age.ms
控制,该属性(在撰写本文时)默认为 300,000 毫秒(5 分钟)。可以使用上述configuration
属性进行配置。默认值:
false
- topic.properties
-
Map,用于 Provision 新 Topic 时使用的 Kafka Topic 属性——例如,
spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0
默认值:无。
- topic.replicas-assignment
-
Map<Integer, List<Integer>>,用于副本分配,其中 key 是分区,value 是分配。用于 Provision 新 Topic 时。参见
kafka-clients
jar 中NewTopic
的 Javadoc。默认值:无。
- topic.replication-factor
-
Provision Topic 时使用的副本因子。覆盖 Binder 范围的设置。如果存在
replicas-assignments
,则忽略。默认值:无(使用 Binder 范围的默认值 -1)。
- pollTimeout
-
Poll 式消费者中用于 Polling 的超时时间。
默认值:5 秒。
- transactionManager
-
KafkaAwareTransactionManager
的 Bean 名称,用于覆盖此绑定的 Binder 事务管理器。通常在需要使用ChainedKafkaTransactionManaager
将另一个事务与 Kafka 事务同步时需要。为了实现记录的精确一次消费和生产,消费者和生产者绑定都必须配置相同的事务管理器。默认值:无。
- txCommitRecovered
-
使用事务性 Binder 时,默认情况下,恢复的记录(例如,重试耗尽并将记录发送到死信主题时)的偏移量将通过新事务提交。将此属性设置为
false
可禁止提交恢复记录的偏移量。默认值:true。
- commonErrorHandlerBeanName
-
每个消费者绑定使用的
CommonErrorHandler
Bean 名称。如果存在,此用户提供的CommonErrorHandler
将优先于 Binder 定义的任何其他错误处理器。如果应用程序不想使用ListenerContainerCustomizer
并随后检查目标/组组合来设置错误处理器,这是一种方便的方式来表达错误处理器。默认值:无。
Kafka 生产者属性
以下属性仅适用于 Kafka 生产者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.producer.
为前缀。
为了避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为 spring.cloud.stream.kafka.default.producer.<property>=<value> 。 |
- admin.configuration
-
自版本 2.1.1 起,此属性已被弃用,推荐使用
topic.properties
,未来版本将移除对它的支持。 - admin.replicas-assignment
-
自版本 2.1.1 起,此属性已被弃用,推荐使用
topic.replicas-assignment
,未来版本将移除对它的支持。 - admin.replication-factor
-
自版本 2.1.1 起,此属性已被弃用,推荐使用
topic.replication-factor
,未来版本将移除对它的支持。 - bufferSize
-
Kafka 生产者在发送前尝试批量处理的数据量的上限,单位为字节。
默认值:
16384
。 - sync
-
生产者是否是同步的。
默认值:
false
。 - sendTimeoutExpression
-
一个 SpEL 表达式,针对出站消息进行评估,用于评估启用同步发布时等待 ack 的时间——例如,
headers['mySendTimeout']
。超时值以毫秒为单位。在 3.0 版本之前,除非使用原生编码,否则无法使用 payload,因为评估此表达式时,payload 已经采用byte[]
的形式。现在,表达式在转换 payload 之前进行评估。默认值:无。
- batchTimeout
-
生产者在发送消息之前等待多长时间,以便在同一批次中累积更多消息。(通常,生产者根本不等待,只是简单地发送在上次发送进行时累积的所有消息。)非零值可能会增加吞吐量,但会牺牲延迟。
默认值:
0
。 - messageKeyExpression
-
一个 SpEL 表达式,针对出站消息进行评估,用于填充生产的 Kafka 消息的 key——例如,
headers['myKey']
。在 3.0 版本之前,除非使用原生编码,否则无法使用 payload,因为评估此表达式时,payload 已经采用byte[]
的形式。现在,表达式在转换 payload 之前进行评估。对于常规处理器(Function<String, String>
或Function<Message<?>, Message<?>
),如果生产的 key 需要与来自 Topic 的入站 key 相同,可以如下设置此属性:spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']
对于反应式函数,需要注意一个重要的警告。在这种情况下,由应用程序负责手动将 header 从入站消息复制到出站消息。您可以设置 header,例如myKey
并像上面建议的那样使用headers['myKey']
,或者为了方便,只需设置KafkaHeaders.MESSAGE_KEY
header,就完全不需要设置此属性了。默认值:无。
- headerPatterns
-
逗号分隔的简单模式列表,用于匹配 Spring messaging header 并将其映射到
ProducerRecord
中的 Kafka Header。模式可以以通配符(星号)开头或结尾。模式可以通过前缀!
来否定。匹配在第一个匹配(正向或负向)后停止。例如,!ask,as*
将通过ash
但不会通过ask
。id
和timestamp
永远不会被映射。默认值:
*
(所有头信息 - 除了id
和timestamp
) - configuration
-
包含通用 Kafka 生产者属性的键/值对 Map。
bootstrap.servers
属性不能在此处设置;如果需要连接到多个集群,请使用多绑定器支持。默认值:空 Map。
- topic.properties
-
一个
Map
,包含在配置新主题时使用的 Kafka 主题属性,例如:spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0
- topic.replicas-assignment
-
Map<Integer, List<Integer>>,用于副本分配,其中 key 是分区,value 是分配。用于 Provision 新 Topic 时。参见
kafka-clients
jar 中NewTopic
的 Javadoc。默认值:无。
- topic.replication-factor
-
Provision Topic 时使用的副本因子。覆盖 Binder 范围的设置。如果存在
replicas-assignments
,则忽略。默认值:无(使用 Binder 范围的默认值 -1)。
- useTopicHeader
-
设置为
true
以使用出站消息中KafkaHeaders.TOPIC
消息头的值覆盖默认绑定目标(主题名称)。如果头信息不存在,则使用默认绑定目标。默认值:
false
。 - recordMetadataChannel
-
一个
MessageChannel
的 bean 名称,用于发送成功的发送结果;该 bean 必须存在于应用程序上下文中。发送到该通道的消息是已发送的消息(如果经过转换),并附带一个额外的头信息KafkaHeaders.RECORD_METADATA
。该头信息包含一个由 Kafka 客户端提供的RecordMetadata
对象;它包括记录写入主题的分区和偏移量。ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)
失败的发送会进入生产者错误通道(如果已配置);请参阅 Kafka 错误通道。
默认值:null。
Kafka 绑定器使用生产者的 partitionCount 设置作为提示,以创建具有给定分区数的主题(结合 minPartitionCount ,两者中的最大值将被使用)。在为绑定器配置 minPartitionCount 并为应用程序配置 partitionCount 时请谨慎,因为会使用较大的值。如果已存在一个分区数较少的主题且 autoAddPartitions 被禁用(默认设置),则绑定器启动失败。如果已存在一个分区数较少的主题且 autoAddPartitions 被启用,则会添加新分区。如果已存在一个分区数大于 (minPartitionCount 或 partitionCount ) 中较大值的主题,则使用现有分区数。 |
- compression
-
设置
compression.type
生产者属性。支持的值包括none
、gzip
、snappy
、lz4
和zstd
。如果您将kafka-clients
jar 版本覆盖到 2.1.0(或更高版本),如 Spring for Apache Kafka 文档 中所述,并希望使用zstd
压缩,请使用spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd
。默认值:无。
- transactionManager
-
KafkaAwareTransactionManager
的 Bean 名称,用于覆盖此绑定的 Binder 事务管理器。通常在需要使用ChainedKafkaTransactionManaager
将另一个事务与 Kafka 事务同步时需要。为了实现记录的精确一次消费和生产,消费者和生产者绑定都必须配置相同的事务管理器。默认值:无。
- closeTimeout
-
关闭生产者时等待的超时时间,单位为秒。
默认值:
30
- allowNonTransactional
-
通常,与事务性绑定器关联的所有输出绑定将在新事务中发布(如果当前没有进行中的事务)。此属性允许您覆盖该行为。如果设置为 true,则发布到此输出绑定的记录将不会在事务中运行,除非当前已有进行中的事务。
默认值:
false