使用响应式 Kafka 绑定器的基本示例
在本节中,我们将展示一些使用响应式绑定器编写响应式 Kafka 应用程序的基本代码片段及其详细信息。
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return s -> s.map(String::toUpperCase);
}
您可以将上述 uppercase 函数与基于消息通道的 Kafka 绑定器(spring-cloud-stream-binder-kafka)以及本节讨论的响应式 Kafka 绑定器(spring-cloud-stream-binder-kafka-reactive)一起使用。当将此函数与常规 Kafka 绑定器一起使用时,尽管您在应用程序中(即在 uppercase 函数中)使用了响应式类型,但您只在函数执行期间获得响应式流。在函数执行上下文之外,没有响应式的好处,因为底层绑定器不是基于响应式堆栈的。因此,尽管这可能看起来像带来了完整的端到端响应式堆栈,但此应用程序只是部分响应式的。
现在假设您正在将适用于 Kafka 的正确响应式绑定器(spring-cloud-stream-binder-kafka-reactive)与上述函数应用程序一起使用。此绑定器实现将从链的顶端消费到末端发布,提供完整的响应式优势。这是因为底层绑定器基于 Reactor Kafka 的核心 API 构建。在消费者端,它使用 KafkaReceiver,这是一个 Kafka 消费者的响应式实现。同样,在生产者端,它使用 KafkaSender API,这是一个 Kafka 生产者的响应式实现。由于响应式 Kafka 绑定器的基础是建立在适当的响应式 Kafka API 之上的,因此应用程序可以充分利用响应式技术的优势。使用此响应式 Kafka 绑定器时,应用程序内置了诸如自动背压等响应式功能。
从版本 4.0.2 开始,您可以通过分别提供一个或多个 ReceiverOptionsCustomizer 或 SenderOptionsCustomizer bean 来定制 ReceiverOptions 和 SenderOptions。它们是 BiFunction,接收绑定名称和初始选项,并返回定制的选项。这些接口扩展了 Ordered,因此当存在多个定制器时,它们将按所需的顺序应用。
绑定器默认不提交偏移量。从版本 4.0.2 开始,KafkaHeaders.ACKNOWLEDGMENT 头包含一个 ReceiverOffset 对象,您可以通过调用其 acknowledge() 或 commit() 方法来提交偏移量。 |
@Bean
public Consumer<Flux<Message<String>>> consume() {
return msg -> {
process(msg.getPayload());
msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
}
}
有关更多信息,请参阅 reactor-kafka 文档和 javadoc。
此外,从版本 4.0.3 开始,Kafka 消费者属性 reactiveAtmostOnce 可以设置为 true,绑定器将在处理每个轮询返回的记录之前自动提交偏移量。此外,从版本 4.0.3 开始,您可以将消费者属性 reactiveAutoCommit 设置为 true,绑定器将在处理每个轮询返回的记录之后自动提交偏移量。在这些情况下,不存在确认头。
4.0.2 版本也提供了 reactiveAutoCommit,但实现不正确,它的行为类似于 reactiveAtMostOnce。 |
以下是如何使用 reactiveAutoCommit 的示例。
@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
return flux -> flux
.doOnNext(inner -> inner
.doOnNext(val -> {
log.info(val.value());
})
.subscribe())
.subscribe();
}
请注意,当使用自动提交时,reactor-kafka 返回一个 Flux<Flux<ConsumerRecord<?, ?>>>。鉴于 Spring 无法访问内部 Flux 的内容,应用程序必须处理原生的 ConsumerRecord;对内容不应用消息转换或转换服务。这需要使用原生解码(通过在配置中指定适当类型的 Deserializer)来返回所需类型的记录键/值。