@KafkaListener
注解
@KafkaListener
注解用于将 bean 方法指定为监听器容器的监听器。该 bean 会被包装在 MessagingMessageListenerAdapter
中,并配置各种特性,例如在必要时用于转换数据以匹配方法参数的转换器。
您可以使用 SpEL (#{…}
) 或属性占位符 (${…}
) 来配置注解上的大多数属性。有关更多信息,请参阅 Javadoc。
Record 监听器
@KafkaListener
注解为简单的 POJO 监听器提供了一种机制。以下示例展示了如何使用它
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
该机制要求您的一个 @Configuration
类上使用 @EnableKafka
注解,并且需要一个监听器容器工厂,该工厂用于配置底层 ConcurrentMessageListenerContainer
。默认情况下,需要一个名为 kafkaListenerContainerFactory
的 bean。以下示例展示了如何使用 ConcurrentMessageListenerContainer
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
...
return props;
}
}
请注意,要设置容器属性,必须使用工厂上的 getContainerProperties()
方法。它被用作注入到容器中的实际属性的模板。
从 2.1.1 版本开始,您现在可以为由该注解创建的消费者设置 client.id
属性。clientIdPrefix
会被加上后缀 -n
,其中 n
是一个整数,表示使用并发时的容器编号。
从 2.2 版本开始,您现在可以使用注解本身的属性覆盖容器工厂的 concurrency
和 autoStartup
属性。这些属性可以是简单值、属性占位符或 SpEL 表达式。以下示例展示了如何实现
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}
显式分区分配
您还可以为 POJO 监听器配置显式的主题和分区(以及可选的初始偏移量)。以下示例展示了如何实现
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
您可以在 partitions
或 partitionOffsets
属性中指定每个分区,但不能同时使用。
与大多数注解属性一样,您可以使用 SpEL 表达式;有关如何生成大量分区的示例,请参阅 手动分配所有分区。
从 2.5.5 版本开始,您可以将初始偏移量应用于所有分配的分区
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
*
通配符表示 partitions
属性中的所有分区。在每个 @TopicPartition
中只能有一个带有通配符的 @PartitionOffset
。
此外,当监听器实现 ConsumerSeekAware
时,即使使用手动分配,现在也会调用 onPartitionsAssigned
。例如,这允许在该时间点执行任何任意的 seek 操作。
从 2.6.4 版本开始,您可以指定一个逗号分隔的分区列表,或者分区范围
@KafkaListener(id = "pp", autoStartup = "false",
topicPartitions = @TopicPartition(topic = "topic1",
partitions = "0-5, 7, 10-15"))
public void process(String in) {
...
}
该范围是包含性的;上面的示例将分配分区 0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15
。
在指定初始偏移量时可以使用相同的技术
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
初始偏移量将应用于所有 6 个分区。
自 3.2 版本以来,@PartitionOffset
支持 SeekPosition.END
、SeekPosition.BEGINNING
、SeekPosition.TIMESTAMP
,seekPosition
匹配 SeekPosition
枚举名称
@KafkaListener(id = "seekPositionTime", topicPartitions = {
@TopicPartition(topic = TOPIC_SEEK_POSITION, partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "723916800000", seekPosition = "TIMESTAMP"),
@PartitionOffset(partition = "1", initialOffset = "0", seekPosition = "BEGINNING"),
@PartitionOffset(partition = "2", initialOffset = "0", seekPosition = "END")
})
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
如果 seekPosition
设置为 END
或 BEGINNING
,将忽略 initialOffset
和 relativeToCurrent
。如果 seekPosition
设置为 TIMESTAMP
,initialOffset
表示时间戳。
手动确认
使用手动 AckMode
时,您还可以为监听器提供 Acknowledgment
。要激活手动 AckMode
,需要在 ContainerProperties
中将确认模式设置为适当的手动模式。以下示例还展示了如何使用不同的容器工厂。此自定义容器工厂必须通过调用 getContainerProperties()
然后调用其上的 setAckMode
将 AckMode
设置为手动类型。否则,Acknowledgment
对象将为 null。
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
消费者记录元数据
最后,关于记录的元数据可以从消息头中获取。您可以使用以下头名称来检索消息的头
-
KafkaHeaders.OFFSET
-
KafkaHeaders.RECEIVED_KEY
-
KafkaHeaders.RECEIVED_TOPIC
-
KafkaHeaders.RECEIVED_PARTITION
-
KafkaHeaders.RECEIVED_TIMESTAMP
-
KafkaHeaders.TIMESTAMP_TYPE
从 2.5 版本开始,如果入站记录的 key 为 null
,则 RECEIVED_KEY
将不会出现;之前该 header 会填充一个 null
值。此更改是为了使框架与 spring-messaging
的约定保持一致,即不存在值为 null
的 headers。
以下示例展示了如何使用这些 headers
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
参数注解(@Payload 、@Header )必须指定在监听器方法的具体实现上;如果定义在接口上,则不会被检测到。 |
从 2.5 版本开始,除了使用离散的 headers 外,您还可以在 ConsumerRecordMetadata
参数中接收记录元数据。
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
...
}
这包含 ConsumerRecord
中除 key 和 value 外的所有数据。
批量监听器
从 1.1 版本开始,您可以配置 @KafkaListener
方法来接收从消费者 poll 中获取的整个批次的消费者记录。
批量监听器不支持非阻塞重试。 |
要配置监听器容器工厂以创建批量监听器,您可以设置 batchListener
属性。以下示例展示了如何实现
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
从 2.8 版本开始,您可以使用 @KafkaListener 注解上的 batch 属性覆盖工厂的 batchListener 属性。结合 容器错误处理器 的变化,这允许同一个工厂用于 record 和 batch 监听器。 |
从 2.9.6 版本开始,容器工厂为 recordMessageConverter 和 batchMessageConverter 属性提供了单独的 setter 方法。以前,只有一个 messageConverter 属性,适用于 record 和 batch 监听器。 |
以下示例展示了如何接收负载列表
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
...
}
主题、分区、偏移量等信息在与负载并行的 headers 中可用。以下示例展示了如何使用这些 headers
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
或者,您可以接收 List
类型的 Message>
对象,其中每个消息包含其偏移量和其他详细信息,但这必须是方法上定义的唯一参数(除了可选的 Acknowledgment
(在使用手动提交时)和/或 Consumer, ?>
参数)。以下示例展示了如何实现
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen1(List<Message<?>> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen2(List<Message<?>> list, Acknowledgment ack) {
...
}
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen3(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
...
}
在这种情况下,不会对负载执行转换。
如果 BatchMessagingMessageConverter
配置了 RecordMessageConverter
,您还可以为 Message
参数添加泛型类型,并且负载会被转换。有关更多信息,请参阅 批量监听器的负载转换。
您还可以接收 ConsumerRecord, ?>
对象的列表,但这必须是方法上定义的唯一参数(除了可选的 Acknowledgment
(在使用手动提交时)和 Consumer, ?>
参数)。以下示例展示了如何实现
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
...
}
从 2.2 版本开始,监听器可以接收由 poll()
方法返回的完整 ConsumerRecords, ?>
对象,使监听器能够访问更多方法,例如 partitions()
(返回列表中的 TopicPartition
实例)和 records(TopicPartition)
(获取选择性记录)。同样,这必须是方法上的唯一参数(除了可选的 Acknowledgment
(在使用手动提交时)或 Consumer, ?>
参数)。以下示例展示了如何实现
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
...
}
如果容器工厂配置了 RecordFilterStrategy ,对于 ConsumerRecords, ?> 监听器,它会被忽略,并发出 WARN 日志消息。只有在使用 List> 形式的监听器时,才能通过批量监听器过滤记录。默认情况下,记录是逐个过滤的;从 2.8 版本开始,您可以覆盖 filterBatch 以一次调用过滤整个批次。 |
注解属性
从 2.0 版本开始,id
属性(如果存在)将用作 Kafka 消费者的 group.id
属性,覆盖消费者工厂中配置的同名属性(如果存在)。您也可以显式设置 groupId
或将 idIsGroup
设置为 false,以恢复之前使用消费者工厂 group.id
的行为。
您可以在大多数注解属性中使用属性占位符或 SpEL 表达式,如下例所示
@KafkaListener(topics = "${some.property}")
@KafkaListener(topics = "#{someBean.someProperty}",
groupId = "#{someBean.someProperty}.group")
从 2.1.2 版本开始,SpEL 表达式支持一个特殊 token:__listener
。这是一个伪 bean 名称,表示此注解所在的当前 bean 实例。
考虑以下示例
@Bean
public Listener listener1() {
return new Listener("topic1");
}
@Bean
public Listener listener2() {
return new Listener("topic2");
}
给定前一个示例中的 bean,我们可以使用以下方式
public class Listener {
private final String topic;
public Listener(String topic) {
this.topic = topic;
}
@KafkaListener(topics = "#{__listener.topic}",
groupId = "#{__listener.topic}.group")
public void listen(...) {
...
}
public String getTopic() {
return this.topic;
}
}
万一(尽管不太可能)您有一个实际名为 __listener
的 bean,您可以使用 beanRef
属性更改表达式 token。以下示例展示了如何实现
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group")
从 2.2.4 版本开始,您可以直接在注解上指定 Kafka 消费者属性,这些属性将覆盖消费者工厂中配置的同名属性。您不能通过这种方式指定 group.id
和 client.id
属性;它们将被忽略;请使用 groupId
和 clientIdPrefix
注解属性来指定它们。
这些属性指定为单独的字符串,采用标准的 Java Properties
文件格式:foo:bar
、foo=bar
或 foo bar
,如下例所示
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
以下是在 使用 RoutingKafkaTemplate
中的相应监听器示例。
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
System.out.println("1: " + in);
}
@KafkaListener(id = "two", topics = "two",
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
System.out.println("2: " + new String(in));
}