发送消息

本节介绍如何发送消息。

使用 KafkaTemplate

本节介绍如何使用 KafkaTemplate 发送消息。

概述

KafkaTemplate 封装了一个生产者,并提供了便捷方法将数据发送到 Kafka 主题。以下列表显示了 KafkaTemplate 中的相关方法

CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

// Flush the producer.
void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

interface OperationsCallback<K, V, T> {

    T doInOperations(KafkaOperations<K, V> operations);

}

有关更多详细信息,请参阅 Javadoc

sendDefault API 要求已向模板提供了默认主题。

该 API 将 timestamp 作为参数,并将此时间戳存储在记录中。用户提供的时间戳如何存储取决于 Kafka 主题上配置的时间戳类型。如果主题配置为使用 CREATE_TIME,则记录(如果未指定则生成)用户指定的时间戳。如果主题配置为使用 LOG_APPEND_TIME,则忽略用户指定的时间戳,代理将添加本地代理时间。

metricspartitionsFor 方法委托给底层 Producer 上的相同方法。execute 方法提供了对底层 Producer 的直接访问。

从 Spring 组件发送消息时,如果依赖于通过 NewTopic bean 自动创建主题,请避免使用 @PostConstruct 方法。@PostConstruct 在应用程序上下文完全就绪之前运行,这可能会导致干净的代理上出现 UnknownTopicOrPartitionException

相反,请考虑以下替代方案

  • 使用 ApplicationListener<ContextRefreshedEvent> 以确保上下文在发送之前完全刷新。

  • 实现 SmartLifecycle 以在 KafkaAdmin bean 完成其初始化后启动。

  • 外部预创建主题。

要使用模板,您可以配置一个生产者工厂,并在模板的构造函数中提供它。以下示例展示了如何实现

@Bean
public ProducerFactory producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map producerConfigs() {
    Map props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/41/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate kafkaTemplate() {
    return new KafkaTemplate(producerFactory());
}

从 2.5 版本开始,您现在可以覆盖工厂的 ProducerConfig 属性,以使用与同一工厂不同的生产者配置创建模板。

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
    return new KafkaTemplate<>(pf);
}

@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
    return new KafkaTemplate<>(pf,
            Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

请注意,类型为 ProducerFactory<?, ?> 的 bean(例如 Spring Boot 自动配置的 bean)可以引用为具有不同窄化泛型类型。

您还可以使用标准 <bean/> 定义配置模板。

然后,要使用模板,您可以调用其方法之一。

当您使用带有 Message<?> 参数的方法时,主题、分区、键和时间戳信息在消息头中提供,其中包含以下项

  • KafkaHeaders.TOPIC

  • KafkaHeaders.PARTITION

  • KafkaHeaders.KEY

  • KafkaHeaders.TIMESTAMP

消息有效负载是数据。

或者,您可以为 KafkaTemplate 配置一个 ProducerListener,以获取发送结果(成功或失败)的异步回调,而不是等待 Future 完成。以下列表显示了 ProducerListener 接口的定义

public interface ProducerListener<K, V> {

    default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
	}

    default void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
	}

}

默认情况下,模板配置了一个 LoggingProducerListener,它记录错误并在发送成功时不做任何操作。

为了方便起见,如果您只想实现其中一个方法,则提供了默认方法实现。

请注意,发送方法返回一个 CompletableFuture<SendResult>。您可以向侦听器注册回调以异步接收发送结果。以下示例展示了如何实现

CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
    ...
});

SendResult 有两个属性:ProducerRecordRecordMetadata。有关这些对象的信息,请参阅 Kafka API 文档。

Throwable 可以强制转换为 KafkaProducerException;其 producerRecord 属性包含失败的记录。

如果您希望阻塞发送线程以等待结果,可以调用 future 的 get() 方法;建议使用带有超时的版本。如果您已设置 linger.ms,您可能希望在等待之前调用 flush(),或者为了方便起见,模板有一个带有 autoFlush 参数的构造函数,该参数使模板在每次发送时调用 flush()。只有在设置了 linger.ms 生产者属性并希望立即发送部分批次时才需要刷新。

示例

本节展示了向 Kafka 发送消息的示例

