消息监听器容器

提供了两个 MessageListenerContainer 实现

  • KafkaMessageListenerContainer

  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainer 在单个线程上接收来自所有主题或分区的所有消息。ConcurrentMessageListenerContainer 委托给一个或多个 KafkaMessageListenerContainer 实例以提供多线程消费。

从版本 2.2.7 开始,你可以向监听器容器添加一个 RecordInterceptor;它将在调用监听器之前被调用,允许检查或修改记录。如果拦截器返回 null,则不调用监听器。从版本 2.7 开始,它具有在监听器退出(正常退出或抛出异常)后调用的附加方法。此外,从版本 2.7 开始,现在有一个 BatchInterceptor,为 批量监听器 提供类似的功能。此外,ConsumerAwareRecordInterceptor(和 BatchInterceptor)提供了对 Consumer<?, ?> 的访问。例如,这可以用于在拦截器中访问消费者指标。

你不应在这些拦截器中执行任何影响消费者位置和/或已提交偏移量的方法;容器需要管理此类信息。
如果拦截器修改了记录(通过创建新记录),则 topicpartitionoffset 必须保持不变,以避免意外的副作用,例如记录丢失。

CompositeRecordInterceptorCompositeBatchInterceptor 可用于调用多个拦截器。

从版本 4.0 开始,AbstractKafkaListenerContainerFactoryAbstractMessageListenerContainergetRecordInterceptor()getBatchInterceptor() 公开为公共方法。如果返回的拦截器是 CompositeRecordInterceptorCompositeBatchInterceptor 的实例,则即使在创建了扩展 AbstractMessageListenerContainer 的容器实例并已配置 RecordInterceptorBatchInterceptor 之后,也可以向其添加额外的 RecordInterceptorBatchInterceptor 实例。以下示例显示了如何实现:

public void configureRecordInterceptor(AbstractKafkaListenerContainerFactory<Integer, String> containerFactory) {
    CompositeRecordInterceptor compositeInterceptor;

    RecordInterceptor<Integer, String> previousInterceptor = containerFactory.getRecordInterceptor();
    if (previousInterceptor instanceof CompositeRecordInterceptor interceptor) {
        compositeInterceptor = interceptor;
    } else {
        compositeInterceptor = new CompositeRecordInterceptor<>();
        containerFactory.setRecordInterceptor(compositeInterceptor);
        if (previousInterceptor != null) {
            compositeInterceptor.addRecordInterceptor(previousInterceptor);
        }
    }

    RecordInterceptor<Integer, String> recordInterceptor1 = new RecordInterceptor() {...};
    RecordInterceptor<Integer, String> recordInterceptor2 = new RecordInterceptor() {...};

    compositeInterceptor.addRecordInterceptor(recordInterceptor1);
    compositeInterceptor.addRecordInterceptor(recordInterceptor2);
}

默认情况下,从版本 2.8 开始,当使用事务时,拦截器在事务开始之前被调用。你可以将监听器容器的 interceptBeforeTx 属性设置为 false,以便在事务开始后调用拦截器。从版本 2.9 开始,这将适用于任何事务管理器,而不仅仅是 KafkaAwareTransactionManager。这允许,例如,拦截器参与由容器启动的 JDBC 事务。

从版本 2.3.8、2.4.6 开始,当并发度大于 1 时,ConcurrentMessageListenerContainer 现在支持 静态成员资格group.instance.id 会以 -n 作为后缀,其中 n1 开始。这与增加的 session.timeout.ms 一起,可用于减少重新平衡事件,例如,当应用程序实例重新启动时。

使用 KafkaMessageListenerContainer

提供以下构造函数

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)

它在一个 ContainerProperties 对象中接收 ConsumerFactory 以及关于主题和分区的信息,以及其他配置。ContainerProperties 具有以下构造函数

public ContainerProperties(TopicPartitionOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)

第一个构造函数接受一个 TopicPartitionOffset 参数数组,以明确指示容器使用哪些分区(使用消费者 assign() 方法)以及可选的初始偏移量。正值默认是绝对偏移量。负值默认是相对于分区内当前最后一个偏移量。提供了一个接受附加 boolean 参数的 TopicPartitionOffset 构造函数。如果此参数为 true,则初始偏移量(正或负)相对于此消费者的当前位置。偏移量在容器启动时应用。第二个构造函数接受一个主题数组,Kafka 根据 group.id 属性分配分区 — 在组中分发分区。第三个构造函数使用正则表达式 Pattern 选择主题。

要将 MessageListener 分配给容器,可以在创建容器时使用 ContainerProps.setMessageListener 方法。以下示例展示了如何实现:

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

请注意,当创建 DefaultKafkaConsumerFactory 时,使用仅接受属性的构造函数(如上所示)意味着键和值 Deserializer 类是从配置中获取的。或者,可以将 Deserializer 实例传递给 DefaultKafkaConsumerFactory 构造函数用于键和/或值,在这种情况下,所有消费者共享相同的实例。另一个选项是提供 Supplier<Deserializer>(从版本 2.3 开始),它将用于为每个 Consumer 获取单独的 Deserializer 实例。

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

