配置选项

本节包含 Apache Kafka Binder 使用的配置选项。

有关与 Binder 相关的通用配置选项和属性,请参阅核心文档中的绑定属性

Kafka Binder 属性

spring.cloud.stream.kafka.binder.brokers

Kafka Binder 连接到的代理列表。

默认值:localhost

spring.cloud.stream.kafka.binder.defaultBrokerPort

brokers 允许指定带有或不带端口信息的宿主(例如,host1,host2:port2)。当代理列表中未配置端口时,这将设置默认端口。

默认值:9092

spring.cloud.stream.kafka.binder.configuration

传递给 Binder 创建的所有客户端(生产者和消费者)的客户端属性(生产者和消费者)的键值映射。由于这些属性由生产者和消费者使用,因此使用应限于通用属性,例如安全设置。通过此配置提供的未知 Kafka 生产者或消费者属性将被过滤掉,并且不允许传播。此处的属性优先于引导程序中设置的任何属性。

默认值:空映射。

spring.cloud.stream.kafka.binder.consumerProperties

任意 Kafka 客户端消费者属性的键值映射。除了支持已知的 Kafka 消费者属性外,此处还允许未知的消费者属性。此处的属性优先于引导程序中设置的任何属性以及上面的 configuration 属性。

默认值:空映射。

spring.cloud.stream.kafka.binder.headers

Binder 传输的自定义标头列表。仅当与使用 kafka-clients 版本 < 0.11.0.0 的旧版应用程序(⇐ 1.3.x)通信时才需要。较新版本原生支持标头。

默认值:空。

spring.cloud.stream.kafka.binder.healthTimeout

以秒为单位等待获取分区信息的时间。如果此计时器到期,则健康报告为关闭。

默认值:10。

spring.cloud.stream.kafka.binder.requiredAcks

代理上所需的确认数。请参阅 Kafka 文档以了解生产者 acks 属性。

默认值:1

spring.cloud.stream.kafka.binder.minPartitionCount

仅当设置了 autoCreateTopicsautoAddPartitions 时有效。Binder 在其生产或消费数据的主题上配置的全局最小分区数。它可以被生产者的 partitionCount 设置或生产者的 instanceCount * concurrency 设置的值(如果两者都更大)覆盖。

默认值:1

spring.cloud.stream.kafka.binder.producerProperties

任意 Kafka 客户端生产者属性的键值映射。除了支持已知的 Kafka 生产者属性外,此处还允许未知的生产者属性。此处的属性优先于引导程序中设置的任何属性以及上面的 configuration 属性。

默认值:空映射。

spring.cloud.stream.kafka.binder.replicationFactor

如果 autoCreateTopics 处于活动状态,则为自动创建的主题的复制因子。可以在每个绑定上覆盖。

如果您使用的是 2.4 之前的 Kafka 代理版本,则此值应至少设置为1。从 3.0.8 版本开始,绑定器使用-1作为默认值,这表示将使用代理的'default.replication.factor'属性来确定副本的数量。请咨询您的 Kafka 代理管理员,以了解是否存在要求最小副本数的策略,如果存在,则通常default.replication.factor将与该值匹配,并且应使用-1,除非您需要大于最小值的副本数。

默认值:-1

spring.cloud.stream.kafka.binder.autoCreateTopics

如果设置为true,则绑定器会自动创建新主题。如果设置为false,则绑定器依赖于主题已配置。在后一种情况下,如果主题不存在,则绑定器将无法启动。

此设置独立于代理的auto.create.topics.enable设置,并且不会影响它。如果服务器设置为自动创建主题,则它们可能会作为元数据检索请求的一部分创建,并使用默认的代理设置。

默认值:true

spring.cloud.stream.kafka.binder.autoAddPartitions

如果设置为true,则绑定器会在需要时创建新分区。如果设置为false,则绑定器依赖于主题的分区大小已配置。如果目标主题的分区数小于预期值,则绑定器将无法启动。

默认值:false

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

在绑定器中启用事务。请参阅 Kafka 文档中的transaction.id以及spring-kafka文档中的事务。启用事务后,将忽略各个producer属性,并且所有生产者都将使用spring.cloud.stream.kafka.binder.transaction.producer.*属性。

默认值null(无事务)

spring.cloud.stream.kafka.binder.transaction.producer.*

