定制 Kafka 绑定器健康指示器

覆盖默认 Kafka Binder 健康指示器

当 Spring Boot Actuator 在类路径上时,Kafka binder 会激活一个默认的健康指示器。该健康指示器会检查 binder 的健康状况以及与 Kafka broker 的任何通信问题。如果应用程序想要禁用此默认健康检查实现并包含自定义实现,则可以为 KafkaBinderHealth 接口提供一个实现。KafkaBinderHealth 是一个从 HealthIndicator 扩展而来的标记接口。在自定义实现中,它必须提供 health() 方法的实现。自定义实现必须作为 bean 存在于应用程序配置中。当 binder 发现自定义实现时,它将使用自定义实现而不是默认实现。以下是应用程序中此类自定义实现 bean 的示例。

@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator() {
    return new KafkaBinderHealth() {
        @Override
        public Health health() {
            // custom implementation details.
        }
    };
}

自定义 Kafka Binder 健康指示器示例

以下是编写自定义 Kafka binder HealthIndicator 的伪代码。在此示例中,我们尝试通过首先检查集群连接性,然后检查与主题相关的问题来覆盖 binder 提供的 Kafka HealthIndicator。

首先,我们需要创建 KafkaBinderHealth 接口的自定义实现。

public class KafkaBinderHealthImplementation implements KafkaBinderHealth {
    @Value("${spring.cloud.bus.destination}")
    private String topic;
    private final AdminClient client;

    public KafkaBinderHealthImplementation(final KafkaAdmin admin) {
		// More about configuring Kafka
		// https://docs.springjava.cn/spring-kafka/reference/html/#configuring-topics
        this.client = AdminClient.create(admin.getConfigurationProperties());
    }

    @Override
    public Health health() {
        if (!checkBrokersConnection()) {
            logger.error("Error when connect brokers");
			return Health.down().withDetail("BrokersConnectionError", "Error message").build();
        }
		if (!checkTopicConnection()) {
			logger.error("Error when trying to connect with specific topic");
			return Health.down().withDetail("TopicError", "Error message with topic name").build();
		}
        return Health.up().build();
    }

    public boolean checkBrokersConnection() {
        // Your implementation
    }

    public boolean checkTopicConnection() {
		// Your implementation
    }
}

然后我们需要为自定义实现创建一个 bean。

@Configuration
public class KafkaBinderHealthIndicatorConfiguration {
	@Bean
	public KafkaBinderHealth kafkaBinderHealthIndicator(final KafkaAdmin admin) {
		return new KafkaBinderHealthImplementation(admin);
	}
}
© . This site is unofficial and not affiliated with VMware.