发送消息

本节介绍如何发送消息。

使用 KafkaTemplate

本节介绍如何使用 KafkaTemplate 发送消息。

概览

KafkaTemplate 封装了一个生产者,并提供了方便的方法将数据发送到 Kafka topic。以下列表展示了 KafkaTemplate 中的相关方法

CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

// Flush the producer.
void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

interface OperationsCallback<K, V, T> {

    T doInOperations(KafkaOperations<K, V> operations);

}

详情请参阅 Javadoc

sendDefault API 要求为模板提供了默认 topic。

该 API 接受 timestamp 参数,并将此时间戳存储在记录中。用户提供的时间戳的存储方式取决于 Kafka topic 上配置的时间戳类型。如果 topic 配置为使用 CREATE_TIME,则记录用户指定的时间戳(如果未指定则生成)。如果 topic 配置为使用 LOG_APPEND_TIME,则忽略用户指定的时间戳,代理会添加本地代理时间。

metricspartitionsFor 方法委托给底层 Producer 上的同名方法。execute 方法提供对底层 Producer 的直接访问。

要使用模板,您可以配置一个生产者工厂并在模板的构造函数中提供它。以下示例展示了如何这样做

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

从 2.5 版本开始,您现在可以覆盖工厂的 ProducerConfig 属性,以便使用同一工厂创建具有不同生产者配置的模板。

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
    return new KafkaTemplate<>(pf);
}

@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
    return new KafkaTemplate<>(pf,
            Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

请注意,类型为 ProducerFactory<?, ?> 的 bean(例如 Spring Boot 自动配置的 bean)可以引用不同的窄化泛型类型。

您还可以使用标准 <bean/> 定义来配置模板。

然后,要使用模板,您可以调用它的一个方法。

当您使用带有 Message<?> 参数的方法时,topic、分区、key 和时间戳信息会包含在消息头部中,具体包括以下项

  • KafkaHeaders.TOPIC

  • KafkaHeaders.PARTITION

  • KafkaHeaders.KEY

  • KafkaHeaders.TIMESTAMP

消息 payload 是数据。

(可选)您可以为 KafkaTemplate 配置一个 ProducerListener,以便获得发送结果(成功或失败)的异步回调,而不是等待 Future 完成。以下列表显示了 ProducerListener 接口的定义

public interface ProducerListener<K, V> {

    default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
	}

    default void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
	}

}

默认情况下,模板配置了一个 LoggingProducerListener,它会记录错误,并在发送成功时不做任何事情。

为了方便起见,提供了默认方法实现,以防您只想实现其中的一个方法。

请注意,发送方法返回一个 CompletableFuture<SendResult>。您可以向监听器注册回调,以异步接收发送结果。以下示例展示了如何这样做

CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
    ...
});

SendResult 有两个属性:ProducerRecordRecordMetadata。有关这些对象的信息,请参阅 Kafka API 文档。

Throwable 可以被转换为 KafkaProducerException;它的 producerRecord 属性包含失败的记录。

如果您希望阻塞发送线程以等待结果,可以调用 future 的 get() 方法;建议使用带有超时参数的方法。如果您设置了 linger.ms,您可能希望在等待之前调用 flush(),或者为了方便,模板有一个带有 autoFlush 参数的构造函数,这会使得模板在每次发送时都调用 flush()。只有在您设置了 linger.ms 生产者属性并且想立即发送部分批次时,才需要进行刷新。

示例

本节展示了向 Kafka 发送消息的示例

示例 1. 非阻塞 (异步)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    CompletableFuture<SendResult<String, String>> future = template.send(record);
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            handleSuccess(data);
        }
        else {
            handleFailure(data, record, ex);
        }
    });
}
阻塞 (同步)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

请注意,ExecutionException 的原因是一个带有 producerRecord 属性的 KafkaProducerException

使用 RoutingKafkaTemplate

从 2.5 版本开始,您可以使用 RoutingKafkaTemplate 在运行时根据目标 topic 名称选择生产者。

由于这些操作的 topic 未知,路由模板**不支持**事务、executeflushmetrics 操作。