示例 1. 非阻塞 (异步)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    CompletableFuture<SendResult<String, String>> future = template.send(record);
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            handleSuccess(data);
        }
        else {
            handleFailure(data, record, ex);
        }
    });
}
示例 2. 阻塞 (同步)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

请注意,ExecutionException 的原因是带有 producerRecord 属性的 KafkaProducerException

使用 RoutingKafkaTemplate

从 2.5 版本开始,您可以根据目标 topic 名称,在运行时使用 RoutingKafkaTemplate 选择生产者。

路由模板不支持事务、executeflushmetrics 操作,因为这些操作的主题是未知的。

模板需要一个从 java.util.regex.PatternProducerFactory<Object, Object> 实例的映射。此映射应该是有序的(例如 LinkedHashMap),因为它按顺序遍历;您应该在开头添加更具体的模式。

以下简单的 Spring Boot 应用程序提供了一个示例,说明如何使用同一模板发送到不同的主题,每个主题都使用不同的值序列化器。

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
            ProducerFactory<Object, Object> pf) {

        // Clone the PF with a different Serializer, register with Spring for shutdown
        Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
        context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);

        Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Pattern.compile("two"), bytesPF);
        map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
        return new RoutingKafkaTemplate(map);
    }

    @Bean
    public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
        return args -> {
            routingTemplate.send("one", "thing1");
            routingTemplate.send("two", "thing2".getBytes());
        };
    }

}

此示例对应的 @KafkaListener 显示在 注解属性 中。

要实现类似结果的另一种技术,但具有向同一主题发送不同类型的额外功能,请参阅 委派序列化器和反序列化器

使用 DefaultKafkaProducerFactory

使用 KafkaTemplate 中所述,ProducerFactory 用于创建生产者。

当不使用 事务 时,默认情况下,DefaultKafkaProducerFactory 会创建一个由所有客户端使用的单例生产者,如 KafkaProducer JavaDocs 中所建议。但是,如果您在模板上调用 flush(),这可能会导致使用同一生产者的其他线程出现延迟。从 2.3 版本开始,DefaultKafkaProducerFactory 有一个新属性 producerPerThread。当设置为 true 时,工厂将为每个线程创建(并缓存)一个单独的生产者,以避免此问题。

producerPerThreadtrue 时,当不再需要生产者时,用户代码必须在工厂上调用 closeThreadBoundProducer()。这将物理关闭生产者并将其从 ThreadLocal 中删除。调用 reset()destroy() 不会清理这些生产者。

创建 DefaultKafkaProducerFactory 时,可以通过调用只接受属性 Map 的构造函数(请参阅 使用 KafkaTemplate 中的示例)从配置中获取键和/或值 Serializer 类,或者可以将 Serializer 实例传递给 DefaultKafkaProducerFactory 构造函数(在这种情况下,所有 Producer 共享相同的实例)。或者,您可以提供 Supplier<Serializer>(从 2.3 版本开始),它们将用于为每个 Producer 获取单独的 Serializer 实例

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

从 2.5.10 版本开始,您现在可以在工厂创建后更新生产者属性。例如,如果您必须在凭据更改后更新 SSL 密钥/信任存储位置,这可能很有用。这些更改不会影响现有生产者实例;调用 reset() 关闭任何现有生产者,以便使用新属性创建新生产者。

您不能将事务性生产者工厂更改为非事务性,反之亦然。

现在提供了两个新方法

void updateConfigs(Map<String, Object> updates);

void removeConfig(String configKey);

从 2.8 版本开始,如果您将序列化器作为对象提供(在构造函数中或通过 setter),工厂将调用 configure() 方法以使用配置属性配置它们。

使用 ReplyingKafkaTemplate

2.1.3 版本引入了 KafkaTemplate 的子类以提供请求/回复语义。该类名为 ReplyingKafkaTemplate,并有两个额外的方法;以下显示了方法签名

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);

结果是一个 CompletableFuture,它被异步填充结果(或异常,如果超时)。结果还有一个 sendFuture 属性,它是调用 KafkaTemplate.send() 的结果。您可以使用此 future 来确定发送操作的结果。

