在 Spring 中管理的生产者拦截器
从版本 3.0.0 开始,对于生产者拦截器,你可以让 Spring 直接将其作为 Bean 管理,而不是在 Apache Kafka 生产者配置中提供拦截器的类名。如果采用这种方法,则需要将此生产者拦截器设置到 KafkaTemplate
上。下面是使用与上面相同的 MyProducerInterceptor
的示例,但更改为不使用内部配置属性。
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
private final SomeBean bean;
public MyProducerInterceptor(SomeBean bean) {
this.bean = bean;
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
this.bean.someMethod("producer interceptor");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
@Bean
public MyProducerInterceptor myProducerInterceptor(SomeBean someBean) {
return new MyProducerInterceptor(someBean);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf, MyProducerInterceptor myProducerInterceptor) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(pf);
kafkaTemplate.setProducerInterceptor(myProducerInterceptor);
}
在记录发送之前,会调用生产者拦截器的 onSend
方法。一旦服务器发送了数据发布的确认,就会调用 onAcknowledgement
方法。onAcknowledgement
在生产者调用任何用户回调之前被调用。
如果你有多个这样的生产者拦截器通过 Spring 管理,并且需要应用到 KafkaTemplate
上,则需要使用 CompositeProducerInterceptor
代替。CompositeProducerInterceptor
允许按顺序添加单个生产者拦截器。底层 ProducerInterceptor
实现中的方法会按照添加到 CompositeProducerInterceptor
的顺序被调用。