Apache Pulsar 支持

Apache Pulsar 通过提供 Spring for Apache Pulsar 项目的自动配置来支持。

org.springframework.pulsar:spring-pulsar 在类路径上时,Spring Boot 将自动配置并注册 Spring for Apache Pulsar 组件。

有一个 spring-boot-starter-pulsar 启动器,用于方便地收集要使用的依赖项。

连接到 Pulsar

当您使用 Pulsar 启动器时,Spring Boot 将自动配置并注册一个 PulsarClient Bean。

默认情况下,应用程序会尝试连接到 pulsar://:6650 处的本地 Pulsar 实例。这可以通过将 spring.pulsar.client.service-url 属性设置为不同的值来调整。

该值必须是有效的 Pulsar 协议 URL

您可以通过指定任何 spring.pulsar.client.* 前缀的应用程序属性来配置客户端。

如果您需要对配置进行更多控制,请考虑注册一个或多个 PulsarClientBuilderCustomizer Bean。

认证

要连接到需要认证的 Pulsar 集群,您需要通过设置 pluginClassName 和插件所需的任何参数来指定要使用的认证插件。您可以将参数设置为参数名称到参数值的映射。以下示例展示了如何配置 AuthenticationOAuth2 插件。

  • 属性

  • YAML

spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: https://auth.server.cloud/
          privateKey: file:///Users/some-key.json
          audience: urn:sn:acme:dev:my-instance

您需要确保 spring.pulsar.client.authentication.param.* 下定义的名称与您的认证插件预期(通常是驼峰式命名)的名称完全匹配。Spring Boot 不会尝试对这些条目进行任何形式的宽松绑定。

例如,如果您想为 AuthenticationOAuth2 认证插件配置发行者 URL,则必须使用 spring.pulsar.client.authentication.param.issuerUrl。如果您使用其他形式,例如 issuerurlissuer-url,则该设置将不会应用于插件。

这种缺乏宽松绑定也使得使用环境变量进行身份验证参数变得麻烦,因为在转换过程中会丢失大小写敏感性。如果您使用环境变量作为参数,那么您需要遵循 Spring for Apache Pulsar 参考文档中的这些步骤才能正常工作。

SSL

默认情况下,Pulsar 客户端以纯文本方式与 Pulsar 服务通信。您可以按照 Spring for Apache Pulsar 参考文档中的这些步骤启用 TLS 加密。

有关客户端和身份验证的完整详细信息,请参阅 Spring for Apache Pulsar 参考文档

连接到 Pulsar 管理

Spring for Apache Pulsar 的 PulsarAdministration 客户端也会自动配置。

默认情况下,应用程序会尝试连接到 https://:8080 处的本地 Pulsar 实例。这可以通过将 spring.pulsar.admin.service-url 属性设置为 (http|https)://<host>:<port> 形式的不同值来调整。

如果您需要对配置进行更多控制,请考虑注册一个或多个 PulsarAdminBuilderCustomizer Bean。

认证

当访问需要认证的 Pulsar 集群时,管理客户端需要与普通 Pulsar 客户端相同的安全配置。您可以使用前面提到的认证配置,将 spring.pulsar.client.authentication 替换为 spring.pulsar.admin.authentication

要在启动时创建主题,请添加类型为 PulsarTopic 的 Bean。如果主题已存在,则忽略该 Bean。

发送消息

Spring 的 PulsarTemplate 是自动配置的,您可以使用它发送消息,如以下示例所示

  • Java

  • Kotlin

import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final PulsarTemplate<String> pulsarTemplate;

	public MyBean(PulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello");
	}

}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {

	@Throws(PulsarClientException::class)
	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello")
	}

}

PulsarTemplate 依赖于 PulsarProducerFactory 来创建底层 Pulsar 生产者。Spring Boot 自动配置也提供了这个生产者工厂,它默认会缓存所创建的生产者。您可以通过指定任何以 spring.pulsar.producer.*spring.pulsar.producer.cache.* 为前缀的应用程序属性来配置生产者工厂和缓存设置。

