消息监听器容器

提供了两种 MessageListenerContainer 实现

  • KafkaMessageListenerContainer

  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainer 在单个线程上接收所有主题或分区的所有消息。ConcurrentMessageListenerContainer 委托给一个或多个 KafkaMessageListenerContainer 实例以提供多线程消费。

从版本 2.2.7 开始,您可以向监听器容器添加 RecordInterceptor;它将在调用监听器之前被调用,允许检查或修改记录。如果拦截器返回 null,则不会调用监听器。从版本 2.7 开始,它增加了在监听器退出(正常退出或抛出异常)后调用的额外方法。此外,从版本 2.7 开始,新增了 BatchInterceptor,为批量监听器提供类似的功能。另外,ConsumerAwareRecordInterceptor(和 BatchInterceptor)提供对 Consumer<?, ?> 的访问。例如,这可以用于在拦截器中访问消费者指标。

您不应该在这些拦截器中执行任何影响消费者位置和/或已提交偏移量的方法;容器需要管理这些信息。
如果拦截器修改了记录(通过创建一个新记录),主题、分区和偏移量必须保持不变,以避免意外的副作用,例如记录丢失。

CompositeRecordInterceptorCompositeBatchInterceptor 可用于调用多个拦截器。

默认情况下,从版本 2.8 开始,使用事务时,拦截器在事务开始之前被调用。您可以将监听器容器的 interceptBeforeTx 属性设置为 false,以便在事务开始后调用拦截器。从版本 2.9 开始,这适用于任何事务管理器,而不仅仅是 KafkaAwareTransactionManager。例如,这使得拦截器能够参与由容器启动的 JDBC 事务。

从版本 2.3.8、2.4.6 开始,当并发度大于 1 时,ConcurrentMessageListenerContainer 现在支持静态成员资格(Static Membership)group.instance.id 的后缀为 -n,其中 n1 开始。这与增加的 session.timeout.ms 一起使用,可以减少重平衡事件,例如在应用实例重新启动时。

使用 KafkaMessageListenerContainer

提供了以下构造函数

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)

它接收一个 ConsumerFactory 以及主题和分区的信息,以及其他配置,这些都在 ContainerProperties 对象中。ContainerProperties 提供了以下构造函数

public ContainerProperties(TopicPartitionOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)

第一个构造函数接受一个 TopicPartitionOffset 参数数组,以明确指示容器使用哪些分区(使用 consumer 的 assign() 方法)以及可选的初始偏移量。正值默认是绝对偏移量。负值默认是相对于分区内当前最后一个偏移量的相对值。提供了 TopicPartitionOffset 的一个构造函数,它接受一个额外的 boolean 参数。如果此参数为 true,则初始偏移量(正或负)是相对于此消费者当前位置的相对值。偏移量在容器启动时应用。第二个接受一个主题数组,Kafka 根据 group.id 属性分配分区——在组内分布分区。第三个使用正则表达式 Pattern 选择主题。

要将 MessageListener 分配给容器,您可以在创建容器时使用 ContainerProps.setMessageListener 方法。以下示例展示了如何操作

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

请注意,创建 DefaultKafkaConsumerFactory 时,使用如上所示只接受属性的构造函数意味着键和值的 Deserializer 类是从配置中获取的。或者,可以将 Deserializer 实例传递给 DefaultKafkaConsumerFactory 的构造函数,用于键和/或值,在这种情况下,所有 Consumers 将共享相同的实例。另一种选择是提供 Supplier<Deserializer> (从版本 2.3 开始),用于为每个 Consumer 获取单独的 Deserializer 实例

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

有关 ContainerProperties 的各种属性的更多信息,请参阅其Javadoc

从版本 2.1.1 开始,引入了一个名为 logContainerConfig 的新属性。当此属性为 trueINFO 级别日志启用时,每个监听器容器会写入一条日志消息,总结其配置属性。

默认情况下,主题偏移量提交的日志记录级别是 DEBUG。从版本 2.1.2 开始,ContainerProperties 中新增了一个名为 commitLogLevel 的属性,允许您指定这些消息的日志级别。例如,要将日志级别更改为 INFO,您可以使用 containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);