如果使用第一个方法,或者 replyTimeout 参数为 null,则使用模板的 defaultReplyTimeout 属性(默认为 5 秒)。

从 2.8.8 版本开始,模板有一个新方法 waitForAssignment。如果回复容器配置了 auto.offset.reset=latest,这很有用,可以避免在容器初始化之前发送请求和回复。

当使用手动分区分配(无组管理)时,等待时间必须大于容器的 pollTimeout 属性,因为直到第一次轮询完成之后才会发送通知。

以下 Spring Boot 应用程序展示了如何使用此功能的示例

@SpringBootApplication
public class KRequestingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KRequestingApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            if (!template.waitForAssignment(Duration.ofSeconds(10))) {
                throw new IllegalStateException("Reply container did not initialize");
            }
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
            ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<String, String> repliesContainer) {

        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> repliesContainer =
                containerFactory.createContainer("kReplies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean
    public NewTopic kReplies() {
        return TopicBuilder.name("kReplies")
            .partitions(10)
            .replicas(2)
            .build();
    }

}

请注意,我们可以使用 Boot 的自动配置容器工厂来创建回复容器。

如果正在为回复使用非平凡的反序列化器,请考虑使用委托给您配置的反序列化器的 ErrorHandlingDeserializer。当如此配置时,RequestReplyFuture 将异常完成,您可以捕获 ExecutionException,其 cause 属性中包含 DeserializationException

从 2.6.7 版本开始,除了检测 DeserializationException 之外,如果提供了 replyErrorChecker 函数,模板将调用它。如果它返回异常,future 将异常完成。

这是一个例子

template.setReplyErrorChecker(record -> {
    Header error = record.headers().lastHeader("serverSentAnError");
    if (error != null) {
        return new MyException(new String(error.value()));
    }
    else {
        return null;
    }
});

...

RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
    future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
    ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
    ...
}
catch (InterruptedException e) {
    ...
}
catch (ExecutionException e) {
    if (e.getCause() instanceof MyException) {
        ...
    }
}
catch (TimeoutException e) {
    ...
}

模板设置一个标头(默认为 KafkaHeaders.CORRELATION_ID),服务器端必须将其回显。

在这种情况下,以下 @KafkaListener 应用程序会响应

@SpringBootApplication
public class KReplyingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KReplyingApplication.class, args);
    }

    @KafkaListener(id="server", topics = "kRequests")
    @SendTo // use default replyTo expression
    public String listen(String in) {
        System.out.println("Server received: " + in);
        return in.toUpperCase();
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean // not required if Jackson is on the classpath
    public MessagingMessageConverter simpleMapperConverter() {
        MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
        messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
        return messagingMessageConverter;
    }

}

@KafkaListener 基础设施回显关联 ID 并确定回复主题。

有关发送回复的更多信息,请参阅 使用 @SendTo 转发侦听器结果。模板使用默认标头 KafKaHeaders.REPLY_TOPIC 来指示回复要发送到的主题。

从 2.2 版本开始,模板尝试从配置的回复容器中检测回复主题或分区。如果容器配置为侦听单个主题或单个 TopicPartitionOffset,则将其用于设置回复标头。如果容器配置为其他方式,则用户必须设置回复标头。在这种情况下,在初始化期间会写入一条 INFO 日志消息。以下示例使用 KafkaHeaders.REPLY_TOPIC

record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));

当您配置单个回复 TopicPartitionOffset 时,只要每个实例侦听不同的分区,就可以为多个模板使用相同的回复主题。当配置单个回复主题时,每个实例必须使用不同的 group.id。在这种情况下,所有实例都接收每个回复,但只有发送请求的实例才能找到关联 ID。这对于自动伸缩可能很有用,但会增加额外的网络流量开销以及丢弃每个不需要的回复的少量成本。当您使用此设置时,我们建议您将模板的 sharedReplyTopic 设置为 true,这将把意外回复的日志级别从默认的 ERROR 降低到 DEBUG。

以下是配置回复容器以使用相同共享回复主题的示例

