测试应用

spring-kafka-test jar 包含一些实用的工具,可帮助您测试应用程序。

嵌入式 Kafka Broker

提供了两种实现

  • EmbeddedKafkaZKBroker - 遗留实现,启动一个嵌入式 Zookeeper 实例(使用 EmbeddedKafka 时这仍然是默认设置)。

  • EmbeddedKafkaKraftBroker - 在组合控制器和 Broker 模式下使用 Kraft 而非 Zookeeper(自 3.1 版本起)。

以下部分讨论了几种配置 Broker 的技术。

KafkaTestUtils

org.springframework.kafka.test.utils.KafkaTestUtils 提供了一些静态辅助方法,用于消费记录、检索各种记录偏移量等。有关完整详细信息,请参阅其 Javadocs

JUnit

org.springframework.kafka.test.utils.KafkaTestUtils 还提供了一些静态方法来设置 Producer 和 Consumer 属性。以下列出了这些方法签名:

/**
 * Set up test properties for an {@code <Integer, String>} consumer.
 * @param group the group id.
 * @param autoCommit the auto commit.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> consumerProps(String group, String autoCommit,
                                       EmbeddedKafkaBroker embeddedKafka) { ... }

/**
 * Set up test properties for an {@code <Integer, String>} producer.
 * @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
 * @return the properties.
 */
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }

从 2.5 版本开始,consumerProps 方法将 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 设置为 earliest。这是因为在大多数情况下,您希望 Consumer 能够消费测试用例中发送的任何消息。ConsumerConfig 的默认值是 latest,这意味着测试在 Consumer 启动之前发送的消息将不会被 Consumer 接收。要恢复之前的行为,请在调用方法后将该属性设置为 latest

使用嵌入式 Broker 时,通常最佳实践是为每个测试使用不同的 Topic,以防止相互干扰。如果由于某些原因无法做到这一点,请注意 consumeFromEmbeddedTopics 方法的默认行为是在分配分区后将 Consumer 定位到分区起始位置。由于该方法无法访问 Consumer 属性,因此您必须使用接受 seekToEnd 布尔参数的重载方法来定位到分区末尾而非起始位置。

提供了 EmbeddedKafkaZKBroker 的 JUnit 4 @Rule 包装器,用于创建嵌入式 Kafka 和嵌入式 Zookeeper 服务器。(有关在 JUnit 5 中使用 @EmbeddedKafka 的信息,请参阅@EmbeddedKafka 注解)。以下列出了这些方法签名:

/**
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param topics the topics to create (2 partitions per).
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { ... }

/**
 *
 * Create embedded Kafka brokers.
 * @param count the number of brokers.
 * @param controlledShutdown passed into TestUtils.createBrokerConfig.
 * @param partitions partitions per topic.
 * @param topics the topics to create.
 */
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... }
JUnit4 不支持 EmbeddedKafkaKraftBroker

EmbeddedKafkaBroker 类有一个实用方法,可以消费它创建的所有 Topic 中的消息。以下示例展示了如何使用它:

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);

KafkaTestUtils 有一些实用方法可以从 Consumer 中获取结果。以下列出了这些方法签名:

/**
 * Poll the consumer, expecting a single record for the specified topic.
 * @param consumer the consumer.
 * @param topic the topic.
 * @return the record.
 * @throws org.junit.ComparisonFailure if exactly one record is not received.
 */
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }

/**
 * Poll the consumer for records.
 * @param consumer the consumer.
 * @return the records.
 */
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }

以下示例展示了如何使用 KafkaTestUtils

...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...

EmbeddedKafkaBroker 启动嵌入式 Kafka 和嵌入式 Zookeeper 服务器时,系统属性 spring.embedded.kafka.brokers 将被设置为 Kafka Broker 的地址,系统属性 spring.embedded.zookeeper.connect 将被设置为 Zookeeper 的地址。为此属性提供了方便的常量(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERSEmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT)。

除了默认的 spring.embedded.kafka.brokers 系统属性外,Kafka Broker 的地址可以暴露给任何任意且方便的属性。为此,可以在启动嵌入式 Kafka 之前设置一个 spring.embedded.kafka.brokers.propertyEmbeddedKafkaBroker.BROKER_LIST_PROPERTY)系统属性。例如,对于 Spring Boot,期望设置 spring.kafka.bootstrap-servers 配置属性以自动配置 Kafka 客户端。因此,在使用随机端口运行嵌入式 Kafka 进行测试之前,我们可以将 spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers 设置为一个系统属性 - EmbeddedKafkaBroker 将使用它来暴露其 Broker 地址。现在这是此属性的默认值(自 3.0.10 版本起)。

