测试应用程序
spring-kafka-test jar 包含一些有用的工具来帮助测试您的应用程序。
嵌入式 Kafka Broker
由于 Kafka 4.0 已完全过渡到 KRaft 模式,现在只有 EmbeddedKafkaKraftBroker 实现可用。
-
EmbeddedKafkaKraftBroker- 在组合的控制器和代理模式下使用Kraft。
有几种配置代理的技术,如下所述。
KafkaTestUtils
org.springframework.kafka.test.utils.KafkaTestUtils 提供了许多静态辅助方法来消费记录、检索各种记录偏移量等。请参阅其 Javadocs 获取完整详情。
JUnit
org.springframework.kafka.test.utils.KafkaTestUtils 提供了一些静态方法来设置生产者和消费者属性。以下清单显示了这些方法签名
/**
* Set up test properties for an {@code <Integer, String>} consumer.
* @param group the group id.
* @param autoCommit the auto commit.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> consumerProps(String group, String autoCommit,
EmbeddedKafkaBroker embeddedKafka) { ... }
/**
* Set up test properties for an {@code <Integer, String>} producer.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }
|
从 2.5 版本开始, 使用嵌入式代理时,通常最佳实践是为每个测试使用不同的主题,以防止串扰。如果由于某种原因无法做到这一点,请注意 |
|
Spring for Apache Kafka 不再支持 JUnit 4。建议迁移到 JUnit Jupiter。 |
EmbeddedKafkaBroker 类有一个实用方法,可以让您消费它创建的所有主题。以下示例显示了如何使用它
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);
KafkaTestUtils 有一些实用方法可以从消费者那里获取结果。以下清单显示了这些方法签名
/**
* Poll the consumer, expecting a single record for the specified topic.
* @param consumer the consumer.
* @param topic the topic.
* @return the record.
* @throws org.junit.ComparisonFailure if exactly one record is not received.
*/
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }
/**
* Poll the consumer for records.
* @param consumer the consumer.
* @return the records.
*/
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }
以下示例显示了如何使用 KafkaTestUtils
...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...
当 EmbeddedKafkaBroker 启动嵌入式 Kafka 代理时,一个名为 spring.embedded.kafka.brokers 的系统属性被设置为 Kafka 代理的地址。为该属性提供了方便的常量(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS)。
除了默认的 spring.embedded.kafka.brokers 系统属性之外,Kafka 代理的地址还可以暴露给任何任意且方便的属性。为此,可以在启动嵌入式 Kafka 之前设置 spring.embedded.kafka.brokers.property (EmbeddedKafkaBroker.BROKER_LIST_PROPERTY) 系统属性。例如,对于 Spring Boot,spring.kafka.bootstrap-servers 配置属性预计会为 Kafka 客户端的自动配置而设置。因此,在随机端口上运行嵌入式 Kafka 的测试之前,我们可以将 spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers 设置为系统属性 - EmbeddedKafkaBroker 将使用它来暴露其代理地址。现在这是此属性的默认值(从 3.0.10 版本开始)。
通过 EmbeddedKafkaBroker.brokerProperties(Map<String, String>),您可以为 Kafka 服务器提供额外的属性。有关可能的代理属性的更多信息,请参阅 Kafka 配置。
配置主题
以下示例配置创建了名为 cat 和 hat 的主题,有五个分区;名为 thing1 的主题,有 10 个分区;以及名为 thing2 的主题,有 15 个分区。
@SpringJUnitConfig
@EmbeddedKafka(
partitions = 5,
topics = {"cat", "hat"}
)
public class MyTests {
@Autowired
private EmbeddedKafkaBroker broker;
@Test
public void test() {
broker.addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
...
}
}
默认情况下,当出现问题(例如添加已存在的主题)时,addTopics 将抛出异常。2.6 版本添加了该方法的新版本,它返回一个 Map<String, Exception>;键是主题名称,值为 null 表示成功,或 Exception 表示失败。
为多个测试类使用相同的 Broker
您可以使用类似以下内容的方法为多个测试类使用相同的 broker
public final class EmbeddedKafkaHolder {
private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaZKBroker(1, false)
.brokerListProperty("spring.kafka.bootstrap-servers");
private static volatile boolean started;
public static EmbeddedKafkaBroker getEmbeddedKafka() {
if (!started) {
synchronized (EmbeddedKafkaBroker.class) {
try {
embeddedKafka.afterPropertiesSet();
}
catch (Exception e) {
throw new KafkaException("Embedded broker failed to start", e);
}
started = true;
}
}
return embeddedKafka;
}
}
这假设一个 Spring Boot 环境,并且嵌入式 broker 替换了 bootstrap servers 属性。
然后,在每个测试类中,您可以使用类似以下内容的方法
static {
EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}
private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();
如果您没有使用 Spring Boot,您可以使用 broker.getBrokersAsString() 获取引导服务器。
前面的示例没有提供在所有测试完成后关闭代理的机制。如果在 Gradle daemon 中运行测试,这可能会成为一个问题。在这种情况下不应使用此技术,或者在测试完成后应使用某种方法调用 EmbeddedKafkaBroker 上的 destroy()。 |
从 3.0 版本开始,框架为 JUnit Platform 暴露了一个 GlobalEmbeddedKafkaTestExecutionListener;它默认禁用。这需要 JUnit Platform 1.8 或更高版本。此监听器的目的是在整个测试计划中启动一个全局 EmbeddedKafkaBroker,并在计划结束时停止它。要启用此监听器,从而为项目中的所有测试提供一个单一的全局嵌入式 Kafka 集群,必须通过系统属性或 JUnit Platform 配置将 spring.kafka.global.embedded.enabled 属性设置为 true。此外,还可以提供以下属性
-
spring.kafka.embedded.count- 要管理的 Kafka 代理数量; -
spring.kafka.embedded.ports- 每个要启动的 Kafka 代理的端口(逗号分隔值),如果首选随机端口则为0;值的数量必须等于上述count; -
spring.kafka.embedded.topics- 在启动的 Kafka 集群中要创建的主题(逗号分隔值); -
spring.kafka.embedded.partitions- 为创建的主题分配的分区数量; -
spring.kafka.embedded.broker.properties.location- 额外 Kafka 代理配置属性文件的位置;此属性的值必须遵循 Spring 资源抽象模式。
本质上,这些属性模仿了 @EmbeddedKafka 的一些属性。
有关配置属性以及如何提供它们的更多信息,请参阅 JUnit Jupiter 用户指南。例如,可以将 spring.embedded.kafka.brokers.property=my.bootstrap-servers 条目添加到测试类路径中的 junit-platform.properties 文件中。从 3.0.10 版本开始,broker 默认将此设置为 spring.kafka.bootstrap-servers,用于使用 Spring Boot 应用程序进行测试。
| 建议不要在单个测试套件中结合使用全局嵌入式 Kafka 和每个测试类。两者共享相同的系统属性,因此很可能导致意外行为。 |
spring-kafka-test 对 junit-jupiter-api 和 junit-platform-launcher(后者用于支持全局嵌入式代理)具有传递依赖。如果您希望使用嵌入式代理且不使用 JUnit,则可能需要排除这些依赖。 |
@EmbeddedKafka 注解
我们通常建议您使用单个代理实例,以避免在测试之间启动和停止代理(并为每个测试使用不同的主题)。从 2.0 版本开始,如果您使用 Spring 的测试应用程序上下文缓存,您还可以声明一个 EmbeddedKafkaBroker bean,这样单个代理就可以跨多个测试类使用。为了方便起见,我们提供了一个名为 @EmbeddedKafka 的测试类级别注解来注册 EmbeddedKafkaBroker bean。以下示例显示了如何使用它
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(partitions = 1,
topics = {
KafkaStreamsTests.STREAMING_TOPIC1,
KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
void someTest() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
assertThat(replies.count()).isGreaterThanOrEqualTo(1);
}
@Configuration
@EnableKafkaStreams
public static class TestKafkaStreamsConfiguration {
@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
private String brokerAddresses;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
return new KafkaStreamsConfiguration(props);
}
}
}
从 2.2.4 版本开始,您还可以使用 @EmbeddedKafka 注解来指定 Kafka 端口属性。
从 4.0 版本开始,所有与 ZooKeeper 相关的属性都已从 @EmbeddedKafka 注解中删除,因为 Kafka 4.0 仅使用 KRaft。 |
以下示例设置了 @EmbeddedKafka 的 topics、brokerProperties 和 brokerPropertiesLocation 属性,支持属性占位符解析
@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
"listeners=PLAINTEXT://:${kafka.broker.port}",
"auto.create.topics.enable=${kafka.broker.topics-enable:true}" },
brokerPropertiesLocation = "classpath:/broker.properties")
在前面的示例中,属性占位符 ${kafka.topics.another-topic}、${kafka.broker.logs-dir} 和 ${kafka.broker.port} 从 Spring Environment 解析。此外,代理属性从 brokerPropertiesLocation 指定的 broker.properties 类路径资源加载。属性占位符会为 brokerPropertiesLocation URL 和资源中找到的任何属性占位符解析。由 brokerProperties 定义的属性会覆盖 brokerPropertiesLocation 中找到的属性。
您可以将 @EmbeddedKafka 注解与 JUnit Jupiter 一起使用。
@EmbeddedKafka 注解与 JUnit Jupiter
从 2.3 版本开始,有两种方法可以将 @EmbeddedKafka 注解与 JUnit Jupiter 结合使用。当与 @SpringJunitConfig 注解一起使用时,嵌入式代理被添加到测试应用程序上下文中。您可以将代理自动注入到您的测试中,在类级别或方法级别,以获取代理地址列表。
当不使用 spring 测试上下文时,EmbdeddedKafkaCondition 会创建一个代理;该条件包含一个参数解析器,因此您可以在测试方法中访问该代理。
@EmbeddedKafka
public class EmbeddedKafkaConditionTests {
@Test
public void test(EmbeddedKafkaBroker broker) {
String brokerList = broker.getBrokersAsString();
...
}
}
除非带有 @EmbeddedKafka 注解的类也带有 ExtendWith(SpringExtension.class) 注解(或元注解),否则将创建一个独立代理(在 Spring 的 TestContext 之外)。@SpringJunitConfig 和 @SpringBootTest 都是元注解的,当这些注解中的任何一个也存在时,将使用基于上下文的代理。
| 当存在 Spring 测试应用程序上下文时,主题和代理属性可以包含属性占位符,只要属性在某处定义,这些占位符就会被解析。如果 Spring 上下文不可用,这些占位符将不会被解析。 |
@SpringBootTest 注解中的嵌入式 Broker
Spring Initializr 现在会自动将 spring-kafka-test 依赖项以测试范围添加到项目配置中。
|
如果您的应用程序在
|
在 Spring Boot 应用程序测试中使用嵌入式代理有几种方法。
它们包括
@EmbeddedKafka 与 @SpringJunitConfig
当将 @EmbeddedKafka 与 @SpringJUnitConfig 结合使用时,建议在测试类上使用 @DirtiesContext。这是为了防止在测试套件中运行多个测试后,JVM 关闭期间发生潜在的竞态条件。例如,如果不使用 @DirtiesContext,EmbeddedKafkaBroker 可能会提前关闭,而应用程序上下文仍然需要它的资源。由于每个 EmbeddedKafka 测试运行都会创建自己的临时目录,当发生这种竞态条件时,它会产生错误日志消息,指示它尝试删除或清理的文件不再可用。添加 @DirtiesContext 将确保在每个测试后清理应用程序上下文并且不被缓存,从而使其不易受到此类潜在资源竞态条件的影响。
@EmbeddedKafka 注解或 EmbeddedKafkaBroker Bean
以下示例显示了如何使用 @EmbeddedKafka 注解创建嵌入式代理
@SpringJUnitConfig
@EmbeddedKafka(topics = "someTopic",
bootstrapServersProperty = "spring.kafka.bootstrap-servers") // this is now the default
public class MyApplicationTests {
@Autowired
private KafkaTemplate<String, String> template;
@Test
void test() {
...
}
}
从 3.0.10 版本开始,bootstrapServersProperty 默认自动设置为 spring.kafka.bootstrap-servers。 |
Hamcrest 匹配器
org.springframework.kafka.test.hamcrest.KafkaMatchers 提供以下匹配器
/**
* @param key the key
* @param <K> the type.
* @return a Matcher that matches the key in a consumer record.
*/
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }
/**
* @param value the value.
* @param <V> the type.
* @return a Matcher that matches the value in a consumer record.
*/
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }
/**
* @param partition the partition.
* @return a Matcher that matches the partition in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }
/**
* Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
* {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
*
* @param ts timestamp of the consumer record.
* @return a Matcher that matches the timestamp in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
return hasTimestamp(TimestampType.CREATE_TIME, ts);
}
/**
* Matcher testing the timestamp of a {@link ConsumerRecord}
* @param type timestamp type of the record
* @param ts timestamp of the consumer record.
* @return a Matcher that matches the timestamp in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
return new ConsumerRecordTimestampMatcher(type, ts);
}
AssertJ 条件
您可以使用以下 AssertJ 条件
/**
* @param key the key
* @param <K> the type.
* @return a Condition that matches the key in a consumer record.
*/
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }
/**
* @param value the value.
* @param <V> the type.
* @return a Condition that matches the value in a consumer record.
*/
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }
/**
* @param key the key.
* @param value the value.
* @param <K> the key type.
* @param <V> the value type.
* @return a Condition that matches the key in a consumer record.
* @since 2.2.12
*/
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }
/**
* @param partition the partition.
* @return a Condition that matches the partition in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }
/**
* @param value the timestamp.
* @return a Condition that matches the timestamp value in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}
/**
* @param type the type of timestamp
* @param value the timestamp.
* @return a Condition that matches the timestamp value in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
return new ConsumerRecordTimestampCondition(type, value);
}
示例
以下示例汇集了本章涵盖的大部分主题
@EmbeddedKafka(topics = KafkaTemplateTests.TEMPLATE_TOPIC)
public class KafkaTemplateTests {
public static final String TEMPLATE_TOPIC = "templateTopic";
public static EmbeddedKafkaBroker embeddedKafka;
@BeforeAll
public static void setUp() {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
}
@Test
public void testTemplate() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps);
ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProperties);
final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> record) {
System.out.println(record);
records.add(record);
}
});
container.setBeanName("templateTests");
container.start();
ContainerTestUtils.waitForAssignment(container,
embeddedKafka.getPartitionsPerTopic());
Map<String, Object> producerProps =
KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf =
new DefaultKafkaProducerFactory<>(producerProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(TEMPLATE_TOPIC);
template.sendDefault("foo");
assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("bar"));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("baz"));
}
}
前面的示例使用了 Hamcrest 匹配器。使用 AssertJ,最后一部分看起来像以下代码
assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));
Mock 消费者和生产者
kafka-clients 库提供了 MockConsumer 和 MockProducer 类用于测试目的。
如果您希望在某些测试中将这些类分别与监听器容器或 KafkaTemplate 一起使用,从 3.0.7 版本开始,框架现在提供了 MockConsumerFactory 和 MockProducerFactory 实现。
这些工厂可以在监听器容器和模板中使用,而不是默认的工厂,后者需要一个运行中的(或嵌入式)代理。
以下是一个返回单个消费者的简单实现示例
@Bean
ConsumerFactory<String, String> consumerFactory() {
MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
TopicPartition topicPartition0 = new TopicPartition("topic", 0);
List<TopicPartition> topicPartitions = Collections.singletonList(topicPartition0);
Map<TopicPartition, Long> beginningOffsets = topicPartitions.stream().collect(Collectors
.toMap(Function.identity(), tp -> 0L));
consumer.updateBeginningOffsets(beginningOffsets);
consumer.schedulePollTask(() -> {
consumer.addRecord(
new ConsumerRecord<>("topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test1",
new RecordHeaders(), Optional.empty()));
consumer.addRecord(
new ConsumerRecord<>("topic", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test2",
new RecordHeaders(), Optional.empty()));
});
return new MockConsumerFactory(() -> consumer);
}
如果您希望进行并发测试,工厂构造函数中的 Supplier lambda 需要每次创建一个新实例。
对于 MockProducerFactory,有两个构造函数;一个用于创建简单工厂,另一个用于创建支持事务的工厂。
以下是示例
@Bean
ProducerFactory<String, String> nonTransFactory() {
return new MockProducerFactory<>(() ->
new MockProducer<>(true, new StringSerializer(), new StringSerializer()));
}
@Bean
ProducerFactory<String, String> transFactory() {
MockProducer<String, String> mockProducer =
new MockProducer<>(true, new StringSerializer(), new StringSerializer());
mockProducer.initTransactions();
return new MockProducerFactory<String, String>((tx, id) -> mockProducer, "defaultTxId");
}
请注意,在第二种情况下,lambda 是一个 BiFunction<Boolean, String>,其中第一个参数为 true,如果调用方需要事务性生产者;可选的第二个参数包含事务 ID。这可以是默认值(如构造函数中提供的),也可以由 KafkaTransactionManager(或用于本地事务的 KafkaTemplate)覆盖,如果已配置。提供事务 ID 以防您希望基于此值使用不同的 MockProducer。
如果您在多线程环境中使用生产者,BiFunction 应该返回多个生产者(可能使用 ThreadLocal 绑定到线程)。
事务性 MockProducer 必须通过调用 initTransaction() 进行事务初始化。 |
使用 MockProducer 时,如果您不想在每次发送后关闭生产者,那么您可以提供一个自定义的 MockProducer 实现,它覆盖了 close 方法,并且不调用父类的 close 方法。这对于测试来说很方便,可以在不关闭生产者的情况下验证对同一个生产者的多次发布。
这是一个例子
@Bean
MockProducer<String, String> mockProducer() {
return new MockProducer<>(false, new StringSerializer(), new StringSerializer()) {
@Override
public void close() {
}
};
}
@Bean
ProducerFactory<String, String> mockProducerFactory(MockProducer<String, String> mockProducer) {
return new MockProducerFactory<>(() -> mockProducer);
}