使用 `@SendTo` 转发监听器结果
从 2.0 版本开始,如果您同时使用 `@SendTo` 注解标记 `@KafkaListener`,并且方法调用返回结果,则该结果将转发到 `@SendTo` 指定的主题。
`@SendTo` 值可以有几种形式
-
`@SendTo("someTopic")` 路由到字面值主题。
-
`@SendTo("#{someExpression}")` 路由到在应用上下文初始化期间评估表达式确定的主题。
-
`@SendTo("!{someExpression}")` 路由到在运行时评估表达式确定的主题。评估的 `#root` 对象有三个属性
-
`request`:入站的 `ConsumerRecord`(对于批量监听器,则是 `ConsumerRecords` 对象)。
-
`source`:从 `request` 转换而来的 `org.springframework.messaging.Message>`。
-
`result`:方法返回的结果。
-
-
`@SendTo`(无属性):这被视为 `!{source.headers['kafka_replyTopic']}`(从 2.1.3 版本开始)。
从 2.1.11 和 2.2.1 版本开始,属性占位符会在 `@SendTo` 值中被解析。
表达式评估的结果必须是表示主题名称的 `String`。以下示例展示了使用 `@SendTo` 的各种方式
@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
...
}
@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
...
}
@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {
@KafkaHandler
public String foo(String in) {
...
}
@KafkaHandler
@SendTo("!{'annotated25reply2'}")
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
为了支持 `@SendTo`,必须为监听器容器工厂提供一个 `KafkaTemplate`(在其 `replyTemplate` 属性中),用于发送回复。这应该是一个 `KafkaTemplate`,而不是用于客户端请求/回复处理的 `ReplyingKafkaTemplate`。使用 Spring Boot 时,它会自动将模板配置到工厂中;配置自己的工厂时,必须按照以下示例所示进行设置。 |
从 2.2 版本开始,您可以向监听器容器工厂添加一个 `ReplyHeadersConfigurer`。会咨询它以确定要在回复消息中设置哪些头。以下示例展示了如何添加 `ReplyHeadersConfigurer`
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
return factory;
}
如果需要,您还可以添加更多头。以下示例展示了如何操作
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {
@Override
public boolean shouldCopy(String headerName, Object headerValue) {
return false;
}
@Override
public Map<String, Object> additionalHeaders() {
return Collections.singletonMap("qux", "fiz");
}
});
return factory;
}
使用 `@SendTo` 时,您必须在 `ConcurrentKafkaListenerContainerFactory` 的 `replyTemplate` 属性中配置一个 `KafkaTemplate` 来执行发送。Spring Boot 会自动注入其自动配置的模板(或者如果存在单个实例,则注入该实例)。
除非您使用 请求/回复语义,否则只使用简单的 `send(topic, value)` 方法,因此您可能希望创建一个子类来生成分区或键。以下示例展示了如何操作 |
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
return new KafkaTemplate<String, String>(producerFactory()) {
@Override
public CompletableFuture<SendResult<String, String>> send(String topic, String data) {
return super.send(topic, partitionForData(data), keyForData(data), data);
}
...
};
}
如果监听器方法返回 `Message>` 或 `Collection
|
使用 请求/回复 语义时,目标分区可以由发送者请求。
即使没有返回结果,您也可以使用 `@SendTo` 注解标记 `@KafkaListener` 方法。这允许配置一个 `errorHandler`,它可以将关于失败消息投递的信息转发到某个主题。以下示例展示了如何操作
有关更多信息,请参阅 异常处理。 |
如果监听器方法返回 `Iterable`,默认情况下会为每个元素发送一条记录作为值。从 2.3.5 版本开始,将 `@KafkaListener` 的 `splitIterables` 属性设置为 `false`,整个结果将作为单个 `ProducerRecord` 的值发送。这需要在回复模板的生产者配置中配置合适的序列化器。但是,如果回复是 `Iterable |