3.0.13.RELEASE

参考指南

本指南描述了 Apache Kafka 对 Spring Cloud Stream 绑定器的实现。它包含有关其设计、使用和配置选项的信息,以及有关 Stream Cloud Stream 概念如何映射到 Apache Kafka 特定结构的信息。此外,本指南还解释了 Spring Cloud Stream 的 Kafka Streams 绑定功能。

1. Apache Kafka Binder

1.1. 使用

要使用 Apache Kafka 绑定器,您需要将 spring-cloud-stream-binder-kafka 作为依赖项添加到您的 Spring Cloud Stream 应用程序中,如下面的 Maven 示例所示

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

或者,您也可以使用 Spring Cloud Stream Kafka 启动器,如下面的 Maven 示例所示

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

1.2. 概述

下图显示了 Apache Kafka 绑定器如何工作的简化图

kafka binder
图 1. Kafka 绑定器

Apache Kafka 绑定器实现将每个目标映射到 Apache Kafka 主题。消费者组直接映射到相同的 Apache Kafka 概念。分区也直接映射到 Apache Kafka 分区。

绑定器当前使用 Apache Kafka kafka-clients 版本 2.3.1。此客户端可以与旧的代理进行通信(请参阅 Kafka 文档),但某些功能可能不可用。例如,在 0.11.x.x 之前的版本中,不支持原生标头。此外,0.11.x.x 不支持 autoAddPartitions 属性。

1.3. 配置选项

本节包含 Apache Kafka 绑定器使用的配置选项。

有关绑定器通用的配置选项和属性,请参阅核心文档中的绑定属性

1.3.1. Kafka Binder 属性

spring.cloud.stream.kafka.binder.brokers

Kafka 绑定器连接到的代理列表。

默认值:localhost

spring.cloud.stream.kafka.binder.defaultBrokerPort

brokers 允许指定带有或不带有端口信息的宿主 (例如,host1,host2:port2)。当代理列表中未配置端口时,此选项设置默认端口。

默认值:9092

spring.cloud.stream.kafka.binder.configuration

传递给绑定器创建的所有客户端 (生产者和消费者) 的客户端属性 (键/值映射)。由于这些属性被生产者和消费者使用,因此应将其使用限制为通用属性,例如安全设置。通过此配置提供的未知 Kafka 生产者或消费者属性将被过滤掉,不允许传播。此处的属性优先于启动时设置的任何属性。

默认值:空映射。

spring.cloud.stream.kafka.binder.consumerProperties

任意 Kafka 客户端消费者属性的键/值映射。除了支持已知的 Kafka 消费者属性外,此选项还允许使用未知的消费者属性。此处的属性优先于启动时设置的任何属性以及上面的 configuration 属性。

默认值:空映射。

spring.cloud.stream.kafka.binder.headers

绑定器传输的自定义标头列表。仅在与使用 kafka-clients 版本 < 0.11.0.0 的旧应用程序 (⇐ 1.3.x) 通信时需要。较新的版本原生支持标头。

默认值:空。

spring.cloud.stream.kafka.binder.healthTimeout

获取分区信息等待的时间 (以秒为单位)。如果此计时器到期,则健康报告为关闭。

默认值:10。

spring.cloud.stream.kafka.binder.requiredAcks

代理上所需的确认数。请参阅 Kafka 文档以了解生产者 acks 属性。

默认值:1

spring.cloud.stream.kafka.binder.minPartitionCount

仅当 autoCreateTopicsautoAddPartitions 设置为 true 时有效。绑定器在生产或消费数据的主题上配置的全局最小分区数。它可以被生产者的 partitionCount 设置或生产者的 instanceCount * concurrency 设置的值覆盖(如果两者中任何一个更大)。

默认值:1

spring.cloud.stream.kafka.binder.producerProperties

任意 Kafka 客户端生产者属性的键值对映射。除了支持已知的 Kafka 生产者属性外,这里也允许使用未知的生产者属性。此处的属性会覆盖在启动时和上面的 configuration 属性中设置的任何属性。

默认值:空映射。

spring.cloud.stream.kafka.binder.replicationFactor

如果 autoCreateTopics 处于活动状态,则为自动创建主题的副本因子。可以在每个绑定上覆盖。

如果您使用的是 2.4 之前的 Kafka 代理版本,则此值应至少设置为 1。从 3.0.8 版本开始,绑定器使用 -1 作为默认值,这表示将使用代理的 'default.replication.factor' 属性来确定副本数量。请咨询您的 Kafka 代理管理员,以查看是否有任何策略要求最小副本因子,如果是这种情况,通常,default.replication.factor 将与该值匹配,并且应使用 -1,除非您需要大于最小值的副本因子。

默认值:-1

spring.cloud.stream.kafka.binder.autoCreateTopics

如果设置为 true,绑定器会自动创建新主题。如果设置为 false,绑定器依赖于主题已配置。在后一种情况下,如果主题不存在,绑定器将无法启动。

此设置独立于代理的 auto.create.topics.enable 设置,不会影响它。如果服务器设置为自动创建主题,它们可能会作为元数据检索请求的一部分创建,并使用默认的代理设置。

默认值:true

spring.cloud.stream.kafka.binder.autoAddPartitions

如果设置为 true,绑定器会在需要时创建新分区。如果设置为 false,绑定器依赖于主题的分区大小已配置。如果目标主题的分区数量小于预期值,绑定器将无法启动。

默认值:false

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

在绑定器中启用事务。请参阅 Kafka 文档中的 transaction.idspring-kafka 文档中的 事务。启用事务后,将忽略单个 producer 属性,所有生产者都将使用 spring.cloud.stream.kafka.binder.transaction.producer.* 属性。

默认值 null(无事务)

spring.cloud.stream.kafka.binder.transaction.producer.*

事务绑定器中生产者的全局生产者属性。请参阅spring.cloud.stream.kafka.binder.transaction.transactionIdPrefixKafka 生产者属性以及所有绑定器支持的通用生产者属性。

默认值:请参阅各个生产者属性。

spring.cloud.stream.kafka.binder.headerMapperBeanName

用于将spring-messaging头映射到 Kafka 头和从 Kafka 头映射到spring-messaging头的KafkaHeaderMapper的 Bean 名称。例如,如果您希望自定义使用 JSON 反序列化头部的BinderHeaderMapper Bean 中的受信任包,请使用此属性。如果此自定义BinderHeaderMapper Bean 未使用此属性提供给绑定器,则绑定器将查找名为kafkaBinderHeaderMapper且类型为BinderHeaderMapper的头映射器 Bean,然后再回退到绑定器创建的默认BinderHeaderMapper

默认值:无。

spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader

当发现主题上的任何分区(无论接收数据的消费者是什么)都没有领导者时,将绑定器健康状态设置为down的标志。

默认值:false

spring.cloud.stream.kafka.binder.certificateStoreDirectory

当信任库或密钥库证书位置以类路径 URL(classpath:…​)形式给出时,绑定器会将资源从 JAR 文件中的类路径位置复制到文件系统上的位置。该文件将被移动到为此属性指定的值指定的位置,该位置必须是文件系统上可由运行应用程序的进程写入的现有目录。如果未设置此值且证书文件是类路径资源,则它将被移动到系统临时目录,该目录由System.getProperty("java.io.tmpdir")返回。如果此值存在,但目录在文件系统上找不到或不可写,则也是如此。

默认值:无。

1.3.2. Kafka 消费者属性

为了避免重复,Spring Cloud Stream 支持以spring.cloud.stream.kafka.default.consumer.<property>=<value>的格式为所有通道设置值。

以下属性仅适用于 Kafka 消费者,必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer.为前缀。

admin.configuration

从 2.1.1 版本开始,此属性已弃用,取而代之的是topic.properties,并且对它的支持将在未来版本中删除。

admin.replicas-assignment

从 2.1.1 版本开始,此属性已弃用,建议使用 topic.replicas-assignment,将在未来版本中移除对它的支持。

admin.replication-factor

从 2.1.1 版本开始,此属性已弃用,建议使用 topic.replication-factor,将在未来版本中移除对它的支持。

autoRebalanceEnabled

当设置为 true 时,主题分区将在消费者组的成员之间自动重新平衡。当设置为 false 时,每个消费者将根据 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 分配一组固定的分区。这要求在每个启动的实例上都正确设置 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 属性。在这种情况下,spring.cloud.stream.instanceCount 属性的值通常必须大于 1。

默认值:true

ackEachRecord

autoCommitOffset 设置为 true 时,此设置决定是否在处理完每个记录后提交偏移量。默认情况下,偏移量在处理完 consumer.poll() 返回的记录批次中的所有记录后提交。通过 consumerconfiguration 属性设置的 Kafka 属性 max.poll.records 可以控制每次轮询返回的记录数量。将此设置为 true 可能会导致性能下降,但这样做可以降低发生故障时重新传递记录的可能性。此外,请参阅绑定器 requiredAcks 属性,它也会影响提交偏移量的性能。

默认值:false

autoCommitOffset

是否在处理完消息后自动提交偏移量。如果设置为 false,则入站消息中将包含一个键为 kafka_acknowledgment、类型为 org.springframework.kafka.support.Acknowledgment 的头。应用程序可以使用此头来确认消息。有关详细信息,请参阅示例部分。当此属性设置为 false 时,Kafka 绑定器将确认模式设置为 org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL,应用程序负责确认记录。另请参阅 ackEachRecord

默认值:true

autoCommitOnError

仅在 autoCommitOffset 设置为 true 时有效。如果设置为 false,则会抑制导致错误的消息的自动提交,并且仅对成功的消息进行提交。它允许流在发生持久性故障的情况下从最后成功处理的消息自动重播。如果设置为 true,则始终自动提交(如果启用了自动提交)。如果未设置(默认值),则其值与 enableDlq 相同,如果错误消息被发送到 DLQ,则自动提交错误消息,否则不提交。

默认值:未设置。

resetOffsets

是否将消费者的偏移量重置为 startOffset 提供的值。如果提供了 KafkaRebalanceListener,则必须为 false;请参阅 使用 KafkaRebalanceListener

默认值:false

startOffset

新组的起始偏移量。允许的值:earliestlatest。如果消费者的“绑定”明确设置了消费者组(通过 spring.cloud.stream.bindings.<channelName>.group),则 'startOffset' 设置为 earliest。否则,对于 anonymous 消费者组,它将设置为 latest。另请参阅 resetOffsets(在本列表中更早)。

默认值:null(等效于 earliest)。

enableDlq

设置为 true 时,将为消费者启用 DLQ 行为。默认情况下,导致错误的消息将转发到名为 error.<destination>.<group> 的主题。DLQ 主题名称可以通过设置 dlqName 属性或定义类型为 DlqDestinationResolver@Bean 来配置。这为错误数量相对较少且重播整个原始主题过于繁琐的情况提供了另一种选择,而不是更常见的 Kafka 重播方案。有关更多信息,请参阅 死信主题处理 处理。从 2.0 版本开始,发送到 DLQ 主题的消息将使用以下标头进行增强:x-original-topicx-exception-messagex-exception-stacktrace 作为 byte[]。默认情况下,失败的记录将发送到 DLQ 主题中与原始记录相同的分区号。有关如何更改此行为,请参阅 死信主题分区选择destinationIsPatterntrue 时不允许。

默认值:false

dlqPartitions

enableDlq 为 true 且未设置此属性时,将创建一个与主主题数量相同的分区数的死信主题。通常,死信记录将发送到死信主题中与原始记录相同的分区。此行为可以更改;请参阅 死信主题分区选择。如果此属性设置为 1 且没有 DqlPartitionFunction bean,则所有死信记录都将写入分区 0。如果此属性大于 1,则必须提供 DlqPartitionFunction bean。请注意,实际分区计数受绑定程序的 minPartitionCount 属性影响。

默认值:none

configuration

包含通用 Kafka 消费者属性的键值对的映射。除了 Kafka 消费者属性之外,还可以在此处传递其他配置属性。例如,应用程序需要的一些属性,例如 spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar

默认值:空映射。

dlqName

接收错误消息的 DLQ 主题名称。

默认值:null(如果未指定,导致错误的消息将转发到名为 error.<destination>.<group> 的主题)。

dlqProducerProperties

使用此属性可以设置特定于 DLQ 的生产者属性。所有可通过 Kafka 生产者属性设置的属性都可以通过此属性设置。当在消费者上启用本机解码(即,useNativeDecoding: true)时,应用程序必须为 DLQ 提供相应的键/值序列化器。这必须以 dlqProducerProperties.configuration.key.serializerdlqProducerProperties.configuration.value.serializer 的形式提供。

默认值:默认 Kafka 生产者属性。

standardHeaders

指示入站通道适配器填充哪些标准标头。允许的值:noneidtimestampboth。如果使用本机反序列化,并且第一个接收消息的组件需要一个 id(例如,配置为使用 JDBC 消息存储的聚合器),则此属性很有用。

