消息监听器容器
提供了两个 MessageListenerContainer 实现
-
KafkaMessageListenerContainer -
ConcurrentMessageListenerContainer
KafkaMessageListenerContainer 在单个线程上接收来自所有主题或分区的所有消息。ConcurrentMessageListenerContainer 委托给一个或多个 KafkaMessageListenerContainer 实例以提供多线程消费。
从版本 2.2.7 开始,你可以向监听器容器添加一个 RecordInterceptor;它将在调用监听器之前被调用,允许检查或修改记录。如果拦截器返回 null,则不调用监听器。从版本 2.7 开始,它具有在监听器退出(正常退出或抛出异常)后调用的附加方法。此外,从版本 2.7 开始,现在有一个 BatchInterceptor,为 批量监听器 提供类似的功能。此外,ConsumerAwareRecordInterceptor(和 BatchInterceptor)提供了对 Consumer<?, ?> 的访问。例如,这可以用于在拦截器中访问消费者指标。
| 你不应在这些拦截器中执行任何影响消费者位置和/或已提交偏移量的方法;容器需要管理此类信息。 |
如果拦截器修改了记录(通过创建新记录),则 topic、partition 和 offset 必须保持不变,以避免意外的副作用,例如记录丢失。 |
CompositeRecordInterceptor 和 CompositeBatchInterceptor 可用于调用多个拦截器。
从版本 4.0 开始,AbstractKafkaListenerContainerFactory 和 AbstractMessageListenerContainer 将 getRecordInterceptor() 和 getBatchInterceptor() 公开为公共方法。如果返回的拦截器是 CompositeRecordInterceptor 或 CompositeBatchInterceptor 的实例,则即使在创建了扩展 AbstractMessageListenerContainer 的容器实例并已配置 RecordInterceptor 或 BatchInterceptor 之后,也可以向其添加额外的 RecordInterceptor 或 BatchInterceptor 实例。以下示例显示了如何实现:
public void configureRecordInterceptor(AbstractKafkaListenerContainerFactory<Integer, String> containerFactory) {
CompositeRecordInterceptor compositeInterceptor;
RecordInterceptor<Integer, String> previousInterceptor = containerFactory.getRecordInterceptor();
if (previousInterceptor instanceof CompositeRecordInterceptor interceptor) {
compositeInterceptor = interceptor;
} else {
compositeInterceptor = new CompositeRecordInterceptor<>();
containerFactory.setRecordInterceptor(compositeInterceptor);
if (previousInterceptor != null) {
compositeInterceptor.addRecordInterceptor(previousInterceptor);
}
}
RecordInterceptor<Integer, String> recordInterceptor1 = new RecordInterceptor() {...};
RecordInterceptor<Integer, String> recordInterceptor2 = new RecordInterceptor() {...};
compositeInterceptor.addRecordInterceptor(recordInterceptor1);
compositeInterceptor.addRecordInterceptor(recordInterceptor2);
}
默认情况下,从版本 2.8 开始,当使用事务时,拦截器在事务开始之前被调用。你可以将监听器容器的 interceptBeforeTx 属性设置为 false,以便在事务开始后调用拦截器。从版本 2.9 开始,这将适用于任何事务管理器,而不仅仅是 KafkaAwareTransactionManager。这允许,例如,拦截器参与由容器启动的 JDBC 事务。
从版本 2.3.8、2.4.6 开始,当并发度大于 1 时,ConcurrentMessageListenerContainer 现在支持 静态成员资格。group.instance.id 会以 -n 作为后缀,其中 n 从 1 开始。这与增加的 session.timeout.ms 一起,可用于减少重新平衡事件,例如,当应用程序实例重新启动时。
使用 KafkaMessageListenerContainer
提供以下构造函数
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它在一个 ContainerProperties 对象中接收 ConsumerFactory 以及关于主题和分区的信息,以及其他配置。ContainerProperties 具有以下构造函数
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
第一个构造函数接受一个 TopicPartitionOffset 参数数组,以明确指示容器使用哪些分区(使用消费者 assign() 方法)以及可选的初始偏移量。正值默认是绝对偏移量。负值默认是相对于分区内当前最后一个偏移量。提供了一个接受附加 boolean 参数的 TopicPartitionOffset 构造函数。如果此参数为 true,则初始偏移量(正或负)相对于此消费者的当前位置。偏移量在容器启动时应用。第二个构造函数接受一个主题数组,Kafka 根据 group.id 属性分配分区 — 在组中分发分区。第三个构造函数使用正则表达式 Pattern 选择主题。
要将 MessageListener 分配给容器,可以在创建容器时使用 ContainerProps.setMessageListener 方法。以下示例展示了如何实现:
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
请注意,当创建 DefaultKafkaConsumerFactory 时,使用仅接受属性的构造函数(如上所示)意味着键和值 Deserializer 类是从配置中获取的。或者,可以将 Deserializer 实例传递给 DefaultKafkaConsumerFactory 构造函数用于键和/或值,在这种情况下,所有消费者共享相同的实例。另一个选项是提供 Supplier<Deserializer>(从版本 2.3 开始),它将用于为每个 Consumer 获取单独的 Deserializer 实例。
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
有关您可以设置的各种属性的更多信息,请参阅 ContainerProperties 的 Javadoc。
从版本 2.1.1 开始,提供了一个名为 logContainerConfig 的新属性。当 true 且启用 INFO 日志时,每个监听器容器都会写入一条日志消息,总结其配置属性。
默认情况下,主题偏移量提交的日志记录在 DEBUG 日志级别执行。从版本 2.1.2 开始,ContainerProperties 中的一个名为 commitLogLevel 的属性允许你指定这些消息的日志级别。例如,要将日志级别更改为 INFO,你可以使用 containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);。
从版本 2.2 开始,添加了一个名为 missingTopicsFatal 的新容器属性(默认值:从 2.3.4 开始为 false)。如果任何配置的主题不存在于代理上,此属性将阻止容器启动。如果容器配置为侦听主题模式(正则表达式),则此属性不适用。以前,容器线程在 consumer.poll() 方法内循环,等待主题出现,同时记录大量消息。除了日志之外,没有迹象表明存在问题。
从版本 2.8 开始,引入了一个新的容器属性 authExceptionRetryInterval。这使得容器在从 KafkaConsumer 获取任何 AuthenticationException 或 AuthorizationException 后重试获取消息。这可能发生在,例如,配置的用户被拒绝访问读取某个主题或凭据不正确时。定义 authExceptionRetryInterval 允许容器在授予正确权限后恢复。
| 默认情况下,未配置间隔 - 身份验证和授权错误被视为致命错误,导致容器停止。 |
从版本 2.8 开始,当创建消费者工厂时,如果你提供反序列化器作为对象(在构造函数中或通过设置器),工厂将调用 configure() 方法来用配置属性配置它们。
使用 ConcurrentMessageListenerContainer
唯一的构造函数与 KafkaListenerContainer 构造函数类似。以下列表显示了构造函数的签名
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它还有一个 concurrency 属性。例如,container.setConcurrency(3) 会创建三个 KafkaMessageListenerContainer 实例。
如果容器属性配置为主题(或主题模式),Kafka 会使用其组管理功能将分区分配给消费者。
|
当监听多个主题时,默认分区分配可能与你预期不符。例如,如果你有三个主题,每个主题有五个分区,并且你想使用 使用 Spring Boot 时,您可以按如下方式设置策略
|
当容器属性配置了 TopicPartitionOffset 时,ConcurrentMessageListenerContainer 会将 TopicPartitionOffset 实例分发给委托的 KafkaMessageListenerContainer 实例。
例如,如果提供了六个 TopicPartitionOffset 实例,并且 concurrency 为 3;每个容器将获得两个分区。对于五个 TopicPartitionOffset 实例,两个容器获得两个分区,第三个获得一个。如果 concurrency 大于 TopicPartitions 的数量,则 concurrency 将向下调整,使得每个容器获得一个分区。
client.id 属性(如果设置)会附加 -n,其中 n 是与并发性对应的消费者实例。当启用 JMX 时,这是为 MBean 提供唯一名称所必需的。 |
从版本 1.3 开始,MessageListenerContainer 提供了对底层 KafkaConsumer 指标的访问。对于 ConcurrentMessageListenerContainer,metrics() 方法返回所有目标 KafkaMessageListenerContainer 实例的指标。这些指标按底层 KafkaConsumer 提供的 client-id 分组到 Map<MetricName, ? extends Metric> 中。
从版本 2.3 开始,ContainerProperties 提供了 idleBetweenPolls 选项,允许监听器容器中的主循环在 KafkaConsumer.poll() 调用之间休眠。实际的休眠间隔是根据提供的选项以及 max.poll.interval.ms 消费者配置与当前记录批处理时间之间的差异的最小值来选择的。
提交偏移量
提供了几个提交偏移量的选项。如果 enable.auto.commit 消费者属性为 true,Kafka 会根据其配置自动提交偏移量。如果为 false,容器支持几种 AckMode 设置(在下一列表中描述)。默认的 AckMode 是 BATCH。从版本 2.3 开始,除非在配置中明确设置,否则框架会将 enable.auto.commit 设置为 false。此前,如果未设置此属性,则使用 Kafka 默认值(true)。
消费者 poll() 方法返回一个或多个 ConsumerRecords。MessageListener 为每个记录调用。以下列表描述了容器对每个 AckMode(不使用事务时)采取的操作
-
RECORD:处理记录后,当监听器返回时提交偏移量。 -
BATCH:当poll()返回的所有记录都已处理完毕时,提交偏移量。 -
TIME:当poll()返回的所有记录都已处理完毕,并且自上次提交以来已超过ackTime时,提交偏移量。 -
COUNT:当poll()返回的所有记录都已处理完毕,并且自上次提交以来已接收到ackCount条记录时,提交偏移量。 -
COUNT_TIME:类似于TIME和COUNT,但如果任一条件为true,则执行提交。 -
MANUAL:消息监听器负责acknowledge()Acknowledgment。之后,应用与BATCH相同的语义。 -
MANUAL_IMMEDIATE:当监听器调用Acknowledgment.acknowledge()方法时立即提交偏移量。
使用事务时,偏移量会发送到事务,其语义等同于 RECORD 或 BATCH,具体取决于监听器类型(记录或批量)。
MANUAL 和 MANUAL_IMMEDIATE 要求监听器是 AcknowledgingMessageListener 或 BatchAcknowledgingMessageListener。请参阅 消息监听器。 |
根据 syncCommits 容器属性,将使用消费者上的 commitSync() 或 commitAsync() 方法。syncCommits 默认为 true;另请参阅 setSyncCommitTimeout。参阅 setCommitCallback 以获取异步提交的结果;默认的回调是 LoggingCommitCallback,它记录错误(并在调试级别记录成功)。
由于监听器容器有自己的提交偏移量机制,因此它更倾向于将 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 设置为 false。从版本 2.3 开始,除非在消费者工厂或容器的消费者属性覆盖中明确设置,否则它会无条件地将其设置为 false。
Acknowledgment 具有以下方法
public interface Acknowledgment {
void acknowledge();
}
此方法允许监听器控制何时提交偏移量。
从版本 2.3 开始,Acknowledgment 接口新增了两个方法 nack(long sleep) 和 nack(int index, long sleep)。第一个用于记录监听器,第二个用于批量监听器。为你的监听器类型调用错误的方法将抛出 IllegalStateException。
如果要提交部分批次,请使用 nack()。当使用事务时,将 AckMode 设置为 MANUAL;调用 nack() 会将成功处理的记录的偏移量发送到事务。 |
nack() 只能在调用你的监听器的消费者线程上调用。 |
使用 乱序提交 时不允许使用 nack()。 |
对于记录监听器,当调用 nack() 时,任何待处理的偏移量都会被提交,上次轮询中剩余的记录会被丢弃,并且会对它们的分区执行查找操作,以便失败的记录和未处理的记录在下一次 poll() 时重新投递。可以通过设置 sleep 参数来在重新投递之前暂停消费者。这与当容器配置了 DefaultErrorHandler 时抛出异常的功能类似。
nack() 会暂停整个监听器,包括所有分配的分区,持续指定的休眠时间。 |
当使用批量监听器时,您可以指定批处理中发生故障的索引。当调用 nack() 时,索引之前的记录的偏移量将被提交,并对失败和已丢弃记录的分区执行查找,以便它们将在下一次 poll() 时重新投递。
有关更多信息,请参阅容器错误处理器。
消费者在休眠期间会暂停,以便我们继续轮询代理以保持消费者活跃。实际的休眠时间及其分辨率取决于容器的 pollTimeout,默认为 5 秒。最小休眠时间等于 pollTimeout,所有休眠时间都将是它的倍数。对于较小的休眠时间或为了提高其准确性,请考虑减少容器的 pollTimeout。 |
从版本 3.0.10 开始,批处理监听器可以使用 Acknowledgment 参数上的 acknowledge(index) 提交批处理的部分偏移量。当调用此方法时,索引处的记录(以及所有先前的记录)的偏移量将被提交。在执行部分批处理提交后调用 acknowledge() 将提交批处理剩余部分的偏移量。以下限制适用
-
需要
AckMode.MANUAL_IMMEDIATE -
该方法必须在监听器线程上调用
-
监听器必须消费一个
List而不是原始的ConsumerRecords -
索引必须在列表元素的范围内
-
索引必须大于之前调用中使用的索引
这些限制是强制执行的,该方法将根据违规情况抛出 IllegalArgumentException 或 IllegalStateException。