Apache Pulsar 支持

Apache Pulsar 通过提供对 Spring for Apache Pulsar 项目的自动配置来支持。

当 classpath 中包含 org.springframework.pulsar:spring-pulsar 时,Spring Boot 将自动配置并注册经典的(命令式)Spring for Apache Pulsar 组件。当 classpath 中包含 org.springframework.pulsar:spring-pulsar-reactive 时,它也会对响应式组件执行相同的操作。

Spring Boot 提供了 spring-boot-starter-pulsarspring-boot-starter-pulsar-reactive starter,分别用于方便地收集命令式和响应式使用的依赖项。

连接到 Pulsar

当你使用 Pulsar starter 时,Spring Boot 将自动配置并注册一个 PulsarClient Bean。

默认情况下,应用会尝试连接到本地的 Pulsar 实例,地址为 pulsar://localhost:6650。可以通过将 spring.pulsar.client.service-url 属性设置为其他值来调整此设置。

该值必须是一个有效的 Pulsar Protocol URL

你可以通过指定任何以 spring.pulsar.client.* 为前缀的应用属性来配置客户端。

如果你需要对配置有更多的控制,可以考虑注册一个或多个 PulsarClientBuilderCustomizer Bean。

认证

要连接到需要认证的 Pulsar 集群,你需要通过设置 pluginClassName 以及插件所需的任何参数来指定要使用的认证插件。你可以将参数设置为参数名称到参数值的映射。以下示例展示了如何配置 AuthenticationOAuth2 插件。

  • 属性

  • YAML

spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: https://auth.server.cloud/
          privateKey: file:///Users/some-key.json
          audience: urn:sn:acme:dev:my-instance

你需要确保在 spring.pulsar.client.authentication.param.* 下定义的名称与你的认证插件预期的名称(通常是驼峰命名法)完全匹配。Spring Boot 不会尝试对这些条目进行任何宽松绑定。

例如,如果你想为 AuthenticationOAuth2 认证插件配置发行者 URL,你必须使用 spring.pulsar.client.authentication.param.issuerUrl。如果你使用其他形式,例如 issuerurlissuer-url,该设置将不会应用于插件。

这种缺乏宽松绑定也使得使用环境变量配置认证参数变得麻烦,因为在转换过程中会丢失大小写敏感性。如果你使用环境变量来设置参数,你需要遵循 Spring for Apache Pulsar 参考文档中列出的 这些步骤 才能使其正常工作。

SSL

默认情况下,Pulsar 客户端以明文方式与 Pulsar 服务通信。你可以遵循 Spring for Apache Pulsar 参考文档中列出的 这些步骤 来启用 TLS 加密。

有关客户端和认证的完整详细信息,请参阅 Spring for Apache Pulsar 参考文档

响应式连接到 Pulsar

当响应式自动配置被激活时,Spring Boot 将自动配置并注册一个 ReactivePulsarClient Bean。

ReactivePulsarClient 适配了前面描述的 PulsarClient 实例。因此,请遵循上一节来配置 ReactivePulsarClient 使用的 PulsarClient

连接到 Pulsar 管理端

Spring for Apache Pulsar 的 PulsarAdministration 客户端也会被自动配置。

默认情况下,应用会尝试连接到本地的 Pulsar 实例,地址为 http://localhost:8080。可以通过将 spring.pulsar.admin.service-url 属性设置为形如 (http|https)://<host>:<port> 的其他值来调整此设置。

如果你需要对配置有更多的控制,可以考虑注册一个或多个 PulsarAdminBuilderCustomizer Bean。

认证

当访问需要认证的 Pulsar 集群时,管理客户端需要与常规 Pulsar 客户端相同的安全配置。你可以使用上述 认证配置,只需将 spring.pulsar.client.authentication 替换为 spring.pulsar.admin.authentication

要在启动时创建一个 topic,请添加一个类型为 PulsarTopic 的 Bean。如果该 topic 已存在,则忽略此 Bean。

发送消息

Spring 的 PulsarTemplate 会被自动配置,你可以使用它来发送消息,如下例所示

  • Java

  • Kotlin

import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final PulsarTemplate<String> pulsarTemplate;

	public MyBean(PulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello");
	}

}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {

	@Throws(PulsarClientException::class)
	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello")
	}

}