@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

    ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
    container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
    container.getContainerProperties().setKafkaConsumerProperties(props);
    return container;
}
如果您有多个客户端实例并且您没有按照前一段中的讨论进行配置,则每个实例都需要一个专用的回复主题。另一种方法是设置 KafkaHeaders.REPLY_PARTITION 并为每个实例使用专用分区。Header 包含一个四字节整数(大端序)。服务器必须使用此标头将回复路由到正确的分区(@KafkaListener 执行此操作)。在这种情况下,回复容器不能使用 Kafka 的组管理功能,并且必须配置为侦听固定分区(通过在其 ContainerProperties 构造函数中使用 TopicPartitionOffset)。
JsonKafkaHeaderMapper 需要 Jackson 在类路径上(用于 @KafkaListener)。如果不可用,消息转换器将没有标头映射器,因此您必须使用 SimpleKafkaHeaderMapper 配置 MessagingMessageConverter,如前所示。

默认情况下,使用 3 个标头

  • KafkaHeaders.CORRELATION_ID - 用于将回复与请求关联起来

  • KafkaHeaders.REPLY_TOPIC - 用于告诉服务器回复到哪里

  • KafkaHeaders.REPLY_PARTITION - (可选) 用于告诉服务器回复到哪个分区

这些标头名称由 @KafkaListener 基础设施用于路由回复。

从 2.3 版本开始,您可以自定义标头名称 - 模板有 3 个属性 correlationHeaderNamereplyTopicHeaderNamereplyPartitionHeaderName。如果您的服务器不是 Spring 应用程序(或不使用 @KafkaListener),这很有用。

相反,如果请求应用程序不是 Spring 应用程序并且将关联信息放在不同的标头中,则从 3.0 版本开始,您可以在侦听器容器工厂上配置自定义 correlationHeaderName,并且该标头将被回显。以前,侦听器必须回显自定义关联标头。

使用 Message<?> 进行请求/回复

2.7 版本向 ReplyingKafkaTemplate 添加了发送和接收 spring-messagingMessage<?> 抽象的方法

RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);

<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
        ParameterizedTypeReference<P> returnType);

这些将使用模板的默认 replyTimeout,还有重载版本可以在方法调用中接受超时。

如果消费者的 Deserializer 或模板的 MessageConverter 可以在没有任何额外信息的情况下转换有效负载(通过配置或回复消息中的类型元数据),则使用第一个方法。

如果您需要为返回类型提供类型信息以协助消息转换器,则使用第二个方法。这还允许同一个模板接收不同类型,即使回复中没有类型元数据,例如当服务器端不是 Spring 应用程序时。以下是后者的一个示例

模板 Bean
  • Java

  • Kotlin

@Bean
ReplyingKafkaTemplate<String, String, String> template(
        ProducerFactory<String, String> pf,
        ConcurrentKafkaListenerContainerFactory<String, String> factory) {

    ConcurrentMessageListenerContainer<String, String> replyContainer =
            factory.createContainer("replies");
    replyContainer.getContainerProperties().setGroupId("request.replies");
    ReplyingKafkaTemplate<String, String, String> template =
            new ReplyingKafkaTemplate<>(pf, replyContainer);
    template.setMessageConverter(new ByteArrayJacksonJsonMessageConverter());
    template.setDefaultTopic("requests");
    return template;
}
@Bean
fun template(
    pf: ProducerFactory<String, String>,
    factory: ConcurrentKafkaListenerContainerFactory<String, String>
): ReplyingKafkaTemplate<String, String, String> {
    val replyContainer = factory.createContainer("replies")
    replyContainer.containerProperties.setGroupId("request.replies")
    val template = ReplyingKafkaTemplate<String, String, String>(pf, replyContainer)
    template.messageConverter = ByteArrayJacksonJsonMessageConverter()
    template.setDefaultTopic("requests")
    return template
}
使用模板
  • Java

  • Kotlin

RequestReplyTypedMessageFuture<String, String, Thing> future1 =
        template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
                new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());

RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
        template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
                new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
val future1: RequestReplyTypedMessageFuture<String, String, Thing> =
    template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
        object : ParameterizedTypeReference<Thing>() {})
log.info(future1.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val thing = future1.get(10, TimeUnit.SECONDS).payload
log.info(thing.toString())

val future2: RequestReplyTypedMessageFuture<String, String, List<Thing>> =
    template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
        object : ParameterizedTypeReference<List<Thing>>() {})
