消息监听器容器
提供了两种 MessageListenerContainer
实现
-
KafkaMessageListenerContainer
-
ConcurrentMessageListenerContainer
KafkaMessageListenerContainer
在单个线程上接收所有主题或分区的所有消息。ConcurrentMessageListenerContainer
委托给一个或多个 KafkaMessageListenerContainer
实例以提供多线程消费。
从版本 2.2.7 开始,您可以向监听器容器添加 RecordInterceptor
;它将在调用监听器之前被调用,允许检查或修改记录。如果拦截器返回 null,则不会调用监听器。从版本 2.7 开始,它增加了在监听器退出(正常退出或抛出异常)后调用的额外方法。此外,从版本 2.7 开始,新增了 BatchInterceptor
,为批量监听器提供类似的功能。另外,ConsumerAwareRecordInterceptor
(和 BatchInterceptor
)提供对 Consumer<?, ?>
的访问。例如,这可以用于在拦截器中访问消费者指标。
您不应该在这些拦截器中执行任何影响消费者位置和/或已提交偏移量的方法;容器需要管理这些信息。 |
如果拦截器修改了记录(通过创建一个新记录),主题、分区和偏移量必须保持不变,以避免意外的副作用,例如记录丢失。 |
CompositeRecordInterceptor
和 CompositeBatchInterceptor
可用于调用多个拦截器。
默认情况下,从版本 2.8 开始,使用事务时,拦截器在事务开始之前被调用。您可以将监听器容器的 interceptBeforeTx
属性设置为 false
,以便在事务开始后调用拦截器。从版本 2.9 开始,这适用于任何事务管理器,而不仅仅是 KafkaAwareTransactionManager
。例如,这使得拦截器能够参与由容器启动的 JDBC 事务。
从版本 2.3.8、2.4.6 开始,当并发度大于 1 时,ConcurrentMessageListenerContainer
现在支持静态成员资格(Static Membership)。group.instance.id
的后缀为 -n
,其中 n
从 1
开始。这与增加的 session.timeout.ms
一起使用,可以减少重平衡事件,例如在应用实例重新启动时。
使用 KafkaMessageListenerContainer
提供了以下构造函数
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它接收一个 ConsumerFactory
以及主题和分区的信息,以及其他配置,这些都在 ContainerProperties
对象中。ContainerProperties
提供了以下构造函数
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
第一个构造函数接受一个 TopicPartitionOffset
参数数组,以明确指示容器使用哪些分区(使用 consumer 的 assign()
方法)以及可选的初始偏移量。正值默认是绝对偏移量。负值默认是相对于分区内当前最后一个偏移量的相对值。提供了 TopicPartitionOffset
的一个构造函数,它接受一个额外的 boolean
参数。如果此参数为 true
,则初始偏移量(正或负)是相对于此消费者当前位置的相对值。偏移量在容器启动时应用。第二个接受一个主题数组,Kafka 根据 group.id
属性分配分区——在组内分布分区。第三个使用正则表达式 Pattern
选择主题。
要将 MessageListener
分配给容器,您可以在创建容器时使用 ContainerProps.setMessageListener
方法。以下示例展示了如何操作
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
请注意,创建 DefaultKafkaConsumerFactory
时,使用如上所示只接受属性的构造函数意味着键和值的 Deserializer
类是从配置中获取的。或者,可以将 Deserializer
实例传递给 DefaultKafkaConsumerFactory
的构造函数,用于键和/或值,在这种情况下,所有 Consumers 将共享相同的实例。另一种选择是提供 Supplier<Deserializer>
(从版本 2.3 开始),用于为每个 Consumer
获取单独的 Deserializer
实例
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
有关 ContainerProperties
的各种属性的更多信息,请参阅其Javadoc。
从版本 2.1.1 开始,引入了一个名为 logContainerConfig
的新属性。当此属性为 true
且 INFO
级别日志启用时,每个监听器容器会写入一条日志消息,总结其配置属性。
默认情况下,主题偏移量提交的日志记录级别是 DEBUG
。从版本 2.1.2 开始,ContainerProperties
中新增了一个名为 commitLogLevel
的属性,允许您指定这些消息的日志级别。例如,要将日志级别更改为 INFO
,您可以使用 containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
。
从版本 2.2 开始,新增了一个名为 missingTopicsFatal
的容器属性(默认值:从 2.3.4 开始为 false
)。如果配置的任何主题在 broker 上不存在,这将阻止容器启动。如果容器配置为监听主题模式(正则表达式),则此属性不适用。之前,容器线程会在 consumer.poll()
方法内循环等待主题出现,同时记录大量消息。除了日志之外,没有任何迹象表明存在问题。
从版本 2.8 开始,引入了一个新的容器属性 authExceptionRetryInterval
。当从 KafkaConsumer
获取到任何 AuthenticationException
或 AuthorizationException
时,这会导致容器重试获取消息。例如,当配置的用户被拒绝读取某个主题或凭据不正确时,可能会发生这种情况。定义 authExceptionRetryInterval
允许容器在授予适当权限后恢复。
默认情况下,未配置间隔 - 认证和授权错误被视为致命错误,会导致容器停止。 |
从版本 2.8 开始,创建消费者工厂时,如果您以对象形式(在构造函数中或通过 setter)提供反序列化器,工厂将调用 configure()
方法,并使用配置属性对其进行配置。
使用 ConcurrentMessageListenerContainer
其唯一的构造函数与 KafkaListenerContainer
的构造函数类似。以下清单显示了构造函数的签名
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它还有一个 concurrency
属性。例如,container.setConcurrency(3)
会创建三个 KafkaMessageListenerContainer
实例。
如果容器属性配置为主题(或主题模式),Kafka 会使用其组管理功能在消费者之间分配分区。
监听多个主题时,默认的分区分配可能与您的预期不同。例如,如果您有三个主题,每个主题有五个分区,并且您想使用 使用 Spring Boot 时,您可以按如下方式设置策略
|
当容器属性配置了 TopicPartitionOffset
s 时,ConcurrentMessageListenerContainer
会将 TopicPartitionOffset
实例分布到委托的 KafkaMessageListenerContainer
实例上。
例如,如果提供了六个 TopicPartitionOffset
实例,且 concurrency
为 3
;每个容器将获得两个分区。对于五个 TopicPartitionOffset
实例,两个容器将获得两个分区,第三个获得一个。如果 concurrency
大于 TopicPartitions
的数量,则 concurrency
会向下调整,使得每个容器获得一个分区。
client.id 属性(如果设置)会附加 -n ,其中 n 对应于并发的消费者实例。当启用 JMX 时,这对于为 MBeans 提供唯一的名称是必需的。 |
从版本 1.3 开始,MessageListenerContainer
提供了访问底层 KafkaConsumer
指标的功能。对于 ConcurrentMessageListenerContainer
,metrics()
方法返回所有目标 KafkaMessageListenerContainer
实例的指标。这些指标按底层 KafkaConsumer
提供的 client-id
分组到 Map<MetricName, ? extends Metric>
中。
从版本 2.3 开始,ContainerProperties
提供了一个 idleBetweenPolls
选项,允许监听器容器中的主循环在 KafkaConsumer.poll()
调用之间休眠。实际的休眠间隔是提供的选项以及 max.poll.interval.ms
消费者配置与当前记录批处理时间差的最小值。
提交偏移量
提供了几种提交偏移量的选项。如果 enable.auto.commit
消费者属性为 true
,Kafka 会根据其配置自动提交偏移量。如果为 false
,容器支持多种 AckMode
设置(在以下列表中描述)。默认的 AckMode
是 BATCH
。从版本 2.3 开始,除非在配置中明确设置,否则框架会将 enable.auto.commit
设置为 false。以前,如果未设置此属性,则使用 Kafka 的默认值(true
)。
消费者 poll()
方法返回一个或多个 ConsumerRecords
。MessageListener
会为每条记录调用一次。以下列表描述了容器在每种 AckMode
下采取的操作(不使用事务时)
-
RECORD
:在监听器处理完记录返回后提交偏移量。 -
BATCH
:在处理完poll()
返回的所有记录后提交偏移量。 -
TIME
:在处理完poll()
返回的所有记录后提交偏移量,前提是自上次提交以来超过了ackTime
。 -
COUNT
:在处理完poll()
返回的所有记录后提交偏移量,前提是自上次提交以来收到了ackCount
条记录。 -
COUNT_TIME
:类似于TIME
和COUNT
,但只要任一条件为true
,就会执行提交。 -
MANUAL
:消息监听器负责acknowledge()
Acknowledgment
。 -
MANUAL_IMMEDIATE
:当监听器调用Acknowledgment.acknowledge()
方法时立即提交偏移量。
使用事务时,偏移量会发送到事务中,其语义等同于 RECORD
或 BATCH
,具体取决于监听器类型(记录或批量)。
MANUAL 和 MANUAL_IMMEDIATE 要求监听器是 AcknowledgingMessageListener 或 BatchAcknowledgingMessageListener 。参见消息监听器。 |
根据 syncCommits
容器属性,使用消费者上的 commitSync()
或 commitAsync()
方法。syncCommits
默认值为 true
;另请参阅 setSyncCommitTimeout
。参见 setCommitCallback
以获取异步提交的结果;默认的回调是 LoggingCommitCallback
,它记录错误(并在 debug 级别记录成功)。
由于监听器容器有自己的提交偏移量机制,它倾向于将 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
设置为 false
。从版本 2.3 开始,除非在消费者工厂或容器的消费者属性覆盖中明确设置,否则它会无条件地将其设置为 false。
Acknowledgment
提供了以下方法
public interface Acknowledgment {
void acknowledge();
}
此方法允许监听器控制何时提交偏移量。
从版本 2.3 开始,Acknowledgment
接口增加了两个额外的方法 nack(long sleep)
和 nack(int index, long sleep)
。第一个用于记录监听器,第二个用于批量监听器。为您的监听器类型调用错误的方法将抛出 IllegalStateException
。
如果您想提交部分批次,使用 nack() ,在使用事务时,将 AckMode 设置为 MANUAL ;调用 nack() 会将成功处理的记录的偏移量发送到事务。 |
nack() 只能在调用您的监听器的消费者线程上调用。 |
使用乱序提交(Out of Order Commits)时不允许使用 nack() 。 |
对于记录监听器,调用 nack()
时,任何待提交的偏移量都会被提交,上次 poll()
返回的剩余记录会被丢弃,并对其分区执行 seek 操作,以便在下一次 poll()
时重新投递失败和未处理的记录。通过设置 sleep
参数,可以在重新投递之前暂停消费者。此功能类似于当容器配置了 DefaultErrorHandler
时抛出异常。
nack() 会暂停整个监听器及其所有已分配的分区,暂停时长为指定的 sleep 持续时间。 |
使用批量监听器时,您可以指定批次内发生失败的记录索引。调用 nack()
时,会提交索引之前记录的偏移量,并对失败和丢弃的记录所在的分区执行 seek 操作,以便在下一次 poll()
时重新投递它们。
有关更多信息,请参阅容器错误处理器。
消费者在休眠期间会暂停,以便我们继续轮询 broker 以保持消费者活跃。实际的休眠时间及其精度取决于容器的 pollTimeout ,默认为 5 秒。最小休眠时间等于 pollTimeout ,并且所有休眠时间都将是它的倍数。对于较短的休眠时间或为了提高精度,可以考虑减少容器的 pollTimeout 。 |
从版本 3.0.10 开始,批量监听器可以使用 Acknowledgment
参数上的 acknowledge(index)
方法来提交批次中部分记录的偏移量。调用此方法时,索引处记录(以及之前所有记录)的偏移量将被提交。在执行部分批次提交后调用 acknowledge()
将提交批次剩余记录的偏移量。以下限制适用
-
需要
AckMode.MANUAL_IMMEDIATE
-
方法必须在监听器线程上调用
-
监听器必须消费
List
而不是原始的ConsumerRecords
-
索引必须在列表元素的范围内
-
索引必须大于之前调用中使用的索引
这些限制将被强制执行,根据违规情况,方法将抛出 IllegalArgumentException
或 IllegalStateException
。