Spring Cloud Stream Binder for Apache Pulsar

Spring for Apache Pulsar 为 Spring Cloud Stream 提供了一个 Binder,我们可以用它来构建基于发布/订阅模式的事件驱动微服务。在本节中,我们将介绍此 Binder 的基本细节。

用法

我们需要在您的应用程序中包含以下依赖项才能使用 Apache Pulsar Binder for Spring Cloud Stream。

  • Maven

  • Gradle

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-spring-cloud-stream-binder</artifactId>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-spring-cloud-stream-binder'
}

概览

Spring Cloud Stream Binder for Apache Pulsar 允许应用程序专注于业务逻辑,而不是处理管理和维护 Pulsar 的底层细节。该 Binder 为应用程序开发人员处理所有这些细节。Spring Cloud Stream 基于 Spring Cloud Function 带来了强大的编程模型,允许应用程序开发人员使用函数式风格编写复杂的事件驱动应用程序。应用程序可以从与中间件无关的方式开始,然后通过 Spring Boot 配置属性将 Pulsar 主题映射为 Spring Cloud Stream 中的目标。Spring Cloud Stream 构建在 Spring Boot 之上,使用 Spring Cloud Stream 编写事件驱动微服务时,您本质上是在编写一个 Boot 应用程序。这是一个简单的 Spring Cloud Stream 应用程序。

@SpringBootApplication
public class SpringPulsarBinderSampleApp {

	private final Logger logger = LoggerFactory.getLogger(this.getClass());

	public static void main(String[] args) {
		SpringApplication.run(SpringPulsarBinderSampleApp.class, args);
	}

	@Bean
	public Supplier<Time> timeSupplier() {
		return () -> new Time(String.valueOf(System.currentTimeMillis()));
	}

	@Bean
	public Function<Time, EnhancedTime> timeProcessor() {
		return (time) -> {
			EnhancedTime enhancedTime = new EnhancedTime(time, "5150");
			this.logger.info("PROCESSOR: {} --> {}", time, enhancedTime);
			return enhancedTime;
		};
	}

	@Bean
	public Consumer<EnhancedTime> timeLogger() {
		return (time) -> this.logger.info("SINK:      {}", time);
	}

	record Time(String time) {
	}

	record EnhancedTime(Time time, String extra) {
	}

}

上面的示例应用程序是一个完整的 Spring Boot 应用程序,值得解释一下。然而,初看起来,您会发现这只是普通的 Java 代码和一些 Spring 及 Spring Boot 注解。我们这里有三个 Bean 方法 - 一个 java.util.function.Supplier,一个 java.util.function.Function,最后是一个 java.util.function.Consumer。Supplier 产生当前时间的毫秒值,函数接收此时间并添加一些随机数据对其进行增强,然后 consumer 记录增强后的时间。

为简洁起见,我们省略了所有 import,但整个应用程序中没有任何 Spring Cloud Stream 特定的内容。它是如何成为一个与 Apache Pulsar 交互的 Spring Cloud Stream 应用程序的呢?您必须在应用程序中包含上述 Binder 依赖项。一旦添加了该依赖项,您必须提供以下配置属性。

spring:
  cloud:
    function:
      definition: timeSupplier;timeProcessor;timeLogger;
    stream:
      bindings:
        timeProcessor-in-0:
          destination: timeSupplier-out-0
        timeProcessor-out-0:
          destination: timeProcessor-out-0
        timeLogger-in-0:
          destination: timeProcessor-out-0

有了这些,上面的 Spring Boot 应用程序就变成了一个基于 Spring Cloud Stream 的端到端事件驱动应用程序。由于类路径中存在 Pulsar Binder,应用程序将与 Apache Pulsar 交互。如果应用程序中只有一个函数,则无需告知 Spring Cloud Stream 激活该函数进行执行,因为它默认就会这样做。如果应用程序中有多个这样的函数,如我们的示例所示,我们需要指示 Spring Cloud Stream 激活哪些函数。在我们的例子中,我们需要所有函数都激活,我们通过 spring.cloud.function.definition 属性来实现。Bean 名称默认成为 Spring Cloud Stream 绑定名称的一部分。绑定是 Spring Cloud Stream 中的一个基本抽象概念,框架通过它与中间件目标进行通信。Spring Cloud Stream 所做的一切几乎都发生在一个具体的绑定上。Supplier 只有一个输出绑定;函数有输入和输出绑定;Consumer 只有输入绑定。让我们以我们的 supplier Bean - timeSupplier 为例。此 supplier 的默认绑定名称将是 timeSupplier-out-0。类似地,timeProcessor 函数的默认绑定名称在入站是 timeProcessor-in-0,在出站是 timeProcessor-out-0。有关如何更改默认绑定名称的详细信息,请参阅 Spring Cloud Stream 参考文档。在大多数情况下,使用默认绑定名称就足够了。我们如上所示在绑定名称上设置目标。如果未提供目标,则绑定名称将成为目标的取值,如 timeSupplier-out-0 的情况。

