监听器容器属性

表 1. ContainerProperties 属性
属性 默认值 描述

ackCount

1

ackModeCOUNTCOUNT_TIME 时,在提交待处理偏移量之前的记录数。

adviceChain

null

包裹消息监听器的 Advice 对象链(例如 MethodInterceptor 环绕通知),按顺序调用。

ackMode

BATCH

控制偏移量的提交频率 - 参见 提交偏移量

ackTime

5000

ackModeTIMECOUNT_TIME 时,待处理偏移量在经过该毫秒数后提交。

assignmentCommitOption

LATEST_ONLY _NO_TX

是否在分配时提交初始位置;默认情况下,仅当 ConsumerConfig.AUTO_OFFSET_RESET_CONFIGlatest 且即使存在事务管理器也不会在事务中运行时,才会提交初始偏移量。有关可用选项的更多信息,请参阅 ContainerProperties.AssignmentCommitOption 的 JavaDocs。

asyncAcks

false

启用乱序提交(参见 手动提交偏移量);消费者被暂停,提交被延迟直到填补空白。

authExceptionRetryInterval

null

当不为 null 时,表示 Kafka 客户端抛出 AuthenticationExceptionAuthorizationException 时,两次 Poll 之间休眠的 Duration。当为 null 时,此类异常被视为致命错误,容器将停止。

batchRecoverAfterRollback

false

设置为 true 以启用批量恢复,参见 回滚后处理器

clientId

(空字符串)

client.id 消费者属性的前缀。覆盖消费者工厂的 client.id 属性;在并发容器中,每个消费者实例会添加 -n 后缀。

checkDeserExWhenKeyNull

false

设置为 true 以在接收到 null 键时始终检查 DeserializationException 头。当消费者代码无法确定 ErrorHandlingDeserializer 已被配置时(例如在使用委托反序列化器时),此属性非常有用。

checkDeserExWhenValueNull

false

设置为 true 以在接收到 null 值时始终检查 DeserializationException 头。当消费者代码无法确定 ErrorHandlingDeserializer 已被配置时(例如在使用委托反序列化器时),此属性非常有用。

commitCallback

null

当存在且 syncCommitsfalse 时,提交完成后调用的回调。

commitLogLevel

DEBUG

与提交偏移量相关的日志的日志级别。

consumerRebalanceListener

null

一个再平衡监听器;参见 再平衡监听器

commitRetries

3

使用 syncCommits 设置为 true 时,设置 RetriableCommitFailedException 的重试次数。默认 3 次(总共尝试 4 次)。

consumerStartTimeout

30s

在记录错误日志之前等待消费者启动的时间;例如,如果您使用线程数不足的任务执行器,可能会发生这种情况。

deliveryAttemptHeader

false

参见 投递尝试次数头

eosMode

V2

精确一次语义模式;参见 精确一次语义

fixTxOffsets

false

当消费事务性生产者产生的记录,且消费者位于分区末尾时,由于用于指示事务提交/回滚的伪记录,以及可能存在的已回滚记录,Lag 可能被错误地报告为大于零。这不影响消费者的功能,但一些用户对“Lag”非零表示担忧。将此属性设置为 true,容器将纠正此类误报的偏移量。检查在下一次 Poll 之前执行,以避免给提交处理增加显著复杂性。在撰写本文时,仅当消费者配置了 isolation.level=read_committedmax.poll.records 大于 1 时,Lag 才会得到纠正。更多信息请参阅 KAFKA-10683

groupId

null

覆盖消费者 group.id 属性;由 @KafkaListeneridgroupId 属性自动设置。

idleBeforeDataMultiplier

5.0

在收到任何记录之前应用的 idleEventInterval 乘数。收到记录后,不再应用该乘数。自 2.8 版本起可用。

idleBetweenPolls

0

用于通过在两次 Poll 之间休眠线程来减慢投递速度。处理一批记录的时间加上此值必须小于 max.poll.interval.ms 消费者属性。

idleEventInterval

null

设置后,启用 ListenerContainerIdleEvents 的发布,参见 应用事件检测空闲和无响应的消费者。另请参阅 idleBeforeDataMultiplier

idlePartitionEventInterval

null

设置后,启用 ListenerContainerIdlePartitionEvents 的发布,参见 应用事件检测空闲和无响应的消费者

kafkaConsumerProperties

用于覆盖消费者工厂上配置的任意消费者属性。

kafkaAwareTransactionManager

null

参见 事务

listenerTaskExecutor

SimpleAsyncTaskExecutor

一个用于运行消费者线程的任务执行器。默认执行器创建的线程命名为 <name>-C-n;对于 KafkaMessageListenerContainer,名称是 Bean 名称;对于 ConcurrentMessageListenerContainer,名称是 Bean 名称加上 -m 后缀,其中 m 对每个子容器递增。参见 容器线程命名

logContainerConfig

false

设置为 true 以在 INFO 级别记录所有容器属性。

messageListener

null

消息监听器。

micrometerEnabled

true

是否为消费者线程维护 Micrometer 计时器。

micrometerTags

要添加到 Micrometer 度量的静态标签映射。

micrometerTagsProvider

null

一个根据消费者记录提供动态标签的函数。

missingTopicsFatal

false

当为 true 时,如果配置的 topic(s) 不存在于 broker 上,则阻止容器启动。

monitorInterval

30s