默认值:none

converterBeanName

实现 RecordMessageConverter 的 Bean 的名称。在入站通道适配器中使用,以替换默认的 MessagingMessageConverter

默认值:null

idleEventInterval

指示最近未收到任何消息的事件之间的间隔(以毫秒为单位)。使用 ApplicationListener<ListenerContainerIdleEvent> 来接收这些事件。有关使用示例,请参见 示例:暂停和恢复消费者

默认值:30000

destinationIsPattern

如果为 true,则目标将被视为正则表达式 Pattern,用于通过代理匹配主题名称。如果为 true,则不会预配主题,并且不允许 enableDlq,因为绑定器在预配阶段不知道主题名称。请注意,检测与模式匹配的新主题所需的时间由消费者属性 metadata.max.age.ms 控制,该属性(在撰写本文时)默认为 300,000 毫秒(5 分钟)。可以使用上面的 configuration 属性进行配置。

默认值:false

topic.properties

预配新主题时使用的 Kafka 主题属性的 Map,例如 spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0

默认值:无。

topic.replicas-assignment

副本分配的 Map<Integer, List<Integer>>,其中键是分区,值是分配。在预配新主题时使用。请参见 kafka-clients jar 中的 NewTopic Javadocs。

默认值:无。

topic.replication-factor

在配置主题时使用的副本因子。覆盖绑定范围的设置。如果存在replicas-assignments,则忽略。

默认值:无(使用绑定范围的默认值 -1)。

pollTimeout

可轮询消费者中轮询使用的超时时间。

默认值:5 秒。

transactionManager

用于覆盖绑定事务管理器的KafkaAwareTransactionManager 的 Bean 名称。通常,如果您想使用ChainedKafkaTransactionManaager 将另一个事务与 Kafka 事务同步,则需要此操作。为了实现记录的精确一次消费和生产,消费者和生产者绑定必须都配置了相同的事务管理器。

默认值:无。

txCommitRecovered

使用事务性绑定时,恢复记录的偏移量(例如,当重试耗尽且记录被发送到死信主题时)将通过默认情况下新的事务提交。将此属性设置为false 将抑制提交恢复记录的偏移量。

默认值:true。

1.3.3. 重置偏移量

当应用程序启动时,每个分配的分区的初始位置取决于两个属性startOffsetresetOffsets。如果resetOffsetsfalse,则应用正常的 Kafka 消费者 auto.offset.reset 语义。即,如果绑定消费者的组没有为分区提交偏移量,则位置为earliestlatest。默认情况下,具有显式group 的绑定使用earliest,而匿名绑定(没有group)使用latest。这些默认值可以通过设置startOffset 绑定属性来覆盖。第一次使用特定group 启动绑定时,将没有提交的偏移量。另一个没有提交偏移量的情况是偏移量已过期。对于现代代理(从 2.1 开始)和默认代理属性,偏移量在最后一个成员离开组后 7 天过期。有关更多信息,请参阅 offsets.retention.minutes 代理属性。

resetOffsetstrue 时,绑定会应用与代理上没有提交偏移量时类似的语义,就好像此绑定从未从主题中消费过一样;即,任何当前提交的偏移量都会被忽略。

以下是可能使用此功能的两种用例。

  1. 从包含键值对的压缩主题中消费。将resetOffsets 设置为true,并将startOffset 设置为earliest;绑定将在所有新分配的分区上执行seekToBeginning

  2. 从包含事件的主题中消费,您只对在此绑定运行期间发生的事件感兴趣。将resetOffsets 设置为true,并将startOffset 设置为latest;绑定将在所有新分配的分区上执行seekToEnd

如果在初始分配后发生重新平衡,则仅对在初始分配期间未分配的任何新分配的分区执行查找。

要更细致地控制主题偏移量,请参阅 使用 KafkaRebalanceListener;当提供监听器时,resetOffsets 不应设置为 true,否则会导致错误。 >>>>>>> 7bc90c10…​ GH-1084: 添加 txCommitRecovered 属性

1.3.4. 消费批次

从 3.0 版本开始,当 spring.cloud.stream.binding.<name>.consumer.batch-mode 设置为 true 时,从 Kafka Consumer 轮询接收到的所有记录将作为 List<?> 提供给监听器方法。否则,该方法将一次调用一个记录。批次的大小由 Kafka 消费者属性 max.poll.recordsfetch.min.bytesfetch.max.wait.ms 控制;有关更多信息,请参阅 Kafka 文档。

请注意,批处理模式不支持 @StreamListener - 它只适用于较新的函数式编程模型。

使用批处理模式时,不支持绑定器内的重试,因此 maxAttempts 将被覆盖为 1。您可以配置一个 SeekToCurrentBatchErrorHandler(使用 ListenerContainerCustomizer)来实现与绑定器中重试类似的功能。您也可以使用手动 AckMode 并调用 Ackowledgment.nack(index, sleep) 来提交部分批次的偏移量,并让剩余的记录重新传递。有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档

1.3.5. Kafka 生产者属性

为了避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为 spring.cloud.stream.kafka.default.producer.<property>=<value>

以下属性仅适用于 Kafka 生产者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.producer. 为前缀。

admin.configuration

从 2.1.1 版本开始,此属性已弃用,取而代之的是topic.properties,并且对它的支持将在未来版本中删除。

admin.replicas-assignment

从 2.1.1 版本开始,此属性已弃用,建议使用 topic.replicas-assignment,将在未来版本中移除对它的支持。

admin.replication-factor

从 2.1.1 版本开始,此属性已弃用,建议使用 topic.replication-factor,将在未来版本中移除对它的支持。

bufferSize

Kafka 生产者在发送之前尝试批处理的数据量上限(以字节为单位)。

默认值:16384

sync

生产者是否同步。

默认值:false

sendTimeoutExpression

一个针对发送消息进行评估的 SpEL 表达式,用于评估启用同步发布时等待确认的时间,例如 headers['mySendTimeout']。超时值的单位是毫秒。在 3.0 之前的版本中,除非使用原生编码,否则无法使用有效负载,因为在评估此表达式时,有效负载已经以 byte[] 的形式存在。现在,表达式在有效负载转换之前进行评估。

默认值:none

batchTimeout

生产者等待更多消息累积到同一批次中以发送消息的时间。(通常,生产者不会等待,而是简单地发送在上次发送过程中累积的所有消息。)非零值可能会以延迟为代价提高吞吐量。

默认值:0

messageKeyExpression

一个针对发送消息进行评估的 SpEL 表达式,用于填充生产的 Kafka 消息的键,例如 headers['myKey']。在 3.0 之前的版本中,除非使用原生编码,否则无法使用有效负载,因为在评估此表达式时,有效负载已经以 byte[] 的形式存在。现在,表达式在有效负载转换之前进行评估。对于常规处理器(Function<String, String>Function<Message<?>, Message<?>),如果生产的键需要与来自主题的传入键相同,则可以按如下方式设置此属性。spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey'] 对于反应式函数,需要注意一个重要的注意事项。在这种情况下,应用程序需要手动将传入消息的标头复制到传出消息。您可以设置标头,例如 myKey,并使用 headers['myKey'],如上所述,或者为了方便起见,只需设置 KafkaHeaders.MESSAGE_KEY 标头,您就不需要设置此属性。

默认值:none

headerPatterns

一个用逗号分隔的简单模式列表,用于匹配要映射到 ProducerRecord 中的 Kafka Headers 的 Spring 消息标头。模式可以以通配符(星号)开头或结尾。模式可以通过在前面加上 ! 来否定。匹配在第一次匹配(正或负)后停止。例如,!ask,as* 将传递 ash 但不传递 askidtimestamp 永远不会被映射。

默认值:*(所有标头 - 除了 idtimestamp

configuration

包含通用 Kafka 生产者属性的键值对映射。

默认值:空映射。

topic.properties

在配置新主题时使用的 Kafka 主题属性的 Map,例如 spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0

topic.replicas-assignment

副本分配的 Map<Integer, List<Integer>>,其中键是分区,值是分配。在预配新主题时使用。请参见 kafka-clients jar 中的 NewTopic Javadocs。

默认值:无。

topic.replication-factor

在配置主题时使用的副本因子。覆盖绑定范围的设置。如果存在replicas-assignments,则忽略。

默认值:无(使用绑定范围的默认值 -1)。

useTopicHeader

设置为true 以使用出站消息中 KafkaHeaders.TOPIC 消息头的值覆盖默认绑定目标(主题名称)。如果头不存在,则使用默认绑定目标。默认值:false

recordMetadataChannel

成功发送结果应发送到的 MessageChannel 的 bean 名称;该 bean 必须存在于应用程序上下文中。发送到该通道的消息是已发送的消息(如果有转换,则经过转换),并带有额外的头 KafkaHeaders.RECORD_METADATA。该头包含 Kafka 客户端提供的 RecordMetadata 对象;它包含记录在主题中写入的分区和偏移量。

ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)

发送失败将进入生产者错误通道(如果已配置);请参阅 错误通道。默认值:null

+

Kafka 绑定器使用生产者的 partitionCount 设置作为提示,以使用给定的分区计数创建主题(与 minPartitionCount 结合使用,两者中较大的值为使用值)。在为绑定器配置 minPartitionCount 和为应用程序配置 partitionCount 时要谨慎,因为使用较大的值。如果主题已存在且分区计数较小,并且 autoAddPartitions 已禁用(默认值),则绑定器无法启动。如果主题已存在且分区计数较小,并且 autoAddPartitions 已启用,则会添加新分区。如果主题已存在且分区数量大于 (minPartitionCountpartitionCount) 中较大的值,则使用现有分区计数。
compression

设置 compression.type 生产者属性。支持的值为 nonegzipsnappylz4。如果您将 kafka-clients jar 覆盖到 2.1.0(或更高版本),如 Spring for Apache Kafka 文档 中所述,并且希望使用 zstd 压缩,请使用 spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd

默认值:none

closeTimeout

关闭生产者时等待的超时时间(以秒为单位)。

默认值:30

1.3.6. 使用示例

在本节中,我们将展示在特定场景中使用上述属性的方法。

示例:将 autoCommitOffset 设置为 false 并依赖手动确认

此示例说明如何在消费者应用程序中手动确认偏移量。

此示例要求将 spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset 设置为 false。请使用您的示例的相应输入通道名称。

@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {

 public static void main(String[] args) {
     SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
 }

 @StreamListener(Sink.INPUT)
 public void process(Message<?> message) {
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }
}
示例:安全配置

Apache Kafka 0.9 支持客户端和代理之间的安全连接。要利用此功能,请遵循 Apache Kafka 文档 以及 Kafka 0.9 Confluent 文档中的安全指南 中的指南。使用 spring.cloud.stream.kafka.binder.configuration 选项为绑定器创建的所有客户端设置安全属性。

例如,要将 security.protocol 设置为 SASL_SSL,请设置以下属性

spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

所有其他安全属性都可以以类似的方式设置。

使用 Kerberos 时,请遵循 参考文档 中的说明来创建和引用 JAAS 配置。

Spring Cloud Stream 支持通过使用 JAAS 配置文件和 Spring Boot 属性将 JAAS 配置信息传递给应用程序。

使用 JAAS 配置文件

可以使用系统属性为 Spring Cloud Stream 应用程序设置 JAAS 和(可选)krb5 文件位置。以下示例展示了如何使用 JAAS 配置文件启动使用 SASL 和 Kerberos 的 Spring Cloud Stream 应用程序

 java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
   --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用 Spring Boot 属性

作为使用 JAAS 配置文件的替代方案,Spring Cloud Stream 提供了一种机制,可以通过使用 Spring Boot 属性为 Spring Cloud Stream 应用程序设置 JAAS 配置。

以下属性可用于配置 Kafka 客户端的登录上下文

spring.cloud.stream.kafka.binder.jaas.loginModule

登录模块名称。在正常情况下无需设置。

默认值:com.sun.security.auth.module.Krb5LoginModule

spring.cloud.stream.kafka.binder.jaas.controlFlag

登录模块的控制标志。

默认值:required

spring.cloud.stream.kafka.binder.jaas.options

包含登录模块选项的键值对映射。

默认值:空映射。

以下示例展示了如何使用 Spring Boot 配置属性启动使用 SASL 和 Kerberos 的 Spring Cloud Stream 应用程序

 java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.autoCreateTopics=false \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
   --spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
   --spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
   --spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
   --spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM

前面的示例表示以下 JAAS 文件的等效项

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_client.keytab"
    principal="[email protected]";
};

如果所需主题已存在于代理上或将由管理员创建,则可以关闭自动创建,并且只需要发送客户端 JAAS 属性。