通过 EmbeddedKafkaBroker.brokerProperties(Map<String, String>),您可以为 Kafka 服务器提供额外的属性。有关可能的 Broker 属性的更多信息,请参阅 Kafka 配置

配置 Topic

以下示例配置创建了名为 cathat 的 Topic(各包含五个分区)、一个名为 thing1 的 Topic(包含 10 个分区)以及一个名为 thing2 的 Topic(包含 15 个分区)。

public class MyTests {

    @ClassRule
    private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 5, "cat", "hat");

    @Test
    public void test() {
        embeddedKafkaRule.getEmbeddedKafka()
              .addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
        ...
    }

}

默认情况下,当出现问题时(例如添加已存在的 Topic),addTopics 会抛出异常。2.6 版本添加了该方法的新版本,返回一个 Map<String, Exception>;Key 是 Topic 名称,成功时值为 null,失败时值为 Exception

在多个测试类中使用相同的 Broker

您可以使用类似于以下方式在多个测试类中使用同一个 Broker:

public final class EmbeddedKafkaHolder {

    private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaZKBroker(1, false)
            .brokerListProperty("spring.kafka.bootstrap-servers");

    private static boolean started;

    public static EmbeddedKafkaBroker getEmbeddedKafka() {
        if (!started) {
            try {
                embeddedKafka.afterPropertiesSet();
            }
            catch (Exception e) {
                throw new KafkaException("Embedded broker failed to start", e);
            }
            started = true;
        }
        return embeddedKafka;
    }

    private EmbeddedKafkaHolder() {
        super();
    }

}

这假设了一个 Spring Boot 环境,并且嵌入式 Broker 替换了 bootstrap servers 属性。

然后,在每个测试类中,您可以使用类似于以下方式:

static {
    EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}

private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();

如果您不使用 Spring Boot,可以使用 broker.getBrokersAsString() 获取 bootstrap servers。

前面的示例没有提供在所有测试完成后关闭 Broker 的机制。例如,如果您在 Gradle daemon 中运行测试,这可能会成为一个问题。在这种情况下,您不应使用此技术,或者您应该使用某种方式在测试完成后调用 EmbeddedKafkaBroker 上的 destroy() 方法。

从 3.0 版本开始,框架为 JUnit Platform 暴露了一个 GlobalEmbeddedKafkaTestExecutionListener;默认情况下它是禁用的。这需要 JUnit Platform 1.8 或更高版本。此监听器的目的是为整个测试计划启动一个全局 EmbeddedKafkaBroker 并在计划结束时将其停止。要启用此监听器,从而在项目中的所有测试中拥有一个全局嵌入式 Kafka 集群,必须通过系统属性或 JUnit Platform 配置将 spring.kafka.global.embedded.enabled 属性设置为 true。此外,还可以提供以下属性:

  • spring.kafka.embedded.count - 要管理的 Kafka Broker 数量;

  • spring.kafka.embedded.ports - 要启动的每个 Kafka Broker 的端口(逗号分隔值),如果首选随机端口则为 0;值的数量必须等于上面提到的 count

  • spring.kafka.embedded.topics - 在启动的 Kafka 集群中要创建的 Topic(逗号分隔值);

  • spring.kafka.embedded.partitions - 为创建的 Topic 配置的分区数量;

  • spring.kafka.embedded.broker.properties.location - 包含额外 Kafka Broker 配置属性的文件位置;此属性的值必须遵循 Spring 资源抽象模式;

  • spring.kafka.embedded.kraft - 默认为 false,当设置为 true 时,使用 EmbeddedKafkaKraftBroker 而非 EmbeddedKafkaZKBroker

这些属性本质上模仿了 @EmbeddedKafka 的一些属性。

有关配置属性以及如何提供它们的更多信息,请参阅 JUnit 5 用户指南。例如,可以将 spring.embedded.kafka.brokers.property=my.bootstrap-servers 条目添加到测试 classpath 中的 junit-platform.properties 文件中。从 3.0.10 版本开始,Broker 默认情况下会自动将其设置为 spring.kafka.bootstrap-servers,用于 Spring Boot 应用程序的测试。

