配置选项

本节包含 Apache Kafka Binder 使用的配置选项。

有关 Binder 的通用配置选项和属性,请参阅核心文档中的绑定属性

Kafka Binder 属性

spring.cloud.stream.kafka.binder.brokers

Kafka Binder 连接的 Broker 列表。

默认值:localhost

spring.cloud.stream.kafka.binder.defaultBrokerPort

brokers 允许指定带端口或不带端口信息的主机(例如,host1,host2:port2)。此属性在 Broker 列表中未配置端口时设置默认端口。

默认值:9092

spring.cloud.stream.kafka.binder.configuration

Key/Value Map,包含传递给 Binder 创建的所有客户端(生产者和消费者)的客户端属性。由于这些属性同时用于生产者和消费者,因此其使用应限于通用属性——例如,安全设置。通过此配置提供的未知 Kafka 生产者或消费者属性会被过滤掉,不允许传播。这里的属性会覆盖在 Boot 中设置的任何属性。

默认值:空 Map。

spring.cloud.stream.kafka.binder.consumerProperties

Key/Value Map,包含任意 Kafka 客户端消费者属性。除了支持已知的 Kafka 消费者属性外,此处也允许使用未知消费者属性。这里的属性会覆盖在 Boot 中以及上述 configuration 属性中设置的任何属性。

默认值:空 Map。

spring.cloud.stream.kafka.binder.headers

Binder 传输的自定义 header 列表。仅在使用旧版本应用 (⇐ 1.3.x) 且 kafka-clients 版本低于 0.11.0.0 时需要。更新版本原生支持 header。

默认值:空。

spring.cloud.stream.kafka.binder.healthTimeout

获取分区信息等待的时间,单位为秒。如果此计时器过期,健康状态将报告为 down。

默认值:60。

spring.cloud.stream.kafka.binder.requiredAcks

Broker 上所需的 acks 数量。参见 Kafka 文档中关于生产者 acks 属性的说明。

默认值:1

spring.cloud.stream.kafka.binder.minPartitionCount

仅在设置了 autoCreateTopicsautoAddPartitions 时生效。Binder 在其生产或消费数据的 Topic 上配置的全局最小分区数。它可能被生产者的 partitionCount 设置或生产者的 instanceCount * concurrency 设置(如果其中一个更大)所覆盖。

默认值:1

spring.cloud.stream.kafka.binder.producerProperties

Key/Value Map,包含任意 Kafka 客户端生产者属性。除了支持已知的 Kafka 生产者属性外,此处也允许使用未知生产者属性。这里的属性会覆盖在 Boot 中以及上述 configuration 属性中设置的任何属性。

默认值:空 Map。

spring.cloud.stream.kafka.binder.replicationFactor

如果 autoCreateTopics 启用,则为自动创建的 Topic 的副本因子。可以在每个 Binding 上覆盖。

如果您使用的是 2.4 之前的 Kafka Broker 版本,则此值应至少设置为 1。从 3.0.8 版本开始,Binder 使用 -1 作为默认值,这表示将使用 Broker 的 'default.replication.factor' 属性来确定副本数量。请与您的 Kafka Broker 管理员确认是否存在需要最小副本因子的策略,如果是,则通常 default.replication.factor 将与该值匹配,应使用 -1,除非您需要大于最小值的副本因子。

默认值:-1

spring.cloud.stream.kafka.binder.autoCreateTopics

如果设置为 true,Binder 会自动创建新的 Topic。如果设置为 false,Binder 则依赖于已配置的 Topic。在后一种情况下,如果 Topic 不存在,Binder 将无法启动。

此设置独立于 Broker 的 auto.create.topics.enable 设置,并且不会影响它。如果服务器设置为自动创建 Topic,它们可能会作为元数据检索请求的一部分创建,并使用默认的 Broker 设置。

默认值:true

spring.cloud.stream.kafka.binder.autoAddPartitions

如果设置为 true,Binder 会在需要时创建新的分区。如果设置为 false,Binder 则依赖于已配置 Topic 的分区大小。如果目标 Topic 的分区数小于预期值,Binder 将无法启动。

默认值:false

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

启用 Binder 中的事务。参见 Kafka 文档中的 transaction.idspring-kafka 文档中的事务。启用事务后,单个 producer 属性将被忽略,所有生产者都使用 spring.cloud.stream.kafka.binder.transaction.producer.* 属性。

默认值 null(无事务)

