监听器容器属性

表 1. ContainerProperties 属性
财产 默认值 描述

ackCount

1

ackModeCOUNTCOUNT_TIME 时,在提交待定偏移量之前的记录数量。

adviceChain

null

一个 Advice 对象链(例如围绕消息侦听器的 MethodInterceptor 建议),按顺序调用。

ackMode

BATCH

控制偏移量提交的频率 - 请参阅提交偏移量

ackTime

5000

ackModeTIMECOUNT_TIME 时,待定偏移量提交之前的时间(毫秒)。

assignmentCommitOption

LATEST_ONLY _NO_TX

是否在分配时提交初始位置;默认情况下,只有当 ConsumerConfig.AUTO_OFFSET_RESET_CONFIGlatest 时才会提交初始偏移量,并且即使存在事务管理器,也不会在事务中运行。有关可用选项的更多信息,请参阅 ContainerProperties.AssignmentCommitOption 的 JavaDocs。

asyncAcks

启用乱序提交(请参阅手动提交偏移量);消费者暂停,直到填充间隙后才提交。

authExceptionRetryInterval

null

当不为 null 时,Kafka 客户端抛出 AuthenticationExceptionAuthorizationException 异常时,在两次轮询之间休眠的 Duration。当为 null 时,此类异常被认为是致命的,容器将停止。

batchRecoverAfterRollback

设置为 true 以启用批处理恢复,请参阅回滚后处理器

clientId

(空字符串)

client.id 消费者属性的前缀。覆盖消费者工厂的 client.id 属性;在并发容器中,-n 作为后缀添加到每个消费者实例。

checkDeserExWhenKeyNull

当收到 null key 时,设置为 true 以始终检查 DeserializationException 标头。当消费者代码无法确定已配置 ErrorHandlingDeserializer 时(例如使用委托反序列化器时)很有用。

checkDeserExWhenValueNull

当收到 null value 时,设置为 true 以始终检查 DeserializationException 标头。当消费者代码无法确定已配置 ErrorHandlingDeserializer 时(例如使用委托反序列化器时)很有用。

commitCallback

null

当存在且 syncCommitsfalse 时,提交完成后调用的回调。

commitLogLevel

DEBUG

与提交偏移量相关的日志的日志级别。

consumerRebalanceListener

null

一个再平衡侦听器;请参阅再平衡侦听器

commitRetries

3

当使用 syncCommits 设置为 true 时,设置 RetriableCommitFailedException 的重试次数。默认 3 次(总共 4 次尝试)。

consumerStartTimeout

30s

在记录错误之前等待消费者启动的时间;这可能会发生,例如,如果使用线程不足的任务执行器。

deliveryAttemptHeader

请参阅投递尝试标头

eosMode

V2

精确一次语义模式;请参阅精确一次语义

fixTxOffsets

当消费由事务性生产者生成且消费者位于分区的末尾的记录时,由于用于指示事务提交/回滚的伪记录以及可能存在的已回滚记录,滞后可能错误地报告为大于零。这不会在功能上影响消费者,但有些用户担心“滞后”不是零。将此属性设置为 true,容器将纠正此类错误报告的偏移量。在下次轮询之前执行检查,以避免增加提交处理的显著复杂性。在撰写本文时,仅当消费者配置了 isolation.level=read_committedmax.poll.records 大于 1 时,滞后才会得到纠正。有关更多信息,请参阅KAFKA-10683

groupId

null

覆盖消费者 group.id 属性;由 @KafkaListeneridgroupId 属性自动设置。

idleBeforeDataMultiplier

5.0

应用于在收到任何记录之前应用的 idleEventInterval 的乘数。收到记录后,不再应用乘数。自版本 2.8 起可用。

idleBetweenPolls

0

用于通过在轮询之间休眠线程来减慢交付速度。处理一批记录的时间加上此值必须小于 max.poll.interval.ms 消费者属性。

idleEventInterval

null

设置后,启用 ListenerContainerIdleEvent 的发布,请参阅应用程序事件检测空闲和无响应的消费者。另请参阅 idleBeforeDataMultiplier

idlePartitionEventInterval

null

设置后,启用 ListenerContainerIdlePartitionEvent 的发布,请参阅应用程序事件检测空闲和无响应的消费者

kafkaConsumerProperties

用于覆盖在消费者工厂上配置的任何任意消费者属性。

kafkaAwareTransactionManager

null

请参阅事务

listenerTaskExecutor

SimpleAsyncTaskExecutor

一个任务执行器,用于运行消费者线程。默认执行器创建名为 <name>-C-n 的线程;对于 KafkaMessageListenerContainer,名称是 bean 名称;对于 ConcurrentMessageListenerContainer,名称是 bean 名称后加上 -m,其中 m 为每个子容器递增。请参阅容器线程命名

logContainerConfig

设置为 true 以在 INFO 级别记录所有容器属性。

messageListener

null

消息侦听器。

micrometerEnabled

true

是否为消费者线程维护 Micrometer 计时器。

micrometerTags

要添加到 Micrometer 指标的静态标签映射。

micrometerTagsProvider

null

一个根据消费者记录提供动态标签的函数。

missingTopicsFatal

当为 true 时,如果配置的主题在 broker 上不存在,则阻止容器启动。

monitorInterval

30s

检查消费者线程状态以获取 NonResponsiveConsumerEvent 的频率。请参阅 noPollThresholdpollTimeout