有关您可以设置的各种属性的更多信息,请参阅 ContainerPropertiesJavadoc

从版本 2.1.1 开始,提供了一个名为 logContainerConfig 的新属性。当 true 且启用 INFO 日志时,每个监听器容器都会写入一条日志消息,总结其配置属性。

默认情况下,主题偏移量提交的日志记录在 DEBUG 日志级别执行。从版本 2.1.2 开始,ContainerProperties 中的一个名为 commitLogLevel 的属性允许你指定这些消息的日志级别。例如,要将日志级别更改为 INFO,你可以使用 containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);

从版本 2.2 开始,添加了一个名为 missingTopicsFatal 的新容器属性(默认值:从 2.3.4 开始为 false)。如果任何配置的主题不存在于代理上,此属性将阻止容器启动。如果容器配置为侦听主题模式(正则表达式),则此属性不适用。以前,容器线程在 consumer.poll() 方法内循环,等待主题出现,同时记录大量消息。除了日志之外,没有迹象表明存在问题。

从版本 2.8 开始,引入了一个新的容器属性 authExceptionRetryInterval。这使得容器在从 KafkaConsumer 获取任何 AuthenticationExceptionAuthorizationException 后重试获取消息。这可能发生在,例如,配置的用户被拒绝访问读取某个主题或凭据不正确时。定义 authExceptionRetryInterval 允许容器在授予正确权限后恢复。

默认情况下,未配置间隔 - 身份验证和授权错误被视为致命错误,导致容器停止。

从版本 2.8 开始,当创建消费者工厂时,如果你提供反序列化器作为对象(在构造函数中或通过设置器),工厂将调用 configure() 方法来用配置属性配置它们。

使用 ConcurrentMessageListenerContainer

唯一的构造函数与 KafkaListenerContainer 构造函数类似。以下列表显示了构造函数的签名

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

它还有一个 concurrency 属性。例如,container.setConcurrency(3) 会创建三个 KafkaMessageListenerContainer 实例。

如果容器属性配置为主题(或主题模式),Kafka 会使用其组管理功能将分区分配给消费者。

当监听多个主题时,默认分区分配可能与你预期不符。例如,如果你有三个主题,每个主题有五个分区,并且你想使用 concurrency=15,你只会看到五个活跃的消费者,每个消费者分配每个主题的一个分区,而其他 10 个消费者处于空闲状态。这是因为默认的 Kafka ConsumerPartitionAssignorRangeAssignor(请参阅其 Javadoc)。对于这种情况,你可能需要考虑使用 RoundRobinAssignor,它将分区分配给所有消费者。然后,每个消费者被分配一个主题或分区。要更改 ConsumerPartitionAssignor,你可以在提供给 DefaultKafkaConsumerFactory 的属性中设置 partition.assignment.strategy 消费者属性(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)。

使用 Spring Boot 时,您可以按如下方式设置策略

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

当容器属性配置了 TopicPartitionOffset 时,ConcurrentMessageListenerContainer 会将 TopicPartitionOffset 实例分发给委托的 KafkaMessageListenerContainer 实例。

例如,如果提供了六个 TopicPartitionOffset 实例,并且 concurrency3;每个容器将获得两个分区。对于五个 TopicPartitionOffset 实例,两个容器获得两个分区,第三个获得一个。如果 concurrency 大于 TopicPartitions 的数量,则 concurrency 将向下调整,使得每个容器获得一个分区。

client.id 属性(如果设置)会附加 -n,其中 n 是与并发性对应的消费者实例。当启用 JMX 时,这是为 MBean 提供唯一名称所必需的。

从版本 1.3 开始,MessageListenerContainer 提供了对底层 KafkaConsumer 指标的访问。对于 ConcurrentMessageListenerContainermetrics() 方法返回所有目标 KafkaMessageListenerContainer 实例的指标。这些指标按底层 KafkaConsumer 提供的 client-id 分组到 Map<MetricName, ? extends Metric> 中。

从版本 2.3 开始,ContainerProperties 提供了 idleBetweenPolls 选项,允许监听器容器中的主循环在 KafkaConsumer.poll() 调用之间休眠。实际的休眠间隔是根据提供的选项以及 max.poll.interval.ms 消费者配置与当前记录批处理时间之间的差异的最小值来选择的。

提交偏移量

提供了几个提交偏移量的选项。如果 enable.auto.commit 消费者属性为 true,Kafka 会根据其配置自动提交偏移量。如果为 false,容器支持几种 AckMode 设置(在下一列表中描述)。默认的 AckModeBATCH。从版本 2.3 开始,除非在配置中明确设置,否则框架会将 enable.auto.commit 设置为 false。此前,如果未设置此属性,则使用 Kafka 默认值(true)。