建议不要在同一个测试套件中同时使用全局嵌入式 Kafka 和每个测试类的嵌入式 Kafka。它们共享相同的系统属性,因此很可能导致意外行为。
spring-kafka-test 传递性依赖于 junit-jupiter-apijunit-platform-launcher(后者用于支持全局嵌入式 Broker)。如果您希望使用嵌入式 Broker 但不使用 JUnit,则可能需要排除这些依赖。

@EmbeddedKafka 注解

我们通常建议您将规则用作 @ClassRule,以避免在测试之间启动和停止 Broker(并为每个测试使用不同的 Topic)。从 2.0 版本开始,如果您使用 Spring 的测试应用上下文缓存,您也可以声明一个 EmbeddedKafkaBroker bean,这样就可以在多个测试类中共享一个 Broker。为了方便起见,我们提供了一个测试类级别的注解 @EmbeddedKafka 来注册 EmbeddedKafkaBroker bean。以下示例展示了如何使用它:

@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,
         topics = {
                 KafkaStreamsTests.STREAMING_TOPIC1,
                 KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    public void someTest() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<Integer, String> consumer = cf.createConsumer();
        this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
        ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
        assertThat(replies.count()).isGreaterThanOrEqualTo(1);
    }

    @Configuration
    @EnableKafkaStreams
    public static class TestKafkaStreamsConfiguration {

        @Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
        private String brokerAddresses;

        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration kStreamsConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
            return new KafkaStreamsConfiguration(props);
        }

    }

}

从 2.2.4 版本开始,您还可以使用 @EmbeddedKafka 注解来指定 Kafka 端口属性。

从 3.2 版本开始,将 kraft 属性设置为 true 可使用 EmbeddedKafkaKraftBroker 而非 EmbeddedKafkaZKBroker

以下示例设置了 @EmbeddedKafka 支持属性占位符解析的 topicsbrokerPropertiesbrokerPropertiesLocation 属性:

@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
        brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
                            "listeners=PLAINTEXT://localhost:${kafka.broker.port}",
                            "auto.create.topics.enable=${kafka.broker.topics-enable:true}" },
        brokerPropertiesLocation = "classpath:/broker.properties")

在前面的示例中,属性占位符 `${kafka.topics.another-topic}`、`${kafka.broker.logs-dir}` 和 `${kafka.broker.port}` 是从 Spring Environment 中解析的。此外,Broker 属性从由 brokerPropertiesLocation 指定的 broker.properties classpath 资源中加载。属性占位符会为 brokerPropertiesLocation URL 以及资源中找到的任何属性占位符进行解析。brokerProperties 定义的属性会覆盖 brokerPropertiesLocation 中找到的属性。

您可以在 JUnit 4 或 JUnit 5 中使用 @EmbeddedKafka 注解。

在 JUnit5 中使用 @EmbeddedKafka 注解

从 2.3 版本开始,有两种方法可以在 JUnit5 中使用 @EmbeddedKafka 注解。当与 @SpringJunitConfig 注解一起使用时,嵌入式 Broker 会添加到测试应用上下文中。您可以在类或方法级别自动注入 Broker 到您的测试中,以获取 Broker 地址列表。

使用 Spring 测试上下文时,EmbdeddedKafkaCondition 会创建一个 Broker;此条件包含一个参数解析器,因此您可以在测试方法中访问 Broker。

@EmbeddedKafka
public class EmbeddedKafkaConditionTests {

    @Test
    public void test(EmbeddedKafkaBroker broker) {
        String brokerList = broker.getBrokersAsString();
        ...
    }

}

将创建一个独立的 Broker(在 Spring 的 TestContext 之外),除非用 @EmbeddedKafka 注解的类也被 ExtendWith(SpringExtension.class) 注解(或元注解)。@SpringJunitConfig@SpringBootTest 就是这样元注解的,当它们中的任何一个也存在时,将使用基于上下文的 Broker。

当存在 Spring 测试应用上下文时,Topic 和 Broker 属性可以包含属性占位符,只要属性在某个地方定义了,它们就会被解析。如果不存在 Spring 上下文,这些占位符将不会被解析。

@SpringBootTest 注解中使用嵌入式 Broker

Spring Initializr 现在会自动将 spring-kafka-test 依赖添加到项目配置的测试范围中。

