使用响应式 Kafka 绑定器的基本示例

在本节中,我们将展示一些使用响应式绑定器编写响应式 Kafka 应用程序的基本代码片段及其详细信息。

@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
    return s -> s.map(String::toUpperCase);
}

您可以将上述 uppercase 函数与基于消息通道的 Kafka 绑定器(spring-cloud-stream-binder-kafka)以及本节讨论的响应式 Kafka 绑定器(spring-cloud-stream-binder-kafka-reactive)一起使用。当将此函数与常规 Kafka 绑定器一起使用时,尽管您在应用程序中(即在 uppercase 函数中)使用了响应式类型,但您只在函数执行期间获得响应式流。在函数执行上下文之外,没有响应式的好处,因为底层绑定器不是基于响应式堆栈的。因此,尽管这可能看起来像带来了完整的端到端响应式堆栈,但此应用程序只是部分响应式的。

现在假设您正在将适用于 Kafka 的正确响应式绑定器(spring-cloud-stream-binder-kafka-reactive)与上述函数应用程序一起使用。此绑定器实现将从链的顶端消费到末端发布,提供完整的响应式优势。这是因为底层绑定器基于 Reactor Kafka 的核心 API 构建。在消费者端,它使用 KafkaReceiver,这是一个 Kafka 消费者的响应式实现。同样,在生产者端,它使用 KafkaSender API,这是一个 Kafka 生产者的响应式实现。由于响应式 Kafka 绑定器的基础是建立在适当的响应式 Kafka API 之上的,因此应用程序可以充分利用响应式技术的优势。使用此响应式 Kafka 绑定器时,应用程序内置了诸如自动背压等响应式功能。

从版本 4.0.2 开始,您可以通过分别提供一个或多个 ReceiverOptionsCustomizerSenderOptionsCustomizer bean 来定制 ReceiverOptionsSenderOptions。它们是 BiFunction,接收绑定名称和初始选项,并返回定制的选项。这些接口扩展了 Ordered,因此当存在多个定制器时,它们将按所需的顺序应用。

绑定器默认不提交偏移量。从版本 4.0.2 开始,KafkaHeaders.ACKNOWLEDGMENT 头包含一个 ReceiverOffset 对象,您可以通过调用其 acknowledge()commit() 方法来提交偏移量。
@Bean
public Consumer<Flux<Message<String>>> consume() {
    return msg -> {
        process(msg.getPayload());
        msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
    }
}

有关更多信息,请参阅 reactor-kafka 文档和 javadoc。

此外,从版本 4.0.3 开始,Kafka 消费者属性 reactiveAtmostOnce 可以设置为 true,绑定器将在处理每个轮询返回的记录之前自动提交偏移量。此外,从版本 4.0.3 开始,您可以将消费者属性 reactiveAutoCommit 设置为 true,绑定器将在处理每个轮询返回的记录之后自动提交偏移量。在这些情况下,不存在确认头。

4.0.2 版本也提供了 reactiveAutoCommit,但实现不正确,它的行为类似于 reactiveAtMostOnce

以下是如何使用 reactiveAutoCommit 的示例。

@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
	return flux -> flux
			.doOnNext(inner -> inner
				.doOnNext(val -> {
					log.info(val.value());
				})
				.subscribe())
			.subscribe();
}

请注意,当使用自动提交时,reactor-kafka 返回一个 Flux<Flux<ConsumerRecord<?, ?>>>。鉴于 Spring 无法访问内部 Flux 的内容,应用程序必须处理原生的 ConsumerRecord;对内容不应用消息转换或转换服务。这需要使用原生解码(通过在配置中指定适当类型的 Deserializer)来返回所需类型的记录键/值。

© . This site is unofficial and not affiliated with VMware.