事务绑定器中生产者的全局生产者属性。请参阅spring.cloud.stream.kafka.binder.transaction.transactionIdPrefixKafka 生产者属性以及所有绑定器支持的通用生产者属性。

默认值:请参阅各个生产者属性。

spring.cloud.stream.kafka.binder.headerMapperBeanName

用于将spring-messaging标头映射到 Kafka 标头和从 Kafka 标头映射到spring-messaging标头的KafkaHeaderMapper的 bean 名称。例如,如果您希望自定义使用 JSON 反序列化标头的BinderHeaderMapper bean 中的可信软件包,请使用此方法。如果此自定义BinderHeaderMapper bean 未通过此属性提供给绑定器,则绑定器将在回退到绑定器创建的默认BinderHeaderMapper之前,查找名称为kafkaBinderHeaderMapper且类型为BinderHeaderMapper的标头映射器 bean。

默认值:无。

spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader

标志,用于在发现主题上的任何分区(无论接收来自该分区的数据的消费者是谁)都没有领导者时,将绑定器运行状况设置为down

默认值:true

spring.cloud.stream.kafka.binder.certificateStoreDirectory

当信任库或密钥库证书位置作为非本地文件系统资源给出时(org.springframework.core.io.Resource 支持的资源,例如 CLASSPATH、HTTP 等),绑定器会将资源从路径(可转换为 org.springframework.core.io.Resource)复制到文件系统上的某个位置。这对于代理级证书(ssl.truststore.locationssl.keystore.location)和用于模式注册表的证书(schema.registry.ssl.truststore.locationschema.registry.ssl.keystore.location)都是正确的。请记住,信任库和密钥库位置路径必须在spring.cloud.stream.kafka.binder.configuration…​下提供。例如,spring.cloud.stream.kafka.binder.configuration.ssl.truststore.locationspring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location等。该文件将复制到此属性的值指定的位置,该位置必须是文件系统上运行应用程序的进程可写入的现有目录。如果未设置此值并且证书文件是非本地文件系统资源,则它将复制到系统临时目录中,该目录由System.getProperty("java.io.tmpdir")返回。如果存在此值,但文件系统上找不到该目录或该目录不可写,则也是如此。

默认值:无。

spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled

设置为 true 时,每次访问指标时都会计算每个消费者主题的偏移量滞后指标。设置为 false 时,仅使用定期计算的偏移量滞后。

默认值:true

spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval

计算每个消费者主题的偏移量滞后的间隔。每当metrics.defaultOffsetLagMetricsEnabled被禁用或其计算花费的时间过长时,都会使用此值。

默认值:60 秒

spring.cloud.stream.kafka.binder.enableObservation

在此绑定器中的所有绑定上启用 Micrometer 观察注册表。

默认值:false

spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup

KafkaHealthIndicator 元数据消费者group.id。此消费者由HealthIndicator用于查询有关正在使用的主题的元数据。

默认值:无。

Kafka 消费者属性

以下属性仅适用于 Kafka 消费者,并且必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer.为前缀。

为了避免重复,Spring Cloud Stream 支持以spring.cloud.stream.kafka.default.consumer.<property>=<value>的格式为所有通道设置值。
admin.configuration

从 2.1.1 版本开始,此属性已弃用,取而代之的是topic.properties,并且将在未来版本中删除对它的支持。

admin.replicas-assignment

从 2.1.1 版本开始,此属性已弃用,取而代之的是topic.replicas-assignment,并且将在未来版本中删除对它的支持。

admin.replication-factor

从 2.1.1 版本开始,此属性已弃用,取而代之的是topic.replication-factor,并且将在未来版本中删除对它的支持。

autoRebalanceEnabled

true时,主题分区会在消费者组的成员之间自动重新平衡。当false时,每个消费者都会根据spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex分配一组固定的分区。这要求在启动的每个实例上都适当地设置spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex属性。在这种情况下,spring.cloud.stream.instanceCount属性的值通常必须大于 1。

默认值:true

ackEachRecord

autoCommitOffsettrue时,此设置决定是否在处理每个记录后提交偏移量。默认情况下,在consumer.poll()返回的记录批次中的所有记录都已处理后,会提交偏移量。通过consumerconfiguration属性设置的 Kafka 属性max.poll.records可以控制轮询返回的记录数。将此设置为true可能会导致性能下降,但这样做可以降低发生故障时重新传递记录的可能性。此外,请参阅绑定器的requiredAcks属性,它也会影响提交偏移量的性能。此属性自 3.1 起已弃用,建议使用ackMode。如果未设置ackMode且未启用批处理模式,则将使用RECORD ackMode。

