Spring Cloud Stream Schema Registry
引言
当组织拥有基于消息传递的发布/订阅架构,并且多个生产者和消费者微服务相互通信时,这些微服务通常需要就一个基于模式的契约达成一致。当需要演进此类模式以适应新的业务需求时,现有组件仍需继续正常工作。Spring Cloud Stream 支持使用独立的 schema registry 服务器,通过该服务器可以注册模式并供应用程序使用。Spring Cloud Stream 的 schema registry 支持还提供了基于 Avro 的 schema registry 客户端支持,这些客户端实质上提供了消息转换器,通过与 schema registry 通信来在消息转换过程中协调模式。Spring Cloud Stream 提供的模式演进支持既可与前面提到的独立 schema registry 配合使用,也可与 Confluent 提供的专门用于 Apache Kafka 的 schema registry 配合使用。
Spring Cloud Stream Schema Registry 概览
Spring Cloud Stream Schema Registry 支持模式演进,以便数据可以随时间演变,同时仍与较旧或较新的生产者和消费者兼容,反之亦然。大多数序列化模型,特别是那些旨在跨不同平台和语言实现可移植性的模型,都依赖于描述数据如何在二进制负载中序列化的模式。为了序列化数据然后对其进行解释,发送方和接收方都必须能够访问描述二进制格式的模式。在某些情况下,模式可以在序列化时从负载类型推断出来,或者在反序列化时从目标类型推断出来。然而,许多应用程序得益于访问描述二进制数据格式的显式模式。Schema registry 允许您以文本格式(通常是 JSON)存储模式信息,并使该信息可供需要接收和发送二进制格式数据的各种应用程序访问。模式可以作为一个包含以下元素的元组进行引用:
-
一个主题 (subject),它是模式的逻辑名称
-
模式版本
-
模式格式,它描述了数据的二进制格式
Spring Cloud Stream Schema Registry 提供以下组件:
-
独立 Schema Registry 服务器
By default, it is using an H2 database, but server can be used with PostgreSQL or MySQL by providing appropriate datasource configuration.
-
能够通过与 Schema Registry 通信进行消息编组的 Schema Registry 客户端。
Currently, the client can communicate to the standalone schema registry or the Confluent Schema Registry.
Schema Registry 客户端
用于与 schema registry 服务器交互的客户端抽象是 `SchemaRegistryClient` 接口,其结构如下:
public interface SchemaRegistryClient {
SchemaRegistrationResponse register(String subject, String format, String schema);
String fetch(SchemaReference schemaReference);
String fetch(Integer id);
}
Spring Cloud Stream 提供了与自身 schema 服务器和与 Confluent Schema Registry 交互的开箱即用实现。
可以通过使用 `@EnableSchemaRegistryClient` 来配置 Spring Cloud Stream schema registry 的客户端,如下所示:
@SpringBootApplication
@EnableSchemaRegistryClient
public class ConsumerApplication {
}
默认转换器经过优化,不仅缓存远程服务器的模式,还缓存 `parse()` 和 `toString()` 方法,这些方法开销很大。因此,它使用一个不缓存响应的 `DefaultSchemaRegistryClient`。如果您打算改变默认行为,可以直接在代码中使用该客户端,并将其覆盖以达到预期结果。为此,您必须将属性 `spring.cloud.stream.schemaRegistryClient.cached=true` 添加到您的应用程序属性中。 |
Schema Registry 客户端属性
Schema Registry 客户端支持以下属性:
spring.cloud.stream.schemaRegistryClient.endpoint
-
schema 服务器的位置。设置此项时,请使用完整的 URL,包括协议 (`http` 或 `https`)、端口和上下文路径。
- 默认值
spring.cloud.stream.schemaRegistryClient.cached
-
客户端是否缓存 schema 服务器的响应。通常设置为 `false`,因为缓存发生在消息转换器中。使用 schema registry 客户端的客户端应将此设置为 `true`。
- 默认值
-
false
Avro Schema Registry 客户端消息转换器
对于已在应用上下文中注册了 `SchemaRegistryClient` bean 的应用程序,Spring Cloud Stream 会为模式管理自动配置一个 Apache Avro 消息转换器。这简化了模式演进,因为接收消息的应用程序可以轻松访问写模式 (writer schema),并与它们自己的读模式 (reader schema) 进行协调。
对于出站消息,如果绑定的内容类型设置为 `application/*+avro`,则 `MessageConverter` 会被激活,如下例所示:
spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro
在出站转换期间,消息转换器会尝试推断每条出站消息的模式(基于其类型),并使用 `SchemaRegistryClient` 将其注册到主题(基于负载类型)。如果已找到相同的模式,则会检索其引用。否则,模式会被注册,并提供新的版本号。消息会以 `contentType` 头发送,使用以下方案:`application/[prefix].[subject].v[version]+avro`,其中 `prefix` 可配置,而 `subject` 是从负载类型推导出来的。
例如,类型为 `User` 的消息可能会以二进制负载发送,其内容类型为 `application/vnd.user.v2+avro`,其中 `user` 是主题,`2` 是版本号。
接收消息时,转换器从入站消息的头部推断模式引用,并尝试检索它。该模式用作反序列化过程中的写模式 (writer schema)。
Avro Schema Registry 消息转换器属性
如果您已通过设置 `spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro` 启用了基于 Avro 的 schema registry 客户端,则可以通过设置以下属性来自定义注册行为。
- spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled
-
启用此项,让转换器使用反射从 POJO 推断 Schema。
默认值:
false
- spring.cloud.stream.schema.avro.readerSchema
-
Avro 通过比较写模式(writer schema,原始负载)和读模式(reader schema,您的应用负载)来比较模式版本。有关更多信息,请参见Avro 文档。如果设置了此项,它将覆盖模式服务器上的任何查找,并使用本地模式作为读模式。默认值:
null
- spring.cloud.stream.schema.avro.schemaLocations
-
将此属性中列出的所有 `.avsc` 文件注册到 Schema 服务器。
默认值:
empty
- spring.cloud.stream.schema.avro.prefix
-
用于 Content-Type 头部的R前缀。
默认值:
vnd
- spring.cloud.stream.schema.avro.subjectNamingStrategy
-
确定在 schema registry 中注册 Avro 模式时使用的主题 (subject) 名称。提供了两种实现:
org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy
,其中主题是模式名称;以及org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy
,它使用 Avro 模式命名空间和名称返回完全限定的主题。可以通过实现org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy
创建自定义策略。默认值:
org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy
- spring.cloud.stream.schema.avro.ignoreSchemaRegistryServer
-
忽略任何 schema registry 通信。对于测试很有用,这样在运行单元测试时,它不会不必要地尝试连接到 Schema Registry 服务器。
默认值:
false
Apache Avro 消息转换器
Spring Cloud Stream 通过其 `spring-cloud-stream-schema-registry-client` 模块提供对基于模式的消息转换器的支持。目前,开箱即用的基于模式消息转换器唯一支持的序列化格式是 Apache Avro,未来版本将添加更多格式。
spring-cloud-stream-schema-registry-client
模块包含两种可用于 Apache Avro 序列化的消息转换器:
-
使用序列化或反序列化对象的类信息,或者在启动时已知位置的模式的转换器。
-
使用 schema registry 的转换器。它们在运行时定位模式,并在领域对象演变时动态注册新模式。
带模式支持的转换器
AvroSchemaMessageConverter
支持通过使用预定义模式或使用类中可用的模式信息(无论是反射还是包含在 `SpecificRecord` 中)来序列化和反序列化消息。如果您提供了自定义转换器,则不会创建默认的 AvroSchemaMessageConverter bean。以下示例展示了一个自定义转换器:
要使用自定义转换器,只需将其添加到应用上下文中,可以选择指定一个或多个与之关联的 `MimeTypes`。默认的 `MimeType` 是 `application/avro`。
如果转换的目标类型是 `GenericRecord`,则必须设置模式。
以下示例展示了如何在 sink 应用程序中配置不带预定义模式的 Apache Avro `MessageConverter`。在此示例中,请注意 mime 类型的值是 `avro/bytes`,而不是默认的 `application/avro`。
@SpringBootApplication
public static class SinkApplication {
//...
@Bean
public MessageConverter userMessageConverter() {
return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
}
}
反之,以下应用程序注册了一个带有预定义模式(在 classpath 中找到)的转换器:
@SpringBootApplication
public static class SinkApplication {
//...
@Bean
public MessageConverter userMessageConverter() {
AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
return converter;
}
}
Schema Registry 服务器
Spring Cloud Stream 提供了一个 schema registry 服务器实现。要使用它,您可以下载最新的 `spring-cloud-stream-schema-registry-server` release 并将其作为独立应用程序运行:
wget https://repo1.maven.org/maven2/org/springframework/cloud/spring-cloud-stream-schema-registry-server/4.0.3/spring-cloud-stream-schema-registry-server-4.0.3.jar
java -jar ./spring-cloud-stream-schema-registry-server-4.0.3.jar
您可以将 schema registry 嵌入到您现有的 Spring Boot web 应用程序中。为此,将 `spring-cloud-stream-schema-registry-core` artifact 添加到您的项目中,并使用 `@EnableSchemaRegistryServer` 注解,这将向您的应用程序添加 schema registry 服务器 REST 控制器。以下示例展示了一个启用 schema registry 的 Spring Boot 应用程序:
|
spring.cloud.stream.schema.server.path
属性可用于控制 schema 服务器的根路径(特别是在嵌入到其他应用程序中时)。spring.cloud.stream.schema.server.allowSchemaDeletion
布尔属性启用模式删除功能。默认情况下,此功能是禁用的。
schema registry 服务器使用关系数据库来存储模式。默认情况下,它使用嵌入式数据库。您可以使用Spring Boot SQL 数据库和 JDBC 配置选项来自定义模式存储。
Schema Registry 服务器 API
Schema Registry 服务器 API 包含以下操作:
-
POST /
— 参见 注册新模式 -
GET /{subject}/{format}/{version}
— 参见 通过主题、格式和版本检索现有模式 -
GET /{subject}/{format}
— 参见 通过主题和格式检索现有模式 -
GET /schemas/{id}
— 参见 通过 ID 检索现有模式 -
DELETE /{subject}/{format}/{version}
— 参见 通过主题、格式和版本删除模式 -
DELETE /schemas/{id}
— 参见 通过 ID 删除模式 -
DELETE /{subject}
— 参见 通过主题删除模式
注册新模式
要注册新模式,请向 `/` 端点发送一个 `POST` 请求。
`/` 接受包含以下字段的 JSON 负载:
-
subject
: 模式主题 -
format
: 模式格式 -
definition
: 模式定义
其响应是一个 JSON 格式的模式对象,包含以下字段:
-
id
: 模式 ID -
subject
: 模式主题 -
format
: 模式格式 -
version
: 模式版本 -
definition
: 模式定义
通过主题、格式和版本检索现有模式
要通过主题、格式和版本检索现有模式,请向 `{subject}/{format}/{version}` 端点发送一个 `GET` 请求。
其响应是一个 JSON 格式的模式对象,包含以下字段:
-
id
: 模式 ID -
subject
: 模式主题 -
format
: 模式格式 -
version
: 模式版本 -
definition
: 模式定义
通过主题和格式检索现有模式
要通过主题和格式检索现有模式,请向 `/subject/format` 端点发送一个 `GET` 请求。
其响应是一个模式列表,每个模式对象都是 JSON 格式,包含以下字段:
-
id
: 模式 ID -
subject
: 模式主题 -
format
: 模式格式 -
version
: 模式版本 -
definition
: 模式定义
通过 ID 检索现有模式
要通过其 ID 检索模式,请向 `/schemas/{id}` 端点发送一个 `GET` 请求。
其响应是一个 JSON 格式的模式对象,包含以下字段:
-
id
: 模式 ID -
subject
: 模式主题 -
format
: 模式格式 -
version
: 模式版本 -
definition
: 模式定义
通过主题删除模式
DELETE /{subject}
通过其主题删除现有模式。
本说明仅适用于 Spring Cloud Stream 1.1.0.RELEASE 的用户。Spring Cloud Stream 1.1.0.RELEASE 使用表名 schema 来存储 Schema 对象。Schema 在许多数据库实现中是一个关键字。为了避免将来出现任何冲突,从 1.1.1.RELEASE 版本开始,我们选择了 SCHEMA_REPOSITORY 作为存储表的名称。任何计划升级的 Spring Cloud Stream 1.1.0.RELEASE 用户都应在升级之前将其现有模式迁移到新表中。 |
使用 Confluent 的 Schema Registry
默认配置会创建一个 `DefaultSchemaRegistryClient` bean。如果您想使用 Confluent schema registry,则需要创建一个 `ConfluentSchemaRegistryClient` 类型的 bean,它将取代框架默认配置的 bean。以下示例展示了如何创建此类 bean:
@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
ConfluentSchemaRegistryClient 已针对 Confluent 平台版本 4.0.0 进行了测试。 |
模式注册过程(序列化)
注册过程的第一部分是从正在通过通道发送的负载中提取模式。`SpecificRecord` 或 `GenericRecord` 等 Avro 类型已包含模式,可以立即从实例中检索。对于 POJO,如果将 `spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled` 属性设置为 `true`(默认值),则会推断模式。
获取模式后,转换器会从远程服务器加载其元数据(版本)。首先,它查询本地缓存。如果未找到结果,则将数据提交到服务器,服务器回复版本信息。转换器总是缓存结果,以避免为每个需要序列化的新消息查询 Schema 服务器的开销。
有了模式版本信息,转换器将消息的 `contentType` 头设置为携带版本信息,例如:`application/vnd.user.v1+avro`。
模式解析过程(反序列化)
读取包含版本信息的消息时(即,带有 模式注册过程(序列化) 中描述的方案的 `contentType` 头),转换器会查询 Schema 服务器以获取入站消息的写模式 (writer schema)。一旦找到入站消息的正确模式,它会检索读模式 (reader schema),并使用 Avro 的模式解析支持将其读入读定义(设置默认值和任何缺失的属性)。
您应该理解写模式(写入消息的应用程序)和读模式(接收应用程序)之间的区别。我们建议花点时间阅读Avro 术语并理解这个过程。Spring Cloud Stream 始终获取写模式以确定如何读取消息。如果您希望 Avro 的模式演进支持正常工作,则需要确保为您的应用程序正确设置了 `readerSchema`。 |