消息监听器容器
提供了两种 MessageListenerContainer 实现
-
KafkaMessageListenerContainer -
ConcurrentMessageListenerContainer
KafkaMessageListenerContainer 在单个线程上接收所有主题或分区的所有消息。ConcurrentMessageListenerContainer 委托给一个或多个 KafkaMessageListenerContainer 实例以提供多线程消费。
从版本 2.2.7 开始,您可以向监听器容器添加 RecordInterceptor;它将在调用监听器之前被调用,允许检查或修改记录。如果拦截器返回 null,则不会调用监听器。从版本 2.7 开始,它增加了在监听器退出(正常退出或抛出异常)后调用的额外方法。此外,从版本 2.7 开始,新增了 BatchInterceptor,为批量监听器提供类似的功能。另外,ConsumerAwareRecordInterceptor(和 BatchInterceptor)提供对 Consumer<?, ?> 的访问。例如,这可以用于在拦截器中访问消费者指标。
| 您不应该在这些拦截器中执行任何影响消费者位置和/或已提交偏移量的方法;容器需要管理这些信息。 |
| 如果拦截器修改了记录(通过创建一个新记录),主题、分区和偏移量必须保持不变,以避免意外的副作用,例如记录丢失。 |
CompositeRecordInterceptor 和 CompositeBatchInterceptor 可用于调用多个拦截器。
默认情况下,从版本 2.8 开始,使用事务时,拦截器在事务开始之前被调用。您可以将监听器容器的 interceptBeforeTx 属性设置为 false,以便在事务开始后调用拦截器。从版本 2.9 开始,这适用于任何事务管理器,而不仅仅是 KafkaAwareTransactionManager。例如,这使得拦截器能够参与由容器启动的 JDBC 事务。
从版本 2.3.8、2.4.6 开始,当并发度大于 1 时,ConcurrentMessageListenerContainer 现在支持静态成员资格(Static Membership)。group.instance.id 的后缀为 -n,其中 n 从 1 开始。这与增加的 session.timeout.ms 一起使用,可以减少重平衡事件,例如在应用实例重新启动时。
使用 KafkaMessageListenerContainer
提供了以下构造函数
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它接收一个 ConsumerFactory 以及主题和分区的信息,以及其他配置,这些都在 ContainerProperties 对象中。ContainerProperties 提供了以下构造函数
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
第一个构造函数接受一个 TopicPartitionOffset 参数数组,以明确指示容器使用哪些分区(使用 consumer 的 assign() 方法)以及可选的初始偏移量。正值默认是绝对偏移量。负值默认是相对于分区内当前最后一个偏移量的相对值。提供了 TopicPartitionOffset 的一个构造函数,它接受一个额外的 boolean 参数。如果此参数为 true,则初始偏移量(正或负)是相对于此消费者当前位置的相对值。偏移量在容器启动时应用。第二个接受一个主题数组,Kafka 根据 group.id 属性分配分区——在组内分布分区。第三个使用正则表达式 Pattern 选择主题。
要将 MessageListener 分配给容器,您可以在创建容器时使用 ContainerProps.setMessageListener 方法。以下示例展示了如何操作
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
请注意,创建 DefaultKafkaConsumerFactory 时,使用如上所示只接受属性的构造函数意味着键和值的 Deserializer 类是从配置中获取的。或者,可以将 Deserializer 实例传递给 DefaultKafkaConsumerFactory 的构造函数,用于键和/或值,在这种情况下,所有 Consumers 将共享相同的实例。另一种选择是提供 Supplier<Deserializer> (从版本 2.3 开始),用于为每个 Consumer 获取单独的 Deserializer 实例
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
有关 ContainerProperties 的各种属性的更多信息,请参阅其Javadoc。
从版本 2.1.1 开始,引入了一个名为 logContainerConfig 的新属性。当此属性为 true 且 INFO 级别日志启用时,每个监听器容器会写入一条日志消息,总结其配置属性。
默认情况下,主题偏移量提交的日志记录级别是 DEBUG。从版本 2.1.2 开始,ContainerProperties 中新增了一个名为 commitLogLevel 的属性,允许您指定这些消息的日志级别。例如,要将日志级别更改为 INFO,您可以使用 containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);。
从版本 2.2 开始,新增了一个名为 missingTopicsFatal 的容器属性(默认值:从 2.3.4 开始为 false)。如果配置的任何主题在 broker 上不存在,这将阻止容器启动。如果容器配置为监听主题模式(正则表达式),则此属性不适用。之前,容器线程会在 consumer.poll() 方法内循环等待主题出现,同时记录大量消息。除了日志之外,没有任何迹象表明存在问题。
从版本 2.8 开始,引入了一个新的容器属性 authExceptionRetryInterval。当从 KafkaConsumer 获取到任何 AuthenticationException 或 AuthorizationException 时,这会导致容器重试获取消息。例如,当配置的用户被拒绝读取某个主题或凭据不正确时,可能会发生这种情况。定义 authExceptionRetryInterval 允许容器在授予适当权限后恢复。
| 默认情况下,未配置间隔 - 认证和授权错误被视为致命错误,会导致容器停止。 |
从版本 2.8 开始,创建消费者工厂时,如果您以对象形式(在构造函数中或通过 setter)提供反序列化器,工厂将调用 configure() 方法,并使用配置属性对其进行配置。
使用 ConcurrentMessageListenerContainer
其唯一的构造函数与 KafkaListenerContainer 的构造函数类似。以下清单显示了构造函数的签名
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它还有一个 concurrency 属性。例如,container.setConcurrency(3) 会创建三个 KafkaMessageListenerContainer 实例。
如果容器属性配置为主题(或主题模式),Kafka 会使用其组管理功能在消费者之间分配分区。
|
监听多个主题时,默认的分区分配可能与您的预期不同。例如,如果您有三个主题,每个主题有五个分区,并且您想使用 使用 Spring Boot 时,您可以按如下方式设置策略
|
当容器属性配置了 TopicPartitionOffsets 时,ConcurrentMessageListenerContainer 会将 TopicPartitionOffset 实例分布到委托的 KafkaMessageListenerContainer 实例上。
例如,如果提供了六个 TopicPartitionOffset 实例,且 concurrency 为 3;每个容器将获得两个分区。对于五个 TopicPartitionOffset 实例,两个容器将获得两个分区,第三个获得一个。如果 concurrency 大于 TopicPartitions 的数量,则 concurrency 会向下调整,使得每个容器获得一个分区。
client.id 属性(如果设置)会附加 -n,其中 n 对应于并发的消费者实例。当启用 JMX 时,这对于为 MBeans 提供唯一的名称是必需的。 |
从版本 1.3 开始,MessageListenerContainer 提供了访问底层 KafkaConsumer 指标的功能。对于 ConcurrentMessageListenerContainer,metrics() 方法返回所有目标 KafkaMessageListenerContainer 实例的指标。这些指标按底层 KafkaConsumer 提供的 client-id 分组到 Map<MetricName, ? extends Metric> 中。
从版本 2.3 开始,ContainerProperties 提供了一个 idleBetweenPolls 选项,允许监听器容器中的主循环在 KafkaConsumer.poll() 调用之间休眠。实际的休眠间隔是提供的选项以及 max.poll.interval.ms 消费者配置与当前记录批处理时间差的最小值。
提交偏移量
提供了几种提交偏移量的选项。如果 enable.auto.commit 消费者属性为 true,Kafka 会根据其配置自动提交偏移量。如果为 false,容器支持多种 AckMode 设置(在以下列表中描述)。默认的 AckMode 是 BATCH。从版本 2.3 开始,除非在配置中明确设置,否则框架会将 enable.auto.commit 设置为 false。以前,如果未设置此属性,则使用 Kafka 的默认值(true)。
消费者 poll() 方法返回一个或多个 ConsumerRecords。MessageListener 会为每条记录调用一次。以下列表描述了容器在每种 AckMode 下采取的操作(不使用事务时)
-
RECORD:在监听器处理完记录返回后提交偏移量。 -
BATCH:在处理完poll()返回的所有记录后提交偏移量。 -
TIME:在处理完poll()返回的所有记录后提交偏移量,前提是自上次提交以来超过了ackTime。 -
COUNT:在处理完poll()返回的所有记录后提交偏移量,前提是自上次提交以来收到了ackCount条记录。 -
COUNT_TIME:类似于TIME和COUNT,但只要任一条件为true,就会执行提交。 -
MANUAL:消息监听器负责acknowledge()Acknowledgment。 -
MANUAL_IMMEDIATE:当监听器调用Acknowledgment.acknowledge()方法时立即提交偏移量。
使用事务时,偏移量会发送到事务中,其语义等同于 RECORD 或 BATCH,具体取决于监听器类型(记录或批量)。
MANUAL 和 MANUAL_IMMEDIATE 要求监听器是 AcknowledgingMessageListener 或 BatchAcknowledgingMessageListener。参见消息监听器。 |
根据 syncCommits 容器属性,使用消费者上的 commitSync() 或 commitAsync() 方法。syncCommits 默认值为 true;另请参阅 setSyncCommitTimeout。参见 setCommitCallback 以获取异步提交的结果;默认的回调是 LoggingCommitCallback,它记录错误(并在 debug 级别记录成功)。
由于监听器容器有自己的提交偏移量机制,它倾向于将 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 设置为 false。从版本 2.3 开始,除非在消费者工厂或容器的消费者属性覆盖中明确设置,否则它会无条件地将其设置为 false。
Acknowledgment 提供了以下方法
public interface Acknowledgment {
void acknowledge();
}
此方法允许监听器控制何时提交偏移量。
从版本 2.3 开始,Acknowledgment 接口增加了两个额外的方法 nack(long sleep) 和 nack(int index, long sleep)。第一个用于记录监听器,第二个用于批量监听器。为您的监听器类型调用错误的方法将抛出 IllegalStateException。
如果您想提交部分批次,使用 nack(),在使用事务时,将 AckMode 设置为 MANUAL;调用 nack() 会将成功处理的记录的偏移量发送到事务。 |
nack() 只能在调用您的监听器的消费者线程上调用。 |
使用乱序提交(Out of Order Commits)时不允许使用 nack()。 |
对于记录监听器,调用 nack() 时,任何待提交的偏移量都会被提交,上次 poll() 返回的剩余记录会被丢弃,并对其分区执行 seek 操作,以便在下一次 poll() 时重新投递失败和未处理的记录。通过设置 sleep 参数,可以在重新投递之前暂停消费者。此功能类似于当容器配置了 DefaultErrorHandler 时抛出异常。
nack() 会暂停整个监听器及其所有已分配的分区,暂停时长为指定的 sleep 持续时间。 |
使用批量监听器时,您可以指定批次内发生失败的记录索引。调用 nack() 时,会提交索引之前记录的偏移量,并对失败和丢弃的记录所在的分区执行 seek 操作,以便在下一次 poll() 时重新投递它们。
有关更多信息,请参阅容器错误处理器。
消费者在休眠期间会暂停,以便我们继续轮询 broker 以保持消费者活跃。实际的休眠时间及其精度取决于容器的 pollTimeout,默认为 5 秒。最小休眠时间等于 pollTimeout,并且所有休眠时间都将是它的倍数。对于较短的休眠时间或为了提高精度,可以考虑减少容器的 pollTimeout。 |
从版本 3.0.10 开始,批量监听器可以使用 Acknowledgment 参数上的 acknowledge(index) 方法来提交批次中部分记录的偏移量。调用此方法时,索引处记录(以及之前所有记录)的偏移量将被提交。在执行部分批次提交后调用 acknowledge() 将提交批次剩余记录的偏移量。以下限制适用
-
需要
AckMode.MANUAL_IMMEDIATE -
方法必须在监听器线程上调用
-
监听器必须消费
List而不是原始的ConsumerRecords -
索引必须在列表元素的范围内
-
索引必须大于之前调用中使用的索引
这些限制将被强制执行,根据违规情况,方法将抛出 IllegalArgumentException 或 IllegalStateException。