默认值:false

autoCommitOffset

从 3.1 版本开始,此属性已弃用。有关替代方案的更多详细信息,请参阅ackMode。是否在处理消息后自动提交偏移量。如果设置为false,则入站消息中存在键为kafka_acknowledgment类型为org.springframework.kafka.support.Acknowledgment的标头。应用程序可以使用此标头确认消息。有关详细信息,请参阅示例部分。当此属性设置为false时,Kafka 绑定器会将 ack 模式设置为org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL,并且应用程序负责确认记录。另请参阅ackEachRecord

默认值:true

ackMode

指定容器 ack 模式。这基于 Spring Kafka 中定义的 AckMode 枚举。如果ackEachRecord属性设置为true且消费者未处于批处理模式,则将使用RECORD的 ack 模式,否则,将使用此属性提供的 ack 模式。

autoCommitOnError

在可轮询的消费者中,如果设置为true,则始终在出错时自动提交。如果未设置(默认值)或为 false,则不会在可轮询的消费者中自动提交。请注意,此属性仅适用于可轮询的消费者。

默认值:未设置。

resetOffsets

是否将消费者的偏移量重置为startOffset提供的值。如果提供了KafkaBindingRebalanceListener,则必须为 false;请参阅重新平衡侦听器有关此属性的更多信息,请参阅重置偏移量

默认值:false

startOffset

新组的起始偏移量。允许的值:earliestlatest。如果为消费者“绑定”显式设置了消费者组(通过spring.cloud.stream.bindings.<channelName>.group),则'startOffset'将设置为earliest。否则,对于anonymous消费者组,它将设置为latest。有关此属性的更多信息,请参阅重置偏移量

默认值:null(相当于earliest)。

enableDlq

设置为 true 时,它会为消费者启用 DLQ 行为。默认情况下,导致错误的消息将转发到名为error.<destination>.<group>的主题。可以通过设置dlqName属性或定义类型为DlqDestinationResolver@Bean来配置 DLQ 主题名称。这为错误数量相对较少且重放整个原始主题可能过于繁琐的情况提供了替代方案,而不是更常见的 Kafka 重放方案。有关更多信息,请参阅kafka dlq 处理。从 2.0 版本开始,发送到 DLQ 主题的消息将使用以下标头增强:x-original-topicx-exception-messagex-exception-stacktrace作为byte[]。默认情况下,失败的记录将发送到与原始记录相同的 DLQ 主题分区号。有关如何更改此行为,请参阅dlq 分区选择destinationIsPatterntrue时,不允许使用。

默认值:false

dlqPartitions

enableDlq为 true 且未设置此属性时,将创建一个与主主题(s)分区数相同的死信主题。通常,死信记录将发送到与原始记录相同的死信主题分区。此行为可以更改;请参阅dlq 分区选择。如果此属性设置为1并且没有DqlPartitionFunction bean,则所有死信记录都将写入分区0。如果此属性大于1,则**必须**提供DlqPartitionFunction bean。请注意,实际的分区计数受绑定器的minPartitionCount属性的影响。

默认值:none

configuration

包含通用 Kafka 消费者属性的键/值对映射。除了 Kafka 消费者属性外,还可以在此处传递其他配置属性。例如,应用程序需要的一些属性,例如spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar。无法在此处设置bootstrap.servers属性;如果您需要连接到多个集群,请使用多绑定器支持。

默认值:空映射。

dlqName

接收错误消息的 DLQ 主题的名称。

默认值:null(如果未指定,则导致错误的消息将转发到名为error.<destination>.<group>的主题)。

dlqProducerProperties

使用此属性,可以设置特定于 DLQ 的生产者属性。可以通过此属性设置 kafka 生产者属性中提供的所有属性。当在消费者上启用原生解码(即,useNativeDecoding:true)时,应用程序必须为 DLQ 提供相应的键/值序列化器。这必须以dlqProducerProperties.configuration.key.serializerdlqProducerProperties.configuration.value.serializer的形式提供。

默认值:默认 Kafka 生产者属性。

standardHeaders