从版本 2.2 开始,新增了一个名为 missingTopicsFatal 的容器属性(默认值:从 2.3.4 开始为 false)。如果配置的任何主题在 broker 上不存在,这将阻止容器启动。如果容器配置为监听主题模式(正则表达式),则此属性不适用。之前,容器线程会在 consumer.poll() 方法内循环等待主题出现,同时记录大量消息。除了日志之外,没有任何迹象表明存在问题。

从版本 2.8 开始,引入了一个新的容器属性 authExceptionRetryInterval。当从 KafkaConsumer 获取到任何 AuthenticationExceptionAuthorizationException 时,这会导致容器重试获取消息。例如,当配置的用户被拒绝读取某个主题或凭据不正确时,可能会发生这种情况。定义 authExceptionRetryInterval 允许容器在授予适当权限后恢复。

默认情况下,未配置间隔 - 认证和授权错误被视为致命错误,会导致容器停止。

从版本 2.8 开始,创建消费者工厂时,如果您以对象形式(在构造函数中或通过 setter)提供反序列化器,工厂将调用 configure() 方法,并使用配置属性对其进行配置。

使用 ConcurrentMessageListenerContainer

其唯一的构造函数与 KafkaListenerContainer 的构造函数类似。以下清单显示了构造函数的签名

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

它还有一个 concurrency 属性。例如,container.setConcurrency(3) 会创建三个 KafkaMessageListenerContainer 实例。

如果容器属性配置为主题(或主题模式),Kafka 会使用其组管理功能在消费者之间分配分区。

监听多个主题时,默认的分区分配可能与您的预期不同。例如,如果您有三个主题,每个主题有五个分区,并且您想使用 concurrency=15,您会看到只有五个活动消费者,每个消费者被分配到每个主题的一个分区,而其他 10 个消费者处于空闲状态。这是因为默认的 Kafka ConsumerPartitionAssignorRangeAssignor(参见其 Javadoc)。对于这种情况,您可能需要考虑使用 RoundRobinAssignor,它会将分区分布到所有消费者上。然后,每个消费者被分配到一个主题或分区。要更改 ConsumerPartitionAssignor,您可以在提供给 DefaultKafkaConsumerFactory 的属性中设置 partition.assignment.strategy 消费者属性(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)。

使用 Spring Boot 时,您可以按如下方式设置策略

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

当容器属性配置了 TopicPartitionOffsets 时,ConcurrentMessageListenerContainer 会将 TopicPartitionOffset 实例分布到委托的 KafkaMessageListenerContainer 实例上。

例如,如果提供了六个 TopicPartitionOffset 实例,且 concurrency3;每个容器将获得两个分区。对于五个 TopicPartitionOffset 实例,两个容器将获得两个分区,第三个获得一个。如果 concurrency 大于 TopicPartitions 的数量,则 concurrency 会向下调整,使得每个容器获得一个分区。

client.id 属性(如果设置)会附加 -n,其中 n 对应于并发的消费者实例。当启用 JMX 时,这对于为 MBeans 提供唯一的名称是必需的。

从版本 1.3 开始,MessageListenerContainer 提供了访问底层 KafkaConsumer 指标的功能。对于 ConcurrentMessageListenerContainermetrics() 方法返回所有目标 KafkaMessageListenerContainer 实例的指标。这些指标按底层 KafkaConsumer 提供的 client-id 分组到 Map<MetricName, ? extends Metric> 中。

从版本 2.3 开始,ContainerProperties 提供了一个 idleBetweenPolls 选项,允许监听器容器中的主循环在 KafkaConsumer.poll() 调用之间休眠。实际的休眠间隔是提供的选项以及 max.poll.interval.ms 消费者配置与当前记录批处理时间差的最小值。

提交偏移量

提供了几种提交偏移量的选项。如果 enable.auto.commit 消费者属性为 true,Kafka 会根据其配置自动提交偏移量。如果为 false,容器支持多种 AckMode 设置(在以下列表中描述)。默认的 AckModeBATCH。从版本 2.3 开始,除非在配置中明确设置,否则框架会将 enable.auto.commit 设置为 false。以前,如果未设置此属性,则使用 Kafka 的默认值(true)。

