Apache Pulsar 支持
Apache Pulsar 通过提供 Spring for Apache Pulsar 项目的自动配置来支持。
当 org.springframework.pulsar:spring-pulsar 在类路径上时,Spring Boot 将自动配置并注册 Spring for Apache Pulsar 组件。
有一个 spring-boot-starter-pulsar 启动器,用于方便地收集要使用的依赖项。
连接到 Pulsar
当您使用 Pulsar 启动器时,Spring Boot 将自动配置并注册一个 PulsarClient Bean。
默认情况下,应用程序会尝试连接到 pulsar://:6650 处的本地 Pulsar 实例。这可以通过将 spring.pulsar.client.service-url 属性设置为不同的值来调整。
| 该值必须是有效的 Pulsar 协议 URL |
您可以通过指定任何 spring.pulsar.client.* 前缀的应用程序属性来配置客户端。
如果您需要对配置进行更多控制,请考虑注册一个或多个 PulsarClientBuilderCustomizer Bean。
认证
要连接到需要认证的 Pulsar 集群,您需要通过设置 pluginClassName 和插件所需的任何参数来指定要使用的认证插件。您可以将参数设置为参数名称到参数值的映射。以下示例展示了如何配置 AuthenticationOAuth2 插件。
-
属性
-
YAML
spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
param:
issuerUrl: https://auth.server.cloud/
privateKey: file:///Users/some-key.json
audience: urn:sn:acme:dev:my-instance
|
您需要确保 例如,如果您想为 这种缺乏宽松绑定也使得使用环境变量进行身份验证参数变得麻烦,因为在转换过程中会丢失大小写敏感性。如果您使用环境变量作为参数,那么您需要遵循 Spring for Apache Pulsar 参考文档中的这些步骤才能正常工作。 |
连接到 Pulsar 管理
Spring for Apache Pulsar 的 PulsarAdministration 客户端也会自动配置。
默认情况下,应用程序会尝试连接到 https://:8080 处的本地 Pulsar 实例。这可以通过将 spring.pulsar.admin.service-url 属性设置为 (http|https)://<host>:<port> 形式的不同值来调整。
如果您需要对配置进行更多控制,请考虑注册一个或多个 PulsarAdminBuilderCustomizer Bean。
认证
当访问需要认证的 Pulsar 集群时,管理客户端需要与普通 Pulsar 客户端相同的安全配置。您可以使用前面提到的认证配置,将 spring.pulsar.client.authentication 替换为 spring.pulsar.admin.authentication。
要在启动时创建主题,请添加类型为 PulsarTopic 的 Bean。如果主题已存在,则忽略该 Bean。 |
发送消息
Spring 的 PulsarTemplate 是自动配置的,您可以使用它发送消息,如以下示例所示
-
Java
-
Kotlin
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final PulsarTemplate<String> pulsarTemplate;
public MyBean(PulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() {
this.pulsarTemplate.send("someTopic", "Hello");
}
}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {
@Throws(PulsarClientException::class)
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello")
}
}
PulsarTemplate 依赖于 PulsarProducerFactory 来创建底层 Pulsar 生产者。Spring Boot 自动配置也提供了这个生产者工厂,它默认会缓存所创建的生产者。您可以通过指定任何以 spring.pulsar.producer.* 和 spring.pulsar.producer.cache.* 为前缀的应用程序属性来配置生产者工厂和缓存设置。
如果您需要对生产者工厂配置进行更多控制,请考虑注册一个或多个 ProducerBuilderCustomizer Bean。这些定制器应用于所有创建的生产者。您也可以在发送消息时传入一个 ProducerBuilderCustomizer,以仅影响当前生产者。
如果您需要对发送的消息进行更多控制,可以在发送消息时传入 TypedMessageBuilderCustomizer。
接收消息
当存在 Apache Pulsar 基础设施时,任何 Bean 都可以使用 @PulsarListener 注解来创建监听器端点。以下组件在 someTopic 主题上创建了一个监听器端点
-
Java
-
Kotlin
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
Spring Boot 自动配置提供了 PulsarListener 所需的所有组件,例如 PulsarListenerContainerFactory 以及它用于构建底层 Pulsar 消费者的消费者工厂。您可以通过指定任何以 spring.pulsar.listener.* 和 spring.pulsar.consumer.* 为前缀的应用程序属性来配置这些组件。
如果您需要对消费者工厂的配置进行更多控制,请考虑注册一个或多个 ConsumerBuilderCustomizer Bean。这些定制器适用于工厂创建的所有消费者,因此也适用于所有 @PulsarListener 实例。您还可以通过设置 @PulsarListener 注解的 consumerCustomizer 属性来定制单个监听器。
如果您需要对实际容器工厂配置进行更多控制,请考虑注册一个或多个 PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> Bean。
读取消息
Pulsar 读取器接口允许应用程序手动管理游标。当您使用读取器连接到主题时,您需要指定读取器连接到主题时开始读取消息的位置。
当存在 Apache Pulsar 基础设施时,任何 Bean 都可以使用 @PulsarReader 注解来使用读取器消费消息。以下组件创建了一个读取器端点,该端点从 someTopic 主题的开头开始读取消息
-
Java
-
Kotlin
import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarReader(topics = "someTopic", startMessageId = "earliest")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
fun processMessage(content: String?) {
// ...
}
}
@PulsarReader 依赖于 PulsarReaderFactory 来创建底层的 Pulsar 读取器。Spring Boot 自动配置提供了此读取器工厂,可以通过设置任何以 spring.pulsar.reader.* 为前缀的应用程序属性进行定制。
如果您需要对读取器工厂的配置进行更多控制,请考虑注册一个或多个 ReaderBuilderCustomizer Bean。这些定制器适用于工厂创建的所有读取器,因此也适用于所有 @PulsarReader 实例。您还可以通过设置 @PulsarReader 注解的 readerCustomizer 属性来定制单个监听器。
如果您需要对实际容器工厂配置进行更多控制,请考虑注册一个或多个 PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>> Bean。
| 有关上述任何组件的更多详细信息以及发现其他可用功能,请参阅 Spring for Apache Pulsar 参考文档。 |
事务支持
Spring for Apache Pulsar 在使用 PulsarTemplate 和 @PulsarListener 时支持事务。
将 spring.pulsar.transaction.enabled 属性设置为 true 将
-
配置
PulsarTransactionManagerBean -
启用
PulsarTemplate的事务支持 -
为
@PulsarListener方法启用事务支持
@PulsarListener 的 transactional 属性可用于微调何时与监听器一起使用事务。
要更精细地控制 Spring for Apache Pulsar 事务特性,您应该定义自己的 PulsarTemplate 和/或 ConcurrentPulsarListenerContainerFactory Bean。如果默认自动配置的 PulsarTransactionManager 不适用,您还可以定义 PulsarAwareTransactionManager Bean。
额外的 Pulsar 属性
自动配置支持的属性在附录的集成属性部分中显示。请注意,这些属性(带连字符或驼峰式)在很大程度上直接映射到 Apache Pulsar 配置属性。有关详细信息,请参阅 Apache Pulsar 文档。
Pulsar 支持的属性中只有一部分可以通过 PulsarProperties 类直接使用。如果您希望使用未直接支持的额外属性来调整自动配置的组件,可以使用上述每个组件支持的定制器。