该模板需要一个从 java.util.regex.PatternProducerFactory<Object, Object> 实例的映射。此映射应有序(例如 LinkedHashMap),因为它会按顺序遍历;您应该在开头添加更具体的模式。

以下是一个简单的 Spring Boot 应用程序,提供了一个示例,说明如何使用同一个模板向不同的 topic 发送消息,每个 topic 使用不同的值序列化器。

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
            ProducerFactory<Object, Object> pf) {

        // Clone the PF with a different Serializer, register with Spring for shutdown
        Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
        context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);

        Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Pattern.compile("two"), bytesPF);
        map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
        return new RoutingKafkaTemplate(map);
    }

    @Bean
    public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
        return args -> {
            routingTemplate.send("one", "thing1");
            routingTemplate.send("two", "thing2".getBytes());
        };
    }

}

此示例对应的 @KafkaListener 位于注解属性中。

有关实现类似结果的另一种技术,同时具有向同一 topic 发送不同类型的附加功能,请参阅委托序列化器和反序列化器

使用 DefaultKafkaProducerFactory

使用 KafkaTemplate 中所示,ProducerFactory 用于创建生产者。

在使用事务时,默认情况下,DefaultKafkaProducerFactory 会创建一个由所有客户端使用的单例生产者,如 KafkaProducer JavaDocs 中推荐的那样。但是,如果您在模板上调用 flush(),这可能会导致使用同一生产者的其他线程出现延迟。从 2.3 版本开始,DefaultKafkaProducerFactory 有一个新属性 producerPerThread。当设置为 true 时,工厂会为每个线程创建一个(并缓存)单独的生产者,以避免此问题。

producerPerThreadtrue 时,当不再需要生产者时,用户代码**必须**调用工厂上的 closeThreadBoundProducer()。这会物理关闭生产者并将其从 ThreadLocal 中移除。调用 reset()destroy() 不会清理这些生产者。

创建 DefaultKafkaProducerFactory 时,可以通过调用只接受属性 Map 的构造函数(参见使用 KafkaTemplate 中的示例)从配置中获取 key 和/或 value 的 Serializer 类,或者可以将 Serializer 实例传递给 DefaultKafkaProducerFactory 构造函数(在这种情况下,所有 Producer 共享相同的实例)。或者,您可以提供 Supplier<Serializer>(从 2.3 版本开始),用于为每个 Producer 获取单独的 Serializer 实例

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

从 2.5.10 版本开始,您现在可以在创建工厂后更新生产者属性。例如,如果您必须在凭证更改后更新 SSL key/trust 存储位置,这可能很有用。更改不会影响现有的生产者实例;调用 reset() 关闭任何现有生产者,以便使用新属性创建新生产者。

您不能将事务性生产者工厂更改为非事务性,反之亦然。

现在提供了两个新方法

void updateConfigs(Map<String, Object> updates);

void removeConfig(String configKey);

从 2.8 版本开始,如果您将序列化器作为对象提供(在构造函数中或通过 setter),工厂将调用 configure() 方法,使用配置属性配置它们。

使用 ReplyingKafkaTemplate

2.1.3 版本引入了 KafkaTemplate 的一个子类,用于提供请求/响应语义。该类名为 ReplyingKafkaTemplate,并有两个额外的方法;以下显示了方法签名

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);

结果是一个 CompletableFuture,它会异步填充结果(或超时异常)。结果还有一个 sendFuture 属性,这是调用 KafkaTemplate.send() 的结果。您可以使用此 future 来确定发送操作的结果。

如果使用第一个方法,或者 replyTimeout 参数为 null,则使用模板的 defaultReplyTimeout 属性(默认为 5 秒)。

从 2.8.8 版本开始,模板新增了一个方法 waitForAssignment。如果回复容器配置了 auto.offset.reset=latest,这非常有用,可以避免在容器初始化之前发送请求和收到回复。

在使用手动分区分配(无组管理)时,等待的持续时间必须大于容器的 pollTimeout 属性,因为通知只有在第一次 poll 完成后才会发送。