PulsarTemplate 依赖于 PulsarProducerFactory 来创建底层的 Pulsar producer。Spring Boot 自动配置也提供了这个 producer factory,默认情况下,它会缓存创建的 producer。你可以通过指定任何以 spring.pulsar.producer.*spring.pulsar.producer.cache.* 为前缀的应用属性来配置 producer factory 和缓存设置。

如果你需要对 producer factory 配置有更多的控制,可以考虑注册一个或多个 ProducerBuilderCustomizer Bean。这些 customizer 会应用于所有创建的 producer。你也可以在发送消息时传入一个 ProducerBuilderCustomizer,以便仅影响当前的 producer。

如果你需要对要发送的消息有更多的控制,可以在发送消息时传入一个 TypedMessageBuilderCustomizer

响应式发送消息

当响应式自动配置被激活时,Spring 的 ReactivePulsarTemplate 会被自动配置,你可以使用它来发送消息,如下例所示

  • Java

  • Kotlin

import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarTemplate<String> pulsarTemplate;

	public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello").subscribe();
	}

}
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {

	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello").subscribe()
	}

}

ReactivePulsarTemplate 依赖于 ReactivePulsarSenderFactory 来实际创建底层的 sender。Spring Boot 自动配置也提供了这个 sender factory,默认情况下,它会缓存创建的 producer。你可以通过指定任何以 spring.pulsar.producer.*spring.pulsar.producer.cache.* 为前缀的应用属性来配置 sender factory 和缓存设置。

如果你需要对 sender factory 配置有更多的控制,可以考虑注册一个或多个 ReactiveMessageSenderBuilderCustomizer Bean。这些 customizer 会应用于所有创建的 sender。你也可以在发送消息时传入一个 ReactiveMessageSenderBuilderCustomizer,以便仅影响当前的 sender。

如果你需要对要发送的消息有更多的控制,可以在发送消息时传入一个 MessageSpecBuilderCustomizer

接收消息

当 Apache Pulsar 基础设施存在时,任何 Bean 都可以使用 @PulsarListener 注解来创建一个 listener endpoint。以下组件在 someTopic topic 上创建一个 listener endpoint

  • Java

  • Kotlin

import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?) {
		// ...
	}

}

Spring Boot 自动配置提供了 @PulsarListener 所需的所有组件,例如 PulsarListenerContainerFactory 以及它用来构建底层 Pulsar consumer 的 consumer factory。你可以通过指定任何以 spring.pulsar.listener.*spring.pulsar.consumer.* 为前缀的应用属性来配置这些组件。

如果你需要对 consumer factory 的配置有更多的控制,可以考虑注册一个或多个 ConsumerBuilderCustomizer Bean。这些 customizer 会应用于 factory 创建的所有 consumer,因此也会应用于所有 @PulsarListener 实例。你也可以通过设置 @PulsarListener 注解的 consumerCustomizer 属性来定制单个 listener。

如果你需要对实际的 container factory 配置有更多的控制,可以考虑注册一个或多个 PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> Bean。

响应式接收消息

当 Apache Pulsar 基础设施存在且响应式自动配置被激活时,任何 Bean 都可以使用 @ReactivePulsarListener 注解来创建一个响应式 listener endpoint。以下组件在 someTopic topic 上创建一个响应式 listener endpoint

  • Java

  • Kotlin

import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@ReactivePulsarListener(topics = "someTopic")
	public Mono<Void> processMessage(String content) {
		// ...
		return Mono.empty();
	}

}
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono

@Component
class MyBean {

	@ReactivePulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?): Mono<Void> {
		// ...
		return Mono.empty()
	}

}

Spring Boot 自动配置提供了 @ReactivePulsarListener 所需的所有组件,例如 ReactivePulsarListenerContainerFactory 以及它用来构建底层响应式 Pulsar consumer 的 consumer factory。你可以通过指定任何以 spring.pulsar.listener.*spring.pulsar.consumer.* 为前缀的应用属性来配置这些组件。

如果你需要对 consumer factory 的配置有更多的控制,可以考虑注册一个或多个 ReactiveMessageConsumerBuilderCustomizer Bean。这些 customizer 会应用于 factory 创建的所有 consumer,因此也会应用于所有 @ReactivePulsarListener 实例。你也可以通过设置 @ReactivePulsarListener 注解的 consumerCustomizer 属性来定制单个 listener。

如果你需要对实际的 container factory 配置有更多的控制,可以考虑注册一个或多个 PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>> Bean。

读取消息