log.info(future2.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2.get(10, TimeUnit.SECONDS).payload
things.forEach { thing1 -> log.info(thing1.toString()) }

回复类型 Message<?>

@KafkaListener 返回 Message<?> 时,在 2.5 版本之前,需要填充回复主题和关联 ID 标头。在此示例中,我们使用来自请求的回复主题标头

@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .build();
}

这也显示了如何为回复记录设置键。

从 2.5 版本开始,框架将检测这些标头是否缺失,并使用主题填充它们 - 无论是从 @SendTo 值确定的主题,还是传入的 KafkaHeaders.REPLY_TOPIC 标头(如果存在)。它还将回显传入的 KafkaHeaders.CORRELATION_IDKafkaHeaders.REPLY_PARTITION(如果存在)。

@KafkaListener(id = "requestor", topics = "request")
@SendTo  // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.KEY, 42)
            .build();
}

回复中的原始记录键

从 3.3 版本开始,来自传入请求的 Kafka 记录键(如果存在)将保留在回复记录中。这仅适用于单记录请求/回复场景。当侦听器是批量或返回类型是集合时,由应用程序决定通过将回复记录包装在 Message 类型中来指定要使用的键。

聚合多个回复

使用 ReplyingKafkaTemplate 中的模板严格用于单个请求/回复场景。对于单个消息的多个接收者返回回复的情况,您可以使用 AggregatingReplyingKafkaTemplate。这是 分散-聚合企业集成模式 客户端的实现。

ReplyingKafkaTemplate 类似,AggregatingReplyingKafkaTemplate 构造函数接受一个生产者工厂和一个侦听器容器来接收回复;它有第三个参数 BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy,每次收到回复时都会咨询它;当谓词返回 true 时,ConsumerRecord 的集合将用于完成 sendAndReceive 方法返回的 Future

还有一个额外的属性 returnPartialOnTimeout(默认为 false)。当此设置为 true 时,不会使用 KafkaReplyTimeoutException 完成 future,而是部分结果正常完成 future(只要至少收到一条回复记录)。

从 2.3.5 版本开始,谓词也会在超时后调用(如果 returnPartialOnTimeouttrue)。第一个参数是当前记录列表;第二个是如果此调用是由于超时,则为 true。谓词可以修改记录列表。

AggregatingReplyingKafkaTemplate<Integer, String, String> template =
        new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
                        coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
        template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
        future.get(30, TimeUnit.SECONDS);

请注意,返回类型是 ConsumerRecord,其值是 ConsumerRecord 的集合。这个“外部”ConsumerRecord 不是一个“真实”的记录,它是由模板合成的,作为请求收到的实际回复记录的持有者。当正常释放发生时(释放策略返回 true),主题设置为 aggregatedResults;如果 returnPartialOnTimeout 为 true 并且发生超时(并且至少收到一条回复记录),则主题设置为 partialResultsAfterTimeout。模板为这些“主题”名称提供了常量静态变量

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a normal release by the release strategy.
 */
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a timeout.
 */
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";

Collection 中真实的 ConsumerRecord 包含收到回复的实际主题。

回复的侦听器容器必须配置为 AckMode.MANUALAckMode.MANUAL_IMMEDIATE;消费者属性 enable.auto.commit 必须为 false(2.3 版本以来的默认值)。为了避免任何消息丢失的可能性,模板仅在没有未完成的请求时提交偏移量,即当最后一个未完成的请求被释放策略释放时。重新平衡后,可能会出现重复的回复传递;这些将被忽略任何正在进行的请求;当收到已释放回复的重复回复时,您可能会看到错误日志消息。
如果您将 ErrorHandlingDeserializer 与此聚合模板一起使用,框架将不会自动检测 DeserializationException。相反,记录(值为 null)将完整返回,并在标头中包含反序列化异常。建议应用程序调用实用方法 ReplyingKafkaTemplate.checkDeserialization() 方法来确定是否发生反序列化异常。有关更多信息,请参阅其 JavaDocs。replyErrorChecker 也不会为此聚合模板调用;您应该对回复的每个元素执行检查。
© . This site is unofficial and not affiliated with VMware.