以下 Spring Boot 应用程序展示了如何使用该功能的一个示例

@SpringBootApplication
public class KRequestingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KRequestingApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            if (!template.waitForAssignment(Duration.ofSeconds(10))) {
                throw new IllegalStateException("Reply container did not initialize");
            }
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
            ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<String, String> repliesContainer) {

        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> repliesContainer =
                containerFactory.createContainer("kReplies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean
    public NewTopic kReplies() {
        return TopicBuilder.name("kReplies")
            .partitions(10)
            .replicas(2)
            .build();
    }

}

请注意,我们可以使用 Boot 的自动配置容器工厂来创建回复容器。

如果回复使用了非平凡的反序列化器,请考虑使用一个委托给您配置的反序列化器的ErrorHandlingDeserializer。这样配置后,RequestReplyFuture 将以异常方式完成,您可以捕获 ExecutionException,其 cause 属性中包含 DeserializationException

从 2.6.7 版本开始,除了检测 DeserializationExceptions 外,如果提供了 replyErrorChecker 函数,模板还会调用它。如果它返回一个异常,future 将以异常方式完成。

这是一个示例

template.setReplyErrorChecker(record -> {
    Header error = record.headers().lastHeader("serverSentAnError");
    if (error != null) {
        return new MyException(new String(error.value()));
    }
    else {
        return null;
    }
});

...

RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
    future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
    ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
    ...
}
catch (InterruptedException e) {
    ...
}
catch (ExecutionException e) {
    if (e.getCause() instanceof MyException) {
        ...
    }
}
catch (TimeoutException e) {
    ...
}

模板会设置一个头部(默认名为 KafkaHeaders.CORRELATION_ID),服务器端必须将其回显回来。

在这种情况下,以下 @KafkaListener 应用程序会响应

@SpringBootApplication
public class KReplyingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KReplyingApplication.class, args);
    }

    @KafkaListener(id="server", topics = "kRequests")
    @SendTo // use default replyTo expression
    public String listen(String in) {
        System.out.println("Server received: " + in);
        return in.toUpperCase();
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean // not required if Jackson is on the classpath
    public MessagingMessageConverter simpleMapperConverter() {
        MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
        messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
        return messagingMessageConverter;
    }

}

@KafkaListener 基础设施会回显关联 ID 并确定回复 topic。

有关发送回复的更多信息,请参阅使用 @SendTo 转发监听器结果。模板使用默认头部 KafKaHeaders.REPLY_TOPIC 来指示回复应发送到的 topic。

从 2.2 版本开始,模板会尝试从配置的回复容器中检测回复 topic 或分区。如果容器配置为监听单个 topic 或单个 TopicPartitionOffset,则使用它来设置回复头部。如果容器的配置不同,用户必须自行设置回复头部。在这种情况下,初始化期间会写入一条 INFO 日志消息。以下示例使用 KafkaHeaders.REPLY_TOPIC

record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));

当您配置单个回复 TopicPartitionOffset 时,只要每个实例监听不同的分区,就可以为多个模板使用相同的回复 topic。当配置单个回复 topic 时,每个实例必须使用不同的 group.id。在这种情况下,所有实例都会收到每个回复,但只有发送请求的实例会找到关联 ID。这可能对自动伸缩有用,但这会带来额外的网络流量开销以及丢弃每个不需要的回复的少量成本。当您使用此设置时,我们建议您将模板的 sharedReplyTopic 设置为 true,这将把意外回复的日志级别从默认的 ERROR 降低到 DEBUG。

以下是配置回复容器使用相同共享回复 topic 的示例