如果您的应用程序在 spring-cloud-stream 中使用了 Kafka Binder,并且希望在测试中使用嵌入式 Broker,则必须移除 spring-cloud-stream-test-support 依赖,因为它会用一个测试 Binder 替换实际的 Binder。如果您希望某些测试使用测试 Binder,而另一些使用嵌入式 Broker,则使用实际 Binder 的测试需要通过在测试类中排除 Binder 自动配置来禁用测试 Binder。以下示例展示了如何操作:

@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.autoconfigure.exclude="
    + "org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration")
public class MyApplicationTests {
    ...
}

在 Spring Boot 应用测试中有几种使用嵌入式 Broker 的方法。

包括:

JUnit4 类规则

以下示例展示了如何使用 JUnit4 类规则创建嵌入式 Broker:

@RunWith(SpringRunner.class)
@SpringBootTest
public class MyApplicationTests {

    @ClassRule
    public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1, false, "someTopic")
            .brokerListProperty("spring.kafka.bootstrap-servers");

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}

请注意,由于这是一个 Spring Boot 应用程序,我们覆盖了 Broker 列表属性以设置 Spring Boot 的属性。

@EmbeddedKafka@SpringJunitConfig

在使用 @EmbeddedKafka@SpringJUnitConfig 时,建议在测试类上使用 @DirtiesContext。这是为了防止在测试套件中运行多个测试后,JVM 关闭期间可能出现的竞态条件。例如,如果不使用 @DirtiesContextEmbeddedKafkaBroker 可能会提前关闭,而应用上下文仍然需要其中的资源。由于每次 @EmbeddedKafka 测试运行都会创建自己的临时目录,当发生这种竞态条件时,会产生错误日志消息,指示其尝试删除或清理的文件不再可用。添加 @DirtiesContext 将确保在每次测试后清理应用上下文并且不进行缓存,从而降低此类潜在资源竞态条件的风险。

@EmbeddedKafka 注解或 EmbeddedKafkaBroker Bean

以下示例展示了如何使用 @EmbeddedKafka 注解创建嵌入式 Broker:

@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = "someTopic",
        bootstrapServersProperty = "spring.kafka.bootstrap-servers") // this is now the default
public class MyApplicationTests {

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        ...
    }

}
默认情况下,bootstrapServersProperty 会自动设置为 spring.kafka.bootstrap-servers,从 3.0.10 版本开始。

Hamcrest 匹配器

org.springframework.kafka.test.hamcrest.KafkaMatchers 提供了以下匹配器:

/**
 * @param key the key
 * @param <K> the type.
 * @return a Matcher that matches the key in a consumer record.
 */
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Matcher that matches the value in a consumer record.
 */
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }

/**
 * @param partition the partition.
 * @return a Matcher that matches the partition in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
 * {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
 *
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
  return hasTimestamp(TimestampType.CREATE_TIME, ts);
}

/**
 * Matcher testing the timestamp of a {@link ConsumerRecord}
 * @param type timestamp type of the record
 * @param ts timestamp of the consumer record.
 * @return a Matcher that matches the timestamp in a consumer record.
 */
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
  return new ConsumerRecordTimestampMatcher(type, ts);
}

AssertJ 条件

您可以使用以下 AssertJ 条件:

/**
 * @param key the key
 * @param <K> the type.
 * @return a Condition that matches the key in a consumer record.
 */
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }

/**
 * @param value the value.
 * @param <V> the type.
 * @return a Condition that matches the value in a consumer record.
 */
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }

/**
 * @param key the key.
 * @param value the value.
 * @param <K> the key type.
 * @param <V> the value type.
 * @return a Condition that matches the key in a consumer record.
 * @since 2.2.12
 */
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }

/**
 * @param partition the partition.
 * @return a Condition that matches the partition in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }

/**
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
  return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}

/**
 * @param type the type of timestamp
 * @param value the timestamp.
 * @return a Condition that matches the timestamp value in a consumer record.
 */
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
  return new ConsumerRecordTimestampCondition(type, value);
}

示例

以下示例汇集了本章涵盖的大部分主题:

public class KafkaTemplateTests {

