重试关键业务逻辑
在某些场景下,你可能需要重试应用程序中关键的业务逻辑部分。这可能是对关系型数据库的外部调用,或是从 Kafka Streams 处理器调用 REST 端点。这些调用可能因各种原因失败,例如网络问题或远程服务不可用。通常情况下,如果再次尝试,这些失败可能会自行解决。默认情况下,Kafka Streams binder 会为所有输入绑定创建 RetryTemplate
bean。
如果函数具有以下签名:
@Bean
public java.util.function.Consumer<KStream<Object, String>> process()
使用默认绑定名称时,RetryTemplate
将注册为 process-in-0-RetryTemplate
。这遵循绑定名称 (process-in-0
) 后面跟字面量 -RetryTemplate
的约定。对于多个输入绑定,每个绑定都会有一个单独的 RetryTemplate
bean 可用。如果应用程序中存在自定义的 RetryTemplate
bean,并通过 spring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName
提供,则该自定义 bean 将优先于任何输入绑定级别的重试模板配置属性。
一旦绑定中的 RetryTemplate
被注入到应用程序中,就可以用来重试应用程序中的任何关键部分。示例如下:
@Bean
public java.util.function.Consumer<KStream<Object, String>> process(@Lazy @Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
retryTemplate.execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
或者,你可以使用自定义的 RetryTemplate
,如下所示:
@EnableAutoConfiguration
public static class CustomRetryTemplateApp {
@Bean
@StreamRetryTemplate
RetryTemplate fooRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
RetryPolicy retryPolicy = new SimpleRetryPolicy(4);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1);
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
fooRetryTemplate().execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
}
注意,当重试次数耗尽时,默认情况下会抛出最后一个异常,导致处理器终止。如果你希望处理异常并继续处理,可以将一个 RecoveryCallback
添加到 execute
方法中。示例如下:
retryTemplate.execute(context -> {
//Critical business logic goes here.
}, context -> {
//Recovery logic goes here.
return null;
));
有关 Spring Retry 项目的 RetryTemplate
、重试策略 (retry policies)、退避策略 (backoff policies) 等更多信息,请参阅该项目。