spring.cloud.stream.kafka.binder.transaction.producer.*

事务性 Binder 中生产者的全局生产者属性。参见 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefixKafka 生产者属性以及所有 Binder 支持的通用生产者属性。

默认值:参见各个生产者属性。

spring.cloud.stream.kafka.binder.headerMapperBeanName

用于将 spring-messaging header 与 Kafka header 之间进行映射的 KafkaHeaderMapper Bean 名称。例如,如果您希望定制使用 JSON 反序列化 header 的 BinderHeaderMapper Bean 中的受信任包,可以使用此属性。如果没有通过此属性将自定义的 BinderHeaderMapper Bean 提供给 Binder,那么 Binder 将查找名称为 kafkaBinderHeaderMapper 且类型为 BinderHeaderMapper 的 header mapper Bean,然后再回退到 Binder 创建的默认 BinderHeaderMapper

默认值:无。

spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader

标志,用于在 Topic 上的任何分区(无论接收数据的消费者是哪个)被发现没有 Leader 时,将 Binder 健康状态设置为 down

默认值:true

spring.cloud.stream.kafka.binder.certificateStoreDirectory

当 truststore 或 keystore 证书位置以非本地文件系统资源(org.springframework.core.io.Resource 支持的资源,例如 CLASSPATH, HTTP 等)形式给出时,Binder 会将资源从路径(可转换为 org.springframework.core.io.Resource)复制到文件系统上的某个位置。这对于 Broker 级别的证书(ssl.truststore.locationssl.keystore.location)和用于 Schema Registry 的证书(schema.registry.ssl.truststore.locationschema.registry.ssl.keystore.location)都适用。请注意,truststore 和 keystore 的位置路径必须在 spring.cloud.stream.kafka.binder.configuration…​ 下提供。例如,spring.cloud.stream.kafka.binder.configuration.ssl.truststore.locationspring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location 等。文件将复制到为此属性值指定的目录,该目录必须是文件系统上已存在且运行应用程序的进程可写入的目录。如果未设置此值且证书文件是非本地文件系统资源,则它将被复制到通过 System.getProperty("java.io.tmpdir") 返回的系统临时目录。即使此值存在,但目录无法在文件系统上找到或不可写入时,情况也是如此。

默认值:无。

spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled

当设置为 true 时,每次访问度量指标时都会计算每个消费者主题的偏移量滞后度量。当设置为 false 时,仅使用周期性计算的偏移量滞后。

默认值:true

spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval

计算每个消费者主题偏移量滞后的间隔。当 metrics.defaultOffsetLagMetricsEnabled 被禁用或计算时间过长时,使用此值。

默认值:60 秒

spring.cloud.stream.kafka.binder.enableObservation

在此 Binder 中的所有绑定上启用 Micrometer 观察注册表。

默认值:false

spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup

KafkaHealthIndicator 元数据消费者 group.idHealthIndicator 使用此消费者查询正在使用的 Topic 的元数据。

默认值:无。

Kafka 消费者属性

以下属性仅适用于 Kafka 消费者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.consumer. 为前缀。

为了避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为 spring.cloud.stream.kafka.default.consumer.<property>=<value>
admin.configuration

自版本 2.1.1 起,此属性已被弃用,推荐使用 topic.properties,未来版本将移除对它的支持。

admin.replicas-assignment

自版本 2.1.1 起,此属性已被弃用,推荐使用 topic.replicas-assignment,未来版本将移除对它的支持。

admin.replication-factor

自版本 2.1.1 起,此属性已被弃用,推荐使用 topic.replication-factor,未来版本将移除对它的支持。

autoRebalanceEnabled

当为 true 时,Topic 分区会在消费者组成员之间自动重新平衡。当为 false 时,每个消费者根据 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 分配一组固定的分区。这要求在每个启动的实例上适当地设置 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 属性。在这种情况下,spring.cloud.stream.instanceCount 属性的值通常必须大于 1。

默认值:true

ackEachRecord

autoCommitOffsettrue 时,此设置决定是否在处理每个记录后提交偏移量。默认情况下,在处理完 consumer.poll() 返回的批次中的所有记录后提交偏移量。Poll 返回的记录数量可以通过 max.poll.records Kafka 属性控制,该属性通过消费者 configuration 属性设置。将其设置为 true 可能会导致性能下降,但这样做可以减少发生故障时重新投递记录的可能性。此外,请参阅 Binder 的 requiredAcks 属性,它也影响提交偏移量的性能。此属性自 3.1 版本起已被弃用,推荐使用 ackMode。如果未设置 ackMode 且未启用批量模式,将使用 RECORD 确认模式。

