使用 Reactive Kafka Binder 的基本示例
在本节中,我们将展示一些使用 reactive binder 编写响应式 Kafka 应用程序的基本代码片段及其相关细节。
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return s -> s.map(String::toUpperCase);
}
您可以将上述 `uppercase` 函数与基于消息通道的 Kafka binder (`spring-cloud-stream-binder-kafka`) 以及本文讨论的 reactive Kafka binder (`spring-cloud-stream-binder-kafka-reactive`) 一起使用。当与常规 Kafka binder 一起使用此函数时,尽管您在应用程序(即在 `uppercase` 函数中)使用了响应式类型,但您仅在函数执行范围内获得了响应式流。在函数执行上下文之外,由于底层 binder 不是基于响应式堆栈构建的,因此没有响应式优势。因此,尽管这可能看起来像带来了完整的端到端响应式堆栈,但此应用程序仅是部分响应式的。
现在假设您正在使用适用于 Kafka 的适当的 reactive binder - `spring-cloud-stream-binder-kafka-reactive` 来构建上述函数的应用程序。此 binder 实现将从链顶端的消费到链底端的发布,提供完整的响应式优势。这是因为底层 binder 构建在 Reactor Kafka 的核心 API 之上。在消费者端,它使用了 KafkaReceiver,这是一个 Kafka 消费者的响应式实现。类似地,在生产者端,它使用了 KafkaSender API,这是一个 Kafka 生产者的响应式实现。由于 reactive Kafka binder 的基础是建立在适当的响应式 Kafka API 之上的,因此应用程序在使用此 reactive Kafka binder 时可以获得使用响应式技术的全部好处。诸如自动背压等响应式能力均内置于应用程序中。
从版本 4.0.2 开始,您可以通过提供一个或多个 `ReceiverOptionsCustomizer` 或 `SenderOptionsCustomizer` Bean 来定制 `ReceiverOptions` 和 `SenderOptions`。它们是 `BiFunction`,接收绑定名称和初始选项,并返回定制后的选项。这些接口扩展了 `Ordered`,因此当存在多个定制器时,它们将按照所需的顺序应用。
binder 默认不提交偏移量。从版本 4.0.2 开始,`KafkaHeaders.ACKNOWLEDGMENT` 头中包含一个 `ReceiverOffset` 对象,您可以通过调用其 `acknowledge()` 或 `commit()` 方法来提交偏移量。 |
@Bean
public Consumer<Flux<Message<String>> consume() {
return msg -> {
process(msg.getPayload());
msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
}
}
有关更多信息,请参阅 `reactor-kafka` 文档和 javadocs。
此外,从版本 4.0.3 开始,可以将 Kafka 消费者属性 `reactiveAtmostOnce` 设置为 `true`,binder 将在处理每个 poll 返回的记录之前自动提交偏移量。同样,从版本 4.0.3 开始,您可以将消费者属性 `reactiveAutoCommit` 设置为 `true`,binder 将在处理每个 poll 返回的记录之后自动提交偏移量。在这些情况下,不存在确认头。
4.0.2 也提供了 `reactiveAutoCommit`,但实现不正确,其行为类似于 `reactiveAtMostOnce`。 |
以下是如何使用 `reactiveAutoCommit` 的示例。
@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
return flux -> flux
.doOnNext(inner -> inner
.doOnNext(val -> {
log.info(val.value());
})
.subscribe())
.subscribe();
}
请注意,当使用自动提交时,`reactor-kafka` 返回 `Flux<Flux<ConsumerRecord<?, ?>>>`。鉴于 Spring 无法访问内部 flux 的内容,应用程序必须处理原生的 `ConsumerRecord`;内容没有应用消息转换或转换服务。这需要使用原生解码(通过在配置中指定相应类型的 `Deserializer`)来返回所需类型的记录键/值。