Spring Cloud Stream 模式注册表

简介

当组织拥有基于消息的发布/订阅架构,并且多个生产者和消费者微服务相互通信时,这些微服务通常需要就基于模式的契约达成一致。当这种模式需要演进以适应新的业务需求时,现有组件仍然需要继续工作。Spring Cloud Stream 提供对独立模式注册表服务器的支持,通过该服务器,上述模式可以被注册并由应用程序使用。Spring Cloud Stream 模式注册表支持还为基于 Avro 的模式注册表客户端提供支持,这些客户端本质上提供消息转换器,与模式注册表通信以在消息转换期间协调模式。Spring Cloud Stream 提供的模式演进支持适用于上述独立模式注册表以及 Confluent 提供的专门用于 Apache Kafka 的模式注册表。

Spring Cloud Stream 模式注册表概述

Spring Cloud Stream 模式注册表提供模式演进支持,以便数据可以随着时间演进,并且仍然适用于旧的或新的生产者和消费者,反之亦然。大多数序列化模型,尤其是那些旨在跨不同平台和语言实现可移植性的模型,都依赖于描述数据如何以二进制有效负载序列化的模式。为了序列化数据然后解释它,发送方和接收方都必须能够访问描述二进制格式的模式。在某些情况下,模式可以从序列化时的有效负载类型或反序列化时的目标类型推断出来。然而,许多应用程序受益于能够访问描述二进制数据格式的显式模式。模式注册表允许您以文本格式(通常是 JSON)存储模式信息,并使该信息可供需要它以二进制格式接收和发送数据的各种应用程序访问。模式可以作为由以下部分组成的元组进行引用:

  • 作为模式逻辑名称的主题

  • 模式版本

  • 模式格式,描述数据的二进制格式

Spring Cloud Stream 模式注册表提供以下组件:

  • 独立模式注册表服务器

    By default, it is using an H2 database, but server can be used with PostgreSQL or MySQL by providing appropriate datasource configuration.
  • 能够通过与模式注册表通信进行消息编组的模式注册表客户端。

    Currently, the client can communicate to the standalone schema registry or the Confluent Schema Registry.

模式注册表客户端

与模式注册表服务器交互的客户端抽象是 SchemaRegistryClient 接口,其结构如下:

public interface SchemaRegistryClient {

    SchemaRegistrationResponse register(String subject, String format, String schema);

    String fetch(SchemaReference schemaReference);

    String fetch(Integer id);

}

Spring Cloud Stream 提供开箱即用的实现,用于与其自己的模式服务器交互以及与 Confluent 模式注册表交互。

Spring Cloud Stream 模式注册表的客户端可以通过使用 @EnableSchemaRegistryClient 进行配置,如下所示:

@SpringBootApplication
@EnableSchemaRegistryClient
public class ConsumerApplication {

}
默认转换器经过优化,不仅缓存来自远程服务器的模式,还缓存 parse()toString() 方法,这两个方法非常耗费资源。因此,它使用一个不缓存响应的 DefaultSchemaRegistryClient。如果您打算更改默认行为,可以直接在代码中使用客户端并将其覆盖为所需的结果。为此,您必须在应用程序属性中添加属性 spring.cloud.stream.schemaRegistryClient.cached=true

模式注册表客户端属性

模式注册表客户端支持以下属性

spring.cloud.stream.schemaRegistryClient.endpoint

模式服务器的位置。设置此项时,请使用完整的 URL,包括协议 (httphttps)、端口和上下文路径。

默认值

localhost:8990/

spring.cloud.stream.schemaRegistryClient.cached

客户端是否应缓存模式服务器响应。通常设置为 false,因为缓存发生在消息转换器中。使用模式注册表客户端的客户端应将此设置为 true

默认值

Avro 模式注册表客户端消息转换器

对于在应用程序上下文中注册了 SchemaRegistryClient bean 的应用程序,Spring Cloud Stream 会自动配置一个 Apache Avro 消息转换器用于模式管理。这简化了模式演进,因为接收消息的应用程序可以轻松访问写入者模式,该模式可以与其自己的读取者模式进行协调。

