StreamsBuilderFactoryBean 配置器

通常需要定制创建 KafkaStreams 对象的 StreamsBuilderFactoryBean。基于 Spring Kafka 提供的底层支持,该 binder 允许您定制 StreamsBuilderFactoryBean。您可以使用 StreamsBuilderFactoryBeanConfigurer 来自定义 StreamsBuilderFactoryBean 本身。然后,一旦通过此配置器访问到 StreamsBuilderFactoryBean,您就可以使用 KafkaStreamsCustomzier 定制相应的 KafkaStreams。这两个定制器都是 Spring for Apache Kafka 项目的一部分。

以下是使用 StreamsBuilderFactoryBeanConfigurer 的示例。

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return sfb -> sfb.setStateListener((newState, oldState) -> {
         //Do some action here!
    });
}

上述示例说明了您可以对 StreamsBuilderFactoryBean 进行的定制。您基本上可以调用 StreamsBuilderFactoryBean 的任何可用变动操作来进行定制。该定制器将在工厂 Bean 启动之前由 binder 调用。

一旦您访问到 StreamsBuilderFactoryBean,您还可以定制底层的 KafkaStreams 对象。以下是实现这一点的蓝图。

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                });
            }
        });
    };
}

KafkaStreamsCustomizer 将在底层的 KafkaStreams 启动之前由 StreamsBuilderFactoryBeabn 调用。

整个应用中只能有一个 StreamsBuilderFactoryBeanConfigurer。那么如何考虑多个 Kafka Streams 处理器呢?因为它们每一个都由独立的 StreamsBuilderFactoryBean 对象支持。在这种情况下,如果这些处理器需要不同的定制,那么应用需要基于应用 ID 应用一些过滤器。

例如,

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
                .equals("processor1-application-id")) {
            factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
                @Override
                public void customize(KafkaStreams kafkaStreams) {
                    kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                    });
                }
            });
        }
    };

使用 StreamsBuilderFactoryBeanConfigurer 注册全局状态存储

如上所述,binder 没有直接提供一种注册全局状态存储的功能。为此,您需要通过 StreamsBuilderFactoryBeanConfigurer 使用定制器。以下是如何实现。

@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
    return streamsBuilderFactoryBean -> {
        try {
            streamsBuilderFactoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
                  @Override
                  public void configureBuilder(StreamsBuilder builder) {
                      builder.addGlobalStore(
                              ...
                      );
                  }
              });
        }
        catch (Exception e) {

        }
    };
}

StreamsBuilder 的任何定制都必须通过 KafkaStreamsInfrastructureCustomizer 完成,如上所示。如果调用 StreamsBuilderFactoryBean#getObject() 来获取 StreamsBuilder 对象,这可能无法工作,因为该 Bean 可能处于初始化阶段,从而导致一些循环依赖问题。

如果您有多个处理器,您需要通过使用应用 ID 过滤掉其他 StreamsBuilderFactoryBean 对象,将全局状态存储附加到正确的 StreamsBuilder,如上所述。

使用 StreamsBuilderFactoryBeanConfigurer 注册生产异常处理器

在错误处理部分,我们指出 binder 没有直接提供处理生产异常的方式。尽管如此,您仍然可以使用 StreamsBuilderFactoryBean 定制器来注册生产异常处理器。请参阅下文。

@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                            CustomProductionExceptionHandler.class);
    };
}

同样,如果您有多个处理器,您可能希望针对正确的 StreamsBuilderFactoryBean 适当设置它。您也可以使用配置属性添加此类生产异常处理器(有关更多信息,请参阅下文),但如果您选择采用编程方法,这是一个选项。