默认值:false

autoCommitOffset

从版本 3.1 开始,此属性已被弃用。有关替代方案的详细信息,请参见 ackMode。消息处理完成后是否自动提交偏移量。如果设置为 false,则入站消息中将存在一个 key 为 kafka_acknowledgment、类型为 org.springframework.kafka.support.Acknowledgment 的 header。应用程序可以使用此 header 来确认消息。有关详细信息,请参见示例部分。当此属性设置为 false 时,Kafka Binder 将确认模式设置为 org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL,并且应用程序负责确认记录。另请参阅 ackEachRecord

默认值:true

ackMode

指定容器确认模式。这基于 Spring Kafka 中定义的 AckMode 枚举。如果 ackEachRecord 属性设置为 true 且消费者未处于批量模式,则将使用 RECORD 确认模式,否则,使用通过此属性提供的确认模式。

autoCommitOnError

在 Poll 式消费者中,如果设置为 true,则在发生错误时始终自动提交。如果未设置(默认值)或为 false,则在 Poll 式消费者中不会自动提交。请注意,此属性仅适用于 Poll 式消费者。

默认值:未设置。

resetOffsets

是否将消费者上的偏移量重置为 startOffset 提供的值。如果提供了 KafkaBindingRebalanceListener,则必须设置为 false;参见Rebalance 监听器。有关此属性的更多信息,请参见重置偏移量

默认值:false

startOffset

新组的起始偏移量。允许的值:earliestlatest。如果通过 spring.cloud.stream.bindings.<channelName>.group 为消费者“绑定”显式设置了消费者组,则 'startOffset' 设置为 earliest。否则,对于匿名消费者组,它设置为 latest。有关此属性的更多信息,请参见重置偏移量

默认值:null(等同于 earliest)。

enableDlq

当设置为 true 时,为消费者启用 DLQ 行为。默认情况下,导致错误的消息会被转发到名为 error.<destination>.<group> 的主题。可以通过设置 dlqName 属性或定义类型为 DlqDestinationResolver@Bean 来配置 DLQ 主题名称。当错误数量相对较少且重放整个原始主题可能过于繁琐时,这提供了一种替代 Kafka 更常见的重放场景的选项。有关更多信息,请参见Kafka DLQ 处理。从 2.0 版本开始,发送到 DLQ 主题的消息会增加以下 header:x-original-topicx-exception-messagex-exception-stacktrace,类型为 byte[]。默认情况下,失败的记录会发送到 DLQ 主题中与原始记录相同的分区号。有关如何更改此行为的信息,请参见DLQ 分区选择destinationIsPatterntrue 时不允许设置此属性。

默认值:false

dlqPartitions

enableDlq 为 true 且未设置此属性时,将创建一个与主主题拥有相同分区数的死信主题。通常,死信记录会发送到死信主题中与原始记录相同的分区。此行为可以更改;参见DLQ 分区选择。如果此属性设置为 1 且没有 DqlPartitionFunction bean,所有死信记录将写入分区 0。如果此属性大于 1,则必须提供一个 DlqPartitionFunction bean。请注意,实际分区数受 Binder 的 minPartitionCount 属性影响。

默认值:无

configuration

Map,包含通用 Kafka 消费者属性的 Key/Value 对。除了 Kafka 消费者属性外,此处还可以传递其他配置属性。例如,应用程序需要的一些属性,如 spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar。此处不能设置 bootstrap.servers 属性;如果需要连接到多个集群,请使用多 Binder 支持。

默认值:空 Map。

dlqName

接收错误消息的 DLQ 主题名称。

默认值:null(如果未指定,导致错误的消息将转发到名为 error.<destination>.<group> 的主题)。

dlqProducerProperties

使用此属性,可以设置特定于 DLQ 的生产者属性。所有通过 Kafka 生产者属性可用的属性都可以通过此属性设置。当消费者上启用原生解码(即 useNativeDecoding: true)时,应用程序必须为 DLQ 提供相应的 key/value 序列化器。这必须以 dlqProducerProperties.configuration.key.serializerdlqProducerProperties.configuration.value.serializer 的形式提供。

默认值:默认 Kafka 生产者属性。

standardHeaders