@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

    ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
    container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
    container.getContainerProperties().setKafkaConsumerProperties(props);
    return container;
}
如果您有多个客户端实例,并且没有按照前一段中讨论的方式配置它们,则每个实例需要一个专用的回复 topic。另一种方法是设置 KafkaHeaders.REPLY_PARTITION 并为每个实例使用专用的分区。Header 包含一个四字节的 int(大端)。服务器必须使用此头部将回复路由到正确的分区(@KafkaListener 会这样做)。但是,在这种情况下,回复容器不能使用 Kafka 的组管理功能,并且必须配置为监听一个固定分区(通过在其 ContainerProperties 构造函数中使用 TopicPartitionOffset)。
DefaultKafkaHeaderMapper 要求 Jackson 位于 classpath 中(供 @KafkaListener 使用)。如果不可用,消息转换器将没有头部映射器,因此您必须配置一个带有 SimpleKafkaHeaderMapperMessagingMessageConverter,如前面所示。

默认情况下,使用 3 个头部

  • KafkaHeaders.CORRELATION_ID - 用于将回复与请求关联

  • KafkaHeaders.REPLY_TOPIC - 用于告诉服务器回复到哪里

  • KafkaHeaders.REPLY_PARTITION - (可选)用于告诉服务器回复到哪个分区

这些头部名称由 @KafkaListener 基础设施用于路由回复。

从 2.3 版本开始,您可以自定义头部名称 - 模板有 3 个属性:correlationHeaderNamereplyTopicHeaderNamereplyPartitionHeaderName。如果您的服务器不是 Spring 应用程序(或不使用 @KafkaListener),这非常有用。

反之,如果请求应用程序不是 Spring 应用程序,并且将关联信息放在不同的头部中,从 3.0 版本开始,您可以在监听器容器工厂上配置自定义的 correlationHeaderName,并且该头部将被回显回来。之前,监听器必须回显自定义关联头部。

使用 Message<?> 进行请求/回复

2.7 版本在 ReplyingKafkaTemplate 中添加了方法,用于发送和接收 spring-messagingMessage<?> 抽象

RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);

<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
        ParameterizedTypeReference<P> returnType);

这些方法将使用模板的默认 replyTimeout,也有重载版本可以在方法调用中指定超时时间。

如果 consumer 的 Deserializer 或模板的 MessageConverter 可以在没有任何额外信息的情况下转换 payload(无论是通过配置还是回复消息中的类型元数据),请使用第一个方法。

如果您需要为返回类型提供类型信息以辅助消息转换器,请使用第二个方法。这还允许同一个模板接收不同类型,即使回复中没有类型元数据,例如服务器端不是 Spring 应用程序的情况。以下是后者的一个示例

模板 Bean
  • Java

  • Kotlin

@Bean
ReplyingKafkaTemplate<String, String, String> template(
        ProducerFactory<String, String> pf,
        ConcurrentKafkaListenerContainerFactory<String, String> factory) {

    ConcurrentMessageListenerContainer<String, String> replyContainer =
            factory.createContainer("replies");
    replyContainer.getContainerProperties().setGroupId("request.replies");
    ReplyingKafkaTemplate<String, String, String> template =
            new ReplyingKafkaTemplate<>(pf, replyContainer);
    template.setMessageConverter(new ByteArrayJsonMessageConverter());
    template.setDefaultTopic("requests");
    return template;
}
@Bean
fun template(
    pf: ProducerFactory<String?, String>?,
    factory: ConcurrentKafkaListenerContainerFactory<String?, String?>
): ReplyingKafkaTemplate<String?, String, String?> {
    val replyContainer = factory.createContainer("replies")
    replyContainer.containerProperties.groupId = "request.replies"
    val template = ReplyingKafkaTemplate(pf, replyContainer)
    template.messageConverter = ByteArrayJsonMessageConverter()
    template.defaultTopic = "requests"
    return template
}
使用模板
  • Java

  • Kotlin

RequestReplyTypedMessageFuture<String, String, Thing> future1 =
        template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
                new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());

RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
        template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
                new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
val future1: RequestReplyTypedMessageFuture<String?, String?, Thing?>? =
    template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
        object : ParameterizedTypeReference<Thing?>() {})
log.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())
val thing = future1?.get(10, TimeUnit.SECONDS)?.payload
log.info(thing.toString())

val future2: RequestReplyTypedMessageFuture<String?, String?, List<Thing?>?>? =
    template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
        object : ParameterizedTypeReference<List<Thing?>?>() {})