指示入站通道适配器填充哪些标准标头。允许的值:noneidtimestampboth。如果使用原生反序列化并且接收消息的第一个组件需要id(例如,配置为使用 JDBC 消息存储的聚合器),则此属性很有用。

默认值:none

converterBeanName

实现RecordMessageConverter的 Bean 的名称。在入站通道适配器中使用,以替换默认的MessagingMessageConverter

默认值:null

idleEventInterval

指示最近未收到任何消息的事件之间的间隔(以毫秒为单位)。使用ApplicationListener<ListenerContainerIdleEvent>接收这些事件。有关用法示例,请参见暂停-恢复

默认值:30000

destinationIsPattern

如果为 true,则目标将被视为用于匹配代理主题名称的正则表达式Pattern。如果为 true,则不会预配主题,并且不允许使用enableDlq,因为绑定器在预配阶段不知道主题名称。请注意,检测与模式匹配的新主题所需的时间由使用者属性metadata.max.age.ms控制,该属性(在撰写本文时)默认为 300,000 毫秒(5 分钟)。这可以使用上面的configuration属性进行配置。

默认值:false

topic.properties

预配新主题时使用的 Kafka 主题属性的Map,例如spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0

默认值:无。

topic.replicas-assignment

副本分配的 Map<Integer, List<Integer>>,其中键是分区,值是分配。在预配新主题时使用。请参见kafka-clients jar 中的NewTopic Javadocs。

默认值:无。

topic.replication-factor

预配主题时要使用的复制因子。覆盖绑定器范围的设置。如果存在replicas-assignments,则忽略。

默认值:无(使用绑定器范围的默认值 -1)。

pollTimeout

在可轮询使用者中用于轮询的超时。

默认值:5 秒。

transactionManager

用于覆盖此绑定的绑定器事务管理器的KafkaAwareTransactionManager的 Bean 名称。如果您想使用ChainedKafkaTransactionManaager将另一个事务与 Kafka 事务同步,通常需要此属性。为了实现记录的精确一次消费和生产,消费者和生产者绑定都必须配置为使用相同的事务管理器。

默认值:无。

txCommitRecovered

使用事务性绑定器时,恢复记录的偏移量(例如,当重试耗尽并且记录发送到死信主题时)默认情况下将通过新事务提交。将此属性设置为false将禁止提交恢复记录的偏移量。

默认值:true。

commonErrorHandlerBeanName

每个使用者绑定要使用的CommonErrorHandler Bean 名称。如果存在,此用户提供的CommonErrorHandler优先于绑定器定义的任何其他错误处理程序。如果应用程序不想使用ListenerContainerCustomizer然后检查目标/组组合以设置错误处理程序,这是一种表达错误处理程序的便捷方法。

默认值:无。

Kafka 生产者属性

以下属性仅适用于 Kafka 生产者,并且必须以spring.cloud.stream.kafka.bindings.<channelName>.producer.为前缀。

为了避免重复,Spring Cloud Stream 支持以spring.cloud.stream.kafka.default.producer.<property>=<value>的格式为所有通道设置值。
admin.configuration

从 2.1.1 版本开始,此属性已弃用,取而代之的是topic.properties,并且将在未来版本中删除对它的支持。

admin.replicas-assignment

从 2.1.1 版本开始,此属性已弃用,取而代之的是topic.replicas-assignment,并且将在未来版本中删除对它的支持。

admin.replication-factor

从 2.1.1 版本开始,此属性已弃用,取而代之的是topic.replication-factor,并且将在未来版本中删除对它的支持。

bufferSize

Kafka 生产者在发送之前尝试批处理的数据量(以字节为单位)的上限。

默认值:16384

sync

生产者是否同步。

默认值:false

sendTimeoutExpression

针对传出消息计算的 SpEL 表达式,用于评估在启用同步发布时等待确认的时间,例如headers['mySendTimeout']。超时的值为毫秒。在 3.0 之前的版本中,除非正在使用原生编码,否则无法使用有效负载,因为在计算此表达式时,有效负载已经以byte[]的形式存在。现在,在转换有效负载之前计算表达式。

默认值:none

batchTimeout

生产者在发送消息之前等待更多消息累积到同一批次中的时间。 (通常,生产者根本不等待,而只是发送在上次发送过程中累积的所有消息。)非零值可能会以延迟为代价提高吞吐量。

默认值:0