指示入站通道适配器填充哪些标准 header。允许的值:noneidtimestampboth。在使用原生反序列化且第一个接收消息的组件需要一个 ID(例如配置为使用 JDBC 消息存储的聚合器)时很有用。

默认值:无

converterBeanName

实现 RecordMessageConverter 的 Bean 名称。用于入站通道适配器中,替换默认的 MessagingMessageConverter

默认值:null

idleEventInterval

事件之间的间隔,单位为毫秒,这些事件指示最近没有收到消息。使用 ApplicationListener<ListenerContainerIdleEvent> 来接收这些事件。有关使用示例,请参见暂停-恢复

默认值:30000

destinationIsPattern

当为 true 时,目标被视为由 Broker 用于匹配 Topic 名称的正则表达式 Pattern。当为 true 时,Topic 不会被 Provision,并且不允许使用 enableDlq,因为 Binder 在 Provision 阶段不知道 Topic 名称。请注意,检测匹配模式的新 Topic 所花费的时间由消费者属性 metadata.max.age.ms 控制,该属性(在撰写本文时)默认为 300,000 毫秒(5 分钟)。可以使用上述 configuration 属性进行配置。

默认值:false

topic.properties

Map,用于 Provision 新 Topic 时使用的 Kafka Topic 属性——例如,spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0

默认值:无。

topic.replicas-assignment

Map<Integer, List<Integer>>,用于副本分配,其中 key 是分区,value 是分配。用于 Provision 新 Topic 时。参见 kafka-clients jar 中 NewTopic 的 Javadoc。

默认值:无。

topic.replication-factor

Provision Topic 时使用的副本因子。覆盖 Binder 范围的设置。如果存在 replicas-assignments,则忽略。

默认值:无(使用 Binder 范围的默认值 -1)。

pollTimeout

Poll 式消费者中用于 Polling 的超时时间。

默认值:5 秒。

transactionManager

KafkaAwareTransactionManager 的 Bean 名称,用于覆盖此绑定的 Binder 事务管理器。通常在需要使用 ChainedKafkaTransactionManaager 将另一个事务与 Kafka 事务同步时需要。为了实现记录的精确一次消费和生产,消费者和生产者绑定都必须配置相同的事务管理器。

默认值:无。

txCommitRecovered

使用事务性 Binder 时,默认情况下,恢复的记录(例如,重试耗尽并将记录发送到死信主题时)的偏移量将通过新事务提交。将此属性设置为 false 可禁止提交恢复记录的偏移量。

默认值:true。

commonErrorHandlerBeanName

每个消费者绑定使用的 CommonErrorHandler Bean 名称。如果存在,此用户提供的 CommonErrorHandler 将优先于 Binder 定义的任何其他错误处理器。如果应用程序不想使用 ListenerContainerCustomizer 并随后检查目标/组组合来设置错误处理器,这是一种方便的方式来表达错误处理器。

默认值:无。

Kafka 生产者属性

以下属性仅适用于 Kafka 生产者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.producer. 为前缀。

为了避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为 spring.cloud.stream.kafka.default.producer.<property>=<value>
admin.configuration

自版本 2.1.1 起,此属性已被弃用,推荐使用 topic.properties,未来版本将移除对它的支持。

admin.replicas-assignment

自版本 2.1.1 起,此属性已被弃用,推荐使用 topic.replicas-assignment,未来版本将移除对它的支持。

admin.replication-factor

自版本 2.1.1 起,此属性已被弃用,推荐使用 topic.replication-factor,未来版本将移除对它的支持。

bufferSize

Kafka 生产者在发送前尝试批量处理的数据量的上限,单位为字节。

默认值:16384

sync

生产者是否是同步的。

默认值:false

sendTimeoutExpression

一个 SpEL 表达式,针对出站消息进行评估,用于评估启用同步发布时等待 ack 的时间——例如,headers['mySendTimeout']。超时值以毫秒为单位。在 3.0 版本之前,除非使用原生编码,否则无法使用 payload,因为评估此表达式时,payload 已经采用 byte[] 的形式。现在,表达式在转换 payload 之前进行评估。

默认值:无。

batchTimeout

生产者在发送消息之前等待多长时间,以便在同一批次中累积更多消息。(通常,生产者根本不等待,只是简单地发送在上次发送进行时累积的所有消息。)非零值可能会增加吞吐量,但会牺牲延迟。

默认值:0

messageKeyExpression