不要在同一个应用程序中混合使用 JAAS 配置文件和 Spring Boot 属性。如果 -Djava.security.auth.login.config 系统属性已存在,则 Spring Cloud Stream 会忽略 Spring Boot 属性。
在使用 autoCreateTopicsautoAddPartitions 与 Kerberos 时要小心。通常,应用程序可能使用在 Kafka 和 Zookeeper 中没有管理权限的主体。因此,依赖 Spring Cloud Stream 创建/修改主题可能会失败。在安全环境中,我们强烈建议使用 Kafka 工具以管理方式创建主题和管理 ACL。
示例:暂停和恢复消费者

如果您希望暂停消费但不想导致分区重新平衡,您可以暂停和恢复消费者。这可以通过将Consumer作为参数添加到您的@StreamListener来实现。要恢复,您需要一个用于ListenerContainerIdleEvent实例的ApplicationListener。事件发布的频率由idleEventInterval属性控制。由于消费者不是线程安全的,您必须在调用线程上调用这些方法。

以下简单的应用程序展示了如何暂停和恢复

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@StreamListener(Sink.INPUT)
	public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
		System.out.println(in);
		consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
	}

	@Bean
	public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
		return event -> {
			System.out.println(event);
			if (event.getConsumer().paused().size() > 0) {
				event.getConsumer().resume(event.getConsumer().paused());
			}
		};
	}

}

1.4. 事务性 Binder

通过将spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix设置为非空值(例如tx-)来启用事务。在处理器应用程序中使用时,消费者会启动事务;在消费者线程上发送的任何记录都将参与同一事务。当监听器正常退出时,监听器容器将向事务发送偏移量并提交它。所有使用spring.cloud.stream.kafka.binder.transaction.producer.*属性配置的生产者绑定都使用一个通用的生产者工厂;单个绑定 Kafka 生产者属性将被忽略。

由于重试将在原始事务中运行,该事务可能会回滚,并且任何发布的记录也将回滚,因此不支持使用事务进行正常的绑定重试(和死信)。当启用重试(通用属性maxAttempts大于零)时,重试属性用于配置一个DefaultAfterRollbackProcessor以在容器级别启用重试。类似地,此功能不是在事务中发布死信记录,而是移动到监听器容器,同样通过DefaultAfterRollbackProcessor实现,该处理器在主事务回滚后运行。

如果您希望在源应用程序中使用事务,或者从某个任意线程进行仅生产者事务(例如@Scheduled方法),您必须获取对事务性生产者工厂的引用,并使用它定义一个KafkaTransactionManager bean。

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders) {
    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
    return new KafkaTransactionManager<>(pf);
}

请注意,我们使用BinderFactory获取对绑定的引用;当只配置了一个绑定时,在第一个参数中使用null。如果配置了多个绑定,请使用绑定名称来获取引用。一旦我们获得了对绑定的引用,我们就可以获得对ProducerFactory的引用并创建一个事务管理器。

然后,您可以使用正常的 Spring 事务支持,例如TransactionTemplate@Transactional,例如

public static class Sender {

    @Transactional
    public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
        stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
    }

}

如果您希望将仅生产者的事务与来自其他事务管理器的那些事务同步,请使用ChainedTransactionManager

1.5. 错误通道

从 1.3 版本开始,绑定器无条件地将异常发送到每个消费者目标的错误通道,并且还可以配置为将异步生产者发送失败发送到错误通道。有关更多信息,请参阅有关错误处理的这一部分

发送失败的ErrorMessage的有效负载是KafkaSendFailureException,它具有以下属性

  • failedMessage:无法发送的 Spring Messaging Message<?>

  • record:从failedMessage创建的原始ProducerRecord

没有自动处理生产者异常(例如发送到死信队列)。您可以使用自己的 Spring Integration 流来使用这些异常。

1.6. Kafka 指标

Kafka 绑定器模块公开了以下指标

spring.cloud.stream.binder.kafka.offset:此指标指示给定绑定器的主题中给定消费者组尚未消费的消息数量。提供的指标基于 Micrometer 库。如果 Micrometer 位于类路径上,并且应用程序没有提供其他此类 bean,则绑定器会创建KafkaBinderMetrics bean。该指标包含消费者组信息、主题和主题上最新偏移量的已提交偏移量的实际滞后。此指标对于向 PaaS 平台提供自动缩放反馈特别有用。

您可以排除KafkaBinderMetrics,以防止创建必要的基础设施(如消费者),然后通过在应用程序中提供以下组件来报告指标。

@Component
class NoOpBindingMeters {
	NoOpBindingMeters(MeterRegistry registry) {
		registry.config().meterFilter(
				MeterFilter.denyNameStartsWith(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME));
	}
}

有关如何选择性地抑制指标的更多详细信息,请参阅此处

1.7. 墓碑记录(空记录值)

使用压缩主题时,具有null值的记录(也称为墓碑记录)表示删除键。要在@StreamListener方法中接收此类消息,参数必须标记为非必需,以接收null值参数。

@StreamListener(Sink.INPUT)
public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key,
               @Payload(required = false) Customer customer) {
    // customer is null if a tombstone record
    ...
}

1.8. 使用 KafkaRebalanceListener

应用程序可能希望在最初分配分区时将主题/分区搜索到任意偏移量,或对消费者执行其他操作。从 2.1 版本开始,如果您在应用程序上下文中提供单个KafkaRebalanceListener bean,它将被连接到所有 Kafka 消费者绑定。

public interface KafkaBindingRebalanceListener {

	/**
	 * Invoked by the container before any pending offsets are committed.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 */
	default void onPartitionsRevokedBeforeCommit(String bindingName, Consumer<?, ?> consumer,
			Collection<TopicPartition> partitions) {

	}

	/**
	 * Invoked by the container after any pending offsets are committed.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 */
	default void onPartitionsRevokedAfterCommit(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {

	}

	/**
	 * Invoked when partitions are initially assigned or after a rebalance.
	 * Applications might only want to perform seek operations on an initial assignment.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 * @param initial true if this is the initial assignment.
	 */
	default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
			boolean initial) {

	}

}

当您提供重新平衡侦听器时,您不能将resetOffsets消费者属性设置为true

1.9. 自定义消费者和生产者配置

如果您希望对用于在 Kafka 中创建ConsumerFactoryProducerFactory的消费者和生产者配置进行高级自定义,您可以实现以下自定义程序。

  • 消费者配置定制器

  • 生产者配置定制器

这两个接口都提供了一种方法来配置用于消费者和生产者属性的配置映射。例如,如果您想访问在应用程序级别定义的 Bean,您可以在 configure 方法的实现中注入它。当绑定器发现这些定制器作为 Bean 可用时,它将在创建消费者和生产者工厂之前调用 configure 方法。

1.10. 自定义 AdminClient 配置

与上面消费者和生产者配置定制一样,应用程序还可以通过提供 AdminClientConfigCustomizer 来定制管理客户端的配置。AdminClientConfigCustomizer 的 configure 方法提供了对管理客户端属性的访问,您可以使用它来定义进一步的定制。绑定器的 Kafka 主题供应器对通过此定制器提供的属性给予最高优先级。以下是如何提供此定制器 Bean 的示例。

@Bean
public AdminClientConfigCustomizer adminClientConfigCustomizer() {
    return props -> {
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    };
}

1.11. 死信主题处理

1.11.1. 死信主题分区选择

默认情况下,记录使用与原始记录相同的分区发布到死信主题。这意味着死信主题必须至少与原始记录具有相同数量的分区。

要更改此行为,请将 DlqPartitionFunction 实现作为 @Bean 添加到应用程序上下文。只能存在一个这样的 Bean。该函数将提供消费者组、失败的 ConsumerRecord 和异常。例如,如果您始终想要路由到分区 0,您可以使用

@Bean
public DlqPartitionFunction partitionFunction() {
    return (group, record, ex) -> 0;
}
如果您将消费者绑定的 dlqPartitions 属性设置为 1(并且绑定器的 minPartitionCount 等于 1),则无需提供 DlqPartitionFunction;框架将始终使用分区 0。如果您将消费者绑定的 dlqPartitions 属性设置为大于 1 的值(或绑定器的 minPartitionCount 大于 1),则 **必须** 提供 DlqPartitionFunction Bean,即使分区计数与原始主题的相同。

还可以为 DLQ 主题定义自定义名称。为此,请将 DlqDestinationResolver 的实现作为 @Bean 创建到应用程序上下文。当绑定器检测到这样的 Bean 时,它将优先使用它,否则它将使用 dlqName 属性。如果两者都找不到,它将默认为 error.<destination>.<group>。以下是如何将 DlqDestinationResolver 作为 @Bean 的示例。

@Bean
public DlqDestinationResolver dlqDestinationResolver() {
    return (rec, ex) -> {
        if (rec.topic().equals("word1")) {
            return "topic1-dlq";
        }
        else {
            return "topic2-dlq";
        }
    };
}

在为 DlqDestinationResolver 提供实现时,需要注意的一点是,绑定器中的供应器不会自动为应用程序创建主题。这是因为绑定器无法推断实现可能发送到的所有 DLQ 主题的名称。因此,如果您使用此策略提供 DLQ 名称,则应用程序有责任确保这些主题事先创建。

1.11.2. 处理死信主题中的记录

由于框架无法预测用户希望如何处理死信消息,因此它不提供任何标准机制来处理它们。如果死信的原因是暂时的,您可能希望将消息路由回原始主题。但是,如果问题是永久性问题,则会导致无限循环。本主题中的示例 Spring Boot 应用程序展示了如何将这些消息路由回原始主题,但它会在三次尝试后将它们移动到“停车场”主题。该应用程序是另一个 spring-cloud-stream 应用程序,它从死信主题读取。当 5 秒内没有收到消息时,它将终止。

这些示例假设原始目标是 so8400out,消费者组是 so8400

有几种策略需要考虑

  • 考虑仅在主应用程序未运行时运行重路由。否则,针对瞬态错误的重试会很快用完。

  • 或者,使用两阶段方法:使用此应用程序路由到第三个主题,然后使用另一个应用程序从那里路由回主主题。

以下代码清单显示了示例应用程序

application.properties
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400

spring.cloud.stream.bindings.output.destination=so8400out

spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot

spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest

spring.cloud.stream.kafka.binder.headers=x-retries
Application
@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) {
        SpringApplication.run(ReRouteDlqKApplication.class, args).close();
    }

    private final AtomicInteger processed = new AtomicInteger();

    @Autowired
    private MessageChannel parkingLot;

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Message<?> reRoute(Message<?> failed) {
        processed.incrementAndGet();
        Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
        if (retries == null) {
            System.out.println("First retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else if (retries.intValue() < 3) {
            System.out.println("Another retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else {
            System.out.println("Retries exhausted for " + failed);
            parkingLot.send(MessageBuilder.fromMessage(failed)
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build());
        }
        return null;
    }

    @Override
    public void run(String... args) throws Exception {
        while (true) {
            int count = this.processed.get();
            Thread.sleep(5000);
            if (count == this.processed.get()) {
                System.out.println("Idle, terminating");
                return;
            }
        }
    }

    public interface TwoOutputProcessor extends Processor {

        @Output("parkingLot")
        MessageChannel parkingLot();

    }

}

1.12. 使用 Kafka Binder 进行分区

Apache Kafka 本地支持主题分区。

有时将数据发送到特定分区是有利的,例如,当您希望严格排序消息处理时(针对特定客户的所有消息都应发送到同一个分区)。

以下示例展示了如何配置生产者和消费者端

@SpringBootApplication
@EnableBinding(Source.class)
public class KafkaPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "foo1", "bar1", "qux1",
            "foo2", "bar2", "qux2",
            "foo3", "bar3", "qux3",
            "foo4", "bar4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
    public Message<?> generate() {
        String value = data[RANDOM.nextInt(data.length)];
        System.out.println("Sending: " + value);
        return MessageBuilder.withPayload(value)
                .setHeader("partitionKey", value)
                .build();
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        output:
          destination: partitioned.topic
          producer:
            partition-key-expression: headers['partitionKey']
            partition-count: 12
主题必须配置为具有足够的 partition 来实现所有消费者组的所需并发性。上述配置支持最多 12 个消费者实例(如果它们的 concurrency 为 2,则为 6 个;如果它们的并发性为 3,则为 4 个,依此类推)。通常最好“过度配置”分区,以允许将来增加消费者或并发性。
前面的配置使用默认分区 (key.hashCode() % partitionCount)。这可能会或可能不会提供一个适当平衡的算法,具体取决于键值。您可以使用 partitionSelectorExpressionpartitionSelectorClass 属性覆盖此默认值。

由于分区由 Kafka 本地处理,因此消费者端不需要任何特殊配置。Kafka 在实例之间分配分区。

以下 Spring Boot 应用程序侦听 Kafka 流,并将每个消息发送到的分区 ID(到控制台)打印出来

@SpringBootApplication
@EnableBinding(Sink.class)
public class KafkaPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Payload String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        System.out.println(in + " received from partition " + partition);
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        input:
          destination: partitioned.topic
          group: myGroup

您可以根据需要添加实例。Kafka 会重新平衡分区分配。如果实例数量(或instance count * concurrency)超过分区数量,则某些消费者将处于空闲状态。

2. Kafka Streams Binder

2.1. 使用

要使用 Kafka Streams 绑定器,您只需将其添加到您的 Spring Cloud Stream 应用程序中,使用以下 Maven 坐标

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>

使用 Spring Initializr 快速启动 Kafka Streams 绑定器的全新项目,然后选择“Cloud Streams”和“Spring for Kafka Streams”,如下所示

spring initializr kafka streams

2.2. 概述

Spring Cloud Stream 包含一个专门为 Apache Kafka Streams 绑定设计的绑定器实现。通过这种原生集成,Spring Cloud Stream “处理器”应用程序可以直接在核心业务逻辑中使用 Apache Kafka Streams API。

Kafka Streams 绑定器实现建立在 Spring for Apache Kafka 项目提供的基础之上。

Kafka Streams 绑定器为 Kafka Streams 中的三种主要类型提供绑定功能 - KStreamKTableGlobalKTable

Kafka Streams 应用程序通常遵循一种模型,其中记录从入站主题读取,应用业务逻辑,然后将转换后的记录写入出站主题。或者,也可以定义没有出站目标的处理器应用程序。

在以下部分中,我们将详细了解 Spring Cloud Stream 与 Kafka Streams 的集成。

2.3. 编程模型

使用 Kafka Streams 绑定器提供的编程模型时,既可以使用高级 Streams DSL,也可以混合使用高级和低级 Processor-API。当混合使用高级和低级 API 时,这通常是通过在 KStream 上调用 transformprocess API 方法来实现的。

2.3.1. 函数式风格

从 Spring Cloud Stream 3.0.0 开始,Kafka Streams 绑定器允许应用程序使用 Java 8 中可用的函数式编程风格进行设计和开发。这意味着应用程序可以简洁地表示为类型 java.util.function.Functionjava.util.function.Consumer 的 lambda 表达式。

让我们来看一个非常基本的例子。

@SpringBootApplication
public class SimpleConsumerApplication {

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input ->
                input.foreach((key, value) -> {
                    System.out.println("Key: " + key + " Value: " + value);
                });
    }
}

虽然简单,但这是一个完整的独立 Spring Boot 应用程序,它利用 Kafka Streams 进行流处理。这是一个没有出站绑定的消费者应用程序,只有一个入站绑定。应用程序消费数据,并简单地将 KStream 键和值的信息记录到标准输出。应用程序包含 SpringBootApplication 注解和一个标记为 Bean 的方法。bean 方法的类型为 java.util.function.Consumer,它用 KStream 参数化。然后在实现中,我们返回一个本质上是 lambda 表达式的 Consumer 对象。在 lambda 表达式中,提供了处理数据的代码。

在这个应用程序中,有一个单一的输入绑定,类型为 KStream。绑定器为应用程序创建此绑定,名称为 process-in-0,即函数 bean 名称后跟一个连字符 (-),然后是文字 in,再跟一个连字符,然后是参数的序数位置。您可以使用此绑定名称来设置其他属性,例如目标。例如,spring.cloud.stream.bindings.process-in-0.destination=my-topic

如果绑定中未设置目标属性,则会创建一个与绑定名称相同的主题(如果应用程序具有足够的权限),或者预期该主题已存在。

构建为 uber-jar(例如,kstream-consumer-app.jar)后,您可以像下面这样运行上面的示例。

java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic

以下是一个完整的处理器示例,它包含输入和输出绑定。这是经典的词频统计示例,其中应用程序从主题接收数据,然后在滚动时间窗口中计算每个词的出现次数。

@SpringBootApplication
public class WordCountProcessorApplication {

  @Bean
  public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    return input -> input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, value) -> new KeyValue<>(value, value))
                .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("word-counts-state-store"))
                .toStream()
                .map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))));
  }

	public static void main(String[] args) {
		SpringApplication.run(WordCountProcessorApplication.class, args);
	}
}

同样,这是一个完整的 Spring Boot 应用程序。与第一个应用程序的不同之处在于,bean 方法的类型为 java.util.function.FunctionFunction 的第一个参数化类型用于输入 KStream,第二个参数化类型用于输出。在方法体中,提供了一个 Function 类型的 lambda 表达式,作为实现,给出了实际的业务逻辑。与之前讨论的基于 Consumer 的应用程序类似,这里的输入绑定默认命名为 process-in-0。对于输出,绑定名称也会自动设置为 process-out-0

构建为 uber-jar(例如,wordcount-processor.jar)后,您可以像下面这样运行上面的示例。

java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts

此应用程序将从 Kafka 主题 words 中消费消息,并将计算结果发布到输出主题 counts

Spring Cloud Stream 将确保来自传入和传出主题的消息自动绑定为 KStream 对象。作为开发人员,您可以专注于代码的业务方面,即编写处理器中所需的逻辑。框架会自动处理 Kafka Streams 基础设施所需的 Kafka Streams 特定配置。

我们上面看到的两个示例都有一个 KStream 输入绑定。在这两种情况下,绑定都从单个主题接收记录。如果您想将多个主题多路复用到单个 KStream 绑定中,可以在下面提供以逗号分隔的 Kafka 主题作为目标。

spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3

此外,如果您想根据正则表达式匹配主题,还可以提供主题模式作为目标。

spring.cloud.stream.bindings.process-in-0.destination=input.*

多个输入绑定

许多非平凡的 Kafka Streams 应用程序通常通过多个绑定从多个主题消费数据。例如,一个主题作为 Kstream 消费,另一个作为 KTableGlobalKTable 消费。应用程序可能希望以表格类型接收数据的原因有很多。考虑一个用例,其中基础主题通过来自数据库的更改数据捕获 (CDC) 机制填充,或者应用程序只关心最新更新以进行下游处理。如果应用程序指定数据需要绑定为 KTableGlobalKTable,那么 Kafka Streams 绑定器将正确地将目标绑定到 KTableGlobalKTable,并使它们可供应用程序操作。我们将研究 Kafka Streams 绑定器中如何处理多个输入绑定的几种不同场景。

Kafka Streams Binder 中的 BiFunction

以下是一个有两个输入和一个输出的示例。在这种情况下,应用程序可以利用java.util.function.BiFunction

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
    return (userClicksStream, userRegionsTable) -> (userClicksStream
            .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                            "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
            .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
                    regionWithClicks.getClicks()))
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .reduce(Long::sum)
            .toStream());
}

这里,基本主题与之前的示例相同,但这里有两个输入。Java 的BiFunction 支持用于将输入绑定到所需的目的地。绑定器为输入生成的默认绑定名称分别为process-in-0process-in-1。默认输出绑定为process-out-0。在本例中,BiFunction 的第一个参数绑定为第一个输入的KStream,第二个参数绑定为第二个输入的KTable

Kafka Streams Binder 中的 BiConsumer

如果有两个输入,但没有输出,那么我们可以使用java.util.function.BiConsumer,如下所示。

@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
    return (userClicksStream, userRegionsTable) -> {}
}
超过两个输入

如果有多于两个输入怎么办?在某些情况下,您可能需要超过两个输入。在这种情况下,绑定器允许您链接部分函数。在函数式编程术语中,这种技术通常被称为柯里化。随着 Java 8 中添加的函数式编程支持,Java 现在允许您编写柯里化函数。Spring Cloud Stream Kafka Streams 绑定器可以利用此功能来启用多个输入绑定。

让我们看一个例子。

@Bean
public Function<KStream<Long, Order>,
        Function<GlobalKTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    return orders -> (
              customers -> (
                    products -> (
                        orders.join(customers,
                            (orderId, order) -> order.getCustomerId(),
                                (order, customer) -> new CustomerOrder(customer, order))
                                .join(products,
                                        (orderId, customerOrder) -> customerOrder
                                                .productId(),
                                        (customerOrder, product) -> {
                                            EnrichedOrder enrichedOrder = new EnrichedOrder();
                                            enrichedOrder.setProduct(product);
                                            enrichedOrder.setCustomer(customerOrder.customer);
                                            enrichedOrder.setOrder(customerOrder.order);
                                            return enrichedOrder;
                                        })
                        )
                )
    );
}

让我们看一下上面介绍的绑定模型的细节。在这个模型中,我们在入站有 3 个部分应用的函数。让我们将它们称为f(x)f(y)f(z)。如果我们从真正的数学函数的角度扩展这些函数,它将看起来像这样:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>x 变量代表KStream<Long, Order>y 变量代表GlobalKTable<Long, Customer>z 变量代表GlobalKTable<Long, Product>。第一个函数f(x) 具有应用程序的第一个输入绑定(KStream<Long, Order>),其输出是函数f(y)。函数f(y) 具有应用程序的第二个输入绑定(GlobalKTable<Long, Customer>),其输出是另一个函数f(z)。函数f(z) 的输入是应用程序的第三个输入(GlobalKTable<Long, Product>),其输出是KStream<Long, EnrichedOrder>,它是应用程序的最终输出绑定。来自三个部分函数的输入,分别是KStreamGlobalKTableGlobalKTable,在方法体中可用于实现作为 lambda 表达式一部分的业务逻辑。

输入绑定分别命名为enrichOrder-in-0enrichOrder-in-1enrichOrder-in-2。输出绑定命名为enrichOrder-out-0

使用柯里化函数,您可以拥有任意数量的输入。但是,请记住,超过少量输入和部分应用的函数(如上面在 Java 中所示)可能会导致代码难以阅读。因此,如果您的 Kafka Streams 应用程序需要超过合理数量的输入绑定,并且您想使用这种函数模型,那么您可能需要重新考虑您的设计并适当地分解应用程序。

多个输出绑定

Kafka Streams 允许将出站数据写入多个主题。此功能在 Kafka Streams 中被称为分支。当使用多个输出绑定时,需要提供一个 KStream 数组 (KStream[]) 作为出站返回类型。

以下是一个示例

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {

    Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
    Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
    Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

    return input -> input
            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
            .groupBy((key, value) -> value)
            .windowedBy(TimeWindows.of(5000))
            .count(Materialized.as("WordCounts-branch"))
            .toStream()
            .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
                    new Date(key.window().start()), new Date(key.window().end()))))
            .branch(isEnglish, isFrench, isSpanish);
}

编程模型保持不变,但出站参数化类型为 KStream[]。默认输出绑定名称分别为 process-out-0process-out-1process-out-2。绑定器生成三个输出绑定的原因是它检测到返回的 KStream 数组的长度。

Kafka Streams 函数式编程风格总结

总之,下表显示了可以在函数式范式中使用的各种选项。

输入数量 输出数量 要使用的组件

1

0

java.util.function.Consumer

2

0

java.util.function.BiConsumer

1

1..n

java.util.function.Function

2

1..n

java.util.function.BiFunction

>= 3

0..n

使用柯里化函数

  • 在表中有多个输出的情况下,类型简单地变为 KStream[]

2.3.2. 命令式编程模型。

虽然上面概述的函数式编程模型是首选方法,但如果您愿意,您仍然可以使用经典的 StreamListener 方法。

以下是一些示例。

以下是使用 StreamListener 的单词计数示例的等效代码。