消费者 poll() 方法返回一个或多个 ConsumerRecordsMessageListener 为每个记录调用。以下列表描述了容器对每个 AckMode(不使用事务时)采取的操作

  • RECORD:处理记录后,当监听器返回时提交偏移量。

  • BATCH:当 poll() 返回的所有记录都已处理完毕时,提交偏移量。

  • TIME:当 poll() 返回的所有记录都已处理完毕,并且自上次提交以来已超过 ackTime 时,提交偏移量。

  • COUNT:当 poll() 返回的所有记录都已处理完毕,并且自上次提交以来已接收到 ackCount 条记录时,提交偏移量。

  • COUNT_TIME:类似于 TIMECOUNT,但如果任一条件为 true,则执行提交。

  • MANUAL:消息监听器负责 acknowledge() Acknowledgment。之后,应用与 BATCH 相同的语义。

  • MANUAL_IMMEDIATE:当监听器调用 Acknowledgment.acknowledge() 方法时立即提交偏移量。

使用事务时,偏移量会发送到事务,其语义等同于 RECORDBATCH,具体取决于监听器类型(记录或批量)。

MANUALMANUAL_IMMEDIATE 要求监听器是 AcknowledgingMessageListenerBatchAcknowledgingMessageListener。请参阅 消息监听器

根据 syncCommits 容器属性,将使用消费者上的 commitSync()commitAsync() 方法。syncCommits 默认为 true;另请参阅 setSyncCommitTimeout。参阅 setCommitCallback 以获取异步提交的结果;默认的回调是 LoggingCommitCallback,它记录错误(并在调试级别记录成功)。

由于监听器容器有自己的提交偏移量机制,因此它更倾向于将 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 设置为 false。从版本 2.3 开始,除非在消费者工厂或容器的消费者属性覆盖中明确设置,否则它会无条件地将其设置为 false。

Acknowledgment 具有以下方法

public interface Acknowledgment {

    void acknowledge();

}

此方法允许监听器控制何时提交偏移量。

从版本 2.3 开始,Acknowledgment 接口新增了两个方法 nack(long sleep)nack(int index, long sleep)。第一个用于记录监听器,第二个用于批量监听器。为你的监听器类型调用错误的方法将抛出 IllegalStateException

如果要提交部分批次,请使用 nack()。当使用事务时,将 AckMode 设置为 MANUAL;调用 nack() 会将成功处理的记录的偏移量发送到事务。
nack() 只能在调用你的监听器的消费者线程上调用。
使用 乱序提交 时不允许使用 nack()

对于记录监听器,当调用 nack() 时,任何待处理的偏移量都会被提交,上次轮询中剩余的记录会被丢弃,并且会对它们的分区执行查找操作,以便失败的记录和未处理的记录在下一次 poll() 时重新投递。可以通过设置 sleep 参数来在重新投递之前暂停消费者。这与当容器配置了 DefaultErrorHandler 时抛出异常的功能类似。

nack() 会暂停整个监听器,包括所有分配的分区,持续指定的休眠时间。

当使用批量监听器时,您可以指定批处理中发生故障的索引。当调用 nack() 时,索引之前的记录的偏移量将被提交,并对失败和已丢弃记录的分区执行查找,以便它们将在下一次 poll() 时重新投递。

有关更多信息,请参阅容器错误处理器

消费者在休眠期间会暂停,以便我们继续轮询代理以保持消费者活跃。实际的休眠时间及其分辨率取决于容器的 pollTimeout,默认为 5 秒。最小休眠时间等于 pollTimeout,所有休眠时间都将是它的倍数。对于较小的休眠时间或为了提高其准确性,请考虑减少容器的 pollTimeout

从版本 3.0.10 开始,批处理监听器可以使用 Acknowledgment 参数上的 acknowledge(index) 提交批处理的部分偏移量。当调用此方法时,索引处的记录(以及所有先前的记录)的偏移量将被提交。在执行部分批处理提交后调用 acknowledge() 将提交批处理剩余部分的偏移量。以下限制适用

  • 需要 AckMode.MANUAL_IMMEDIATE

  • 该方法必须在监听器线程上调用

  • 监听器必须消费一个 List 而不是原始的 ConsumerRecords

  • 索引必须在列表元素的范围内

  • 索引必须大于之前调用中使用的索引

这些限制是强制执行的,该方法将根据违规情况抛出 IllegalArgumentExceptionIllegalStateException

监听器容器自动启动

监听器容器实现 SmartLifecycle,默认情况下 autoStartuptrue。容器在较晚的阶段(Integer.MAX-VALUE - 100)启动。其他实现 SmartLifecycle 的组件,为了处理来自监听器的数据,应该在较早的阶段启动。- 100 为后续阶段留出了空间,以便在容器之后启用组件自动启动。

© . This site is unofficial and not affiliated with VMware.