如果您需要对生产者工厂配置进行更多控制,请考虑注册一个或多个 ProducerBuilderCustomizer Bean。这些定制器应用于所有创建的生产者。您也可以在发送消息时传入一个 ProducerBuilderCustomizer,以仅影响当前生产者。

如果您需要对发送的消息进行更多控制,可以在发送消息时传入 TypedMessageBuilderCustomizer

接收消息

当存在 Apache Pulsar 基础设施时,任何 Bean 都可以使用 @PulsarListener 注解来创建监听器端点。以下组件在 someTopic 主题上创建了一个监听器端点

  • Java

  • Kotlin

import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?) {
		// ...
	}

}

Spring Boot 自动配置提供了 PulsarListener 所需的所有组件,例如 PulsarListenerContainerFactory 以及它用于构建底层 Pulsar 消费者的消费者工厂。您可以通过指定任何以 spring.pulsar.listener.*spring.pulsar.consumer.* 为前缀的应用程序属性来配置这些组件。

如果您需要对消费者工厂的配置进行更多控制,请考虑注册一个或多个 ConsumerBuilderCustomizer Bean。这些定制器适用于工厂创建的所有消费者,因此也适用于所有 @PulsarListener 实例。您还可以通过设置 @PulsarListener 注解的 consumerCustomizer 属性来定制单个监听器。

如果您需要对实际容器工厂配置进行更多控制,请考虑注册一个或多个 PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> Bean。

读取消息

Pulsar 读取器接口允许应用程序手动管理游标。当您使用读取器连接到主题时,您需要指定读取器连接到主题时开始读取消息的位置。

当存在 Apache Pulsar 基础设施时,任何 Bean 都可以使用 @PulsarReader 注解来使用读取器消费消息。以下组件创建了一个读取器端点,该端点从 someTopic 主题的开头开始读取消息

  • Java

  • Kotlin

import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarReader(topics = "someTopic", startMessageId = "earliest")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
	fun processMessage(content: String?) {
		// ...
	}

}

@PulsarReader 依赖于 PulsarReaderFactory 来创建底层的 Pulsar 读取器。Spring Boot 自动配置提供了此读取器工厂,可以通过设置任何以 spring.pulsar.reader.* 为前缀的应用程序属性进行定制。

如果您需要对读取器工厂的配置进行更多控制,请考虑注册一个或多个 ReaderBuilderCustomizer Bean。这些定制器适用于工厂创建的所有读取器,因此也适用于所有 @PulsarReader 实例。您还可以通过设置 @PulsarReader 注解的 readerCustomizer 属性来定制单个监听器。

如果您需要对实际容器工厂配置进行更多控制,请考虑注册一个或多个 PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>> Bean。

有关上述任何组件的更多详细信息以及发现其他可用功能,请参阅 Spring for Apache Pulsar 参考文档

事务支持

Spring for Apache Pulsar 在使用 PulsarTemplate@PulsarListener 时支持事务。

spring.pulsar.transaction.enabled 属性设置为 true

@PulsarListenertransactional 属性可用于微调何时与监听器一起使用事务。

要更精细地控制 Spring for Apache Pulsar 事务特性,您应该定义自己的 PulsarTemplate 和/或 ConcurrentPulsarListenerContainerFactory Bean。如果默认自动配置的 PulsarTransactionManager 不适用,您还可以定义 PulsarAwareTransactionManager Bean。

额外的 Pulsar 属性

自动配置支持的属性在附录的集成属性部分中显示。请注意,这些属性(带连字符或驼峰式)在很大程度上直接映射到 Apache Pulsar 配置属性。有关详细信息,请参阅 Apache Pulsar 文档。

Pulsar 支持的属性中只有一部分可以通过 PulsarProperties 类直接使用。如果您希望使用未直接支持的额外属性来调整自动配置的组件,可以使用上述每个组件支持的定制器。

© . This site is unofficial and not affiliated with VMware.