当运行上述应用程序时,您应该会看到 supplier 每秒执行一次,然后由函数消费并增强时间,再由 logger consumer 消费。

基于 Binder 的应用程序中的消息转换

在上面的示例应用程序中,我们没有为消息转换提供任何 schema 信息。这是因为,Spring Cloud Stream 默认使用其自身的消息转换机制,该机制利用 Spring Framework 通过 Spring Messaging 项目建立的消息支持。除非另有指定,Spring Cloud Stream 在入站和出站绑定上都使用 application/json 作为消息转换的 content-type。在出站,数据被序列化为 byte[],然后 Pulsar Binder 使用 Schema.BYTES 将其通过线路发送到 Pulsar 主题。类似地,在入站,数据从 Pulsar 主题作为 byte[] 消费,然后使用适当的消息转换器转换为目标类型。

使用 Pulsar Schema 在 Pulsar 中进行原生转换

尽管默认使用框架提供的消息转换,但 Spring Cloud Stream 允许每个 Binder 决定如何转换消息。如果应用程序选择此路径,Spring Cloud Stream 将避免使用任何 Spring 提供的消息转换功能,并直接传递接收或生成的数据。Spring Cloud Stream 中的此功能被称为生产者端的原生编码和消费者端的原生解码。这意味着编码和解码在目标中间件上原生发生,在我们的例子中是在 Apache Pulsar 上。对于上述应用程序,我们可以使用以下配置绕过框架转换并使用原生编码和解码。

spring:
  cloud:
    stream:
      bindings:
        timeSupplier-out-0:
          producer:
            use-native-encoding: true
        timeProcessor-in-0:
          destination: timeSupplier-out-0
          consumer:
            use-native-decoding: true
        timeProcessor-out-0:
          destination: timeProcessor-out-0
          producer:
            use-native-encoding: true
        timeLogger-in-0:
          destination: timeProcessor-out-0
          consumer:
            use-native-decoding: true
      pulsar:
        bindings:
          timeSupplier-out-0:
            producer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-in-0:
            consumer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-out-0:
            producer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
          timeLogger-in-0:
            consumer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime

在生产者端启用原生编码的属性是核心 Spring Cloud Stream 中的绑定级别属性。您可以在生产者绑定上设置它 - spring.cloud.stream.bindings.<binding-name>.producer.use-native-encoding 并将其设置为 true。类似地,对于消费者绑定,使用 - spring.cloud.stream.bindings.<binding-name>.consumer.user-native-decoding 并将其设置为 true。如果决定使用原生编码和解码,对于 Pulsar,我们需要设置相应的 schema 和底层消息类型信息。此信息作为扩展绑定属性提供。如您在上述配置中所见,属性是 - 用于 schema 信息的 spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.schema-type 和用于实际目标类型的 spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.message-type。如果消息同时包含 key 和 value,您可以使用 message-key-typemessage-value-type 来指定它们的目标类型。

省略 schema-type 属性时,将查阅任何已配置的自定义 schema 映射。

消息头转换

每条消息通常都有头信息,需要在消息在 Pulsar 和 Spring Messaging 之间通过 Spring Cloud Stream 输入和输出绑定传输时携带。为了支持这种传输,框架处理必要的消息头转换。

自定义头映射器

Pulsar Binder 配置了默认的头映射器,可以通过提供自己的 PulsarHeaderMapper bean 来覆盖。

在以下示例中,配置了一个 JSON 头映射器,它:

  • 映射所有入站头(除了键为“top”或“secret”的头)

  • 映射出站头(除了键为“id”、“timestamp”或“userId”的头)

  • 仅信任“com.acme”包中的对象进行出站反序列化

  • 使用简单的 toString() 编码/解码任何“com.acme.Money”头值