@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class WordCountProcessorApplication {

    @StreamListener("input")
    @SendTo("output")
    public KStream<?, WordCount> process(KStream<?, String> input) {
        return input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("WordCounts-multi"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
    }

    public static void main(String[] args) {
        SpringApplication.run(WordCountProcessorApplication.class, args);
    }

如您所见,这有点冗长,因为您需要提供 EnableBinding 和其他额外的注释(如 StreamListenerSendTo)才能使其成为一个完整的应用程序。EnableBinding 是您指定包含绑定的绑定接口的地方。在本例中,我们使用的是库存 KafkaStreamsProcessor 绑定接口,它具有以下契约。

public interface KafkaStreamsProcessor {

	@Input("input")
	KStream<?, ?> input();

	@Output("output")
	KStream<?, ?> output();

}

绑定器将为输入 KStream 和输出 KStream 创建绑定,因为您使用的是包含这些声明的绑定接口。

除了函数式风格提供的编程模型的明显差异之外,这里需要特别提到的一点是,绑定名称是您在绑定接口中指定的。例如,在上面的应用程序中,由于我们使用的是 KafkaStreamsProcessor,因此绑定名称为 inputoutput。绑定属性需要使用这些名称。例如 spring.cloud.stream.bindings.input.destinationspring.cloud.stream.bindings.output.destination 等。请记住,这与函数式风格有根本的不同,因为在函数式风格中,绑定器会为应用程序生成绑定名称。这是因为应用程序在使用 EnableBinding 的函数模型中没有提供任何绑定接口。

以下是一个具有两个输入的接收器的另一个示例。

@EnableBinding(KStreamKTableBinding.class)
.....
.....
@StreamListener
public void process(@Input("inputStream") KStream<String, PlayEvent> playEvents,
                    @Input("inputTable") KTable<Long, Song> songTable) {
                    ....
                    ....
}

interface KStreamKTableBinding {

    @Input("inputStream")
    KStream<?, ?> inputStream();

    @Input("inputTable")
    KTable<?, ?> inputTable();
}

以下是与上面看到的基于 BiFunction 的处理器相同的 StreamListener 等效代码。

@EnableBinding(KStreamKTableBinding.class)
....
....

@StreamListener
@SendTo("output")
public KStream<String, Long> process(@Input("input") KStream<String, Long> userClicksStream,
                                     @Input("inputTable") KTable<String, String> userRegionsTable) {
....
....
}

interface KStreamKTableBinding extends KafkaStreamsProcessor {

    @Input("inputX")
    KTable<?, ?> inputTable();
}

最后,以下是具有三个输入和柯里化函数的应用程序的 StreamListener 等效代码。

@EnableBinding(CustomGlobalKTableProcessor.class)
...
...
    @StreamListener
    @SendTo("output")
    public KStream<Long, EnrichedOrder> process(
            @Input("input-1") KStream<Long, Order> ordersStream,
            @Input("input-2") GlobalKTable<Long, Customer> customers,
            @Input("input-3") GlobalKTable<Long, Product> products) {

        KStream<Long, CustomerOrder> customerOrdersStream = ordersStream.join(
                customers, (orderId, order) -> order.getCustomerId(),
                (order, customer) -> new CustomerOrder(customer, order));

        return customerOrdersStream.join(products,
                (orderId, customerOrder) -> customerOrder.productId(),
                (customerOrder, product) -> {
                    EnrichedOrder enrichedOrder = new EnrichedOrder();
                    enrichedOrder.setProduct(product);
                    enrichedOrder.setCustomer(customerOrder.customer);
                    enrichedOrder.setOrder(customerOrder.order);
                    return enrichedOrder;
                });
        }

    interface CustomGlobalKTableProcessor {

            @Input("input-1")
            KStream<?, ?> input1();

            @Input("input-2")
            GlobalKTable<?, ?> input2();

            @Input("input-3")
            GlobalKTable<?, ?> input3();

            @Output("output")
            KStream<?, ?> output();
    }

您可能会注意到上面两个示例更加冗长,因为除了提供EnableBinding之外,您还需要编写自己的自定义绑定接口。使用函数模型,您可以避免所有这些仪式细节。

在我们继续研究 Kafka Streams 绑定器提供的通用编程模型之前,以下是多个输出绑定的StreamListener版本。

EnableBinding(KStreamProcessorWithBranches.class)
public static class WordCountProcessorApplication {

    @Autowired
    private TimeWindows timeWindows;

    @StreamListener("input")
    @SendTo({"output1","output2","output3"})
    public KStream<?, WordCount>[] process(KStream<Object, String> input) {

			Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
			Predicate<Object, WordCount> isFrench =  (k, v) -> v.word.equals("french");
			Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

			return input
					.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
					.groupBy((key, value) -> value)
					.windowedBy(timeWindows)
					.count(Materialized.as("WordCounts-1"))
					.toStream()
					.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))
					.branch(isEnglish, isFrench, isSpanish);
    }

    interface KStreamProcessorWithBranches {

    		@Input("input")
    		KStream<?, ?> input();

    		@Output("output1")
    		KStream<?, ?> output1();

    		@Output("output2")
    		KStream<?, ?> output2();

    		@Output("output3")
    		KStream<?, ?> output3();
    	}
}

回顾一下,我们已经回顾了使用 Kafka Streams 绑定器时的各种编程模型选择。

绑定器为输入上的KStreamKTableGlobalKTable提供绑定功能。KTableGlobalKTable绑定仅在输入上可用。绑定器支持KStream的输入和输出绑定。

Kafka Streams 绑定器编程模型的优点是,绑定器为您提供了使用完全函数式编程模型或使用基于StreamListener的命令式方法的灵活性。

2.4. 编程模型的辅助工具

2.4.1. 单个应用程序中的多个 Kafka Streams 处理器

绑定器允许在单个 Spring Cloud Stream 应用程序中拥有多个 Kafka Streams 处理器。您可以拥有以下应用程序。

@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
   ...
}

在这种情况下,绑定器将创建 3 个具有不同应用程序 ID 的独立 Kafka Streams 对象(下面将详细介绍)。但是,如果您在应用程序中有多个处理器,则必须告诉 Spring Cloud Stream 哪些函数需要激活。以下是如何激活函数。

spring.cloud.stream.function.definition: process;anotherProcess;yetAnotherProcess

如果您希望某些函数不要立即激活,可以从该列表中删除它们。

当您只有一个 Kafka Streams 处理器和其他类型的Function bean 在同一个应用程序中时,这也适用,该应用程序通过不同的绑定器处理(例如,基于常规 Kafka 消息通道绑定器的函数 bean)。

2.4.2. Kafka Streams 应用程序 ID

应用程序 ID 是您需要为 Kafka Streams 应用程序提供的必填属性。Spring Cloud Stream Kafka Streams 绑定器允许您通过多种方式配置此应用程序 ID。

如果您在应用程序中只有一个处理器或StreamListener,那么您可以使用以下属性在绑定器级别设置它。

spring.cloud.stream.kafka.streams.binder.applicationId.

为了方便起见,如果您只有一个处理器,您也可以使用spring.application.name作为属性来委托应用程序 ID。

如果您在应用程序中有多个 Kafka Streams 处理器,那么您需要为每个处理器设置应用程序 ID。在函数模型的情况下,您可以将其作为属性附加到每个函数。

例如,假设您有以下函数。

@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

然后,您可以使用以下绑定器级别属性为每个应用程序设置应用程序 ID。

spring.cloud.stream.kafka.streams.binder.functions.process.applicationId

spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId

StreamListener 的情况下,您需要在处理器的第一个输入绑定上设置此属性。

例如,假设您有两个基于 StreamListener 的处理器。

@StreamListener
@SendTo("output")
public KStream<String, String> process(@Input("input") <KStream<Object, String>> input) {
   ...
}

@StreamListener
@SendTo("anotherOutput")
public KStream<String, String> anotherProcess(@Input("anotherInput") <KStream<Object, String>> input) {
   ...
}

然后,您必须使用以下绑定属性为此设置应用程序 ID。

spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId

spring.cloud.stream.kafka.streams.bindings.anotherInput.consumer.applicationId

对于基于函数的模型,这种在绑定级别设置应用程序 ID 的方法也适用。但是,如果您使用的是函数模型,则在绑定器级别按函数设置应用程序 ID 会更容易,如上所述。

对于生产部署,强烈建议通过配置显式指定应用程序 ID。如果您要自动扩展应用程序,这一点尤其重要,在这种情况下,您需要确保每个实例都使用相同的应用程序 ID 部署。

如果应用程序未提供应用程序 ID,则绑定器将为您自动生成一个静态应用程序 ID。这在开发场景中很方便,因为它避免了显式提供应用程序 ID 的需要。以这种方式生成的应用程序 ID 在应用程序重启时将保持静态。在函数模型中,生成的应用程序 ID 将是函数 bean 名称后跟文字 applicationID,例如,如果 process 是函数 bean 名称,则为 process-applicationID。在 StreamListener 的情况下,生成的应用程序 ID 将使用包含类名后跟方法名后跟文字 applicationId,而不是使用函数 bean 名称。

设置应用程序 ID 总结
  • 默认情况下,绑定器将为每个函数或 StreamListener 方法自动生成应用程序 ID。

  • 如果您只有一个处理器,则可以使用 spring.kafka.streams.applicationIdspring.application.namespring.cloud.stream.kafka.streams.binder.applicationId

  • 如果您有多个处理器,则可以使用属性 spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId 为每个函数设置应用程序 ID。在 StreamListener 的情况下,可以使用 spring.cloud.stream.kafka.streams.bindings.input.applicationId 来完成此操作,假设输入绑定名称为 input

2.4.3. 使用函数式风格覆盖绑定器生成的默认绑定名称

默认情况下,绑定器在使用函数式风格时,会使用上面讨论的策略来生成绑定名称,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0、process-out-0 等。如果您想覆盖这些绑定名称,可以通过指定以下属性来实现。

spring.cloud.stream.function.bindings.<default binding name>。默认绑定名称是绑定器生成的原始绑定名称。

例如,假设您有以下函数。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

绑定器将生成名称为 process-in-0process-in-1process-out-0 的绑定。现在,如果您想将它们更改为完全不同的名称,也许是更具领域特色的绑定名称,那么您可以按照以下步骤进行操作。

spring.cloud.stream.function.bindings.process-in-0=users

spring.cloud.stream.function.bindings.process-in-0=regions

spring.cloud.stream.function.bindings.process-out-0=clicks

之后,您必须在这些新的绑定名称上设置所有绑定级别的属性。

请记住,在上面描述的函数式编程模型中,在大多数情况下,遵循默认绑定名称是有意义的。您可能仍然想要进行这种覆盖的唯一原因是,当您有大量配置属性时,您希望将绑定映射到更具领域友好的名称。

2.4.4. 设置引导服务器配置

在运行 Kafka Streams 应用程序时,您必须提供 Kafka 代理服务器信息。如果您没有提供此信息,绑定器会期望您在默认的 localhost:9092 上运行代理。如果不是这种情况,则需要覆盖它。有几种方法可以做到这一点。

  • 使用启动属性 - spring.kafka.bootstrapServers

  • 绑定器级别属性 - spring.cloud.stream.kafka.streams.binder.brokers

当涉及到绑定器级别属性时,无论您使用的是通过常规 Kafka 绑定器提供的代理属性 - spring.cloud.stream.kafka.binder.brokers 还是 Kafka Streams 绑定器,都没有关系。Kafka Streams 绑定器将首先检查是否设置了 Kafka Streams 绑定器特定的代理属性 (spring.cloud.stream.kafka.streams.binder.brokers),如果未找到,则会查找 spring.cloud.stream.kafka.binder.brokers

2.5. 记录序列化和反序列化

Kafka Streams 绑定器允许您以两种方式序列化和反序列化记录。一种是 Kafka 提供的原生序列化和反序列化功能,另一种是 Spring Cloud Stream 框架的消息转换功能。让我们看看一些细节。

2.5.1. 入站反序列化

键始终使用原生 Serdes 反序列化。

对于值,默认情况下,Kafka 会在入站进行本机反序列化。请注意,这与之前版本的 Kafka Streams 绑定器中的默认行为有重大变化,在之前版本中,反序列化是由框架完成的。

Kafka Streams 绑定器将尝试通过查看 `java.util.function.Function|Consumer` 或 `StreamListener` 的类型签名来推断匹配的 `Serde` 类型。以下是它匹配 Serdes 的顺序。

  • 如果应用程序提供 `Serde` 类型的 bean,并且返回类型使用传入键或值的实际类型进行参数化,那么它将使用该 `Serde` 进行入站反序列化。例如,如果您在应用程序中具有以下内容,则绑定器会检测到 `KStream` 的传入值类型与使用 `Serde` bean 参数化的类型匹配。它将使用它进行入站反序列化。

@Bean
public Serde<Foo() customSerde{
 ...
}

@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
  • 接下来,它会查看类型,并查看它们是否为 Kafka Streams 公开的类型之一。如果是,则使用它们。以下是绑定器将尝试从 Kafka Streams 匹配的 Serde 类型。

    Integer, Long, Short, Double, Float, byte[], UUID and String.
  • 如果 Kafka Streams 提供的任何 Serdes 都不匹配类型,那么它将使用 Spring Kafka 提供的 JsonSerde。在这种情况下,绑定器假设类型是 JSON 友好的。如果您有多个值对象作为输入,这将很有用,因为绑定器将在内部将它们推断为正确的 Java 类型。但在回退到 `JsonSerde` 之前,绑定器会检查 Kafka Streams 配置中设置的默认 `Serde`,以查看它是否是一个可以与传入 KStream 的类型匹配的 `Serde`。

如果上述策略均无效,则应用程序必须通过配置提供 `Serde`。这可以通过两种方式配置 - 绑定或默认。

首先,绑定器将查看是否在绑定级别提供了 `Serde`。例如,如果您有以下处理器,

@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}

那么,您可以使用以下方法提供绑定级别 `Serde`

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果您按输入绑定提供上述 `Serde`,那么它将具有更高的优先级,并且绑定器将避免任何 `Serde` 推断。

如果您希望在入站反序列化时使用默认的键/值 Serdes,您可以在绑定器级别进行设置。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde

如果您不想使用 Kafka 提供的原生解码,您可以依靠 Spring Cloud Stream 提供的消息转换功能。由于原生解码是默认设置,为了让 Spring Cloud Stream 反序列化入站值对象,您需要显式禁用原生解码。

