监听器容器属性

表 1. ContainerProperties 属性
属性 默认值 描述

ackCount

1

ackModeCOUNTCOUNT_TIME 时,提交挂起的偏移量之前的记录数。

adviceChain

null

围绕消息监听器的一系列 Advice 对象(例如 MethodInterceptor 围绕建议),按顺序调用。

ackMode

BATCH

控制多久提交一次偏移量 - 请参阅 提交偏移量

ackTime

5000

ackModeTIMECOUNT_TIME 时,提交挂起的偏移量之前经过的时间(毫秒)。

assignmentCommitOption

LATEST_ONLY _NO_TX

是否在分配时提交初始位置;默认情况下,仅当 ConsumerConfig.AUTO_OFFSET_RESET_CONFIGlatest 时才会提交初始偏移量,即使存在事务管理器,它也不会在事务中运行。有关可用选项的更多信息,请参阅 ContainerProperties.AssignmentCommitOption 的 JavaDoc。

asyncAcks

false

启用乱序提交(请参阅 手动提交偏移量);消费者会暂停,并且提交会被推迟,直到填补空白。

authExceptionRetryInterval

null

如果非空,则为 Kafka 客户端抛出 AuthenticationExceptionAuthorizationException 时在轮询之间休眠的 Duration。如果为 null,则此类异常被视为致命异常,容器将停止。

batchRecoverAfterRollback

false

设置为 true 以启用批处理恢复,请参阅 回滚后处理器

clientId

(空字符串)

client.id 消费者属性的前缀。覆盖消费者工厂 client.id 属性;在并发容器中,-n 会作为每个消费者实例的后缀添加。

checkDeserExWhenKeyNull

false

设置为 true 以始终在收到 null key 时检查 DeserializationException 标头。当消费者代码无法确定是否已配置 ErrorHandlingDeserializer 时很有用,例如在使用委托反序列化器时。

checkDeserExWhenValueNull

false

设置为 true 以始终在收到 null value 时检查 DeserializationException 标头。当消费者代码无法确定是否已配置 ErrorHandlingDeserializer 时很有用,例如在使用委托反序列化器时。

commitCallback

null

如果存在且 syncCommitsfalse,则在提交完成后调用的回调。

commitLogLevel

DEBUG

与提交偏移量相关的日志的日志记录级别。

consumerRebalanceListener

null

重新平衡监听器;请参阅 重新平衡监听器

commitRetries

3

设置使用 syncCommits 设置为 true 时 RetriableCommitFailedException 的重试次数。默认为 3(共 4 次尝试)。

consumerStartTimeout

30s

在记录错误之前等待消费者启动的时间;例如,如果使用线程不足的任务执行器,则可能会发生这种情况。

deliveryAttemptHeader

false

请参阅 传递尝试标头

eosMode

V2

精确一次语义模式;请参阅 精确一次语义

fixTxOffsets

false

在使用事务性生产者生成的记录时,如果消费者位于分区的末尾,则由于用于指示事务提交/回滚的伪记录以及可能存在的回滚记录,滞后可能会错误地报告为大于零。这不会在功能上影响消费者,但一些用户表示担心“滞后”不为零。将此属性设置为 true,容器将更正此类错误报告的偏移量。检查将在下一次轮询之前执行,以避免在提交处理中添加明显的复杂性。在撰写本文时,仅当消费者配置为 isolation.level=read_committedmax.poll.records 大于 1 时,才会更正滞后。有关更多信息,请参阅 KAFKA-10683

groupId

null

覆盖消费者 group.id 属性;由 @KafkaListeneridgroupId 属性自动设置。

idleBeforeDataMultiplier

5.0

在接收到任何记录之前应用于idleEventInterval的乘数。接收到记录后,将不再应用乘数。自 2.8 版起可用。

idleBetweenPolls

0

用于通过在轮询之间使线程休眠来减慢传递速度。处理一批记录的时间加上此值必须小于max.poll.interval.ms消费者属性。

idleEventInterval

null

设置后,启用ListenerContainerIdleEvent的发布,请参阅应用程序事件检测空闲和无响应的消费者。另请参阅idleBeforeDataMultiplier

idlePartitionEventInterval

null

设置后,启用ListenerContainerIdlePartitionEvent的发布,请参阅应用程序事件检测空闲和无响应的消费者

kafkaConsumerProperties

用于覆盖在消费者工厂上配置的任何任意消费者属性。

kafkaAwareTransactionManager

null

请参阅事务

listenerTaskExecutor

SimpleAsyncTaskExecutor

用于运行消费者线程的任务执行器。默认执行器创建名为<name>-C-n的线程;对于KafkaMessageListenerContainer,名称是bean名称;对于ConcurrentMessageListenerContainer,名称是bean名称后跟-n,其中n为每个子容器递增。

logContainerConfig

false

设置为true以在INFO级别记录所有容器属性。

messageListener

null

消息侦听器。

micrometerEnabled

true

是否为消费者线程维护Micrometer计时器。

micrometerTags

要添加到Micrometer指标的静态标签映射。

micrometerTagsProvider

null

一个函数,根据消费者记录提供动态标签。

missingTopicsFatal

false

如果配置的主题在代理上不存在,则设置为true可防止容器启动。

monitorInterval

30s

检查消费者线程的NonResponsiveConsumerEvents状态的频率。请参阅noPollThresholdpollTimeout

noPollThreshold

3.0