@Bean
public PulsarHeaderMapper customPulsarHeaderMapper() {
    return JsonPulsarHeaderMapper.builder()
            .inboundPatterns("!top", "!secret", "*")
            .outboundPatterns("!id", "!timestamp", "!userId", "*")
            .trustedPackages("com.acme")
            .toStringClasses("com.acme.Money")
            .build();
}

在 Binder 中使用 Pulsar 属性

该 Binder 使用 Spring for Apache Pulsar 框架的基本组件来构建其生产者和消费者绑定。由于基于 Binder 的应用程序是 Spring Boot 应用程序,Binder 默认使用 Spring Boot 对 Spring for Apache Pulsar 的自动配置。因此,核心框架级别提供的所有 Pulsar Spring Boot 属性也可以通过 Binder 使用。例如,您可以使用带有前缀 spring.pulsar.producer…​spring.pulsar.consumer…​ 等的属性。此外,您还可以设置这些 Pulsar 属性在 Binder 级别。例如,以下配置也将生效 - spring.cloud.stream.pulsar.binder.producer…​spring.cloud.stream.pulsar.binder.consumer…​

以上两种方法都可以,但使用这样的属性时,它会应用于整个应用程序。如果应用程序中有多个函数,它们都会获得相同的属性。您还可以将这些 Pulsar 属性设置在扩展绑定属性级别来解决此问题。扩展绑定属性应用于绑定本身。例如,如果您有一个输入绑定和一个输出绑定,并且两者都需要单独的 Pulsar 属性集,则必须在扩展绑定上设置它们。生产者绑定的模式是 spring.cloud.stream.pulsar.bindings.<output-binding-name>.producer…​。类似地,对于消费者绑定,模式是 spring.cloud.stream.pulsar.bindings.<input-binding-name>.consumer…​。通过这种方式,您可以在同一应用程序中为不同的绑定应用不同的 Pulsar 属性集。

扩展绑定属性的优先级最高。Binder 中属性应用的优先级顺序是:扩展绑定属性 → Binder 属性 → Spring Boot 属性(从高到低)。

以下是一些可供查阅的资源,以了解有关 Pulsar Binder 可用属性的更多信息。

Pulsar 生产者绑定配置。这些属性需要 spring.cloud.stream.bindings.<binding-name>.producer 前缀。Spring Boot 提供的所有 Pulsar 生产者属性 也可通过此配置类使用。

Pulsar 消费者绑定配置。这些属性需要 spring.cloud.stream.bindings.<binding-name>.consumer 前缀。Spring Boot 提供的所有 Pulsar 消费者属性 也可通过此配置类使用。

有关常见的 Pulsar Binder 特定配置属性,请参阅此处。这些属性需要 spring.cloud.stream.pulsar.binder 前缀。上述指定的生产者和消费者属性(包括 Spring Boot 的属性)可以在 Binder 级别使用 spring.cloud.stream.pulsar.binder.producerspring.cloud.stream.pulsar.binder.consumer 前缀使用。

Pulsar 主题供应器

Spring Cloud Stream Binder for Apache Pulsar 附带一个开箱即用的 Pulsar 主题供应器。运行应用程序时,如果必要的主题不存在,Pulsar 将为您创建主题。然而,这只是一个基本的非分区主题,如果您想要创建分区主题等高级功能,可以依赖 Binder 中的主题供应器。Pulsar 主题供应器使用框架中的 PulsarAdministration,后者使用 PulsarAdminBuilder。因此,除非您在默认服务器和端口上运行 Pulsar,否则需要设置 spring.pulsar.administration.service-url 属性。

创建主题时指定分区计数

创建主题时,可以通过两种方式设置分区计数。首先,您可以在 Binder 级别使用属性 spring.cloud.stream.pulsar.binder.partition-count 进行设置。正如我们上面看到的,这样做会使应用程序创建的所有主题继承此属性。假设您希望在绑定级别对设置分区进行精细控制。在这种情况下,您可以使用格式 spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.partition-count 按绑定设置 partition-count 属性。通过这种方式,同一应用程序中不同函数创建的各种主题将根据应用程序需求拥有不同的分区。