例如,如果您有与上面相同的 BiFunction 处理器,那么 spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false 您需要为所有输入单独禁用原生解码。否则,对于未禁用的输入,原生解码仍然会应用。

默认情况下,Spring Cloud Stream 将使用 application/json 作为内容类型,并使用相应的 json 消息转换器。您可以使用以下属性和相应的 MessageConverter bean 来使用自定义消息转换器。

spring.cloud.stream.bindings.process-in-0.contentType

2.5.2. 出站序列化

出站序列化基本上遵循与入站反序列化相同的规则。与入站反序列化一样,Spring Cloud Stream 的先前版本的一个主要变化是,出站的序列化由 Kafka 本地处理。在 3.0 版本之前的绑定器中,这是由框架本身完成的。

出站的键始终由 Kafka 使用匹配的 Serde 进行序列化,该 Serde 由绑定器推断。如果它无法推断键的类型,则需要使用配置指定。

值 Serdes 使用与入站反序列化相同的规则进行推断。首先,它匹配以查看出站类型是否来自应用程序中提供的 bean。如果没有,它会检查它是否与 Kafka 公开的 Serde 匹配,例如 - IntegerLongShortDoubleFloatbyte[]UUIDString。如果这不起作用,它将回退到 Spring Kafka 项目提供的 JsonSerde,但首先查看默认的 Serde 配置以查看是否有匹配项。请记住,所有这些都对应用程序透明。如果这些都不起作用,则用户必须通过配置提供要使用的 Serde

假设您使用的是与上面相同的 BiFunction 处理器。然后,您可以配置出站键/值 Serdes,如下所示。

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

如果 Serde 推断失败,并且没有提供绑定级别 Serdes,则绑定器将回退到 JsonSerde,但会查看默认 Serdes 以进行匹配。

默认 Serdes 的配置方式与上面描述的反序列化中的配置方式相同。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde

如果您的应用程序使用分支功能并具有多个输出绑定,则必须为每个绑定配置这些绑定。同样,如果绑定程序能够推断出Serde类型,则无需进行此配置。

如果您不想使用 Kafka 提供的原生编码,而是想使用框架提供的消息转换,则需要显式禁用原生编码,因为原生编码是默认设置。例如,如果您具有与上述相同的 BiFunction 处理器,则spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false。在分支的情况下,您需要为所有输出单独禁用原生编码。否则,原生编码仍将应用于您未禁用的那些输出。

当 Spring Cloud Stream 完成转换时,默认情况下,它将使用application/json作为内容类型,并使用相应的 json 消息转换器。您可以使用以下属性和相应的MessageConverter bean 来使用自定义消息转换器。

spring.cloud.stream.bindings.process-out-0.contentType

当禁用原生编码/解码时,绑定程序不会像原生 Serdes 那样进行任何推断。应用程序需要显式提供所有配置选项。因此,通常建议使用默认的序列化/反序列化选项,并在编写 Spring Cloud Stream Kafka Streams 应用程序时坚持使用 Kafka Streams 提供的原生序列化/反序列化。您必须使用框架提供的消息转换功能的一种情况是,当您的上游生产者使用特定的序列化策略时。在这种情况下,您希望使用匹配的反序列化策略,因为原生机制可能会失败。当依赖于默认的Serde机制时,应用程序必须确保绑定程序具有正确映射入站和出站的正确方法,并使用适当的Serde,否则可能会失败。

值得一提的是,上面概述的数据序列化/反序列化方法仅适用于处理器的边缘,即入站和出站。您的业务逻辑可能仍然需要调用显式需要Serde对象的 Kafka Streams API。这些仍然是应用程序的责任,必须由开发人员相应地处理。

2.6. 错误处理

Apache Kafka Streams 提供了原生处理反序列化错误异常的能力。有关此支持的详细信息,请参阅 此处。默认情况下,Apache Kafka Streams 提供两种反序列化异常处理程序 - LogAndContinueExceptionHandlerLogAndFailExceptionHandler。顾名思义,前者将记录错误并继续处理下一条记录,而后者将记录错误并失败。LogAndFailExceptionHandler 是默认的反序列化异常处理程序。

2.6.1. 处理绑定器中的反序列化异常

Kafka Streams 绑定器允许使用以下属性指定上述反序列化异常处理程序。

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail

除了上述两种反序列化异常处理程序之外,绑定器还提供第三种处理程序,用于将错误记录(毒丸)发送到 DLQ(死信队列)主题。以下是启用此 DLQ 异常处理程序的方法。

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq

设置上述属性后,所有反序列化错误的记录将自动发送到 DLQ 主题。

您可以如下设置发布 DLQ 消息的主题名称。

您可以为 DlqDestinationResolver 提供实现,它是一个函数式接口。DlqDestinationResolverConsumerRecord 和异常作为输入,然后允许指定主题名称作为输出。通过访问 Kafka ConsumerRecord,可以在 BiFunction 的实现中内省标头记录。

以下是如何为 DlqDestinationResolver 提供实现的示例。

@Bean
public DlqDestinationResolver dlqDestinationResolver() {
    return (rec, ex) -> {
        if (rec.topic().equals("word1")) {
            return "topic1-dlq";
        }
        else {
            return "topic2-dlq";
        }
    };
}

在为 DlqDestinationResolver 提供实现时,需要注意的一点是,绑定器中的供应器不会自动为应用程序创建主题。这是因为绑定器无法推断实现可能发送到的所有 DLQ 主题的名称。因此,如果您使用此策略提供 DLQ 名称,则应用程序有责任确保这些主题事先创建。

如果 DlqDestinationResolver 作为 bean 存在于应用程序中,则它具有更高的优先级。如果您不想遵循这种方法,而是使用配置提供静态 DLQ 名称,则可以设置以下属性。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)

如果设置了此属性,则错误记录将发送到主题 custom-dlq。如果应用程序没有使用上述任何策略,则它将创建一个名为 error.<input-topic-name>.<application-id> 的 DLQ 主题。例如,如果您的绑定的目标主题是 inputTopic,应用程序 ID 是 process-applicationId,则默认 DLQ 主题是 error.inputTopic.process-applicationId。如果您的意图是启用 DLQ,始终建议为每个输入绑定显式创建一个 DLQ 主题。

2.6.2. 每个输入消费者绑定的 DLQ

属性 spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler 适用于整个应用程序。这意味着,如果同一个应用程序中有多个函数或 StreamListener 方法,则此属性将应用于所有这些方法。但是,如果您在一个处理器中有多个处理器或多个输入绑定,则可以使用绑定器为每个输入消费者绑定提供的更细粒度的 DLQ 控制。

如果您有以下处理器,

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

并且您只想在第一个输入绑定上启用 DLQ,并在第二个绑定上启用 logAndSkip,则可以在消费者上执行以下操作。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: logAndSkip

以这种方式设置反序列化异常处理程序比在绑定器级别设置具有更高的优先级。

2.6.3. DLQ 分区

默认情况下,记录使用与原始记录相同的分区发布到死信主题。这意味着死信主题必须至少与原始记录具有相同数量的分区。

要更改此行为,请将 DlqPartitionFunction 实现作为 @Bean 添加到应用程序上下文。只能存在一个这样的 bean。该函数将提供消费者组(在大多数情况下与应用程序 ID 相同)、失败的 ConsumerRecord 和异常。例如,如果您始终希望路由到分区 0,则可以使用

@Bean
public DlqPartitionFunction partitionFunction() {
    return (group, record, ex) -> 0;
}
如果您将消费者绑定的 dlqPartitions 属性设置为 1(并且绑定器的 minPartitionCount 等于 1),则无需提供 DlqPartitionFunction;框架将始终使用分区 0。如果您将消费者绑定的 dlqPartitions 属性设置为大于 1 的值(或绑定器的 minPartitionCount 大于 1),则 **必须** 提供 DlqPartitionFunction Bean,即使分区计数与原始主题的相同。

在使用 Kafka Streams 绑定器中的异常处理功能时,请牢记以下几点。

  • 属性 spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler 适用于整个应用程序。这意味着,如果同一个应用程序中有多个函数或 StreamListener 方法,则此属性将应用于所有这些方法。

  • 反序列化异常处理与本机反序列化和框架提供的消息转换一致。

2.6.4. 处理绑定器中的生产异常

与上面描述的反序列化异常处理程序支持不同,绑定器没有为处理生产异常提供此类一流机制。但是,您仍然可以使用 StreamsBuilderFactoryBean 自定义程序配置生产异常处理程序,您可以在下面后续部分中找到有关此自定义程序的更多详细信息。

2.7. 重试关键业务逻辑

在某些情况下,您可能希望重试对应用程序至关重要的业务逻辑部分。可能存在对关系数据库的外部调用,或者从 Kafka Streams 处理器调用 REST 端点。这些调用可能由于各种原因而失败,例如网络问题或远程服务不可用。更常见的是,如果您可以再次尝试这些失败,它们可能会自行解决。默认情况下,Kafka Streams 绑定器为所有输入绑定创建 RetryTemplate bean。

如果函数具有以下签名,

@Bean
public java.util.function.Consumer<KStream<Object, String>> process()

并且使用默认绑定名称,则 RetryTemplate 将注册为 process-in-0-RetryTemplate。这遵循绑定名称 (process-in-0) 后跟文字 -RetryTemplate 的约定。在多个输入绑定情况下,每个绑定将提供一个单独的 RetryTemplate bean。如果应用程序中存在自定义 RetryTemplate bean,并且通过 spring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName 提供,则该 bean 优先于任何输入绑定级别重试模板配置属性。

将绑定中的 RetryTemplate 注入应用程序后,可以使用它来重试应用程序的任何关键部分。以下是一个示例

@Bean
public java.util.function.Consumer<KStream<Object, String>> process(@Lazy @Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {

    return input -> input
            .process(() -> new Processor<Object, String>() {
                @Override
                public void init(ProcessorContext processorContext) {
                }

                @Override
                public void process(Object o, String s) {
                    retryTemplate.execute(context -> {
                       //Critical business logic goes here.
                    });
                }

                @Override
                public void close() {
                }
            });
}

或者,您可以使用以下自定义 RetryTemplate

@EnableAutoConfiguration
public static class CustomRetryTemplateApp {

    @Bean
    @StreamRetryTemplate
    RetryTemplate fooRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        RetryPolicy retryPolicy = new SimpleRetryPolicy(4);
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(1);

        retryTemplate.setBackOffPolicy(backOffPolicy);
        retryTemplate.setRetryPolicy(retryPolicy);

        return retryTemplate;
    }

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input -> input
                .process(() -> new Processor<Object, String>() {
                    @Override
                    public void init(ProcessorContext processorContext) {
                    }

                    @Override
                    public void process(Object o, String s) {
                        fooRetryTemplate().execute(context -> {
                           //Critical business logic goes here.
                        });

                    }

                    @Override
                    public void close() {
                    }
                });
    }
}

请注意,当重试次数用尽时,默认情况下,将抛出最后一个异常,导致处理器终止。如果您希望处理异常并继续处理,可以在 execute 方法中添加一个 RecoveryCallback:以下是一个示例。

retryTemplate.execute(context -> {
    //Critical business logic goes here.
    }, context -> {
       //Recovery logic goes here.
       return null;
    ));

有关 RetryTemplate、重试策略、退避策略等的更多信息,请参阅 Spring Retry 项目。

2.8. 状态存储

当使用高级 DSL 并进行适当的调用以触发状态存储时,Kafka Streams 会自动创建状态存储。

如果您想将传入的 KTable 绑定作为命名状态存储进行物化,则可以使用以下策略。

假设您有以下函数。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
   ...
}

然后通过设置以下属性,传入的 KTable 数据将被物化到命名状态存储中。

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store

您可以在应用程序中将自定义状态存储定义为 bean,这些 bean 将被绑定器检测到并添加到 Kafka Streams 构建器中。特别是当使用处理器 API 时,您需要手动注册状态存储。为此,您可以在应用程序中创建一个 StateStore 作为 bean。以下是如何定义此类 bean 的示例。

@Bean
public StoreBuilder myStore() {
    return Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
            Serdes.Long());
}

@Bean
public StoreBuilder otherStore() {
    return Stores.windowStoreBuilder(
            Stores.persistentWindowStore("other-store",
                    1L, 3, 3L, false), Serdes.Long(),
            Serdes.Long());
}

然后,应用程序可以直接访问这些状态存储。

在引导过程中,绑定器将处理上述 bean 并将其传递给 Streams 构建器对象。

访问状态存储

Processor<Object, Product>() {

    WindowStore<Object, String> state;

    @Override
    public void init(ProcessorContext processorContext) {
        state = (WindowStore)processorContext.getStateStore("mystate");
    }
    ...
}

这在注册全局状态存储时将不起作用。要注册全局状态存储,请参阅下面关于自定义 StreamsBuilderFactoryBean 的部分。

2.9. 交互式查询

