消费批次

从版本 3.0 开始,当 spring.cloud.stream.bindings.<name>.consumer.batch-mode 设置为 true 时,通过轮询 Kafka Consumer 接收到的所有记录将以 List<?> 的形式呈现给监听器方法。否则,该方法将一次处理一条记录。批次的大小由 Kafka 消费者属性 max.poll.recordsfetch.min.bytesfetch.max.wait.ms 控制;有关更多信息,请参阅 Kafka 文档。

接收批次时,允许使用以下类型签名

List<Person>
Message<List<Person>>

List<Person> 的第一个选项中,监听器将不会获取任何消息头部。如果使用第二个类型签名(Message<List<Person>>),则可以访问头部;但是,所有头部仍然是 Collection 的形式。让我们看下面的例子。

假设 Message 包含一个包含十个 Person 对象的列表。MessageMessageHeaders 包含一个头部映射,其中键是头部名称,值是列表。此列表包含该头部的值,顺序与载荷列表相同。因此,应用程序需要根据载荷列表的迭代,从 MessageHeaders 映射中正确访问头部。

请注意,在批处理模式下消费时,不允许使用 List<Message<Person>> 形式的类型签名。

从版本 4.0.2 开始,Binder 在批处理模式下消费时支持 DLQ 功能。请记住,当在批处理模式下消费绑定上使用 DLQ 时,从前一次轮询接收到的所有记录都将发送到 DLQ 主题。

在使用批处理模式时,Binder 内部不支持重试,因此 maxAttempts 将被覆盖为 1。您可以通过配置 DefaultErrorHandler(使用 ListenerContainerCustomizer)来实现类似于 Binder 中重试的功能。您还可以使用手动 AckMode 并调用 Ackowledgment.nack(index, sleep) 来提交部分批次的 offset 并重新投递剩余的记录。有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档
在批处理模式下接收 KafkaNull 对象时,接收到的列表中将包含对应于 KafkaNull 对象的 null 元素。这对于 List<Person>Message<List<Person>> 风格的类型签名都适用。

在批处理模式下消费时的可观测性

在批量消费记录时,不直接支持可观测性追踪传播功能。这是因为 Kafka Binder 使用的 Spring for Apache Kafka 库不支持对批量监听器进行追踪;它仅支持记录监听器。在批量监听器中,接收到的记录可能来自多个主题/分区,并且来自多个生产者,添加追踪信息是可选的。由于批次中的记录之间可能没有任何关联,框架无法对它们进行追踪做出任何假设,例如为它们提供一个单一的追踪 ID 等。如果您使用 Message<List<String>> 的类型签名,则可以获取一个名为 kafka_batchConvertedHeaders 的头部,它包含一个与您的载荷条目数量相同的列表。此列表包含一个包含追踪头部的 Map。但是,应用程序需要正确地迭代此列表并启动一个观测。