Apache Pulsar 支持
Apache Pulsar 通过提供对 Spring for Apache Pulsar 项目的自动配置来支持。
当 classpath 中包含 org.springframework.pulsar:spring-pulsar 时,Spring Boot 将自动配置并注册经典的(命令式)Spring for Apache Pulsar 组件。当 classpath 中包含 org.springframework.pulsar:spring-pulsar-reactive 时,它也会对响应式组件执行相同的操作。
Spring Boot 提供了 spring-boot-starter-pulsar 和 spring-boot-starter-pulsar-reactive starter,分别用于方便地收集命令式和响应式使用的依赖项。
连接到 Pulsar
当你使用 Pulsar starter 时,Spring Boot 将自动配置并注册一个 PulsarClient Bean。
默认情况下,应用会尝试连接到本地的 Pulsar 实例,地址为 pulsar://:6650。可以通过将 spring.pulsar.client.service-url 属性设置为其他值来调整此设置。
| 该值必须是一个有效的 Pulsar Protocol URL |
你可以通过指定任何以 spring.pulsar.client.* 为前缀的应用属性来配置客户端。
如果你需要对配置有更多的控制,可以考虑注册一个或多个 PulsarClientBuilderCustomizer Bean。
认证
要连接到需要认证的 Pulsar 集群,你需要通过设置 pluginClassName 以及插件所需的任何参数来指定要使用的认证插件。你可以将参数设置为参数名称到参数值的映射。以下示例展示了如何配置 AuthenticationOAuth2 插件。
-
属性
-
YAML
spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
param:
issuerUrl: https://auth.server.cloud/
privateKey: file:///Users/some-key.json
audience: urn:sn:acme:dev:my-instance
|
你需要确保在 例如,如果你想为 这种缺乏宽松绑定也使得使用环境变量配置认证参数变得麻烦,因为在转换过程中会丢失大小写敏感性。如果你使用环境变量来设置参数,你需要遵循 Spring for Apache Pulsar 参考文档中列出的 这些步骤 才能使其正常工作。 |
响应式连接到 Pulsar
当响应式自动配置被激活时,Spring Boot 将自动配置并注册一个 ReactivePulsarClient Bean。
ReactivePulsarClient 适配了前面描述的 PulsarClient 实例。因此,请遵循上一节来配置 ReactivePulsarClient 使用的 PulsarClient。
连接到 Pulsar 管理端
Spring for Apache Pulsar 的 PulsarAdministration 客户端也会被自动配置。
默认情况下,应用会尝试连接到本地的 Pulsar 实例,地址为 https://:8080。可以通过将 spring.pulsar.admin.service-url 属性设置为形如 (http|https)://<host>:<port> 的其他值来调整此设置。
如果你需要对配置有更多的控制,可以考虑注册一个或多个 PulsarAdminBuilderCustomizer Bean。
认证
当访问需要认证的 Pulsar 集群时,管理客户端需要与常规 Pulsar 客户端相同的安全配置。你可以使用上述 认证配置,只需将 spring.pulsar.client.authentication 替换为 spring.pulsar.admin.authentication。
要在启动时创建一个 topic,请添加一个类型为 PulsarTopic 的 Bean。如果该 topic 已存在,则忽略此 Bean。 |
发送消息
Spring 的 PulsarTemplate 会被自动配置,你可以使用它来发送消息,如下例所示
-
Java
-
Kotlin
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final PulsarTemplate<String> pulsarTemplate;
public MyBean(PulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() {
this.pulsarTemplate.send("someTopic", "Hello");
}
}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {
@Throws(PulsarClientException::class)
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello")
}
}
PulsarTemplate 依赖于 PulsarProducerFactory 来创建底层的 Pulsar producer。Spring Boot 自动配置也提供了这个 producer factory,默认情况下,它会缓存创建的 producer。你可以通过指定任何以 spring.pulsar.producer.* 和 spring.pulsar.producer.cache.* 为前缀的应用属性来配置 producer factory 和缓存设置。
如果你需要对 producer factory 配置有更多的控制,可以考虑注册一个或多个 ProducerBuilderCustomizer Bean。这些 customizer 会应用于所有创建的 producer。你也可以在发送消息时传入一个 ProducerBuilderCustomizer,以便仅影响当前的 producer。
如果你需要对要发送的消息有更多的控制,可以在发送消息时传入一个 TypedMessageBuilderCustomizer。
响应式发送消息
当响应式自动配置被激活时,Spring 的 ReactivePulsarTemplate 会被自动配置,你可以使用它来发送消息,如下例所示
-
Java
-
Kotlin
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final ReactivePulsarTemplate<String> pulsarTemplate;
public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() {
this.pulsarTemplate.send("someTopic", "Hello").subscribe();
}
}
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello").subscribe()
}
}
ReactivePulsarTemplate 依赖于 ReactivePulsarSenderFactory 来实际创建底层的 sender。Spring Boot 自动配置也提供了这个 sender factory,默认情况下,它会缓存创建的 producer。你可以通过指定任何以 spring.pulsar.producer.* 和 spring.pulsar.producer.cache.* 为前缀的应用属性来配置 sender factory 和缓存设置。
如果你需要对 sender factory 配置有更多的控制,可以考虑注册一个或多个 ReactiveMessageSenderBuilderCustomizer Bean。这些 customizer 会应用于所有创建的 sender。你也可以在发送消息时传入一个 ReactiveMessageSenderBuilderCustomizer,以便仅影响当前的 sender。
如果你需要对要发送的消息有更多的控制,可以在发送消息时传入一个 MessageSpecBuilderCustomizer。
接收消息
当 Apache Pulsar 基础设施存在时,任何 Bean 都可以使用 @PulsarListener 注解来创建一个 listener endpoint。以下组件在 someTopic topic 上创建一个 listener endpoint
-
Java
-
Kotlin
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
Spring Boot 自动配置提供了 @PulsarListener 所需的所有组件,例如 PulsarListenerContainerFactory 以及它用来构建底层 Pulsar consumer 的 consumer factory。你可以通过指定任何以 spring.pulsar.listener.* 和 spring.pulsar.consumer.* 为前缀的应用属性来配置这些组件。
如果你需要对 consumer factory 的配置有更多的控制,可以考虑注册一个或多个 ConsumerBuilderCustomizer Bean。这些 customizer 会应用于 factory 创建的所有 consumer,因此也会应用于所有 @PulsarListener 实例。你也可以通过设置 @PulsarListener 注解的 consumerCustomizer 属性来定制单个 listener。
如果你需要对实际的 container factory 配置有更多的控制,可以考虑注册一个或多个 PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> Bean。
响应式接收消息
当 Apache Pulsar 基础设施存在且响应式自动配置被激活时,任何 Bean 都可以使用 @ReactivePulsarListener 注解来创建一个响应式 listener endpoint。以下组件在 someTopic topic 上创建一个响应式 listener endpoint
-
Java
-
Kotlin
import reactor.core.publisher.Mono;
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@ReactivePulsarListener(topics = "someTopic")
public Mono<Void> processMessage(String content) {
// ...
return Mono.empty();
}
}
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono
@Component
class MyBean {
@ReactivePulsarListener(topics = ["someTopic"])
fun processMessage(content: String?): Mono<Void> {
// ...
return Mono.empty()
}
}
Spring Boot 自动配置提供了 @ReactivePulsarListener 所需的所有组件,例如 ReactivePulsarListenerContainerFactory 以及它用来构建底层响应式 Pulsar consumer 的 consumer factory。你可以通过指定任何以 spring.pulsar.listener.* 和 spring.pulsar.consumer.* 为前缀的应用属性来配置这些组件。
如果你需要对 consumer factory 的配置有更多的控制,可以考虑注册一个或多个 ReactiveMessageConsumerBuilderCustomizer Bean。这些 customizer 会应用于 factory 创建的所有 consumer,因此也会应用于所有 @ReactivePulsarListener 实例。你也可以通过设置 @ReactivePulsarListener 注解的 consumerCustomizer 属性来定制单个 listener。
如果你需要对实际的 container factory 配置有更多的控制,可以考虑注册一个或多个 PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>> Bean。
读取消息
Pulsar reader 接口使应用能够手动管理 cursor。当你使用 reader 连接到 topic 时,你需要指定 reader 连接到 topic 时从哪条消息开始读取。
当 Apache Pulsar 基础设施存在时,任何 Bean 都可以使用 @PulsarReader 注解来使用 reader 消费消息。以下组件创建一个 reader endpoint,它从 someTopic topic 的开头开始读取消息
-
Java
-
Kotlin
import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarReader(topics = "someTopic", startMessageId = "earliest")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
fun processMessage(content: String?) {
// ...
}
}
@PulsarReader 依赖于 PulsarReaderFactory 来创建底层的 Pulsar reader。Spring Boot 自动配置提供了这个 reader factory,可以通过设置任何以 spring.pulsar.reader.* 为前缀的应用属性来对其进行定制。
如果你需要对 reader factory 的配置有更多的控制,可以考虑注册一个或多个 ReaderBuilderCustomizer Bean。这些 customizer 会应用于 factory 创建的所有 reader,因此也会应用于所有 @PulsarReader 实例。你也可以通过设置 @PulsarReader 注解的 readerCustomizer 属性来定制单个 listener。
如果你需要对实际的 container factory 配置有更多的控制,可以考虑注册一个或多个 PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>> Bean。
响应式读取消息
当 Apache Pulsar 基础设施存在且响应式自动配置被激活时,Spring 提供了 ReactivePulsarReaderFactory,你可以使用它来创建一个 reader,以便以响应式方式读取消息。以下组件使用提供的 factory 创建一个 reader,并从 someTopic topic 中读取 5 分钟前的单条消息
-
Java
-
Kotlin
import java.time.Instant;
import java.util.List;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Mono;
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;
public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
this.pulsarReaderFactory = pulsarReaderFactory;
}
public void someMethod() {
ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
.topic("someTopic")
.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
Mono<Message<String>> message = this.pulsarReaderFactory
.createReader(Schema.STRING, List.of(readerBuilderCustomizer))
.readOne();
// ...
}
}
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder
import org.apache.pulsar.reactive.client.api.StartAtSpec
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory
import org.springframework.stereotype.Component
import java.time.Instant
@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {
fun someMethod() {
val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
readerBuilder: ReactiveMessageReaderBuilder<String> ->
readerBuilder
.topic("someTopic")
.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)))
}
val message = pulsarReaderFactory
.createReader(Schema.STRING, listOf(readerBuilderCustomizer))
.readOne()
// ...
}
}
Spring Boot 自动配置提供了这个 reader factory,可以通过设置任何以 spring.pulsar.reader.* 为前缀的应用属性来对其进行定制。
如果你需要对 reader factory 配置有更多的控制,可以在使用 factory 创建 reader 时传入一个或多个 ReactiveMessageReaderBuilderCustomizer 实例。
如果你需要对 reader factory 配置有更多的控制,可以考虑注册一个或多个 ReactiveMessageReaderBuilderCustomizer Bean。这些 customizer 会应用于所有创建的 reader。你也可以在创建 reader 时传入一个或多个 ReactiveMessageReaderBuilderCustomizer,以便仅将定制应用于创建的 reader。
| 有关上述任何组件的更多详细信息以及发现其他可用特性,请参阅 Spring for Apache Pulsar 参考文档。 |
事务支持
Spring for Apache Pulsar 在使用 PulsarTemplate 和 @PulsarListener 时支持事务。
| 目前使用响应式变体时不支持事务。 |
将 spring.pulsar.transaction.enabled 属性设置为 true 将会
-
配置一个
PulsarTransactionManagerBean -
为
PulsarTemplate启用事务支持 -
为
@PulsarListener方法启用事务支持
@PulsarListener 的 transactional 属性可用于微调何时应将事务与 listener 一起使用。
为了更好地控制 Spring for Apache Pulsar 的事务特性,你应该定义自己的 PulsarTemplate 和/或 ConcurrentPulsarListenerContainerFactory Bean。如果默认自动配置的 PulsarTransactionManager 不合适,你也可以定义一个 PulsarAwareTransactionManager Bean。
额外 Pulsar 属性
自动配置支持的属性显示在附录的 集成属性 部分。请注意,在大多数情况下,这些属性(连字符命名或驼峰命名)直接映射到 Apache Pulsar 的配置属性。有关详细信息,请参阅 Apache Pulsar 文档。
Pulsar 支持的属性中,只有一部分可以直接通过 PulsarProperties 类获取。如果你希望使用未直接支持的附加属性来调整自动配置的组件,你可以使用上述每个组件支持的定制器。