Spring Cloud Stream Binder for Apache Pulsar

Spring for Apache Pulsar为Spring Cloud Stream提供了一个绑定器,我们可以用它来使用发布-订阅范式构建事件驱动的微服务。在本节中,我们将详细介绍这个绑定器的基本细节。

用法

我们需要在您的应用程序中包含以下依赖项,以使用Spring Cloud Stream的Apache Pulsar绑定器。

  • Maven

  • Gradle

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-spring-cloud-stream-binder</artifactId>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-spring-cloud-stream-binder'
}

概述

Spring Cloud Stream for Apache Pulsar的绑定器允许应用程序专注于业务逻辑,而不是处理管理和维护Pulsar的底层细节。绑定器会为应用程序开发人员处理所有这些细节。Spring Cloud Stream带来了基于Spring Cloud Function的强大编程模型,允许应用程序开发人员使用函数式风格编写复杂的事件驱动应用程序。应用程序可以从与中间件无关的方式开始,然后通过Spring Boot配置属性将Pulsar主题映射为Spring Cloud Stream中的目标。Spring Cloud Stream构建在Spring Boot之上,当使用Spring Cloud Stream编写事件驱动的微服务时,您本质上是在编写一个Boot应用程序。下面是一个简单的Spring Cloud Stream应用程序。

@SpringBootApplication
public class SpringPulsarBinderSampleApp {

	private final Logger logger = LoggerFactory.getLogger(this.getClass());

	public static void main(String[] args) {
		SpringApplication.run(SpringPulsarBinderSampleApp.class, args);
	}

	@Bean
	public Supplier<Time> timeSupplier() {
		return () -> new Time(String.valueOf(System.currentTimeMillis()));
	}

	@Bean
	public Function<Time, EnhancedTime> timeProcessor() {
		return (time) -> {
			EnhancedTime enhancedTime = new EnhancedTime(time, "5150");
			this.logger.info("PROCESSOR: {} --> {}", time, enhancedTime);
			return enhancedTime;
		};
	}

	@Bean
	public Consumer<EnhancedTime> timeLogger() {
		return (time) -> this.logger.info("SINK:      {}", time);
	}

	record Time(String time) {
	}

	record EnhancedTime(Time time, String extra) {
	}

}

上述示例应用程序是一个成熟的Spring Boot应用程序,值得做一些解释。然而,初次查看,您会发现这只是普通的Java以及一些Spring和Spring Boot注解。我们这里有三个Bean方法:一个java.util.function.Supplier,一个java.util.function.Function,以及一个java.util.function.Consumer。Supplier生成当前时间的毫秒数,Function接收这个时间并添加一些随机数据进行增强,然后Consumer记录增强后的时间。

为了简洁起见,我们省略了所有导入,但整个应用程序中没有任何Spring Cloud Stream特有的内容。它如何成为一个与Apache Pulsar交互的Spring Cloud Stream应用程序呢?您必须在应用程序中包含上述绑定器依赖。一旦添加了该依赖,您必须提供以下配置属性。

spring:
  cloud:
    function:
      definition: timeSupplier;timeProcessor;timeLogger;
    stream:
      bindings:
        timeProcessor-in-0:
          destination: timeSupplier-out-0
        timeProcessor-out-0:
          destination: timeProcessor-out-0
        timeLogger-in-0:
          destination: timeProcessor-out-0

有了这些,上面的Spring Boot应用程序就变成了一个基于Spring Cloud Stream的端到端事件驱动应用程序。因为我们类路径中包含了Pulsar绑定器,所以应用程序会与Apache Pulsar交互。如果应用程序中只有一个函数,那么我们不需要告诉Spring Cloud Stream激活该函数执行,因为它默认会这样做。如果应用程序中有多个这样的函数,就像我们的示例一样,我们需要指示Spring Cloud Stream我们想要激活哪些函数。在我们的例子中,我们需要激活所有这些函数,我们通过spring.cloud.function.definition属性来实现。默认情况下,Bean名称成为Spring Cloud Stream绑定名称的一部分。绑定是Spring Cloud Stream中一个基本的抽象概念,框架通过它与中间件目标通信。Spring Cloud Stream所做的几乎所有事情都发生在具体的绑定之上。一个供应器(supplier)只有一个输出绑定;函数(function)有输入和输出绑定;消费者(consumer)只有一个输入绑定。以我们的供应器bean——timeSupplier为例。这个供应器的默认绑定名称将是timeSupplier-out-0。类似地,timeProcessor函数的默认绑定名称在入站(inbound)方面是timeProcessor-in-0,在出站(outbound)方面是timeProcessor-out-0。有关如何更改默认绑定名称的详细信息,请参阅Spring Cloud Stream参考文档。在大多数情况下,使用默认绑定名称就足够了。我们如上所示,在绑定名称上设置了目标。如果没有提供目标,则绑定名称成为目标的值,就像timeSupplier-out-0的情况一样。