对于出站消息,如果绑定内容类型设置为 application/*+avro,则 MessageConverter 将被激活,如以下示例所示:

spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro

在出站转换期间,消息转换器尝试推断每个出站消息的模式(基于其类型),并使用 SchemaRegistryClient 将其注册到主题(基于有效负载类型)。如果已经找到相同的模式,则检索对其的引用。如果未找到,则注册模式,并提供新的版本号。消息将使用以下方案的 contentType 标头发送:application/[prefix].[subject].v[version]+avro,其中 prefix 可配置,subject 从有效负载类型推断。

例如,类型为 User 的消息可以作为二进制有效负载发送,其内容类型为 application/vnd.user.v2+avro,其中 user 是主题,2 是版本号。

接收消息时,转换器从传入消息的标头推断模式引用,并尝试检索它。该模式用作反序列化过程中的写入者模式。

Avro 模式注册表消息转换器属性

如果通过设置 spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro 启用了基于 Avro 的模式注册表客户端,则可以通过设置以下属性来自定义注册行为。

spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled

如果要让转换器使用反射从 POJO 推断模式,请启用此项。

默认值:false

spring.cloud.stream.schema.avro.readerSchema

Avro 通过查看写入者模式(原始有效负载)和读取者模式(您的应用程序有效负载)来比较模式版本。有关更多信息,请参阅 Avro 文档。如果设置,此项将覆盖模式服务器上的任何查找,并使用本地模式作为读取者模式。默认值:null

spring.cloud.stream.schema.avro.schemaLocations

将此属性中列出的任何 .avsc 文件注册到模式服务器。

默认值:empty

spring.cloud.stream.schema.avro.prefix

在 Content-Type 标头中使用的前缀。

默认值:vnd

spring.cloud.stream.schema.avro.subjectNamingStrategy

确定用于在模式注册表中注册 Avro 模式的主题名称。有两种实现可用:org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy,其中主题是模式名称;以及 org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy,它使用 Avro 模式命名空间和名称返回完全限定的主题。可以通过实现 org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy 创建自定义策略。

默认值:org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy

spring.cloud.stream.schema.avro.ignoreSchemaRegistryServer

忽略任何模式注册表通信。对于测试目的很有用,这样在运行单元测试时,它不会不必要地尝试连接到模式注册表服务器。

默认值:false

Apache Avro 消息转换器

Spring Cloud Stream 通过其 spring-cloud-stream-schema-registry-client 模块提供对基于模式的消息转换器的支持。目前,对于基于模式的消息转换器,开箱即用仅支持 Apache Avro 序列化格式,未来版本将添加更多格式。

spring-cloud-stream-schema-registry-client 模块包含两种类型的消息转换器,可用于 Apache Avro 序列化:

  • 使用序列化或反序列化对象的类信息或在启动时已知位置的模式的转换器。

  • 使用模式注册表的转换器。它们在运行时查找模式并随着域对象的演进而动态注册新模式。

支持模式的转换器

AvroSchemaMessageConverter 支持通过使用预定义模式或使用类中可用的模式信息(通过反射或包含在 SpecificRecord 中)序列化和反序列化消息。如果您提供自定义转换器,则不会创建默认的 AvroSchemaMessageConverter bean。以下示例显示了自定义转换器:

要使用自定义转换器,您可以将其添加到应用程序上下文,并可选择指定一个或多个 MimeType 以将其关联。默认的 MimeTypeapplication/avro

如果转换的目标类型是 GenericRecord,则必须设置模式。

以下示例展示了如何在接收器应用程序中配置转换器,通过注册没有预定义模式的 Apache Avro MessageConverter。在此示例中,请注意 mime 类型值为 avro/bytes,而不是默认的 application/avro

@SpringBootApplication
public static class SinkApplication {

  //...

  @Bean
  public MessageConverter userMessageConverter() {
      return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
  }
}

相反,以下应用程序注册了一个带有预定义模式(在类路径中找到)的转换器

@SpringBootApplication
public static class SinkApplication {

  //...

  @Bean
  public MessageConverter userMessageConverter() {
      AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
      converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
      return converter;
  }
}

模式注册表服务器

Spring Cloud Stream 提供了一个模式注册表服务器实现。要使用它,您可以下载最新的 spring-cloud-stream-schema-registry-server 版本并将其作为独立应用程序运行。

wget https://repo1.maven.org/maven2/org/springframework/cloud/spring-cloud-stream-schema-registry-server/4.0.3/spring-cloud-stream-schema-registry-server-4.0.3.jar
java -jar ./spring-cloud-stream-schema-registry-server-4.0.3.jar

您可以将模式注册表嵌入到现有的 Spring Boot Web 应用程序中。为此,请将 spring-cloud-stream-schema-registry-core 工件添加到您的项目,并使用 @EnableSchemaRegistryServer 注解,该注解会将模式注册表服务器 REST 控制器添加到您的应用程序中。以下示例显示了一个启用模式注册表的 Spring Boot 应用程序:

@SpringBootApplication
@EnableSchemaRegistryServer
public class SchemaRegistryServerApplication {
public static void main(String[] args) {
SpringApplication.run(SchemaRegistryServerApplication.class, args);
}
}

spring.cloud.stream.schema.server.path 属性可用于控制模式服务器的根路径(尤其是在嵌入到其他应用程序中时)。spring.cloud.stream.schema.server.allowSchemaDeletion 布尔属性启用模式删除。默认情况下,此功能是禁用的。

模式注册表服务器使用关系数据库来存储模式。默认情况下,它使用嵌入式数据库。您可以使用 Spring Boot SQL 数据库和 JDBC 配置选项来自定义模式存储。

模式注册表服务器 API

模式注册表服务器 API 包含以下操作

注册新模式

要注册新模式,请向 / 端点发送 POST 请求。

/ 接受包含以下字段的 JSON 有效负载

  • subject:模式主题

  • format:模式格式

  • definition:模式定义

其响应是 JSON 格式的模式对象,包含以下字段

  • id:模式 ID

  • subject:模式主题

  • format:模式格式

  • version:模式版本

  • definition:模式定义

按主题、格式和版本检索现有模式

要通过主题、格式和版本检索现有模式,请向 {subject}/{format}/{version} 端点发送 GET 请求。

其响应是 JSON 格式的模式对象,包含以下字段

  • id:模式 ID

  • subject:模式主题

  • format:模式格式

  • version:模式版本

  • definition:模式定义

按主题和格式检索现有模式

要通过主题和格式检索现有模式,请向 /subject/format 端点发送 GET 请求。

其响应是模式列表,每个模式对象都是 JSON 格式,包含以下字段

  • id:模式 ID

  • subject:模式主题

  • format:模式格式

  • version:模式版本

  • definition:模式定义

按 ID 检索现有模式

要按 ID 检索模式,请向 /schemas/{id} 端点发送 GET 请求。

其响应是 JSON 格式的模式对象,包含以下字段

  • id:模式 ID

  • subject:模式主题

  • format:模式格式

  • version:模式版本

  • definition:模式定义

按主题、格式和版本删除模式

要删除由其主题、格式和版本标识的模式,请向 {subject}/{format}/{version} 端点发送 DELETE 请求。

按 ID 删除模式

要按 ID 删除模式,请向 /schemas/{id} 端点发送 DELETE 请求。

按主题删除模式

DELETE /{subject}

按主题删除现有模式。

此说明仅适用于 Spring Cloud Stream 1.1.0.RELEASE 的用户。Spring Cloud Stream 1.1.0.RELEASE 使用表名 schema 来存储 Schema 对象。Schema 在许多数据库实现中都是关键字。为了避免将来发生任何冲突,从 1.1.1.RELEASE 开始,我们选择使用 SCHEMA_REPOSITORY 作为存储表的名称。任何升级的 Spring Cloud Stream 1.1.0.RELEASE 用户都应该在升级之前将其现有模式迁移到新表。

使用 Confluent 的模式注册表

默认配置创建了一个 DefaultSchemaRegistryClient bean。如果您想使用 Confluent 模式注册表,则需要创建一个 ConfluentSchemaRegistryClient 类型的 bean,该 bean 会取代框架默认配置的 bean。以下示例展示了如何创建这样的 bean:

@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
  ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
  client.setEndpoint(endpoint);
  return client;
}
ConfluentSchemaRegistryClient 针对 Confluent 平台版本 4.0.0 进行了测试。

模式注册与解析

为了更好地理解 Spring Cloud Stream 如何注册和解析新模式以及它如何利用 Avro 模式比较功能,我们提供了两个独立的子章节:

模式注册过程(序列化)

注册过程的第一部分是从通过通道发送的有效负载中提取模式。Avro 类型(例如 SpecificRecordGenericRecord)已经包含模式,可以立即从实例中检索。对于 POJO,如果 spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled 属性设置为 true(默认值),则会推断模式。

一旦获取到模式,转换器就会从远程服务器加载其元数据(版本)。首先,它查询本地缓存。如果未找到结果,它会将数据提交到服务器,服务器会回复版本信息。转换器总是缓存结果,以避免为每个需要序列化的新消息查询模式服务器的开销。

有了模式版本信息,转换器将消息的 contentType 标头设置为携带版本信息——例如:application/vnd.user.v1+avro

模式解析过程(反序列化)

当读取包含版本信息的消息时(即,具有在 模式注册过程(序列化) 中描述的方案的 contentType 标头),转换器会查询模式服务器以获取消息的写入者模式。一旦找到传入消息的正确模式,它就会检索读取者模式,并通过使用 Avro 的模式解析支持,将其读入读取者定义(设置默认值和任何缺失的属性)。

您应该理解写入者模式(编写消息的应用程序)和读取者模式(接收应用程序)之间的区别。我们建议花点时间阅读 Avro 术语并理解该过程。Spring Cloud Stream 总是获取写入者模式以确定如何读取消息。如果您希望 Avro 的模式演进支持生效,则需要确保为您的应用程序正确设置了 readerSchema
© . This site is unofficial and not affiliated with VMware.