Kafka Streams 绑定器 API 公开了一个名为 InteractiveQueryService 的类,用于交互式查询状态存储。您可以在应用程序中将其作为 Spring bean 访问。从应用程序访问此 bean 的一种简单方法是 autowire 该 bean。

@Autowired
private InteractiveQueryService interactiveQueryService;

获得对该 bean 的访问权限后,您就可以查询您感兴趣的特定状态存储。见下文。

ReadOnlyKeyValueStore<Object, Object> keyValueStore =
						interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());

在启动过程中,上述检索存储的方法调用可能会失败。例如,它可能仍在初始化状态存储的中间。在这种情况下,重试此操作将很有用。Kafka Streams 绑定器提供了一个简单的重试机制来适应这种情况。

以下是您可以用来控制此重试的两个属性。

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 默认值为 1

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 默认值为 1000 毫秒。

如果有多个 Kafka Streams 应用程序实例正在运行,那么在您能够交互式地查询它们之前,您需要确定哪个应用程序实例托管了您要查询的特定键。InteractiveQueryService API 提供了用于识别主机信息的方法。

为了使此功能正常工作,您必须按如下所示配置属性 application.server

spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>

以下是一些代码片段

org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
						key, keySerializer);

if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {

    //query from the store that is locally available
}
else {
    //query from the remote host
}

2.9.1. 通过 InteractiveQueryService 可用的其他 API 方法

使用以下 API 方法检索与给定存储和键的组合关联的 KeyQueryMetadata 对象。

public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)

使用以下 API 方法检索与给定存储和键的组合关联的 KakfaStreams 对象。

public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)

2.10. 健康指标

健康指标需要依赖项 spring-boot-starter-actuator。对于 Maven,请使用

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

Spring Cloud Stream Kafka Streams Binder 提供了一个健康指标来检查底层流线程的状态。Spring Cloud Stream 定义了一个属性 management.health.binders.enabled 来启用健康指标。请参阅 Spring Cloud Stream 文档

健康指标为每个流线程的元数据提供以下详细信息

  • 线程名称

  • 线程状态:CREATEDRUNNINGPARTITIONS_REVOKEDPARTITIONS_ASSIGNEDPENDING_SHUTDOWNDEAD

  • 活动任务:任务 ID 和分区

  • 待机任务:任务 ID 和分区

默认情况下,只有全局状态可见(UPDOWN)。要显示详细信息,属性 management.endpoint.health.show-details 必须设置为 ALWAYSWHEN_AUTHORIZED。有关健康信息的更多详细信息,请参阅 Spring Boot Actuator 文档

如果所有注册的 Kafka 线程都处于 RUNNING 状态,则健康指标的状态为 UP

由于 Kafka Streams 绑定器中存在三个独立的绑定器(KStreamKTableGlobalKTable),因此它们都将报告健康状态。启用 show-details 时,报告的一些信息可能是冗余的。

当同一个应用程序中存在多个 Kafka Streams 处理器时,将为所有处理器报告健康检查,并将按 Kafka Streams 的应用程序 ID 进行分类。

2.11. 访问 Kafka Streams 指标

Spring Cloud Stream Kafka Streams 绑定器提供 Kafka Streams 指标,这些指标可以通过 Micrometer 的 MeterRegistry 导出。

对于 Spring Boot 2.2.x 版本,指标支持由绑定器通过自定义 Micrometer 指标实现提供。对于 Spring Boot 2.3.x 版本,Kafka Streams 指标支持通过 Micrometer 原生提供。

当通过 Boot 执行器端点访问指标时,请确保将 metrics 添加到属性 management.endpoints.web.exposure.include 中。然后,您可以访问 /acutator/metrics 获取所有可用指标的列表,这些指标可以通过相同的 URI (/actuator/metrics/<metric-name>) 单独访问。

2.12. 混合使用高级 DSL 和低级 Processor API

Kafka Streams 提供两种 API 变体。它有一个更高级别的 DSL 类似 API,您可以在其中链接各种操作,这些操作可能对许多函数式程序员来说很熟悉。Kafka Streams 还提供对低级处理器 API 的访问。处理器 API 虽然功能强大,并且能够在更低级别控制事物,但本质上是命令式的。Spring Cloud Stream 的 Kafka Streams 绑定器允许您使用高级别 DSL 或混合使用 DSL 和处理器 API。混合使用这两种变体为您提供了许多选项来控制应用程序中的各种用例。应用程序可以使用 transformprocess 方法 API 调用来访问处理器 API。

以下是如何在使用 process API 的 Spring Cloud Stream 应用程序中组合使用 DSL 和处理器 API 的示例。

@Bean
public Consumer<KStream<Object, String>> process() {
    return input ->
        input.process(() -> new Processor<Object, String>() {
            @Override
            @SuppressWarnings("unchecked")
            public void init(ProcessorContext context) {
               this.context = context;
            }

            @Override
            public void process(Object key, String value) {
                //business logic
            }

            @Override
            public void close() {

        });
}

以下是如何使用 transform API 的示例。

@Bean
public Consumer<KStream<Object, String>> process() {
    return (input, a) ->
        input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
            @Override
            public void init(ProcessorContext context) {

            }

            @Override
            public void close() {

            }

            @Override
            public KeyValue<Object, String> transform(Object key, String value) {
                // business logic - return transformed KStream;
            }
        });
}

process API 方法调用是一个终止操作,而 transform API 是非终止操作,它为您提供了一个可能已转换的 KStream,您可以使用它继续使用 DSL 或处理器 API 进行进一步处理。

2.13. 出站的分区支持

Kafka Streams 处理器通常将处理后的输出发送到传出的 Kafka 主题。如果传出的主题已分区,并且处理器需要将传出的数据发送到特定分区,则应用程序需要提供一个类型为 StreamPartitioner 的 bean。有关更多详细信息,请参阅 StreamPartitioner。让我们看一些例子。

这是我们已经多次见过的同一个处理器,

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    ...
}

这是输出绑定目标

spring.cloud.stream.bindings.process-out-0.destination: outputTopic

如果主题 outputTopic 有 4 个分区,如果您没有提供分区策略,Kafka Streams 将使用默认分区策略,这可能不是您想要的,具体取决于特定的用例。假设您想将与 spring 匹配的任何键发送到分区 0,将 cloud 发送到分区 1,将 stream 发送到分区 2,并将所有其他内容发送到分区 3。您需要在应用程序中执行以下操作。

@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
    return (t, k, v, n) -> {
        if (k.equals("spring")) {
            return 0;
        }
        else if (k.equals("cloud")) {
            return 1;
        }
        else if (k.equals("stream")) {
            return 2;
        }
        else {
            return 3;
        }
    };
}

这是一个基本的实现,但是您可以访问记录的键值对、主题名称和分区总数。因此,如果需要,您可以实现复杂的划分策略。

您还需要将此 Bean 名称与应用程序配置一起提供。

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner

应用程序中的每个输出主题都需要像这样单独配置。

2.14. StreamsBuilderFactoryBean 自定义器

通常需要自定义创建 KafkaStreams 对象的 StreamsBuilderFactoryBean。根据 Spring Kafka 提供的基础支持,绑定器允许您自定义 StreamsBuilderFactoryBean。您可以使用 StreamsBuilderFactoryBeanCustomizer 自定义 StreamsBuilderFactoryBean 本身。然后,一旦您通过此自定义器访问 StreamsBuilderFactoryBean,就可以使用 KafkaStreamsCustomzier 自定义相应的 KafkaStreams。这两个自定义器都是 Spring for Apache Kafka 项目的一部分。

以下是如何使用 StreamsBuilderFactoryBeanCustomizer 的示例。

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return sfb -> sfb.setStateListener((newState, oldState) -> {
         //Do some action here!
    });
}

以上只是为了说明您可以对 StreamsBuilderFactoryBean 进行的自定义操作。您基本上可以调用 StreamsBuilderFactoryBean 中的任何可用变异操作来对其进行自定义。此自定义器将在工厂 Bean 启动之前由绑定器调用。

一旦您访问了 StreamsBuilderFactoryBean,您还可以自定义底层的 KafkaStreams 对象。以下是如何操作的蓝图。

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                });
            }
        });
    };
}

KafkaStreamsCustomizer 将在底层 KafkaStreams 启动之前由 StreamsBuilderFactoryBeabn 调用。

整个应用程序中只能有一个 StreamsBuilderFactoryBeanCustomizer。那么我们如何处理多个 Kafka Streams 处理器,因为它们中的每一个都由单独的 StreamsBuilderFactoryBean 对象支持?在这种情况下,如果需要对这些处理器进行不同的自定义,那么应用程序需要根据应用程序 ID 应用一些过滤器。

例如,

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {

    return factoryBean -> {
        if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
                .equals("processor1-application-id")) {
            factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
                @Override
                public void customize(KafkaStreams kafkaStreams) {
                    kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                    });
                }
            });
        }
    };

2.14.1. 使用自定义器注册全局状态存储

如上所述,绑定器没有提供注册全局状态存储作为功能的第一类方法。为此,您需要使用自定义器。以下是如何操作。

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
    return fb -> {
        try {
            final StreamsBuilder streamsBuilder = fb.getObject();
            streamsBuilder.addGlobalStore(...);
        }
        catch (Exception e) {

        }
    };
}

再次,如果您有多个处理器,您需要通过使用上面概述的应用程序 ID 过滤掉其他 StreamsBuilderFactoryBean 对象,将全局状态存储附加到正确的 StreamsBuilder 上。

2.14.2. 使用自定义器注册生产异常处理程序

在错误处理部分,我们指出绑定器没有提供处理生产异常的一流方法。虽然情况如此,您仍然可以使用 StreamsBuilderFacotryBean 自定义器来注册生产异常处理程序。见下文。

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                            CustomProductionExceptionHandler.class);
    };
}

再次,如果您有多个处理器,您可能希望将其适当地设置在正确的 StreamsBuilderFactoryBean 上。您也可以使用配置属性添加此类生产异常处理程序(有关详细信息,请参见下文),但这是一种选择,如果您选择使用编程方法。

2.15. 时间戳提取器

Kafka Streams 允许您根据各种时间戳概念来控制对消费者记录的处理。默认情况下,Kafka Streams 提取嵌入在消费者记录中的时间戳元数据。您可以通过为每个输入绑定提供不同的 TimestampExtractor 实现来更改此默认行为。以下是如何执行此操作的一些详细信息。

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
    return orderStream ->
            customers ->
                products -> orderStream;
}

@Bean
public TimestampExtractor timestampExtractor() {
    return new WallclockTimestampExtractor();
}

然后,您为每个消费者绑定设置上述 TimestampExtractor bean 名称。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"

如果您跳过输入消费者绑定以设置自定义时间戳提取器,则该消费者将使用默认设置。

2.16. 基于 Kafka Streams 的绑定器和常规 Kafka 绑定器的多绑定器

您可以拥有一个应用程序,其中既有基于常规 Kafka 绑定器的函数/消费者/供应商,也有基于 Kafka Streams 的处理器。但是,您不能将它们混合在一个函数或消费者中。

以下是一个示例,其中您在同一个应用程序中同时拥有基于绑定器的组件。

@Bean
public Function<String, String> process() {
    return s -> s;
}

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {

    return input -> input;
}

这是来自配置的相关部分

spring.cloud.stream.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar

如果您有与上述相同的应用程序,但处理两个不同的 Kafka 集群,例如,常规的 process 同时作用于 Kafka 集群 1 和集群 2(从集群 1 接收数据并发送到集群 2),而 Kafka Streams 处理器作用于 Kafka 集群 2,那么事情就会变得更加复杂。然后您必须使用 Spring Cloud Stream 提供的 多绑定器 功能。

以下是该场景中您的配置可能发生的变化。

# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster

spring.cloud.stream.function.definition=process;kstreamProcess

# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2

# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3

请注意上述配置。我们有两种类型的绑定器,但总共有 3 个绑定器,第一个是基于集群 1 的常规 Kafka 绑定器(kafka1),然后是基于集群 2 的另一个 Kafka 绑定器(kafka2),最后是 kstream 绑定器(kafka3)。应用程序中的第一个处理器从 kafka1 接收数据并发布到 kafka2,其中两个绑定器都基于常规 Kafka 绑定器,但集群不同。第二个处理器是一个 Kafka Streams 处理器,它从 kafka3 消费数据,kafka3kafka2 在同一个集群中,但绑定器类型不同。

由于 Kafka Streams 绑定器系列中有三种不同的绑定器类型 - kstreamktableglobalktable - 如果您的应用程序有多个基于这些绑定器中的任何一个的绑定,则需要明确提供绑定器类型。

例如,如果您有以下处理器,

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    ...
}

那么,这必须在多绑定器场景中配置如下。请注意,这仅在您拥有真正的多绑定器场景时才需要,在这种场景中,多个处理器处理单个应用程序中的多个集群。在这种情况下,需要明确地为绑定器提供绑定,以区分其他处理器的绑定器类型和集群。

spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}

spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1  #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2  #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3  #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream

# rest of the configuration is omitted.

2.17. 状态清理

默认情况下,Kafkastreams.cleanup() 方法在绑定停止时被调用。请参阅 Spring Kafka 文档。要修改此行为,只需在应用程序上下文中添加一个 CleanupConfig @Bean(配置为在启动、停止或两者都不执行清理)即可;该 bean 将被检测到并连接到工厂 bean。

2.18. Kafka Streams 拓扑可视化

Kafka Streams 绑定器提供以下执行器端点,用于检索拓扑描述,您可以使用这些描述通过外部工具可视化拓扑。

/actuator/kafkastreamstopology

/actuator/kafkastreamstopology/<application-id of the processor>

您需要从 Spring Boot 包含执行器和 Web 依赖项才能访问这些端点。此外,您还需要将 kafkastreamstopology 添加到 management.endpoints.web.exposure.include 属性中。默认情况下,kafkastreamstopology 端点处于禁用状态。

2.19. 配置选项

本节包含 Kafka Streams 绑定器使用的配置选项。

有关与绑定器相关的常见配置选项和属性,请参阅 核心文档

2.19.1. Kafka Streams 绑定器属性

以下属性在绑定器级别可用,必须以 spring.cloud.stream.kafka.streams.binder. 为前缀。

configuration

包含与 Apache Kafka Streams API 相关的属性的键值对映射。此属性必须以 spring.cloud.stream.kafka.streams.binder. 为前缀。以下是使用此属性的一些示例。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000

有关可能进入流配置的所有属性的更多信息,请参阅 Apache Kafka Streams 文档中的 StreamsConfig JavaDocs。您可以从 StreamsConfig 设置的所有配置都可以通过此设置。使用此属性时,它适用于整个应用程序,因为这是一个绑定器级别属性。如果您的应用程序中有多个处理器,它们都将获取这些属性。对于 application.id 之类的属性,这将变得很麻烦,因此您必须仔细检查如何使用此绑定器级别 configuration 属性映射来自 StreamsConfig 的属性。

functions.<function-bean-name>.applicationId

仅适用于函数式处理器。这可用于为应用程序中的每个函数设置应用程序 ID。在多个函数的情况下,这是一种方便的方式来设置应用程序 ID。

functions.<function-bean-name>.configuration

仅适用于函数式处理器。包含与 Apache Kafka Streams API 相关的属性的键值对映射。这类似于上面描述的绑定器级别 configuration 属性,但此级别的 configuration 属性仅限于命名函数。当您有多个处理器并且想要根据特定函数限制对配置的访问权限时,您可能需要使用此方法。所有 StreamsConfig 属性都可以在此处使用。

brokers

代理 URL

默认值:localhost

zkNodes

Zookeeper URL

默认值:localhost

deserializationExceptionHandler

反序列化错误处理程序类型。此处理程序在绑定器级别应用,因此应用于应用程序中的所有输入绑定。在消费者绑定级别,可以通过更细粒度的方式控制它。可能的值为 - logAndContinuelogAndFailsendToDlq

默认值:logAndFail

applicationId

在绑定器级别全局设置 Kafka Streams 应用程序的 application.id 的便捷方法。如果应用程序包含多个函数或 StreamListener 方法,则应分别设置 application id。请参阅上面的详细说明,其中讨论了设置 application id 的方法。

默认值:应用程序将生成一个静态 application ID。有关更多详细信息,请参阅 application ID 部分。

stateStoreRetry.maxAttempts

尝试连接到状态存储的最大次数。

默认值:1

stateStoreRetry.backoffPeriod

重试时尝试连接到状态存储的回退时间段。

默认值:1000 毫秒

consumerProperties

绑定器级别的任意消费者属性。

producerProperties

绑定器级别的任意生产者属性。

2.19.2. Kafka Streams 生产者属性

以下属性适用于 Kafka Streams 生产者,并且必须以 spring.cloud.stream.kafka.streams.bindings.<绑定名称>.producer. 为前缀。为了方便起见,如果有多个输出绑定并且它们都需要一个公共值,则可以使用前缀 spring.cloud.stream.kafka.streams.default.producer. 进行配置。

keySerde

要使用的键序列化器/反序列化器

默认值:请参阅上面关于消息序列化/反序列化的讨论

valueSerde

要使用的值序列化器/反序列化器

默认值:请参阅上面关于消息序列化/反序列化的讨论

useNativeEncoding

启用/禁用本机编码的标志

默认值:true

streamPartitionerBeanName: 要在消费者处使用的自定义出站分区程序 bean 名称。应用程序可以提供自定义 StreamPartitioner 作为 Spring bean,并且可以将此 bean 的名称提供给生产者以代替默认分区程序。

+ 默认值:请参阅上面关于出站分区支持的讨论。

producedAs

处理器输出到的接收器组件的自定义名称。

默认值:none(由 Kafka Streams 生成)

2.19.3. Kafka Streams 消费者属性

以下属性适用于 Kafka Streams 消费者,必须以 spring.cloud.stream.kafka.streams.bindings.<绑定名称>.consumer. 为前缀。为了方便起见,如果有多个输入绑定并且它们都需要一个公共值,则可以使用前缀 spring.cloud.stream.kafka.streams.default.consumer. 进行配置。

applicationId

为每个输入绑定设置 application.id。这仅适用于基于 StreamListener 的处理器,对于基于函数的处理器,请参阅上面概述的其他方法。

默认值:见上文。

keySerde

要使用的键序列化器/反序列化器

默认值:请参阅上面关于消息序列化/反序列化的讨论

valueSerde

要使用的值序列化器/反序列化器

默认值:请参阅上面关于消息序列化/反序列化的讨论

materializedAs

使用传入的 KTable 类型时要物化的状态存储。

默认值:none

useNativeDecoding

启用/禁用原生解码的标志。

默认值:true

dlqName

DLQ 主题名称。

默认值:见上文关于错误处理和 DLQ 的讨论。

startOffset

如果没有已提交的偏移量可供消费,则要从中开始的偏移量。这主要用于消费者第一次从主题中消费时。Kafka Streams 使用 earliest 作为默认策略,绑定程序使用相同的默认策略。可以使用此属性将其覆盖为 latest

默认值:earliest

注意:在消费者上使用 resetOffsets 对 Kafka Streams 绑定程序没有影响。与基于消息通道的绑定程序不同,Kafka Streams 绑定程序不会按需寻求开始或结束。

deserializationExceptionHandler

反序列化错误处理程序类型。此处理程序应用于每个消费者绑定,而不是之前描述的绑定程序级别属性。可能的值为 - logAndContinuelogAndFailsendToDlq

默认值:logAndFail

timestampExtractorBeanName

要在消费者处使用的特定时间戳提取器 bean 名称。应用程序可以提供 TimestampExtractor 作为 Spring bean,并且可以将此 bean 的名称提供给消费者以使用,而不是默认的 bean。

默认值:见上文关于时间戳提取器的讨论。

eventTypes

此绑定的支持事件类型的逗号分隔列表。

默认值:none

eventTypeHeaderKey

通过此绑定传入的每个记录上的事件类型标头键。

默认值:event_type

consumedAs

处理器从中消费的源组件的自定义名称。

默认值:none(由 Kafka Streams 生成)

2.19.4. 关于并发性的特别说明

在 Kafka Streams 中,您可以使用 num.stream.threads 属性控制处理器可以创建的线程数。您可以使用上面在绑定程序、函数、生产者或消费者级别描述的各种 configuration 选项来执行此操作。您还可以使用核心 Spring Cloud Stream 为此目的提供的 concurrency 属性。使用此属性时,您需要在消费者上使用它。当您在函数或 StreamListener 中有多个输入绑定时,请在第一个输入绑定上设置此属性。例如,当设置 spring.cloud.stream.bindings.process-in-0.consumer.concurrency 时,它将被绑定程序转换为 num.stream.threads。如果您有多个处理器,并且一个处理器定义了绑定级别并发性,而其他处理器没有,那么那些没有绑定级别并发性的处理器将默认回退到通过 spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads 指定的绑定程序范围属性。如果此绑定程序配置不可用,则应用程序将使用 Kafka Streams 设置的默认值。

附录

附录 A: 构建

A.1. 基本编译和测试

要构建源代码,您需要安装 JDK 1.7。

构建使用 Maven 包装器,因此您无需安装特定版本的 Maven。要启用测试,您应该在构建之前运行 Kafka 服务器 0.9 或更高版本。有关运行服务器的更多信息,请参见下文。

主要的构建命令是

$ ./mvnw clean install

您也可以添加 '-DskipTests',以避免运行测试。

您也可以自己安装 Maven(>=3.3.3)并在以下示例中使用 mvn 命令代替 ./mvnw。如果您这样做,您可能还需要添加 -P spring,如果您的本地 Maven 设置不包含 spring 预发布工件的存储库声明。
请注意,您可能需要通过设置 MAVEN_OPTS 环境变量(值为 -Xmx512m -XX:MaxPermSize=128m)来增加可用于 Maven 的内存量。我们尝试在 .mvn 配置中涵盖这一点,因此如果您发现您必须这样做才能使构建成功,请提交工单以将设置添加到源代码控制。

需要中间件的项目通常包含一个 docker-compose.yml,因此请考虑使用 Docker Compose 在 Docker 容器中运行中间件服务器。

A.2. 文档

有一个“完整”配置文件将生成文档。

A.3. 使用代码

如果您没有 IDE 偏好,我们建议您在使用代码时使用 Spring Tools SuiteEclipse。我们使用 m2eclipe eclipse 插件来支持 maven。其他 IDE 和工具也应该可以正常工作。

A.3.1. 使用 m2eclipse 导入到 Eclipse

我们建议在使用 eclipse 时使用 m2eclipe eclipse 插件。如果您还没有安装 m2eclipse,它可以在“eclipse 市场”中找到。

不幸的是,m2e 还不支持 Maven 3.3,因此在将项目导入 Eclipse 后,您还需要告诉 m2eclipse 使用项目的 .settings.xml 文件。如果您不这样做,您可能会看到与项目中的 POM 相关的许多不同的错误。打开您的 Eclipse 首选项,展开 Maven 首选项,然后选择用户设置。在用户设置字段中,单击浏览并导航到您导入的 Spring Cloud 项目,选择该项目中的 .settings.xml 文件。单击应用,然后单击确定以保存首选项更改。

或者,您可以将存储库设置从 .settings.xml 复制到您自己的 ~/.m2/settings.xml 中。

A.3.2. 不使用 m2eclipse 导入到 Eclipse

如果您不想使用 m2eclipse,您可以使用以下命令生成 eclipse 项目元数据

$ ./mvnw eclipse:eclipse

生成的 Eclipse 项目可以通过从“文件”菜单中选择“导入现有项目”来导入。

[[contributing] == 贡献

Spring Cloud 在非限制性的 Apache 2.0 许可下发布,并遵循非常标准的 Github 开发流程,使用 Github 跟踪器来处理问题并将拉取请求合并到主分支。如果您想贡献即使是微不足道的东西,请不要犹豫,但请遵循以下指南。

A.4. 签署贡献者许可协议

在我们接受非微不足道的补丁或拉取请求之前,我们将需要您签署 贡献者协议。签署贡献者协议不会授予任何人对主存储库的提交权限,但它确实意味着我们可以接受您的贡献,并且如果您这样做,您将获得作者署名。可能会要求活跃的贡献者加入核心团队,并被赋予合并拉取请求的能力。

A.5. 代码规范和管理

这些都不是拉取请求的必要条件,但它们都会有所帮助。它们也可以在原始拉取请求之后但在合并之前添加。

  • 使用 Spring Framework 代码格式约定。如果您使用 Eclipse,您可以使用 Spring Cloud Build 项目中的 eclipse-code-formatter.xml 文件导入格式化程序设置。如果使用 IntelliJ,您可以使用 Eclipse Code Formatter Plugin 导入相同的文件。

  • 确保所有新的 .java 文件都具有一个简单的 Javadoc 类注释,至少包含一个 @author 标签来标识您,最好至少包含一段关于该类用途的文字。

  • 将 ASF 许可证头注释添加到所有新的 .java 文件中(从项目中的现有文件中复制)

  • 将自己添加为 @author 到您大幅修改(超过美观更改)的 .java 文件中。

  • 添加一些 Javadoc,如果您更改了命名空间,则添加一些 XSD 文档元素。

  • 一些单元测试也会有很大帮助——总有人要做的。

  • 如果没有人使用您的分支,请将其重新定位到当前主分支(或主项目中的其他目标分支)。

  • 在编写提交消息时,请遵循 这些约定,如果您正在修复现有问题,请在提交消息的末尾添加 Fixes gh-XXXX(其中 XXXX 是问题编号)。