当运行上述应用程序时,您应该会看到供应器每秒执行一次,然后由函数消费并增强由日志消费者消费的时间。

基于绑定器的应用程序中的消息转换

在上面的示例应用程序中,我们没有提供消息转换的任何Schema信息。这是因为,默认情况下,Spring Cloud Stream使用通过Spring Messaging项目在Spring Framework中建立的消息支持来使用其消息转换机制。除非另有说明,Spring Cloud Stream在入站和出站绑定上都使用application/json作为消息转换的content-type。在出站端,数据被序列化为byte[],然后Pulsar绑定器使用Schema.BYTES通过网络将其发送到Pulsar主题。类似地,在入站端,数据从Pulsar主题中以byte[]形式消费,然后使用适当的消息转换器转换为目标类型。

在Pulsar中使用Pulsar Schema进行原生转换

尽管默认是使用框架提供的消息转换,但Spring Cloud Stream 允许每个绑定器决定消息应该如何转换。如果应用程序选择此路径,Spring Cloud Stream 将避免使用任何 Spring 提供的消息转换功能,并直接传递它接收或生成的数据。Spring Cloud Stream 中的此功能在生产者端称为原生编码,在消费者端称为原生解码。这意味着编码和解码是在目标中间件(在我们的例子中是 Apache Pulsar)上原生发生的。对于上述应用程序,我们可以使用以下配置来绕过框架转换并使用原生编码和解码。

spring:
  cloud:
    stream:
      bindings:
        timeSupplier-out-0:
          producer:
            use-native-encoding: true
        timeProcessor-in-0:
          destination: timeSupplier-out-0
          consumer:
            use-native-decoding: true
        timeProcessor-out-0:
          destination: timeProcessor-out-0
          producer:
            use-native-encoding: true
        timeLogger-in-0:
          destination: timeProcessor-out-0
          consumer:
            use-native-decoding: true
      pulsar:
        bindings:
          timeSupplier-out-0:
            producer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-in-0:
            consumer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-out-0:
            producer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
          timeLogger-in-0:
            consumer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime

在生产者端启用原生编码的属性是核心Spring Cloud Stream的绑定级别属性。您可以在生产者绑定上设置它——spring.cloud.stream.bindings.<binding-name>.producer.use-native-encoding并将其设置为true。同样,对于消费者绑定,使用spring.cloud.stream.bindings.<binding-name>.consumer.user-native-decoding并将其设置为true。如果我们决定使用原生编码和解码,对于Pulsar,我们需要设置相应的schema和底层消息类型信息。此信息作为扩展绑定属性提供。如您在上述配置中所示,属性是用于schema信息的spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.schema-type和用于实际目标类型的spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.message-type。如果消息中同时包含键和值,您可以使用message-key-typemessage-value-type来指定它们的目标类型。

当省略 `schema-type` 属性时,将查阅任何已配置的自定义 Schema 映射。

消息头转换

每个消息通常都包含需要在 Pulsar 和 Spring Messaging 之间通过 Spring Cloud Stream 输入和输出绑定进行传输的头部信息。为了支持这种传输,框架会处理必要的消息头部转换。

自定义头映射器

Pulsar 绑定器配置了默认的头部映射器,可以通过提供您自己的 PulsarHeaderMapper bean 来覆盖。

在以下示例中,配置了一个 JSON 头部映射器,它:

  • 映射所有入站头(除了键为“top”或“secret”的头)

  • 映射出站头(除了键为“id”、“timestamp”或“userId”的头)

  • 对于出站反序列化,仅信任“com.acme”包中的对象

  • 使用简单的 toString() 编码对任何“com.acme.Money”头值进行序列化/反序列化