Pulsar reader 接口使应用能够手动管理 cursor。当你使用 reader 连接到 topic 时,你需要指定 reader 连接到 topic 时从哪条消息开始读取。

当 Apache Pulsar 基础设施存在时,任何 Bean 都可以使用 @PulsarReader 注解来使用 reader 消费消息。以下组件创建一个 reader endpoint,它从 someTopic topic 的开头开始读取消息

  • Java

  • Kotlin

import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarReader(topics = "someTopic", startMessageId = "earliest")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
	fun processMessage(content: String?) {
		// ...
	}

}

@PulsarReader 依赖于 PulsarReaderFactory 来创建底层的 Pulsar reader。Spring Boot 自动配置提供了这个 reader factory,可以通过设置任何以 spring.pulsar.reader.* 为前缀的应用属性来对其进行定制。

如果你需要对 reader factory 的配置有更多的控制,可以考虑注册一个或多个 ReaderBuilderCustomizer Bean。这些 customizer 会应用于 factory 创建的所有 reader,因此也会应用于所有 @PulsarReader 实例。你也可以通过设置 @PulsarReader 注解的 readerCustomizer 属性来定制单个 listener。

如果你需要对实际的 container factory 配置有更多的控制,可以考虑注册一个或多个 PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>> Bean。

响应式读取消息

当 Apache Pulsar 基础设施存在且响应式自动配置被激活时,Spring 提供了 ReactivePulsarReaderFactory,你可以使用它来创建一个 reader,以便以响应式方式读取消息。以下组件使用提供的 factory 创建一个 reader,并从 someTopic topic 中读取 5 分钟前的单条消息

  • Java

  • Kotlin

import java.time.Instant;
import java.util.List;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;

	public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
		this.pulsarReaderFactory = pulsarReaderFactory;
	}

	public void someMethod() {
		ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
			.topic("someTopic")
			.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
		Mono<Message<String>> message = this.pulsarReaderFactory
			.createReader(Schema.STRING, List.of(readerBuilderCustomizer))
			.readOne();
		// ...
	}

}
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder
import org.apache.pulsar.reactive.client.api.StartAtSpec
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory
import org.springframework.stereotype.Component
import java.time.Instant

@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {

	fun someMethod() {
		val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
			readerBuilder: ReactiveMessageReaderBuilder<String> ->
				readerBuilder
					.topic("someTopic")
					.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)))
		}
		val message = pulsarReaderFactory
				.createReader(Schema.STRING, listOf(readerBuilderCustomizer))
				.readOne()
		// ...
	}

}

Spring Boot 自动配置提供了这个 reader factory,可以通过设置任何以 spring.pulsar.reader.* 为前缀的应用属性来对其进行定制。

如果你需要对 reader factory 配置有更多的控制,可以在使用 factory 创建 reader 时传入一个或多个 ReactiveMessageReaderBuilderCustomizer 实例。

如果你需要对 reader factory 配置有更多的控制,可以考虑注册一个或多个 ReactiveMessageReaderBuilderCustomizer Bean。这些 customizer 会应用于所有创建的 reader。你也可以在创建 reader 时传入一个或多个 ReactiveMessageReaderBuilderCustomizer,以便仅将定制应用于创建的 reader。

有关上述任何组件的更多详细信息以及发现其他可用特性,请参阅 Spring for Apache Pulsar 参考文档

事务支持

Spring for Apache Pulsar 在使用 PulsarTemplate@PulsarListener 时支持事务。

目前使用响应式变体时不支持事务。

spring.pulsar.transaction.enabled 属性设置为 true 将会

@PulsarListenertransactional 属性可用于微调何时应将事务与 listener 一起使用。

为了更好地控制 Spring for Apache Pulsar 的事务特性,你应该定义自己的 PulsarTemplate 和/或 ConcurrentPulsarListenerContainerFactory Bean。如果默认自动配置的 PulsarTransactionManager 不合适,你也可以定义一个 PulsarAwareTransactionManager Bean。

额外 Pulsar 属性

自动配置支持的属性显示在附录的 集成属性 部分。请注意,在大多数情况下,这些属性(连字符命名或驼峰命名)直接映射到 Apache Pulsar 的配置属性。有关详细信息,请参阅 Apache Pulsar 文档。

Pulsar 支持的属性中,只有一部分可以直接通过 PulsarProperties 类获取。如果你希望使用未直接支持的附加属性来调整自动配置的组件,你可以使用上述每个组件支持的定制器。