StreamsBuilderFactoryBean 配置器
通常需要自定义创建 KafkaStreams
对象的 StreamsBuilderFactoryBean
。基于 Spring Kafka 提供的底层支持,绑定器允许您自定义 StreamsBuilderFactoryBean
。您可以使用 StreamsBuilderFactoryBeanConfigurer
来自定义 StreamsBuilderFactoryBean
本身。然后,一旦您通过此配置器访问 StreamsBuilderFactoryBean
,就可以使用 KafkaStreamsCustomzier
自定义相应的 KafkaStreams
。这两个自定义器都是 Spring for Apache Kafka 项目的一部分。
以下是如何使用 StreamsBuilderFactoryBeanConfigurer
的示例。
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
以上内容展示了您可以用来自定义 StreamsBuilderFactoryBean
的一些操作。您基本上可以调用 StreamsBuilderFactoryBean
中的任何可用变异操作来对其进行自定义。此自定义器将在工厂 Bean 启动之前由绑定器调用。
一旦您获得了对 StreamsBuilderFactoryBean
的访问权限,您还可以自定义底层的 KafkaStreams
对象。以下是如何操作的蓝图。
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
KafkaStreamsCustomizer
将在 StreamsBuilderFactoryBeabn
启动底层 KafkaStreams
之前被调用。
整个应用程序中只能有一个 StreamsBuilderFactoryBeanConfigurer
。那么我们如何处理多个 Kafka Streams 处理器,因为它们中的每一个都由单独的 StreamsBuilderFactoryBean
对象支持?在这种情况下,如果这些处理器的自定义需要不同,那么应用程序需要根据应用程序 ID 应用一些过滤器。
例如,
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
}
};
使用 StreamsBuilderFactoryBeanConfigurer 注册全局状态存储
如上所述,绑定器没有提供将全局状态存储注册为功能的第一类方法。为此,您需要通过 StreamsBuilderFactoryBeanConfigurer
使用自定义器。以下是操作方法。
@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
return streamsBuilderFactoryBean -> {
try {
streamsBuilderFactoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
@Override
public void configureBuilder(StreamsBuilder builder) {
builder.addGlobalStore(
...
);
}
});
}
catch (Exception e) {
}
};
}
对 StreamsBuilder
的任何自定义都必须通过 KafkaStreamsInfrastructureCustomizer
完成,如上所示。如果调用 StreamsBuilderFactoryBean#getObject()
来获取对 StreamsBuilder
对象的访问权限,这可能无法正常工作,因为该 bean 可能处于初始化状态,因此会遇到一些循环依赖问题。
如果您有多个处理器,您希望通过使用应用程序 ID 过滤掉其他 StreamsBuilderFactoryBean
对象,将全局状态存储附加到正确的 StreamsBuilder
上,如上所述。
使用 StreamsBuilderFactoryBeanConfigurer 注册生产异常处理程序
在错误处理部分,我们指出绑定器没有提供处理生产异常的第一类方法。虽然情况如此,您仍然可以使用 StreamsBuilderFacotryBean
自定义器来注册生产异常处理程序。见下文。
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class);
};
}
再次,如果您有多个处理器,您可能希望将其适当地设置到正确的 StreamsBuilderFactoryBean
上。您也可以使用配置属性添加此类生产异常处理程序(有关详细信息,请参见下文),但如果您选择使用编程方法,这是一种选择。