messageKeyExpression

针对传出消息计算的 SpEL 表达式,用于填充生成的 Kafka 消息的键,例如headers['myKey']。在 3.0 之前的版本中,除非正在使用原生编码,否则无法使用有效负载,因为在计算此表达式时,有效负载已经以byte[]的形式存在。现在,在转换有效负载之前计算表达式。对于常规处理器(Function<String, String>Function<Message<?>, Message<?>),如果生成的键需要与主题中的传入键相同,则可以按如下方式设置此属性。spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']对于反应式函数,需要牢记一个重要的注意事项。在这种情况下,应用程序需要手动将标头从传入消息复制到传出消息。您可以设置标头,例如myKey并使用headers['myKey']如上所述,或者为了方便起见,只需设置KafkaHeaders.MESSAGE_KEY标头,您就不需要设置此属性。

默认值:none

headerPatterns

用于匹配要映射到ProducerRecord中的 Kafka Headers的 Spring 消息传递标头的简单模式的逗号分隔列表。模式可以以通配符(星号)开头或结尾。可以通过在前面加上!来否定模式。匹配在第一次匹配(正向或负向)后停止。例如,!ask,as*将传递ash但不传递askidtimestamp永远不会映射。

默认值:*(所有标头 - 除idtimestamp之外)

configuration

包含通用 Kafka 生产者属性的键/值对的映射。此处无法设置bootstrap.servers属性;如果您需要连接到多个集群,请使用多绑定器支持。

默认值:空映射。

topic.properties

预配新主题时使用的 Kafka 主题属性的Map,例如spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0

topic.replicas-assignment

副本分配的 Map<Integer, List<Integer>>,其中键是分区,值是分配。在预配新主题时使用。请参见kafka-clients jar 中的NewTopic Javadocs。

默认值:无。

topic.replication-factor

预配主题时要使用的复制因子。覆盖绑定器范围的设置。如果存在replicas-assignments,则忽略。

默认值:无(使用绑定器范围的默认值 -1)。

useTopicHeader

设置为true以使用传出消息中的KafkaHeaders.TOPIC消息标头的值覆盖默认绑定目标(主题名称)。如果标头不存在,则使用默认绑定目标。

默认值:false

recordMetadataChannel

应将成功发送结果发送到的MessageChannel的 Bean 名称;该 Bean 必须存在于应用程序上下文中。发送到通道的消息是发送的消息(如果存在,则在转换后),并带有一个额外的标头KafkaHeaders.RECORD_METADATA。该标头包含 Kafka 客户端提供的RecordMetadata对象;它包括记录在主题中写入的分区和偏移量。

ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)

失败的发送转到生产者错误通道(如果已配置);请参见Kafka 错误通道

默认值:null。

Kafka 绑定器使用生产者的partitionCount设置作为提示,以创建具有给定分区数的主题(与minPartitionCount结合使用,两者中较大的值为使用值)。在为绑定器配置minPartitionCount和为应用程序配置partitionCount时要谨慎,因为使用较大的值。如果主题已经存在且分区数较小且autoAddPartitions已禁用(默认值),则绑定器无法启动。如果主题已经存在且分区数较小且autoAddPartitions已启用,则会添加新的分区。如果主题已经存在且分区数大于(minPartitionCountpartitionCount)的最大值,则使用现有的分区数。
compression

设置compression.type生产者属性。支持的值为nonegzipsnappylz4zstd。如果您根据Spring for Apache Kafka 文档中所述将kafka-clients jar 覆盖到 2.1.0(或更高版本),并且希望使用zstd压缩,请使用spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd

默认值:none

transactionManager

用于覆盖此绑定的绑定器事务管理器的KafkaAwareTransactionManager的 Bean 名称。如果您想使用ChainedKafkaTransactionManaager将另一个事务与 Kafka 事务同步,通常需要此属性。为了实现记录的精确一次消费和生产,消费者和生产者绑定都必须配置为使用相同的事务管理器。

默认值:无。

closeTimeout

关闭生产者时等待的超时时间(以秒为单位)。

默认值:30

allowNonTransactional

通常,与事务性绑定器关联的所有输出绑定都将在新事务中发布,如果没有已经在处理中的事务。此属性允许您覆盖该行为。如果设置为 true,则发布到此输出绑定的记录将不会在事务中运行,除非已经在处理中的事务。

默认值:false