消费者 poll() 方法返回一个或多个 ConsumerRecordsMessageListener 会为每条记录调用一次。以下列表描述了容器在每种 AckMode 下采取的操作(不使用事务时)

  • RECORD:在监听器处理完记录返回后提交偏移量。

  • BATCH:在处理完 poll() 返回的所有记录后提交偏移量。

  • TIME:在处理完 poll() 返回的所有记录后提交偏移量,前提是自上次提交以来超过了 ackTime

  • COUNT:在处理完 poll() 返回的所有记录后提交偏移量,前提是自上次提交以来收到了 ackCount 条记录。

  • COUNT_TIME:类似于 TIMECOUNT,但只要任一条件为 true,就会执行提交。

  • MANUAL:消息监听器负责 acknowledge() Acknowledgment

  • MANUAL_IMMEDIATE:当监听器调用 Acknowledgment.acknowledge() 方法时立即提交偏移量。

使用事务时,偏移量会发送到事务中,其语义等同于 RECORDBATCH,具体取决于监听器类型(记录或批量)。

MANUALMANUAL_IMMEDIATE 要求监听器是 AcknowledgingMessageListenerBatchAcknowledgingMessageListener。参见消息监听器

根据 syncCommits 容器属性,使用消费者上的 commitSync()commitAsync() 方法。syncCommits 默认值为 true;另请参阅 setSyncCommitTimeout。参见 setCommitCallback 以获取异步提交的结果;默认的回调是 LoggingCommitCallback,它记录错误(并在 debug 级别记录成功)。

由于监听器容器有自己的提交偏移量机制,它倾向于将 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 设置为 false。从版本 2.3 开始,除非在消费者工厂或容器的消费者属性覆盖中明确设置,否则它会无条件地将其设置为 false。

Acknowledgment 提供了以下方法

public interface Acknowledgment {

    void acknowledge();

}

此方法允许监听器控制何时提交偏移量。

从版本 2.3 开始,Acknowledgment 接口增加了两个额外的方法 nack(long sleep)nack(int index, long sleep)。第一个用于记录监听器,第二个用于批量监听器。为您的监听器类型调用错误的方法将抛出 IllegalStateException

如果您想提交部分批次,使用 nack(),在使用事务时,将 AckMode 设置为 MANUAL;调用 nack() 会将成功处理的记录的偏移量发送到事务。
nack() 只能在调用您的监听器的消费者线程上调用。
使用乱序提交(Out of Order Commits)时不允许使用 nack()

对于记录监听器,调用 nack() 时,任何待提交的偏移量都会被提交,上次 poll() 返回的剩余记录会被丢弃,并对其分区执行 seek 操作,以便在下一次 poll() 时重新投递失败和未处理的记录。通过设置 sleep 参数,可以在重新投递之前暂停消费者。此功能类似于当容器配置了 DefaultErrorHandler 时抛出异常。

nack() 会暂停整个监听器及其所有已分配的分区,暂停时长为指定的 sleep 持续时间。

使用批量监听器时,您可以指定批次内发生失败的记录索引。调用 nack() 时,会提交索引之前记录的偏移量,并对失败和丢弃的记录所在的分区执行 seek 操作,以便在下一次 poll() 时重新投递它们。

有关更多信息,请参阅容器错误处理器

消费者在休眠期间会暂停,以便我们继续轮询 broker 以保持消费者活跃。实际的休眠时间及其精度取决于容器的 pollTimeout,默认为 5 秒。最小休眠时间等于 pollTimeout,并且所有休眠时间都将是它的倍数。对于较短的休眠时间或为了提高精度,可以考虑减少容器的 pollTimeout

从版本 3.0.10 开始,批量监听器可以使用 Acknowledgment 参数上的 acknowledge(index) 方法来提交批次中部分记录的偏移量。调用此方法时,索引处记录(以及之前所有记录)的偏移量将被提交。在执行部分批次提交后调用 acknowledge() 将提交批次剩余记录的偏移量。以下限制适用

  • 需要 AckMode.MANUAL_IMMEDIATE

  • 方法必须在监听器线程上调用

  • 监听器必须消费 List 而不是原始的 ConsumerRecords

  • 索引必须在列表元素的范围内

  • 索引必须大于之前调用中使用的索引

这些限制将被强制执行,根据违规情况,方法将抛出 IllegalArgumentExceptionIllegalStateException

监听器容器自动启动

监听器容器实现了 SmartLifecycle,并且 autoStartup 默认值为 true。容器在较晚的阶段启动(Integer.MAX-VALUE - 100)。其他实现了 SmartLifecycle 并处理来自监听器数据的组件应在更早的阶段启动。- 100 为后续阶段留出空间,以便在容器启动后自动启动组件。