StreamsBuilderFactoryBean 配置器
通常需要定制创建 KafkaStreams
对象的 StreamsBuilderFactoryBean
。基于 Spring Kafka 提供的底层支持,该 binder 允许您定制 StreamsBuilderFactoryBean
。您可以使用 StreamsBuilderFactoryBeanConfigurer
来自定义 StreamsBuilderFactoryBean
本身。然后,一旦通过此配置器访问到 StreamsBuilderFactoryBean
,您就可以使用 KafkaStreamsCustomzier
定制相应的 KafkaStreams
。这两个定制器都是 Spring for Apache Kafka 项目的一部分。
以下是使用 StreamsBuilderFactoryBeanConfigurer
的示例。
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
上述示例说明了您可以对 StreamsBuilderFactoryBean
进行的定制。您基本上可以调用 StreamsBuilderFactoryBean
的任何可用变动操作来进行定制。该定制器将在工厂 Bean 启动之前由 binder 调用。
一旦您访问到 StreamsBuilderFactoryBean
,您还可以定制底层的 KafkaStreams
对象。以下是实现这一点的蓝图。
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
KafkaStreamsCustomizer
将在底层的 KafkaStreams
启动之前由 StreamsBuilderFactoryBeabn
调用。
整个应用中只能有一个 StreamsBuilderFactoryBeanConfigurer
。那么如何考虑多个 Kafka Streams 处理器呢?因为它们每一个都由独立的 StreamsBuilderFactoryBean
对象支持。在这种情况下,如果这些处理器需要不同的定制,那么应用需要基于应用 ID 应用一些过滤器。
例如,
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
}
};
使用 StreamsBuilderFactoryBeanConfigurer 注册全局状态存储
如上所述,binder 没有直接提供一种注册全局状态存储的功能。为此,您需要通过 StreamsBuilderFactoryBeanConfigurer
使用定制器。以下是如何实现。
@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
return streamsBuilderFactoryBean -> {
try {
streamsBuilderFactoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
@Override
public void configureBuilder(StreamsBuilder builder) {
builder.addGlobalStore(
...
);
}
});
}
catch (Exception e) {
}
};
}
对 StreamsBuilder
的任何定制都必须通过 KafkaStreamsInfrastructureCustomizer
完成,如上所示。如果调用 StreamsBuilderFactoryBean#getObject()
来获取 StreamsBuilder
对象,这可能无法工作,因为该 Bean 可能处于初始化阶段,从而导致一些循环依赖问题。
如果您有多个处理器,您需要通过使用应用 ID 过滤掉其他 StreamsBuilderFactoryBean
对象,将全局状态存储附加到正确的 StreamsBuilder
,如上所述。
使用 StreamsBuilderFactoryBeanConfigurer 注册生产异常处理器
在错误处理部分,我们指出 binder 没有直接提供处理生产异常的方式。尽管如此,您仍然可以使用 StreamsBuilderFactoryBean
定制器来注册生产异常处理器。请参阅下文。
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class);
};
}
同样,如果您有多个处理器,您可能希望针对正确的 StreamsBuilderFactoryBean
适当设置它。您也可以使用配置属性添加此类生产异常处理器(有关更多信息,请参阅下文),但如果您选择采用编程方法,这是一个选项。