    private static final String TEMPLATE_TOPIC = "templateTopic";

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);

    @Test
    public void testTemplate() throws Exception {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
            embeddedKafka.getEmbeddedKafka());
        DefaultKafkaConsumerFactory<Integer, String> cf =
                            new DefaultKafkaConsumerFactory<>(consumerProps);
        ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
        KafkaMessageListenerContainer<Integer, String> container =
                            new KafkaMessageListenerContainer<>(cf, containerProperties);
        final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
        container.setupMessageListener(new MessageListener<Integer, String>() {

            @Override
            public void onMessage(ConsumerRecord<Integer, String> record) {
                System.out.println(record);
                records.add(record);
            }

        });
        container.setBeanName("templateTests");
        container.start();
        ContainerTestUtils.waitForAssignment(container,
                            embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
        Map<String, Object> producerProps =
                            KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
        ProducerFactory<Integer, String> pf =
                            new DefaultKafkaProducerFactory<>(producerProps);
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
        template.setDefaultTopic(TEMPLATE_TOPIC);
        template.sendDefault("foo");
        assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
        template.sendDefault(0, 2, "bar");
        ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("bar"));
        template.send(TEMPLATE_TOPIC, 0, 2, "baz");
        received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(2));
        assertThat(received, hasPartition(0));
        assertThat(received, hasValue("baz"));
    }

}

前面的示例使用了 Hamcrest 匹配器。使用 AssertJ,最后一部分代码如下所示:

assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));

Mock Consumer 和 Producer

kafka-clients 库提供了 MockConsumerMockProducer 类用于测试目的。

如果您希望在某些使用监听器容器或 KafkaTemplate 的测试中使用这些类,从 3.0.7 版本开始,框架现在提供了 MockConsumerFactoryMockProducerFactory 实现。

这些工厂可以在监听器容器和模板中使用,而非需要运行(或嵌入式)Broker 的默认工厂。

这是一个返回单个 Consumer 的简单实现示例:

@Bean
ConsumerFactory<String, String> consumerFactory() {
    MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    TopicPartition topicPartition0 = new TopicPartition("topic", 0);
    List<TopicPartition> topicPartitions = Collections.singletonList(topicPartition0);
    Map<TopicPartition, Long> beginningOffsets = topicPartitions.stream().collect(Collectors
            .toMap(Function.identity(), tp -> 0L));
    consumer.updateBeginningOffsets(beginningOffsets);
    consumer.schedulePollTask(() -> {
        consumer.addRecord(
                new ConsumerRecord<>("topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test1",
                        new RecordHeaders(), Optional.empty()));
        consumer.addRecord(
                new ConsumerRecord<>("topic", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test2",
                        new RecordHeaders(), Optional.empty()));
    });
    return new MockConsumerFactory(() -> consumer);
}

如果您希望进行并发测试,工厂构造函数中的 Supplier lambda 需要每次都创建一个新实例。

对于 MockProducerFactory,有两个构造函数;一个用于创建简单工厂,一个用于创建支持事务的工厂。

示例:

@Bean
ProducerFactory<String, String> nonTransFactory() {
    return new MockProducerFactory<>(() ->
            new MockProducer<>(true, new StringSerializer(), new StringSerializer()));
}

@Bean
ProducerFactory<String, String> transFactory() {
    MockProducer<String, String> mockProducer =
            new MockProducer<>(true, new StringSerializer(), new StringSerializer());
    mockProducer.initTransactions();
    return new MockProducerFactory<String, String>((tx, id) -> mockProducer, "defaultTxId");
}

注意在第二种情况下,lambda 是一个 BiFunction<Boolean, String>,其中第一个参数为 true 表示调用者需要一个事务性 Producer;可选的第二个参数包含事务 ID。这可以是默认值(由构造函数提供),或者在配置了 KafkaTransactionManager(或用于本地事务的 KafkaTemplate)的情况下,可以被覆盖。提供事务 ID 是为了您可能希望根据此值使用不同的 MockProducer

如果您在多线程环境中使用 Producer,BiFunction 应该返回多个 Producer(可能使用 ThreadLocal 进行线程绑定)。

事务性 MockProducer 必须通过调用 initTransaction() 进行事务初始化。

使用 MockProducer 时,如果您不想在每次发送后关闭 Producer,则可以提供一个自定义 MockProducer 实现,它覆盖 close 方法,但不调用父类的 close 方法。这对于测试很方便,可以在不关闭同一个 Producer 的情况下验证多次发布。

示例:

@Bean
MockProducer<String, String> mockProducer() {
    return new MockProducer<>(false, new StringSerializer(), new StringSerializer()) {
        @Override
        public void close() {

        }
    };
}

@Bean
ProducerFactory<String, String> mockProducerFactory(MockProducer<String, String> mockProducer) {
    return new MockProducerFactory<>(() -> mockProducer);
}