noPollThreshold

3.0

乘以 pollTimeOut 以确定是否发布 NonResponsiveConsumerEvent。请参阅 monitorInterval

observationConvention

null

设置后,根据消费者记录中的信息向计时器和跟踪添加动态标签。

observationEnabled

设置为 true 以通过 Micrometer 启用观察。

offsetAndMetadataProvider

null

一个 OffsetAndMetadata 提供者;默认情况下,提供者创建带有空元数据的偏移量和元数据。提供者提供了一种自定义元数据的方法。

onlyLogRecordMetadata

设置为 false 以记录完整的消费者记录(在错误、调试日志等中),而不仅仅是 topic-partition@offset

pauseImmediate

当容器暂停时,在当前记录处理后停止处理,而不是在处理完上次轮询的所有记录后停止;剩余的记录保留在内存中,当容器恢复时将传递给侦听器。

pollTimeout

5000

传递给 Consumer.poll() 的超时时间(毫秒)。

pollTimeoutWhilePaused

100

当容器处于暂停状态时,传递给 Consumer.poll() 的超时时间(毫秒)。

restartAfterAuthExceptions

如果容器因授权/认证异常而停止,则为 true 以重新启动容器。

scheduler

ThreadPoolTaskScheduler

一个调度器,用于运行消费者监视任务。

shutdownTimeout

10000

阻塞 stop() 方法的最长时间(毫秒),直到所有消费者停止并发布容器停止事件。

stopContainerWhenFenced

如果抛出 ProducerFencedException,则停止侦听器容器。有关更多信息,请参阅回滚后处理器

stopImmediate

当容器停止时,在当前记录处理后停止处理,而不是在处理完上次轮询的所有记录后停止。

subBatchPerPartition

请参阅描述。

使用批处理侦听器时,如果此值为 true,则侦听器将使用轮询结果(按分区拆分为子批处理)进行调用。默认值为 false

syncCommitTimeout

null

syncCommitstrue 时使用的超时时间。未设置时,容器将尝试确定 default.api.timeout.ms 消费者属性并使用它;否则将使用 60 秒。

syncCommits

true

是否对偏移量使用同步或异步提交;请参阅 commitCallback

topics topicPattern topicPartitions

不适用

配置的主题、主题模式或显式分配的主题/分区。互斥;必须至少提供一个;由 ContainerProperties 构造函数强制执行。

transactionManager

null

自 3.2 起已弃用,请参阅[kafkaAwareTransactionManager]其他事务管理器

表 2. AbstractMessageListenerContainer 属性
财产 默认值 描述

afterRollbackProcessor

DefaultAfterRollbackProcessor

在事务回滚后调用的 AfterRollbackProcessor

applicationEventPublisher

应用程序上下文

事件发布者。

batchErrorHandler

请参阅描述。

已弃用 - 请参阅 commonErrorHandler

batchInterceptor

null

设置一个 BatchInterceptor,在调用批处理侦听器之前调用;不适用于记录侦听器。另请参阅 interceptBeforeTx

beanName

bean 名称

容器的 bean 名称;子容器的后缀为 -n

commonErrorHandler

请参阅描述。

DefaultErrorHandlernull,当提供 transactionManager 时,使用 DefaultAfterRollbackProcessor。请参阅容器错误处理器

containerProperties

ContainerProperties

容器属性实例。

groupId

请参阅描述。

如果存在,则为 containerProperties.groupId,否则为消费者工厂的 group.id 属性。

interceptBeforeTx

true

确定 recordInterceptor 是在事务开始之前还是之后调用。

listenerId

请参阅描述。

用户配置容器的 bean 名称或 @KafkaListenerid 属性。

listenerInfo

null

一个用于填充 KafkaHeaders.LISTENER_INFO 标头的值。对于 @KafkaListener,此值从 info 属性获取。此标头可用于各种地方,例如 RecordInterceptorRecordFilterStrategy 和侦听器代码本身。

pauseRequested

(只读)

如果已请求消费者暂停,则为 True。

recordInterceptor

null

设置一个 RecordInterceptor,在调用记录侦听器之前调用;不适用于批处理侦听器。另请参阅 interceptBeforeTx

topicCheckTimeout

30s

missingTopicsFatal 容器属性为 true 时,等待 describeTopics 操作完成的时间(秒)。

表 3. KafkaMessageListenerContainer 属性
财产 默认值 描述

assignedPartitions

(只读)

当前分配给此容器的分区(显式或不显式)。

clientIdSuffix

null

由并发容器用于为每个子容器的消费者提供唯一的 client.id

containerPaused

不适用

如果已请求暂停且消费者已实际暂停,则为 True。

表 4. ConcurrentMessageListenerContainer 属性
财产 默认值 描述

alwaysClientIdSuffix

true

设置为 false 可阻止在 concurrency 仅为 1 时向 client.id 消费者属性添加后缀。

assignedPartitions

(只读)

此容器的子 KafkaMessageListenerContainer 当前分配的分区的总和(显式或不显式)。

concurrency

1

要管理的子 KafkaMessageListenerContainer 的数量。

containerPaused

不适用

如果已请求暂停且所有子容器的消费者已实际暂停,则为 True。

containers

不适用

对所有子 KafkaMessageListenerContainer 的引用。

© . This site is unofficial and not affiliated with VMware.