交互式查询

Kafka Streams 绑定器 API 公开了一个名为InteractiveQueryService的类,用于交互式查询状态存储。您可以在应用程序中将其作为 Spring bean 访问。从应用程序访问此 bean 的一种简单方法是autowire该 bean。

@Autowired
private InteractiveQueryService interactiveQueryService;

获得对该 bean 的访问权限后,您可以查询您感兴趣的特定状态存储。请参见下文。

ReadOnlyKeyValueStore<Object, Object> keyValueStore =
						interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());

在启动期间,上述检索存储的方法调用可能会失败。例如,它可能仍在初始化状态存储的过程中。在这种情况下,重试此操作将非常有用。Kafka Streams 绑定器提供了一个简单的重试机制来适应这种情况。

以下是您可以用来控制此重试的两个属性。

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 默认值为1

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 默认值为1000 毫秒。

如果正在运行多个 Kafka Streams 应用程序实例,则在您可以交互式查询它们之前,您需要确定哪个应用程序实例托管您正在查询的特定密钥。InteractiveQueryService API 提供了用于识别主机信息的方法。

为了使此方法有效,您必须如下配置属性application.server

spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>

以下是一些代码片段

org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
						key, keySerializer);

if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {

    //query from the store that is locally available
}
else {
    //query from the remote host
}

有关这些主机查找方法的更多信息,请参阅方法的Javadoc。对于这些方法,如果底层 KafkaStreams 对象未准备好,则在启动期间也可能会引发异常。上述重试属性也适用于这些方法。

通过 InteractiveQueryService 可用的其他 API 方法

使用以下 API 方法检索与给定存储和密钥的组合关联的KeyQueryMetadata 对象。

public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)

使用以下 API 方法检索与给定存储和密钥的组合关联的KakfaStreams 对象。

public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)

自定义存储查询参数

有时您需要在通过InteractiveQueryService查询存储之前微调存储查询参数。为此,从绑定器的4.0.1版本开始,您可以为StoreQueryParametersCustomizer提供一个 bean,它是一个具有customize方法的功能接口,该方法采用StoreQueryParameter作为参数。以下是其方法签名。

StoreQueryParameters<T> customize(StoreQueryParameters<T> storeQueryParameters);

使用这种方法,应用程序可以进一步自定义StoreQueryParameters,例如启用陈旧存储。

当此 bean 出现在此应用程序中时,InteractiveQueryService将在查询状态存储之前调用其customize方法。

请记住,应用程序中必须存在一个唯一的StoreQueryParametersCustomizer bean。