基于事件类型的 Kafka Streams 应用路由

常规基于消息通道的 Binder 中可用的路由函数在 Kafka Streams Binder 中不受支持。然而,Kafka Streams Binder 仍然通过入站记录上的事件类型记录头提供路由功能。

要启用基于事件类型的路由,应用程序必须提供以下属性。

spring.cloud.stream.kafka.streams.bindings.<绑定名称>.consumer.eventTypes.

这可以是逗号分隔的值。

例如,假设我们有这个函数

@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
    return input -> input;
}

我们还假设只有当传入记录的事件类型为 foobar 时,才执行此函数中的业务逻辑。这可以通过在绑定上使用 eventTypes 属性来表达,如下所示。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar

现在,当应用程序运行时,binder 会检查每个传入记录的 event_type 头部,看其值是否设置为 foobar。如果两者都没有找到,则会跳过函数执行。

默认情况下,binder 期望记录头部键为 event_type,但这可以按绑定进行更改。例如,如果想将此绑定的头部键从默认值更改为 my_event,可以按如下方式更改。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event.

在使用 Kafka Streams binder 的事件路由功能时,它会使用 byte array Serde 来反序列化所有传入记录。只有当记录头部与事件类型匹配时,它才会使用实际的 Serde 来进行适当的反序列化,无论是使用配置的 Serde 还是推断的 Serde。如果您在绑定上设置了反序列化异常处理器,这会引入问题,因为预期的反序列化只在调用栈更深处发生,从而导致意外错误。为了解决这个问题,您可以在绑定上设置以下属性,以强制 binder 使用配置或推断的 Serde,而不是 byte array Serde

spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEvents

通过这种方式,应用程序在使用事件路由功能时可以立即检测到反序列化问题,并可以采取适当的处理决策。