一个 SpEL 表达式,针对出站消息进行评估,用于填充生产的 Kafka 消息的 key——例如,headers['myKey']。在 3.0 版本之前,除非使用原生编码,否则无法使用 payload,因为评估此表达式时,payload 已经采用 byte[] 的形式。现在,表达式在转换 payload 之前进行评估。对于常规处理器(Function<String, String>Function<Message<?>, Message<?>),如果生产的 key 需要与来自 Topic 的入站 key 相同,可以如下设置此属性:spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey'] 对于反应式函数,需要注意一个重要的警告。在这种情况下,由应用程序负责手动将 header 从入站消息复制到出站消息。您可以设置 header,例如 myKey 并像上面建议的那样使用 headers['myKey'],或者为了方便,只需设置 KafkaHeaders.MESSAGE_KEY header,就完全不需要设置此属性了。

默认值:无。

headerPatterns

逗号分隔的简单模式列表,用于匹配 Spring messaging header 并将其映射到 ProducerRecord 中的 Kafka Header。模式可以以通配符(星号)开头或结尾。模式可以通过前缀 ! 来否定。匹配在第一个匹配(正向或负向)后停止。例如,!ask,as* 将通过 ash 但不会通过 askidtimestamp 永远不会被映射。

默认值:*(所有头信息 - 除了 idtimestamp

configuration

包含通用 Kafka 生产者属性的键/值对 Map。bootstrap.servers 属性不能在此处设置;如果需要连接到多个集群,请使用多绑定器支持。

默认值:空 Map。

topic.properties

一个 Map,包含在配置新主题时使用的 Kafka 主题属性,例如:spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0

topic.replicas-assignment

Map<Integer, List<Integer>>,用于副本分配,其中 key 是分区,value 是分配。用于 Provision 新 Topic 时。参见 kafka-clients jar 中 NewTopic 的 Javadoc。

默认值:无。

topic.replication-factor

Provision Topic 时使用的副本因子。覆盖 Binder 范围的设置。如果存在 replicas-assignments,则忽略。

默认值:无(使用 Binder 范围的默认值 -1)。

useTopicHeader

设置为 true 以使用出站消息中 KafkaHeaders.TOPIC 消息头的值覆盖默认绑定目标(主题名称)。如果头信息不存在,则使用默认绑定目标。

默认值:false

recordMetadataChannel

一个 MessageChannel 的 bean 名称,用于发送成功的发送结果;该 bean 必须存在于应用程序上下文中。发送到该通道的消息是已发送的消息(如果经过转换),并附带一个额外的头信息 KafkaHeaders.RECORD_METADATA。该头信息包含一个由 Kafka 客户端提供的 RecordMetadata 对象;它包括记录写入主题的分区和偏移量。

ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)

失败的发送会进入生产者错误通道(如果已配置);请参阅 Kafka 错误通道

默认值:null。

Kafka 绑定器使用生产者的 partitionCount 设置作为提示,以创建具有给定分区数的主题(结合 minPartitionCount,两者中的最大值将被使用)。在为绑定器配置 minPartitionCount 并为应用程序配置 partitionCount 时请谨慎,因为会使用较大的值。如果已存在一个分区数较少的主题且 autoAddPartitions 被禁用(默认设置),则绑定器启动失败。如果已存在一个分区数较少的主题且 autoAddPartitions 被启用,则会添加新分区。如果已存在一个分区数大于 (minPartitionCountpartitionCount) 中较大值的主题,则使用现有分区数。
compression

设置 compression.type 生产者属性。支持的值包括 nonegzipsnappylz4zstd。如果您将 kafka-clients jar 版本覆盖到 2.1.0(或更高版本),如 Spring for Apache Kafka 文档 中所述,并希望使用 zstd 压缩,请使用 spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd

默认值:无。

transactionManager

KafkaAwareTransactionManager 的 Bean 名称,用于覆盖此绑定的 Binder 事务管理器。通常在需要使用 ChainedKafkaTransactionManaager 将另一个事务与 Kafka 事务同步时需要。为了实现记录的精确一次消费和生产,消费者和生产者绑定都必须配置相同的事务管理器。

默认值:无。

closeTimeout

关闭生产者时等待的超时时间,单位为秒。

默认值:30

allowNonTransactional

通常,与事务性绑定器关联的所有输出绑定将在新事务中发布(如果当前没有进行中的事务)。此属性允许您覆盖该行为。如果设置为 true,则发布到此输出绑定的记录将不会在事务中运行,除非当前已有进行中的事务。

默认值:false