消费批次
从 3.0 版本开始,当 spring.cloud.stream.bindings.<name>.consumer.batch-mode
设置为 true
时,所有由 Kafka Consumer
轮询接收的记录将作为 List<?>
提供给监听器方法。否则,该方法将每次调用一个记录。批次的大小由 Kafka 消费者属性 max.poll.records
、fetch.min.bytes
、fetch.max.wait.ms
控制;有关更多信息,请参阅 Kafka 文档。
接收批次时,允许以下类型签名
List<Person>
Message<List<Person>>
在 List<Person>
的第一个选项中,监听器不会收到任何消息头。如果使用第二个类型签名 (Message<List<Person>>
),则可以访问消息头;但是,所有消息头仍然以 Collection
的形式存在。让我们看下面的例子。
假设 Message
包含一个包含十个 Person
对象的列表。Message
的 MessageHeaders
包含一个消息头映射,键为消息头名称,值为一个列表。此列表包含该消息头的消息头值,顺序与有效负载列表相同。因此,应用程序需要根据有效负载列表的迭代,从 MessageHeaders
映射中正确访问消息头。
请注意,在批量模式下消费时,不允许使用 List<Message<Person>>
形式的类型签名。
从 4.0.2
版本开始,绑定器在批量模式下消费时支持 DLQ 功能。请记住,当在批量模式下的消费者绑定上使用 DLQ 时,从上次轮询接收到的所有记录都将被传递到 DLQ 主题。
在使用批量模式时,绑定器不支持重试,因此 maxAttempts 将被覆盖为 1。您可以配置一个 DefaultErrorHandler (使用 ListenerContainerCustomizer )来实现与绑定器中重试类似的功能。您也可以使用手动 AckMode 并调用 Ackowledgment.nack(index, sleep) 来提交部分批次的偏移量,并让剩余的记录重新传递。有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档。
|
在批量模式下接收 KafkaNull 对象时,接收到的列表将包含与 KafkaNull 对象相对应的空元素。这对于 List<Person> 和 Message<List<Person>> 样式的类型签名都是正确的。
|
在批量模式下消费时的可观察性
在批次消费记录时,不支持直接使用观察跟踪传播功能。这是因为 Kafka 绑定器使用的 Spring for Apache Kafka 库不支持批次监听器的跟踪;它只支持记录监听器。在批次监听器中,接收到的记录可能来自多个主题/分区,也可能来自多个生产者,其中添加跟踪信息是可选的。由于批次中的记录之间可能没有关联,框架无法对跟踪它们做出任何假设,例如将它们提供为单个跟踪 ID 等。如果使用 Message<List<String>>
的类型签名,则可以获取一个名为 kafka_batchConvertedHeaders
的标头,其中包含一个与有效负载条目数量相同的列表。此列表包含一个 Map
,其中包含跟踪标头。但是,应用程序需要自行迭代此列表并启动观察。