log.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2?.get(10, TimeUnit.SECONDS)?.payload
things?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })

回复类型 Message<?>

@KafkaListener 返回 Message<?> 时,在 2.5 版本之前,需要填充回复 topic 和关联 id 头部。在此示例中,我们使用请求中的回复 topic 头部

@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .build();
}

这也展示了如何在回复记录上设置 key。

从 2.5 版本开始,框架将检测这些头部是否缺失,并使用 topic 填充它们 - 要么是根据 @SendTo 值确定的 topic,要么是传入的 KafkaHeaders.REPLY_TOPIC 头部(如果存在)。如果存在传入的 KafkaHeaders.CORRELATION_IDKafkaHeaders.REPLY_PARTITION,它也会回显它们。

@KafkaListener(id = "requestor", topics = "request")
@SendTo  // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.KEY, 42)
            .build();
}

回复中的原始记录 Key

从 3.3 版本开始,传入请求中的 Kafka 记录 key(如果存在)将在回复记录中保留。这仅适用于单记录请求/回复场景。当监听器是批处理或返回类型是集合时,应用程序需要通过将回复记录包装在 Message 类型中来指定要使用的 key。

聚合多个回复

使用 ReplyingKafkaTemplate 中介绍的模板严格用于单请求/回复场景。对于单个消息有多个接收者返回回复的情况,您可以使用 AggregatingReplyingKafkaTemplate。这是散播-聚集企业集成模式的客户端实现。

ReplyingKafkaTemplate 类似,AggregatingReplyingKafkaTemplate 的构造函数接受一个生产者工厂和一个监听器容器来接收回复;它有第三个参数 BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy,每次收到回复时都会调用它;当谓词返回 true 时,ConsumerRecord 的集合用于完成 sendAndReceive 方法返回的 Future

还有一个附加属性 returnPartialOnTimeout(默认为 false)。当此属性设置为 true 时,未来不会以 KafkaReplyTimeoutException 完成,而是会以部分结果正常完成(只要收到了至少一个回复记录)。

从 2.3.5 版本开始,在超时后(如果 returnPartialOnTimeouttrue)也会调用谓词。第一个参数是当前的记录列表;第二个参数表示如果此调用是由于超时引起的,则为 true。谓词可以修改记录列表。

AggregatingReplyingKafkaTemplate<Integer, String, String> template =
        new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
                        coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
        template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
        future.get(30, TimeUnit.SECONDS);

请注意,返回类型是一个 ConsumerRecord,其值是 ConsumerRecord 的集合。"外部" ConsumerRecord 不是一个“真实”的记录,它是模板合成的,用于保存为请求收到的实际回复记录。当正常释放发生(释放策略返回 true)时,topic 设置为 aggregatedResults;如果 returnPartialOnTimeout 为 true 并且发生超时(并且收到了至少一个回复记录),则 topic 设置为 partialResultsAfterTimeout。模板为这些“topic”名称提供了静态常量变量

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a normal release by the release strategy.
 */
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a timeout.
 */
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";

Collection 中的真实 ConsumerRecord 包含收到回复的实际 topic。

回复的监听器容器**必须**配置为 AckMode.MANUALAckMode.MANUAL_IMMEDIATE;消费者属性 enable.auto.commit 必须为 false(从 2.3 版本开始的默认值)。为了避免任何丢失消息的可能性,模板只在没有未完成的请求时提交偏移量,即当最后一个未完成的请求被释放策略释放时。再平衡后,可能会出现重复的回复投递;对于任何正在进行的请求,这些重复投递将被忽略;当已释放的回复收到重复回复时,您可能会看到错误日志消息。
如果您在此聚合模板中使用ErrorHandlingDeserializer,框架将不会自动检测 DeserializationException。相反,记录(值为 null)将完整返回,反序列化异常将位于头部中。建议应用程序调用实用方法 ReplyingKafkaTemplate.checkDeserialization() 来确定是否发生了反序列化异常。更多信息请参阅其 JavaDocs。此聚合模板也不会调用 replyErrorChecker;您应该对回复的每个元素执行检查。