乘以pollTimeOut以确定是否发布NonResponsiveConsumerEvent。请参阅monitorInterval

observationConvention

null

设置后,根据消费者记录中的信息,将动态标签添加到计时器和跟踪中。

observationEnabled

false

设置为true以通过Micrometer启用观察。

offsetAndMetadataProvider

null

OffsetAndMetadata的提供者;默认情况下,提供者会创建具有空元数据的偏移量和元数据。提供者提供了一种自定义元数据的方法。

onlyLogRecordMetadata

false

设置为false以记录完整的消费者记录(在错误、调试日志等中),而不是仅记录topic-partition@offset

pauseImmediate

false

当容器暂停时,在处理当前记录后停止处理,而不是在处理上一次轮询的所有记录后停止;剩余的记录将保留在内存中,并在容器恢复时传递给侦听器。

pollTimeout

5000

以毫秒为单位传递给Consumer.poll()的超时。

pollTimeoutWhilePaused

100

当容器处于暂停状态时,传递给Consumer.poll()(以毫秒为单位)的超时。

restartAfterAuthExceptions

false

如果容器由于授权/身份验证异常而停止,则设置为True以重新启动容器。

scheduler

ThreadPoolTaskScheduler

用于运行消费者监视器任务的调度程序。

shutdownTimeout

10000

以毫秒为单位的最大时间,在该时间内阻止stop()方法,直到所有消费者停止并在发布容器停止事件之前。

stopContainerWhenFenced

false

如果抛出ProducerFencedException,则停止侦听器容器。有关更多信息,请参阅回滚后处理器

stopImmediate

false

当容器停止时,在处理当前记录后停止处理,而不是在处理上一次轮询的所有记录后停止。

subBatchPerPartition

请参阅描述。

使用批处理侦听器时,如果为true,则使用轮询结果(每个分区一个)拆分为子批处理来调用侦听器。默认值为false

syncCommitTimeout

null

syncCommitstrue时使用的超时。未设置时,容器将尝试确定default.api.timeout.ms消费者属性并使用它;否则将使用60秒。

syncCommits

true

是否对偏移量使用同步或异步提交;请参阅commitCallback

topics topicPattern topicPartitions

n/a

配置的主题、主题模式或显式分配的主题/分区。互斥;必须提供至少一个;由ContainerProperties构造函数强制执行。

transactionManager

null

自 3.2 版起已弃用,请参阅[kafkaAwareTransactionManager]其他事务管理器

表 2. AbstractListenerContainer 属性
属性 默认值 描述

afterRollbackProcessor

DefaultAfterRollbackProcessor

在回滚事务后调用的AfterRollbackProcessor

applicationEventPublisher

应用程序上下文

事件发布者。

batchErrorHandler

请参阅描述。

已弃用 - 请参阅commonErrorHandler

batchInterceptor

null

设置BatchInterceptor以在调用批处理侦听器之前调用;不适用于记录侦听器。另请参阅interceptBeforeTx

beanName

bean名称

容器的bean名称;对于子容器,后跟-n

commonErrorHandler

请参阅描述。

当提供transactionManager时为DefaultErrorHandlernull,当使用DefaultAfterRollbackProcessor时。请参阅容器错误处理程序

containerProperties

ContainerProperties

容器属性实例。

groupId

请参阅描述。

如果存在,则为containerProperties.groupId,否则为消费者工厂中的group.id属性。

interceptBeforeTx

true

确定是否在事务开始之前或之后调用recordInterceptor

listenerId

请参阅描述。

用户配置的容器的bean名称或@KafkaListenerid属性。

listenerInfo

null

要在KafkaHeaders.LISTENER_INFO标头中填充的值。使用@KafkaListener时,此值从info属性中获取。此标头可用于各种位置,例如RecordInterceptorRecordFilterStrategy和侦听器代码本身。

pauseRequested

(只读)

如果已请求消费者暂停,则为True。

recordInterceptor

null

设置RecordInterceptor以在调用记录侦听器之前调用;不适用于批处理侦听器。另请参阅interceptBeforeTx

topicCheckTimeout

30s

missingTopicsFatal容器属性为true时,等待describeTopics操作完成的最长时间(以秒为单位)。

表 3. KafkaMessageListenerContainer 属性
属性 默认值 描述

assignedPartitions

(只读)

当前分配给此容器(显式或隐式)的分区。

assignedPartitionsByClientId

(只读)

当前分配给此容器(显式或隐式)的分区。

clientIdSuffix

null

并发容器使用它为每个子容器的消费者提供唯一的client.id

containerPaused

n/a

如果已请求暂停并且消费者已实际暂停,则为True。

表 4. ConcurrentMessageListenerContainer 属性
属性 默认值 描述

alwaysClientIdSuffix

true

设置为false以抑制向client.id消费者属性添加后缀,当concurrency仅为1时。

assignedPartitions

(只读)

当前分配给此容器的子KafkaMessageListenerContainer(显式或隐式)的分区的聚合。

assignedPartitionsByClientId

(只读)

当前分配给此容器的子KafkaMessageListenerContainer(显式或隐式)的分区,以子容器的消费者的client.id属性为键。

concurrency

1

要管理的子KafkaMessageListenerContainer的数量。

containerPaused

n/a

如果已请求暂停并且所有子容器的消费者已实际暂停,则为True。

containers

n/a

对所有子KafkaMessageListenerContainer的引用。