@Bean
public PulsarHeaderMapper customPulsarHeaderMapper() {
    return JsonPulsarHeaderMapper.builder()
            .inboundPatterns("!top", "!secret", "*")
            .outboundPatterns("!id", "!timestamp", "!userId", "*")
            .trustedPackages("com.acme")
            .toStringClasses("com.acme.Money")
            .build();
}

在绑定器中使用 Pulsar 属性

绑定器使用 Spring for Apache Pulsar 框架中的基本组件来构建其生产者和消费者绑定。由于基于绑定器的应用程序是 Spring Boot 应用程序,因此绑定器默认使用 Spring for Apache Pulsar 的 Spring Boot 自动配置。因此,核心框架级别上所有可用的 Pulsar Spring Boot 属性也通过绑定器提供。例如,您可以使用以 spring.pulsar.producer…​spring.pulsar.consumer…​ 等为前缀的属性。此外,您还可以在绑定器级别设置这些 Pulsar 属性。例如,spring.cloud.stream.pulsar.binder.producer…​spring.cloud.stream.pulsar.binder.consumer…​ 也能正常工作。

以上两种方法都可行,但当使用这些属性时,它们会应用于整个应用程序。如果应用程序中有多个函数,它们都将获得相同的属性。您还可以通过扩展绑定属性级别来设置这些 Pulsar 属性以解决此问题。扩展绑定属性应用于绑定本身。例如,如果您有一个输入绑定和一个输出绑定,并且它们都要求一组独立的 Pulsar 属性,则必须在扩展绑定上设置它们。生产者绑定的模式是 spring.cloud.stream.pulsar.bindings.<output-binding-name>.producer…​。同样,对于消费者绑定,模式是 spring.cloud.stream.pulsar.bindings.<input-binding-name>.consumer…​。通过这种方式,您可以为同一应用程序中的不同绑定应用一组独立的 Pulsar 属性。

扩展绑定属性具有最高的优先级。绑定器中应用属性的优先级顺序是 扩展绑定属性 → 绑定器属性 → Spring Boot 属性(从高到低)。

以下是一些可以用来查找更多 Pulsar 绑定器可用属性的资源。

Pulsar 生产者绑定配置。这些属性需要 spring.cloud.stream.bindings.<binding-name>.producer 前缀。所有 Spring Boot 提供的 Pulsar 生产者属性 也通过此配置类提供。

Pulsar消费者绑定配置。这些属性需要 spring.cloud.stream.bindings.<binding-name>.consumer 前缀。所有 Spring Boot 提供的 Pulsar消费者属性 也通过此配置类提供。

有关常见的 Pulsar 绑定器特定配置属性,请参阅 此处。这些属性需要 spring.cloud.stream.pulsar.binder 前缀。上述指定的生产者和消费者属性(包括 Spring Boot 属性)可以在绑定器中使用 spring.cloud.stream.pulsar.binder.producerspring.cloud.stream.pulsar.binder.consumer 前缀。

Pulsar 主题配置器

Spring Cloud Stream for Apache Pulsar的绑定器提供了一个开箱即用的Pulsar主题配置器。当运行应用程序时,如果缺少必要的主题,Pulsar将为您创建主题。然而,这是一个基本的非分区主题,如果您想要高级功能,例如创建分区主题,您可以依赖绑定器中的主题配置器。Pulsar主题配置器使用框架中的PulsarAdministration,它又使用PulsarAdminBuilder。因此,您需要设置spring.pulsar.administration.service-url属性,除非您在默认服务器和端口上运行Pulsar。

创建主题时指定分区数量

创建主题时,您可以通过两种方式设置分区数量。首先,您可以在绑定器级别使用属性 spring.cloud.stream.pulsar.binder.partition-count 进行设置。如上所述,这样做将使应用程序创建的所有主题都继承此属性。如果您希望在绑定级别对设置分区进行精细控制,则可以使用 spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.partition-count 格式为每个绑定设置 partition-count 属性。这样,同一应用程序中不同函数创建的各种主题将根据应用程序要求具有不同的分区。

© . This site is unofficial and not affiliated with VMware.