检查消费者线程状态以获取 NonResponsiveConsumerEvents 的频率。参见 noPollThresholdpollTimeout

noPollThreshold

3.0

乘以 pollTimeOut 以确定是否发布 NonResponsiveConsumerEvent。参见 monitorInterval

observationConvention

null

设置后,根据消费者记录中的信息,为计时器和跟踪添加动态标签。

observationEnabled

false

设置为 true 以启用通过 Micrometer 进行观察。

offsetAndMetadataProvider

null

一个 OffsetAndMetadata 的提供者;默认情况下,提供者创建一个带有空元数据的偏移量和元数据。提供者提供了一种自定义元数据的方式。

onlyLogRecordMetadata

false

设置为 false 以记录完整的消费者记录(在错误、调试日志等中),而不仅仅是 topic-partition@offset

pauseImmediate

false

当容器暂停时,在处理当前记录后停止处理,而不是在处理完上次 Poll 的所有记录后停止;剩余的记录会保留在内存中,并在容器恢复时传递给监听器。

pollTimeout

5000

传递给 Consumer.poll() 的超时时间,单位为毫秒。

pollTimeoutWhilePaused

100

当容器处于暂停状态时,传递给 Consumer.poll() 的超时时间(单位为毫秒)。

restartAfterAuthExceptions

false

如果容器因授权/认证异常而停止,则设置为 True 以重启容器。

scheduler

ThreadPoolTaskScheduler

一个用于运行消费者监控任务的调度器。

shutdownTimeout

10000

阻塞 stop() 方法直到所有消费者停止并在发布容器停止事件之前的最大时间(单位为毫秒)。

stopContainerWhenFenced

false

如果抛出 ProducerFencedException,则停止监听器容器。更多信息请参见 回滚后处理器

stopImmediate

false

当容器停止时,在处理当前记录后停止处理,而不是在处理完上次 Poll 的所有记录后停止。

subBatchPerPartition

见描述。

使用批量监听器时,如果此属性为 true,则将 Poll 结果分割成子批量(每个分区一个)后调用监听器。默认值为 false

syncCommitTimeout

null

syncCommitstrue 时使用的超时时间。未设置时,容器将尝试确定 default.api.timeout.ms 消费者属性并使用该值;否则将使用 60 秒。

syncCommits

true

是否对偏移量使用同步或异步提交;参见 commitCallback

topics topicPattern topicPartitions

不适用

配置的 topic(s)、topic 模式或显式分配的 topics/partitions。互斥;必须至少提供其中之一;由 ContainerProperties 构造函数强制执行。

transactionManager

null

自 3.2 版本起已弃用,参见 [kafkaAwareTransactionManager], 其他事务管理器

表 2. AbstractMessageListenerContainer 属性
属性 默认值 描述

afterRollbackProcessor

DefaultAfterRollbackProcessor

一个在事务回滚后调用的 AfterRollbackProcessor

applicationEventPublisher

应用上下文

事件发布器。

batchErrorHandler

见描述。

已弃用 - 参见 commonErrorHandler

batchInterceptor

null

设置一个 BatchInterceptor 以在调用批量监听器之前调用;不适用于记录监听器。另请参见 interceptBeforeTx

beanName

bean 名称

容器的 Bean 名称;子容器会添加 -n 后缀。

commonErrorHandler

见描述。

当提供了 transactionManager 且使用了 DefaultAfterRollbackProcessor 时,默认是 DefaultErrorHandlernull。参见 容器错误处理器

containerProperties

ContainerProperties

容器属性实例。

groupId

见描述。

如果存在,则是 containerProperties.groupId,否则是消费者工厂的 group.id 属性。

interceptBeforeTx

true

确定是在事务开始之前还是之后调用 recordInterceptor

listenerId

见描述。

用户配置的容器的 Bean 名称或 @KafkaListenerid 属性。

listenerInfo

null

要填充到 KafkaHeaders.LISTENER_INFO 头中的值。对于 @KafkaListener,此值来自 info 属性。此头可以在各种地方使用,例如在 RecordInterceptorRecordFilterStrategy 以及监听器代码本身中。

pauseRequested

(只读)

如果已请求暂停消费者,则为 True。

recordInterceptor

null

设置一个 RecordInterceptor 以在调用记录监听器之前调用;不适用于批量监听器。另请参见 interceptBeforeTx

topicCheckTimeout

30s

missingTopicsFatal 容器属性为 true 时,等待 describeTopics 操作完成的时间,单位为秒。

表 3. KafkaMessageListenerContainer 属性
属性 默认值 描述

assignedPartitions

(只读)

当前分配给此容器的分区(显式或非显式)。

clientIdSuffix

null

由并发容器使用,为每个子容器的消费者提供唯一的 client.id

containerPaused

不适用

如果已请求暂停并且消费者已实际暂停,则为 True。

表 4. ConcurrentMessageListenerContainer 属性
属性 默认值 描述

alwaysClientIdSuffix

true

concurrency 仅为 1 时,设置为 false 以禁止向 client.id 消费者属性添加后缀。

assignedPartitions

(只读)

当前分配给此容器的子 KafkaMessageListenerContainers 的分区集合(显式或非显式)。

concurrency

1

要管理的子 KafkaMessageListenerContainers 的数量。

containerPaused

不适用

如果已请求暂停并且所有子容器的消费者已实际暂停,则为 True。

containers

不适用

对所有子 KafkaMessageListenerContainers 的引用。