3.0.13.RELEASE
参考指南
本指南描述了 Apache Kafka 对 Spring Cloud Stream 绑定器的实现。它包含有关其设计、使用和配置选项的信息,以及有关 Stream Cloud Stream 概念如何映射到 Apache Kafka 特定结构的信息。此外,本指南还解释了 Spring Cloud Stream 的 Kafka Streams 绑定功能。
1. Apache Kafka Binder
1.1. 使用
要使用 Apache Kafka 绑定器,您需要将 spring-cloud-stream-binder-kafka
作为依赖项添加到您的 Spring Cloud Stream 应用程序中,如下面的 Maven 示例所示
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
或者,您也可以使用 Spring Cloud Stream Kafka 启动器,如下面的 Maven 示例所示
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
1.2. 概述
下图显示了 Apache Kafka 绑定器如何工作的简化图
Apache Kafka 绑定器实现将每个目标映射到 Apache Kafka 主题。消费者组直接映射到相同的 Apache Kafka 概念。分区也直接映射到 Apache Kafka 分区。
绑定器当前使用 Apache Kafka kafka-clients
版本 2.3.1
。此客户端可以与旧的代理进行通信(请参阅 Kafka 文档),但某些功能可能不可用。例如,在 0.11.x.x 之前的版本中,不支持原生标头。此外,0.11.x.x 不支持 autoAddPartitions
属性。
1.3. 配置选项
本节包含 Apache Kafka 绑定器使用的配置选项。
有关绑定器通用的配置选项和属性,请参阅核心文档中的绑定属性。
1.3.1. Kafka Binder 属性
- spring.cloud.stream.kafka.binder.brokers
-
Kafka 绑定器连接到的代理列表。
默认值:
localhost
。 - spring.cloud.stream.kafka.binder.defaultBrokerPort
-
brokers
允许指定带有或不带有端口信息的宿主 (例如,host1,host2:port2
)。当代理列表中未配置端口时,此选项设置默认端口。默认值:
9092
。 - spring.cloud.stream.kafka.binder.configuration
-
传递给绑定器创建的所有客户端 (生产者和消费者) 的客户端属性 (键/值映射)。由于这些属性被生产者和消费者使用,因此应将其使用限制为通用属性,例如安全设置。通过此配置提供的未知 Kafka 生产者或消费者属性将被过滤掉,不允许传播。此处的属性优先于启动时设置的任何属性。
默认值:空映射。
- spring.cloud.stream.kafka.binder.consumerProperties
-
任意 Kafka 客户端消费者属性的键/值映射。除了支持已知的 Kafka 消费者属性外,此选项还允许使用未知的消费者属性。此处的属性优先于启动时设置的任何属性以及上面的
configuration
属性。默认值:空映射。
- spring.cloud.stream.kafka.binder.headers
-
绑定器传输的自定义标头列表。仅在与使用
kafka-clients
版本 < 0.11.0.0 的旧应用程序 (⇐ 1.3.x) 通信时需要。较新的版本原生支持标头。默认值:空。
- spring.cloud.stream.kafka.binder.healthTimeout
-
获取分区信息等待的时间 (以秒为单位)。如果此计时器到期,则健康报告为关闭。
默认值:10。
- spring.cloud.stream.kafka.binder.requiredAcks
-
代理上所需的确认数。请参阅 Kafka 文档以了解生产者
acks
属性。默认值:
1
。 - spring.cloud.stream.kafka.binder.minPartitionCount
-
仅当
autoCreateTopics
或autoAddPartitions
设置为 true 时有效。绑定器在生产或消费数据的主题上配置的全局最小分区数。它可以被生产者的partitionCount
设置或生产者的instanceCount * concurrency
设置的值覆盖(如果两者中任何一个更大)。默认值:
1
。 - spring.cloud.stream.kafka.binder.producerProperties
-
任意 Kafka 客户端生产者属性的键值对映射。除了支持已知的 Kafka 生产者属性外,这里也允许使用未知的生产者属性。此处的属性会覆盖在启动时和上面的
configuration
属性中设置的任何属性。默认值:空映射。
- spring.cloud.stream.kafka.binder.replicationFactor
-
如果
autoCreateTopics
处于活动状态,则为自动创建主题的副本因子。可以在每个绑定上覆盖。如果您使用的是 2.4 之前的 Kafka 代理版本,则此值应至少设置为 1
。从 3.0.8 版本开始,绑定器使用-1
作为默认值,这表示将使用代理的 'default.replication.factor' 属性来确定副本数量。请咨询您的 Kafka 代理管理员,以查看是否有任何策略要求最小副本因子,如果是这种情况,通常,default.replication.factor
将与该值匹配,并且应使用-1
,除非您需要大于最小值的副本因子。默认值:
-1
。 - spring.cloud.stream.kafka.binder.autoCreateTopics
-
如果设置为
true
,绑定器会自动创建新主题。如果设置为false
,绑定器依赖于主题已配置。在后一种情况下,如果主题不存在,绑定器将无法启动。此设置独立于代理的 auto.create.topics.enable
设置,不会影响它。如果服务器设置为自动创建主题,它们可能会作为元数据检索请求的一部分创建,并使用默认的代理设置。默认值:
true
。 - spring.cloud.stream.kafka.binder.autoAddPartitions
-
如果设置为
true
,绑定器会在需要时创建新分区。如果设置为false
,绑定器依赖于主题的分区大小已配置。如果目标主题的分区数量小于预期值,绑定器将无法启动。默认值:
false
。 - spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
-
在绑定器中启用事务。请参阅 Kafka 文档中的
transaction.id
和spring-kafka
文档中的 事务。启用事务后,将忽略单个producer
属性,所有生产者都将使用spring.cloud.stream.kafka.binder.transaction.producer.*
属性。默认值
null
(无事务) - spring.cloud.stream.kafka.binder.transaction.producer.*
-
事务绑定器中生产者的全局生产者属性。请参阅
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
和Kafka 生产者属性以及所有绑定器支持的通用生产者属性。默认值:请参阅各个生产者属性。
- spring.cloud.stream.kafka.binder.headerMapperBeanName
-
用于将
spring-messaging
头映射到 Kafka 头和从 Kafka 头映射到spring-messaging
头的KafkaHeaderMapper
的 Bean 名称。例如,如果您希望自定义使用 JSON 反序列化头部的BinderHeaderMapper
Bean 中的受信任包,请使用此属性。如果此自定义BinderHeaderMapper
Bean 未使用此属性提供给绑定器,则绑定器将查找名为kafkaBinderHeaderMapper
且类型为BinderHeaderMapper
的头映射器 Bean,然后再回退到绑定器创建的默认BinderHeaderMapper
。默认值:无。
- spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader
-
当发现主题上的任何分区(无论接收数据的消费者是什么)都没有领导者时,将绑定器健康状态设置为
down
的标志。默认值:
false
。 - spring.cloud.stream.kafka.binder.certificateStoreDirectory
-
当信任库或密钥库证书位置以类路径 URL(
classpath:…
)形式给出时,绑定器会将资源从 JAR 文件中的类路径位置复制到文件系统上的位置。该文件将被移动到为此属性指定的值指定的位置,该位置必须是文件系统上可由运行应用程序的进程写入的现有目录。如果未设置此值且证书文件是类路径资源,则它将被移动到系统临时目录,该目录由System.getProperty("java.io.tmpdir")
返回。如果此值存在,但目录在文件系统上找不到或不可写,则也是如此。默认值:无。
1.3.2. Kafka 消费者属性
为了避免重复,Spring Cloud Stream 支持以spring.cloud.stream.kafka.default.consumer.<property>=<value> 的格式为所有通道设置值。
|
以下属性仅适用于 Kafka 消费者,必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer.
为前缀。
- admin.configuration
-
从 2.1.1 版本开始,此属性已弃用,取而代之的是
topic.properties
,并且对它的支持将在未来版本中删除。 - admin.replicas-assignment
-
从 2.1.1 版本开始,此属性已弃用,建议使用
topic.replicas-assignment
,将在未来版本中移除对它的支持。 - admin.replication-factor
-
从 2.1.1 版本开始,此属性已弃用,建议使用
topic.replication-factor
,将在未来版本中移除对它的支持。 - autoRebalanceEnabled
-
当设置为
true
时,主题分区将在消费者组的成员之间自动重新平衡。当设置为false
时,每个消费者将根据spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
分配一组固定的分区。这要求在每个启动的实例上都正确设置spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
属性。在这种情况下,spring.cloud.stream.instanceCount
属性的值通常必须大于 1。默认值:
true
。 - ackEachRecord
-
当
autoCommitOffset
设置为true
时,此设置决定是否在处理完每个记录后提交偏移量。默认情况下,偏移量在处理完consumer.poll()
返回的记录批次中的所有记录后提交。通过consumer
的configuration
属性设置的 Kafka 属性max.poll.records
可以控制每次轮询返回的记录数量。将此设置为true
可能会导致性能下降,但这样做可以降低发生故障时重新传递记录的可能性。此外,请参阅绑定器requiredAcks
属性,它也会影响提交偏移量的性能。默认值:
false
。 - autoCommitOffset
-
是否在处理完消息后自动提交偏移量。如果设置为
false
,则入站消息中将包含一个键为kafka_acknowledgment
、类型为org.springframework.kafka.support.Acknowledgment
的头。应用程序可以使用此头来确认消息。有关详细信息,请参阅示例部分。当此属性设置为false
时,Kafka 绑定器将确认模式设置为org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL
,应用程序负责确认记录。另请参阅ackEachRecord
。默认值:
true
。 - autoCommitOnError
-
仅在
autoCommitOffset
设置为true
时有效。如果设置为false
,则会抑制导致错误的消息的自动提交,并且仅对成功的消息进行提交。它允许流在发生持久性故障的情况下从最后成功处理的消息自动重播。如果设置为true
,则始终自动提交(如果启用了自动提交)。如果未设置(默认值),则其值与enableDlq
相同,如果错误消息被发送到 DLQ,则自动提交错误消息,否则不提交。默认值:未设置。
- resetOffsets
-
是否将消费者的偏移量重置为 startOffset 提供的值。如果提供了
KafkaRebalanceListener
,则必须为 false;请参阅 使用 KafkaRebalanceListener。默认值:
false
。 - startOffset
-
新组的起始偏移量。允许的值:
earliest
和latest
。如果消费者的“绑定”明确设置了消费者组(通过spring.cloud.stream.bindings.<channelName>.group
),则 'startOffset' 设置为earliest
。否则,对于anonymous
消费者组,它将设置为latest
。另请参阅resetOffsets
(在本列表中更早)。默认值:null(等效于
earliest
)。 - enableDlq
-
设置为 true 时,将为消费者启用 DLQ 行为。默认情况下,导致错误的消息将转发到名为
error.<destination>.<group>
的主题。DLQ 主题名称可以通过设置dlqName
属性或定义类型为DlqDestinationResolver
的@Bean
来配置。这为错误数量相对较少且重播整个原始主题过于繁琐的情况提供了另一种选择,而不是更常见的 Kafka 重播方案。有关更多信息,请参阅 死信主题处理 处理。从 2.0 版本开始,发送到 DLQ 主题的消息将使用以下标头进行增强:x-original-topic
、x-exception-message
和x-exception-stacktrace
作为byte[]
。默认情况下,失败的记录将发送到 DLQ 主题中与原始记录相同的分区号。有关如何更改此行为,请参阅 死信主题分区选择。当destinationIsPattern
为true
时不允许。默认值:
false
。 - dlqPartitions
-
当
enableDlq
为 true 且未设置此属性时,将创建一个与主主题数量相同的分区数的死信主题。通常,死信记录将发送到死信主题中与原始记录相同的分区。此行为可以更改;请参阅 死信主题分区选择。如果此属性设置为1
且没有DqlPartitionFunction
bean,则所有死信记录都将写入分区0
。如果此属性大于1
,则必须提供DlqPartitionFunction
bean。请注意,实际分区计数受绑定程序的minPartitionCount
属性影响。默认值:
none
- configuration
-
包含通用 Kafka 消费者属性的键值对的映射。除了 Kafka 消费者属性之外,还可以在此处传递其他配置属性。例如,应用程序需要的一些属性,例如
spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar
。默认值:空映射。
- dlqName
-
接收错误消息的 DLQ 主题名称。
默认值:null(如果未指定,导致错误的消息将转发到名为
error.<destination>.<group>
的主题)。 - dlqProducerProperties
-
使用此属性可以设置特定于 DLQ 的生产者属性。所有可通过 Kafka 生产者属性设置的属性都可以通过此属性设置。当在消费者上启用本机解码(即,useNativeDecoding: true)时,应用程序必须为 DLQ 提供相应的键/值序列化器。这必须以
dlqProducerProperties.configuration.key.serializer
和dlqProducerProperties.configuration.value.serializer
的形式提供。默认值:默认 Kafka 生产者属性。
- standardHeaders
-
指示入站通道适配器填充哪些标准标头。允许的值:
none
、id
、timestamp
或both
。如果使用本机反序列化,并且第一个接收消息的组件需要一个id
(例如,配置为使用 JDBC 消息存储的聚合器),则此属性很有用。默认值:
none
- converterBeanName
-
实现
RecordMessageConverter
的 Bean 的名称。在入站通道适配器中使用,以替换默认的MessagingMessageConverter
。默认值:
null
- idleEventInterval
-
指示最近未收到任何消息的事件之间的间隔(以毫秒为单位)。使用
ApplicationListener<ListenerContainerIdleEvent>
来接收这些事件。有关使用示例,请参见 示例:暂停和恢复消费者。默认值:
30000
- destinationIsPattern
-
如果为 true,则目标将被视为正则表达式
Pattern
,用于通过代理匹配主题名称。如果为 true,则不会预配主题,并且不允许enableDlq
,因为绑定器在预配阶段不知道主题名称。请注意,检测与模式匹配的新主题所需的时间由消费者属性metadata.max.age.ms
控制,该属性(在撰写本文时)默认为 300,000 毫秒(5 分钟)。可以使用上面的configuration
属性进行配置。默认值:
false
- topic.properties
-
预配新主题时使用的 Kafka 主题属性的
Map
,例如spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0
默认值:无。
- topic.replicas-assignment
-
副本分配的 Map<Integer, List<Integer>>,其中键是分区,值是分配。在预配新主题时使用。请参见
kafka-clients
jar 中的NewTopic
Javadocs。默认值:无。
- topic.replication-factor
-
在配置主题时使用的副本因子。覆盖绑定范围的设置。如果存在
replicas-assignments
,则忽略。默认值:无(使用绑定范围的默认值 -1)。
- pollTimeout
-
可轮询消费者中轮询使用的超时时间。
默认值:5 秒。
- transactionManager
-
用于覆盖绑定事务管理器的
KafkaAwareTransactionManager
的 Bean 名称。通常,如果您想使用ChainedKafkaTransactionManaager
将另一个事务与 Kafka 事务同步,则需要此操作。为了实现记录的精确一次消费和生产,消费者和生产者绑定必须都配置了相同的事务管理器。默认值:无。
- txCommitRecovered
-
使用事务性绑定时,恢复记录的偏移量(例如,当重试耗尽且记录被发送到死信主题时)将通过默认情况下新的事务提交。将此属性设置为
false
将抑制提交恢复记录的偏移量。默认值:true。
1.3.3. 重置偏移量
当应用程序启动时,每个分配的分区的初始位置取决于两个属性startOffset
和 resetOffsets
。如果resetOffsets
为false
,则应用正常的 Kafka 消费者 auto.offset.reset
语义。即,如果绑定消费者的组没有为分区提交偏移量,则位置为earliest
或 latest
。默认情况下,具有显式group
的绑定使用earliest
,而匿名绑定(没有group
)使用latest
。这些默认值可以通过设置startOffset
绑定属性来覆盖。第一次使用特定group
启动绑定时,将没有提交的偏移量。另一个没有提交偏移量的情况是偏移量已过期。对于现代代理(从 2.1 开始)和默认代理属性,偏移量在最后一个成员离开组后 7 天过期。有关更多信息,请参阅 offsets.retention.minutes
代理属性。
当resetOffsets
为true
时,绑定会应用与代理上没有提交偏移量时类似的语义,就好像此绑定从未从主题中消费过一样;即,任何当前提交的偏移量都会被忽略。
以下是可能使用此功能的两种用例。
-
从包含键值对的压缩主题中消费。将
resetOffsets
设置为true
,并将startOffset
设置为earliest
;绑定将在所有新分配的分区上执行seekToBeginning
。 -
从包含事件的主题中消费,您只对在此绑定运行期间发生的事件感兴趣。将
resetOffsets
设置为true
,并将startOffset
设置为latest
;绑定将在所有新分配的分区上执行seekToEnd
。
如果在初始分配后发生重新平衡,则仅对在初始分配期间未分配的任何新分配的分区执行查找。 |
要更细致地控制主题偏移量,请参阅 使用 KafkaRebalanceListener;当提供监听器时,resetOffsets
不应设置为 true
,否则会导致错误。 >>>>>>> 7bc90c10… GH-1084: 添加 txCommitRecovered 属性
1.3.4. 消费批次
从 3.0 版本开始,当 spring.cloud.stream.binding.<name>.consumer.batch-mode
设置为 true
时,从 Kafka Consumer
轮询接收到的所有记录将作为 List<?>
提供给监听器方法。否则,该方法将一次调用一个记录。批次的大小由 Kafka 消费者属性 max.poll.records
、fetch.min.bytes
、fetch.max.wait.ms
控制;有关更多信息,请参阅 Kafka 文档。
请注意,批处理模式不支持 @StreamListener
- 它只适用于较新的函数式编程模型。
使用批处理模式时,不支持绑定器内的重试,因此 maxAttempts 将被覆盖为 1。您可以配置一个 SeekToCurrentBatchErrorHandler (使用 ListenerContainerCustomizer )来实现与绑定器中重试类似的功能。您也可以使用手动 AckMode 并调用 Ackowledgment.nack(index, sleep) 来提交部分批次的偏移量,并让剩余的记录重新传递。有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档。
|
1.3.5. Kafka 生产者属性
为了避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为 spring.cloud.stream.kafka.default.producer.<property>=<value> 。
|
以下属性仅适用于 Kafka 生产者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.producer.
为前缀。
- admin.configuration
-
从 2.1.1 版本开始,此属性已弃用,取而代之的是
topic.properties
,并且对它的支持将在未来版本中删除。 - admin.replicas-assignment
-
从 2.1.1 版本开始,此属性已弃用,建议使用
topic.replicas-assignment
,将在未来版本中移除对它的支持。 - admin.replication-factor
-
从 2.1.1 版本开始,此属性已弃用,建议使用
topic.replication-factor
,将在未来版本中移除对它的支持。 - bufferSize
-
Kafka 生产者在发送之前尝试批处理的数据量上限(以字节为单位)。
默认值:
16384
。 - sync
-
生产者是否同步。
默认值:
false
。 - sendTimeoutExpression
-
一个针对发送消息进行评估的 SpEL 表达式,用于评估启用同步发布时等待确认的时间,例如
headers['mySendTimeout']
。超时值的单位是毫秒。在 3.0 之前的版本中,除非使用原生编码,否则无法使用有效负载,因为在评估此表达式时,有效负载已经以byte[]
的形式存在。现在,表达式在有效负载转换之前进行评估。默认值:
none
。 - batchTimeout
-
生产者等待更多消息累积到同一批次中以发送消息的时间。(通常,生产者不会等待,而是简单地发送在上次发送过程中累积的所有消息。)非零值可能会以延迟为代价提高吞吐量。
默认值:
0
。 - messageKeyExpression
-
一个针对发送消息进行评估的 SpEL 表达式,用于填充生产的 Kafka 消息的键,例如
headers['myKey']
。在 3.0 之前的版本中,除非使用原生编码,否则无法使用有效负载,因为在评估此表达式时,有效负载已经以byte[]
的形式存在。现在,表达式在有效负载转换之前进行评估。对于常规处理器(Function<String, String>
或Function<Message<?>, Message<?>
),如果生产的键需要与来自主题的传入键相同,则可以按如下方式设置此属性。spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']
对于反应式函数,需要注意一个重要的注意事项。在这种情况下,应用程序需要手动将传入消息的标头复制到传出消息。您可以设置标头,例如myKey
,并使用headers['myKey']
,如上所述,或者为了方便起见,只需设置KafkaHeaders.MESSAGE_KEY
标头,您就不需要设置此属性。默认值:
none
。 - headerPatterns
-
一个用逗号分隔的简单模式列表,用于匹配要映射到
ProducerRecord
中的 KafkaHeaders
的 Spring 消息标头。模式可以以通配符(星号)开头或结尾。模式可以通过在前面加上!
来否定。匹配在第一次匹配(正或负)后停止。例如,!ask,as*
将传递ash
但不传递ask
。id
和timestamp
永远不会被映射。默认值:
*
(所有标头 - 除了id
和timestamp
) - configuration
-
包含通用 Kafka 生产者属性的键值对映射。
默认值:空映射。
- topic.properties
-
在配置新主题时使用的 Kafka 主题属性的
Map
,例如spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0
- topic.replicas-assignment
-
副本分配的 Map<Integer, List<Integer>>,其中键是分区,值是分配。在预配新主题时使用。请参见
kafka-clients
jar 中的NewTopic
Javadocs。默认值:无。
- topic.replication-factor
-
在配置主题时使用的副本因子。覆盖绑定范围的设置。如果存在
replicas-assignments
,则忽略。默认值:无(使用绑定范围的默认值 -1)。
- useTopicHeader
-
设置为
true
以使用出站消息中KafkaHeaders.TOPIC
消息头的值覆盖默认绑定目标(主题名称)。如果头不存在,则使用默认绑定目标。默认值:false
。 - recordMetadataChannel
-
成功发送结果应发送到的
MessageChannel
的 bean 名称;该 bean 必须存在于应用程序上下文中。发送到该通道的消息是已发送的消息(如果有转换,则经过转换),并带有额外的头KafkaHeaders.RECORD_METADATA
。该头包含 Kafka 客户端提供的RecordMetadata
对象;它包含记录在主题中写入的分区和偏移量。
ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)
发送失败将进入生产者错误通道(如果已配置);请参阅 错误通道。默认值:null
+
Kafka 绑定器使用生产者的 partitionCount 设置作为提示,以使用给定的分区计数创建主题(与 minPartitionCount 结合使用,两者中较大的值为使用值)。在为绑定器配置 minPartitionCount 和为应用程序配置 partitionCount 时要谨慎,因为使用较大的值。如果主题已存在且分区计数较小,并且 autoAddPartitions 已禁用(默认值),则绑定器无法启动。如果主题已存在且分区计数较小,并且 autoAddPartitions 已启用,则会添加新分区。如果主题已存在且分区数量大于 (minPartitionCount 或 partitionCount ) 中较大的值,则使用现有分区计数。
|
- compression
-
设置
compression.type
生产者属性。支持的值为none
、gzip
、snappy
和lz4
。如果您将kafka-clients
jar 覆盖到 2.1.0(或更高版本),如 Spring for Apache Kafka 文档 中所述,并且希望使用zstd
压缩,请使用spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd
。默认值:
none
。 - closeTimeout
-
关闭生产者时等待的超时时间(以秒为单位)。
默认值:
30
1.3.6. 使用示例
在本节中,我们将展示在特定场景中使用上述属性的方法。
示例:将 autoCommitOffset
设置为 false
并依赖手动确认
此示例说明如何在消费者应用程序中手动确认偏移量。
此示例要求将 spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset
设置为 false
。请使用您的示例的相应输入通道名称。
@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {
public static void main(String[] args) {
SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
}
@StreamListener(Sink.INPUT)
public void process(Message<?> message) {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}
示例:安全配置
Apache Kafka 0.9 支持客户端和代理之间的安全连接。要利用此功能,请遵循 Apache Kafka 文档 以及 Kafka 0.9 Confluent 文档中的安全指南 中的指南。使用 spring.cloud.stream.kafka.binder.configuration
选项为绑定器创建的所有客户端设置安全属性。
例如,要将 security.protocol
设置为 SASL_SSL
,请设置以下属性
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
所有其他安全属性都可以以类似的方式设置。
使用 Kerberos 时,请遵循 参考文档 中的说明来创建和引用 JAAS 配置。
Spring Cloud Stream 支持通过使用 JAAS 配置文件和 Spring Boot 属性将 JAAS 配置信息传递给应用程序。
使用 JAAS 配置文件
可以使用系统属性为 Spring Cloud Stream 应用程序设置 JAAS 和(可选)krb5 文件位置。以下示例展示了如何使用 JAAS 配置文件启动使用 SASL 和 Kerberos 的 Spring Cloud Stream 应用程序
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用 Spring Boot 属性
作为使用 JAAS 配置文件的替代方案,Spring Cloud Stream 提供了一种机制,可以通过使用 Spring Boot 属性为 Spring Cloud Stream 应用程序设置 JAAS 配置。
以下属性可用于配置 Kafka 客户端的登录上下文
- spring.cloud.stream.kafka.binder.jaas.loginModule
-
登录模块名称。在正常情况下无需设置。
默认值:
com.sun.security.auth.module.Krb5LoginModule
。 - spring.cloud.stream.kafka.binder.jaas.controlFlag
-
登录模块的控制标志。
默认值:
required
。 - spring.cloud.stream.kafka.binder.jaas.options
-
包含登录模块选项的键值对映射。
默认值:空映射。
以下示例展示了如何使用 Spring Boot 配置属性启动使用 SASL 和 Kerberos 的 Spring Cloud Stream 应用程序
java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.autoCreateTopics=false \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM
前面的示例表示以下 JAAS 文件的等效项
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_client.keytab"
principal="[email protected]";
};
如果所需主题已存在于代理上或将由管理员创建,则可以关闭自动创建,并且只需要发送客户端 JAAS 属性。
不要在同一个应用程序中混合使用 JAAS 配置文件和 Spring Boot 属性。如果 -Djava.security.auth.login.config 系统属性已存在,则 Spring Cloud Stream 会忽略 Spring Boot 属性。
|
在使用 autoCreateTopics 和 autoAddPartitions 与 Kerberos 时要小心。通常,应用程序可能使用在 Kafka 和 Zookeeper 中没有管理权限的主体。因此,依赖 Spring Cloud Stream 创建/修改主题可能会失败。在安全环境中,我们强烈建议使用 Kafka 工具以管理方式创建主题和管理 ACL。
|
示例:暂停和恢复消费者
如果您希望暂停消费但不想导致分区重新平衡,您可以暂停和恢复消费者。这可以通过将Consumer
作为参数添加到您的@StreamListener
来实现。要恢复,您需要一个用于ListenerContainerIdleEvent
实例的ApplicationListener
。事件发布的频率由idleEventInterval
属性控制。由于消费者不是线程安全的,您必须在调用线程上调用这些方法。
以下简单的应用程序展示了如何暂停和恢复
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
}
@Bean
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
return event -> {
System.out.println(event);
if (event.getConsumer().paused().size() > 0) {
event.getConsumer().resume(event.getConsumer().paused());
}
};
}
}
1.4. 事务性 Binder
通过将spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
设置为非空值(例如tx-
)来启用事务。在处理器应用程序中使用时,消费者会启动事务;在消费者线程上发送的任何记录都将参与同一事务。当监听器正常退出时,监听器容器将向事务发送偏移量并提交它。所有使用spring.cloud.stream.kafka.binder.transaction.producer.*
属性配置的生产者绑定都使用一个通用的生产者工厂;单个绑定 Kafka 生产者属性将被忽略。
由于重试将在原始事务中运行,该事务可能会回滚,并且任何发布的记录也将回滚,因此不支持使用事务进行正常的绑定重试(和死信)。当启用重试(通用属性maxAttempts 大于零)时,重试属性用于配置一个DefaultAfterRollbackProcessor 以在容器级别启用重试。类似地,此功能不是在事务中发布死信记录,而是移动到监听器容器,同样通过DefaultAfterRollbackProcessor 实现,该处理器在主事务回滚后运行。
|
如果您希望在源应用程序中使用事务,或者从某个任意线程进行仅生产者事务(例如@Scheduled
方法),您必须获取对事务性生产者工厂的引用,并使用它定义一个KafkaTransactionManager
bean。
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
return new KafkaTransactionManager<>(pf);
}
请注意,我们使用BinderFactory
获取对绑定的引用;当只配置了一个绑定时,在第一个参数中使用null
。如果配置了多个绑定,请使用绑定名称来获取引用。一旦我们获得了对绑定的引用,我们就可以获得对ProducerFactory
的引用并创建一个事务管理器。
然后,您可以使用正常的 Spring 事务支持,例如TransactionTemplate
或@Transactional
,例如
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
如果您希望将仅生产者的事务与来自其他事务管理器的那些事务同步,请使用ChainedTransactionManager
。
1.5. 错误通道
从 1.3 版本开始,绑定器无条件地将异常发送到每个消费者目标的错误通道,并且还可以配置为将异步生产者发送失败发送到错误通道。有关更多信息,请参阅有关错误处理的这一部分。
发送失败的ErrorMessage
的有效负载是KafkaSendFailureException
,它具有以下属性
-
failedMessage
:无法发送的 Spring MessagingMessage<?>
。 -
record
:从failedMessage
创建的原始ProducerRecord
没有自动处理生产者异常(例如发送到死信队列)。您可以使用自己的 Spring Integration 流来使用这些异常。
1.6. Kafka 指标
Kafka 绑定器模块公开了以下指标
spring.cloud.stream.binder.kafka.offset
:此指标指示给定绑定器的主题中给定消费者组尚未消费的消息数量。提供的指标基于 Micrometer 库。如果 Micrometer 位于类路径上,并且应用程序没有提供其他此类 bean,则绑定器会创建KafkaBinderMetrics
bean。该指标包含消费者组信息、主题和主题上最新偏移量的已提交偏移量的实际滞后。此指标对于向 PaaS 平台提供自动缩放反馈特别有用。
您可以排除KafkaBinderMetrics
,以防止创建必要的基础设施(如消费者),然后通过在应用程序中提供以下组件来报告指标。
@Component
class NoOpBindingMeters {
NoOpBindingMeters(MeterRegistry registry) {
registry.config().meterFilter(
MeterFilter.denyNameStartsWith(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME));
}
}
有关如何选择性地抑制指标的更多详细信息,请参阅此处。
1.7. 墓碑记录(空记录值)
使用压缩主题时,具有null
值的记录(也称为墓碑记录)表示删除键。要在@StreamListener
方法中接收此类消息,参数必须标记为非必需,以接收null
值参数。
@StreamListener(Sink.INPUT)
public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key,
@Payload(required = false) Customer customer) {
// customer is null if a tombstone record
...
}
1.8. 使用 KafkaRebalanceListener
应用程序可能希望在最初分配分区时将主题/分区搜索到任意偏移量,或对消费者执行其他操作。从 2.1 版本开始,如果您在应用程序上下文中提供单个KafkaRebalanceListener
bean,它将被连接到所有 Kafka 消费者绑定。
public interface KafkaBindingRebalanceListener {
/**
* Invoked by the container before any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedBeforeCommit(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
}
/**
* Invoked by the container after any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedAfterCommit(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
/**
* Invoked when partitions are initially assigned or after a rebalance.
* Applications might only want to perform seek operations on an initial assignment.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
boolean initial) {
}
}
当您提供重新平衡侦听器时,您不能将resetOffsets
消费者属性设置为true
。
1.9. 自定义消费者和生产者配置
如果您希望对用于在 Kafka 中创建ConsumerFactory
和ProducerFactory
的消费者和生产者配置进行高级自定义,您可以实现以下自定义程序。
-
消费者配置定制器
-
生产者配置定制器
这两个接口都提供了一种方法来配置用于消费者和生产者属性的配置映射。例如,如果您想访问在应用程序级别定义的 Bean,您可以在 configure
方法的实现中注入它。当绑定器发现这些定制器作为 Bean 可用时,它将在创建消费者和生产者工厂之前调用 configure
方法。
1.10. 自定义 AdminClient 配置
与上面消费者和生产者配置定制一样,应用程序还可以通过提供 AdminClientConfigCustomizer
来定制管理客户端的配置。AdminClientConfigCustomizer 的 configure 方法提供了对管理客户端属性的访问,您可以使用它来定义进一步的定制。绑定器的 Kafka 主题供应器对通过此定制器提供的属性给予最高优先级。以下是如何提供此定制器 Bean 的示例。
@Bean
public AdminClientConfigCustomizer adminClientConfigCustomizer() {
return props -> {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
};
}
1.11. 死信主题处理
1.11.1. 死信主题分区选择
默认情况下,记录使用与原始记录相同的分区发布到死信主题。这意味着死信主题必须至少与原始记录具有相同数量的分区。
要更改此行为,请将 DlqPartitionFunction
实现作为 @Bean
添加到应用程序上下文。只能存在一个这样的 Bean。该函数将提供消费者组、失败的 ConsumerRecord
和异常。例如,如果您始终想要路由到分区 0,您可以使用
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果您将消费者绑定的 dlqPartitions 属性设置为 1(并且绑定器的 minPartitionCount 等于 1 ),则无需提供 DlqPartitionFunction ;框架将始终使用分区 0。如果您将消费者绑定的 dlqPartitions 属性设置为大于 1 的值(或绑定器的 minPartitionCount 大于 1 ),则 **必须** 提供 DlqPartitionFunction Bean,即使分区计数与原始主题的相同。
|
还可以为 DLQ 主题定义自定义名称。为此,请将 DlqDestinationResolver
的实现作为 @Bean
创建到应用程序上下文。当绑定器检测到这样的 Bean 时,它将优先使用它,否则它将使用 dlqName
属性。如果两者都找不到,它将默认为 error.<destination>.<group>
。以下是如何将 DlqDestinationResolver
作为 @Bean
的示例。
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
在为 DlqDestinationResolver
提供实现时,需要注意的一点是,绑定器中的供应器不会自动为应用程序创建主题。这是因为绑定器无法推断实现可能发送到的所有 DLQ 主题的名称。因此,如果您使用此策略提供 DLQ 名称,则应用程序有责任确保这些主题事先创建。
1.11.2. 处理死信主题中的记录
由于框架无法预测用户希望如何处理死信消息,因此它不提供任何标准机制来处理它们。如果死信的原因是暂时的,您可能希望将消息路由回原始主题。但是,如果问题是永久性问题,则会导致无限循环。本主题中的示例 Spring Boot 应用程序展示了如何将这些消息路由回原始主题,但它会在三次尝试后将它们移动到“停车场”主题。该应用程序是另一个 spring-cloud-stream 应用程序,它从死信主题读取。当 5 秒内没有收到消息时,它将终止。
这些示例假设原始目标是 so8400out
,消费者组是 so8400
。
有几种策略需要考虑
-
考虑仅在主应用程序未运行时运行重路由。否则,针对瞬态错误的重试会很快用完。
-
或者,使用两阶段方法:使用此应用程序路由到第三个主题,然后使用另一个应用程序从那里路由回主主题。
以下代码清单显示了示例应用程序
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private MessageChannel parkingLot;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<?> reRoute(Message<?> failed) {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries.intValue() < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
parkingLot.send(MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, terminating");
return;
}
}
}
public interface TwoOutputProcessor extends Processor {
@Output("parkingLot")
MessageChannel parkingLot();
}
}
1.12. 使用 Kafka Binder 进行分区
Apache Kafka 本地支持主题分区。
有时将数据发送到特定分区是有利的,例如,当您希望严格排序消息处理时(针对特定客户的所有消息都应发送到同一个分区)。
以下示例展示了如何配置生产者和消费者端
@SpringBootApplication
@EnableBinding(Source.class)
public class KafkaPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"foo1", "bar1", "qux1",
"foo2", "bar2", "qux2",
"foo3", "bar3", "qux3",
"foo4", "bar4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
.web(false)
.run(args);
}
@InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
public Message<?> generate() {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
}
}
spring:
cloud:
stream:
bindings:
output:
destination: partitioned.topic
producer:
partition-key-expression: headers['partitionKey']
partition-count: 12
主题必须配置为具有足够的 partition 来实现所有消费者组的所需并发性。上述配置支持最多 12 个消费者实例(如果它们的 concurrency 为 2,则为 6 个;如果它们的并发性为 3,则为 4 个,依此类推)。通常最好“过度配置”分区,以允许将来增加消费者或并发性。
|
前面的配置使用默认分区 (key.hashCode() % partitionCount )。这可能会或可能不会提供一个适当平衡的算法,具体取决于键值。您可以使用 partitionSelectorExpression 或 partitionSelectorClass 属性覆盖此默认值。
|
由于分区由 Kafka 本地处理,因此消费者端不需要任何特殊配置。Kafka 在实例之间分配分区。
以下 Spring Boot 应用程序侦听 Kafka 流,并将每个消息发送到的分区 ID(到控制台)打印出来
@SpringBootApplication
@EnableBinding(Sink.class)
public class KafkaPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
.web(false)
.run(args);
}
@StreamListener(Sink.INPUT)
public void listen(@Payload String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(in + " received from partition " + partition);
}
}
spring:
cloud:
stream:
bindings:
input:
destination: partitioned.topic
group: myGroup
您可以根据需要添加实例。Kafka 会重新平衡分区分配。如果实例数量(或instance count * concurrency
)超过分区数量,则某些消费者将处于空闲状态。
2. Kafka Streams Binder
2.1. 使用
要使用 Kafka Streams 绑定器,您只需将其添加到您的 Spring Cloud Stream 应用程序中,使用以下 Maven 坐标
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
使用 Spring Initializr 快速启动 Kafka Streams 绑定器的全新项目,然后选择“Cloud Streams”和“Spring for Kafka Streams”,如下所示
2.2. 概述
Spring Cloud Stream 包含一个专门为 Apache Kafka Streams 绑定设计的绑定器实现。通过这种原生集成,Spring Cloud Stream “处理器”应用程序可以直接在核心业务逻辑中使用 Apache Kafka Streams API。
Kafka Streams 绑定器实现建立在 Spring for Apache Kafka 项目提供的基础之上。
Kafka Streams 绑定器为 Kafka Streams 中的三种主要类型提供绑定功能 - KStream
、KTable
和 GlobalKTable
。
Kafka Streams 应用程序通常遵循一种模型,其中记录从入站主题读取,应用业务逻辑,然后将转换后的记录写入出站主题。或者,也可以定义没有出站目标的处理器应用程序。
在以下部分中,我们将详细了解 Spring Cloud Stream 与 Kafka Streams 的集成。
2.3. 编程模型
使用 Kafka Streams 绑定器提供的编程模型时,既可以使用高级 Streams DSL,也可以混合使用高级和低级 Processor-API。当混合使用高级和低级 API 时,这通常是通过在 KStream
上调用 transform
或 process
API 方法来实现的。
2.3.1. 函数式风格
从 Spring Cloud Stream 3.0.0
开始,Kafka Streams 绑定器允许应用程序使用 Java 8 中可用的函数式编程风格进行设计和开发。这意味着应用程序可以简洁地表示为类型 java.util.function.Function
或 java.util.function.Consumer
的 lambda 表达式。
让我们来看一个非常基本的例子。
@SpringBootApplication
public class SimpleConsumerApplication {
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input ->
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
虽然简单,但这是一个完整的独立 Spring Boot 应用程序,它利用 Kafka Streams 进行流处理。这是一个没有出站绑定的消费者应用程序,只有一个入站绑定。应用程序消费数据,并简单地将 KStream
键和值的信息记录到标准输出。应用程序包含 SpringBootApplication
注解和一个标记为 Bean
的方法。bean 方法的类型为 java.util.function.Consumer
,它用 KStream
参数化。然后在实现中,我们返回一个本质上是 lambda 表达式的 Consumer 对象。在 lambda 表达式中,提供了处理数据的代码。
在这个应用程序中,有一个单一的输入绑定,类型为 KStream
。绑定器为应用程序创建此绑定,名称为 process-in-0
,即函数 bean 名称后跟一个连字符 (-
),然后是文字 in
,再跟一个连字符,然后是参数的序数位置。您可以使用此绑定名称来设置其他属性,例如目标。例如,spring.cloud.stream.bindings.process-in-0.destination=my-topic
。
如果绑定中未设置目标属性,则会创建一个与绑定名称相同的主题(如果应用程序具有足够的权限),或者预期该主题已存在。 |
构建为 uber-jar(例如,kstream-consumer-app.jar
)后,您可以像下面这样运行上面的示例。
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic
以下是一个完整的处理器示例,它包含输入和输出绑定。这是经典的词频统计示例,其中应用程序从主题接收数据,然后在滚动时间窗口中计算每个词的出现次数。
@SpringBootApplication
public class WordCountProcessorApplication {
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("word-counts-state-store"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
}
同样,这是一个完整的 Spring Boot 应用程序。与第一个应用程序的不同之处在于,bean 方法的类型为 java.util.function.Function
。Function
的第一个参数化类型用于输入 KStream
,第二个参数化类型用于输出。在方法体中,提供了一个 Function
类型的 lambda 表达式,作为实现,给出了实际的业务逻辑。与之前讨论的基于 Consumer 的应用程序类似,这里的输入绑定默认命名为 process-in-0
。对于输出,绑定名称也会自动设置为 process-out-0
。
构建为 uber-jar(例如,wordcount-processor.jar
)后,您可以像下面这样运行上面的示例。
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts
此应用程序将从 Kafka 主题 words
中消费消息,并将计算结果发布到输出主题 counts
。
Spring Cloud Stream 将确保来自传入和传出主题的消息自动绑定为 KStream 对象。作为开发人员,您可以专注于代码的业务方面,即编写处理器中所需的逻辑。框架会自动处理 Kafka Streams 基础设施所需的 Kafka Streams 特定配置。
我们上面看到的两个示例都有一个 KStream
输入绑定。在这两种情况下,绑定都从单个主题接收记录。如果您想将多个主题多路复用到单个 KStream
绑定中,可以在下面提供以逗号分隔的 Kafka 主题作为目标。
spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3
此外,如果您想根据正则表达式匹配主题,还可以提供主题模式作为目标。
spring.cloud.stream.bindings.process-in-0.destination=input.*
多个输入绑定
许多非平凡的 Kafka Streams 应用程序通常通过多个绑定从多个主题消费数据。例如,一个主题作为 Kstream
消费,另一个作为 KTable
或 GlobalKTable
消费。应用程序可能希望以表格类型接收数据的原因有很多。考虑一个用例,其中基础主题通过来自数据库的更改数据捕获 (CDC) 机制填充,或者应用程序只关心最新更新以进行下游处理。如果应用程序指定数据需要绑定为 KTable
或 GlobalKTable
,那么 Kafka Streams 绑定器将正确地将目标绑定到 KTable
或 GlobalKTable
,并使它们可供应用程序操作。我们将研究 Kafka Streams 绑定器中如何处理多个输入绑定的几种不同场景。
Kafka Streams Binder 中的 BiFunction
以下是一个有两个输入和一个输出的示例。在这种情况下,应用程序可以利用java.util.function.BiFunction
。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
这里,基本主题与之前的示例相同,但这里有两个输入。Java 的BiFunction
支持用于将输入绑定到所需的目的地。绑定器为输入生成的默认绑定名称分别为process-in-0
和process-in-1
。默认输出绑定为process-out-0
。在本例中,BiFunction
的第一个参数绑定为第一个输入的KStream
,第二个参数绑定为第二个输入的KTable
。
Kafka Streams Binder 中的 BiConsumer
如果有两个输入,但没有输出,那么我们可以使用java.util.function.BiConsumer
,如下所示。
@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
return (userClicksStream, userRegionsTable) -> {}
}
超过两个输入
如果有多于两个输入怎么办?在某些情况下,您可能需要超过两个输入。在这种情况下,绑定器允许您链接部分函数。在函数式编程术语中,这种技术通常被称为柯里化。随着 Java 8 中添加的函数式编程支持,Java 现在允许您编写柯里化函数。Spring Cloud Stream Kafka Streams 绑定器可以利用此功能来启用多个输入绑定。
让我们看一个例子。
@Bean
public Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
return orders -> (
customers -> (
products -> (
orders.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}
让我们看一下上面介绍的绑定模型的细节。在这个模型中,我们在入站有 3 个部分应用的函数。让我们将它们称为f(x)
、f(y)
和f(z)
。如果我们从真正的数学函数的角度扩展这些函数,它将看起来像这样:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>
。x
变量代表KStream<Long, Order>
,y
变量代表GlobalKTable<Long, Customer>
,z
变量代表GlobalKTable<Long, Product>
。第一个函数f(x)
具有应用程序的第一个输入绑定(KStream<Long, Order>
),其输出是函数f(y)
。函数f(y)
具有应用程序的第二个输入绑定(GlobalKTable<Long, Customer>
),其输出是另一个函数f(z)
。函数f(z)
的输入是应用程序的第三个输入(GlobalKTable<Long, Product>
),其输出是KStream<Long, EnrichedOrder>
,它是应用程序的最终输出绑定。来自三个部分函数的输入,分别是KStream
、GlobalKTable
、GlobalKTable
,在方法体中可用于实现作为 lambda 表达式一部分的业务逻辑。
输入绑定分别命名为enrichOrder-in-0
、enrichOrder-in-1
和enrichOrder-in-2
。输出绑定命名为enrichOrder-out-0
。
使用柯里化函数,您可以拥有任意数量的输入。但是,请记住,超过少量输入和部分应用的函数(如上面在 Java 中所示)可能会导致代码难以阅读。因此,如果您的 Kafka Streams 应用程序需要超过合理数量的输入绑定,并且您想使用这种函数模型,那么您可能需要重新考虑您的设计并适当地分解应用程序。
多个输出绑定
Kafka Streams 允许将出站数据写入多个主题。此功能在 Kafka Streams 中被称为分支。当使用多个输出绑定时,需要提供一个 KStream 数组 (KStream[]
) 作为出站返回类型。
以下是一个示例
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.branch(isEnglish, isFrench, isSpanish);
}
编程模型保持不变,但出站参数化类型为 KStream[]
。默认输出绑定名称分别为 process-out-0
、process-out-1
、process-out-2
。绑定器生成三个输出绑定的原因是它检测到返回的 KStream
数组的长度。
Kafka Streams 函数式编程风格总结
总之,下表显示了可以在函数式范式中使用的各种选项。
输入数量 | 输出数量 | 要使用的组件 |
---|---|---|
1 |
0 |
java.util.function.Consumer |
2 |
0 |
java.util.function.BiConsumer |
1 |
1..n |
java.util.function.Function |
2 |
1..n |
java.util.function.BiFunction |
>= 3 |
0..n |
使用柯里化函数 |
-
在表中有多个输出的情况下,类型简单地变为
KStream[]
。
2.3.2. 命令式编程模型。
虽然上面概述的函数式编程模型是首选方法,但如果您愿意,您仍然可以使用经典的 StreamListener
方法。
以下是一些示例。
以下是使用 StreamListener
的单词计数示例的等效代码。
@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class WordCountProcessorApplication {
@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<?, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-multi"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
如您所见,这有点冗长,因为您需要提供 EnableBinding
和其他额外的注释(如 StreamListener
和 SendTo
)才能使其成为一个完整的应用程序。EnableBinding
是您指定包含绑定的绑定接口的地方。在本例中,我们使用的是库存 KafkaStreamsProcessor
绑定接口,它具有以下契约。
public interface KafkaStreamsProcessor {
@Input("input")
KStream<?, ?> input();
@Output("output")
KStream<?, ?> output();
}
绑定器将为输入 KStream
和输出 KStream
创建绑定,因为您使用的是包含这些声明的绑定接口。
除了函数式风格提供的编程模型的明显差异之外,这里需要特别提到的一点是,绑定名称是您在绑定接口中指定的。例如,在上面的应用程序中,由于我们使用的是 KafkaStreamsProcessor
,因此绑定名称为 input
和 output
。绑定属性需要使用这些名称。例如 spring.cloud.stream.bindings.input.destination
、spring.cloud.stream.bindings.output.destination
等。请记住,这与函数式风格有根本的不同,因为在函数式风格中,绑定器会为应用程序生成绑定名称。这是因为应用程序在使用 EnableBinding
的函数模型中没有提供任何绑定接口。
以下是一个具有两个输入的接收器的另一个示例。
@EnableBinding(KStreamKTableBinding.class)
.....
.....
@StreamListener
public void process(@Input("inputStream") KStream<String, PlayEvent> playEvents,
@Input("inputTable") KTable<Long, Song> songTable) {
....
....
}
interface KStreamKTableBinding {
@Input("inputStream")
KStream<?, ?> inputStream();
@Input("inputTable")
KTable<?, ?> inputTable();
}
以下是与上面看到的基于 BiFunction
的处理器相同的 StreamListener
等效代码。
@EnableBinding(KStreamKTableBinding.class)
....
....
@StreamListener
@SendTo("output")
public KStream<String, Long> process(@Input("input") KStream<String, Long> userClicksStream,
@Input("inputTable") KTable<String, String> userRegionsTable) {
....
....
}
interface KStreamKTableBinding extends KafkaStreamsProcessor {
@Input("inputX")
KTable<?, ?> inputTable();
}
最后,以下是具有三个输入和柯里化函数的应用程序的 StreamListener
等效代码。
@EnableBinding(CustomGlobalKTableProcessor.class)
...
...
@StreamListener
@SendTo("output")
public KStream<Long, EnrichedOrder> process(
@Input("input-1") KStream<Long, Order> ordersStream,
@Input("input-2") GlobalKTable<Long, Customer> customers,
@Input("input-3") GlobalKTable<Long, Product> products) {
KStream<Long, CustomerOrder> customerOrdersStream = ordersStream.join(
customers, (orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order));
return customerOrdersStream.join(products,
(orderId, customerOrder) -> customerOrder.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
});
}
interface CustomGlobalKTableProcessor {
@Input("input-1")
KStream<?, ?> input1();
@Input("input-2")
GlobalKTable<?, ?> input2();
@Input("input-3")
GlobalKTable<?, ?> input3();
@Output("output")
KStream<?, ?> output();
}
您可能会注意到上面两个示例更加冗长,因为除了提供EnableBinding
之外,您还需要编写自己的自定义绑定接口。使用函数模型,您可以避免所有这些仪式细节。
在我们继续研究 Kafka Streams 绑定器提供的通用编程模型之前,以下是多个输出绑定的StreamListener
版本。
EnableBinding(KStreamProcessorWithBranches.class)
public static class WordCountProcessorApplication {
@Autowired
private TimeWindows timeWindows;
@StreamListener("input")
@SendTo({"output1","output2","output3"})
public KStream<?, WordCount>[] process(KStream<Object, String> input) {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(timeWindows)
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))
.branch(isEnglish, isFrench, isSpanish);
}
interface KStreamProcessorWithBranches {
@Input("input")
KStream<?, ?> input();
@Output("output1")
KStream<?, ?> output1();
@Output("output2")
KStream<?, ?> output2();
@Output("output3")
KStream<?, ?> output3();
}
}
回顾一下,我们已经回顾了使用 Kafka Streams 绑定器时的各种编程模型选择。
绑定器为输入上的KStream
、KTable
和GlobalKTable
提供绑定功能。KTable
和GlobalKTable
绑定仅在输入上可用。绑定器支持KStream
的输入和输出绑定。
Kafka Streams 绑定器编程模型的优点是,绑定器为您提供了使用完全函数式编程模型或使用基于StreamListener
的命令式方法的灵活性。
2.4. 编程模型的辅助工具
2.4.1. 单个应用程序中的多个 Kafka Streams 处理器
绑定器允许在单个 Spring Cloud Stream 应用程序中拥有多个 Kafka Streams 处理器。您可以拥有以下应用程序。
@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
...
}
在这种情况下,绑定器将创建 3 个具有不同应用程序 ID 的独立 Kafka Streams 对象(下面将详细介绍)。但是,如果您在应用程序中有多个处理器,则必须告诉 Spring Cloud Stream 哪些函数需要激活。以下是如何激活函数。
spring.cloud.stream.function.definition: process;anotherProcess;yetAnotherProcess
如果您希望某些函数不要立即激活,可以从该列表中删除它们。
当您只有一个 Kafka Streams 处理器和其他类型的Function
bean 在同一个应用程序中时,这也适用,该应用程序通过不同的绑定器处理(例如,基于常规 Kafka 消息通道绑定器的函数 bean)。
2.4.2. Kafka Streams 应用程序 ID
应用程序 ID 是您需要为 Kafka Streams 应用程序提供的必填属性。Spring Cloud Stream Kafka Streams 绑定器允许您通过多种方式配置此应用程序 ID。
如果您在应用程序中只有一个处理器或StreamListener
,那么您可以使用以下属性在绑定器级别设置它。
spring.cloud.stream.kafka.streams.binder.applicationId
.
为了方便起见,如果您只有一个处理器,您也可以使用spring.application.name
作为属性来委托应用程序 ID。
如果您在应用程序中有多个 Kafka Streams 处理器,那么您需要为每个处理器设置应用程序 ID。在函数模型的情况下,您可以将其作为属性附加到每个函数。
例如,假设您有以下函数。
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
...
}
和
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
然后,您可以使用以下绑定器级别属性为每个应用程序设置应用程序 ID。
spring.cloud.stream.kafka.streams.binder.functions.process.applicationId
和
spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId
在 StreamListener
的情况下,您需要在处理器的第一个输入绑定上设置此属性。
例如,假设您有两个基于 StreamListener
的处理器。
@StreamListener
@SendTo("output")
public KStream<String, String> process(@Input("input") <KStream<Object, String>> input) {
...
}
@StreamListener
@SendTo("anotherOutput")
public KStream<String, String> anotherProcess(@Input("anotherInput") <KStream<Object, String>> input) {
...
}
然后,您必须使用以下绑定属性为此设置应用程序 ID。
spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId
和
spring.cloud.stream.kafka.streams.bindings.anotherInput.consumer.applicationId
对于基于函数的模型,这种在绑定级别设置应用程序 ID 的方法也适用。但是,如果您使用的是函数模型,则在绑定器级别按函数设置应用程序 ID 会更容易,如上所述。
对于生产部署,强烈建议通过配置显式指定应用程序 ID。如果您要自动扩展应用程序,这一点尤其重要,在这种情况下,您需要确保每个实例都使用相同的应用程序 ID 部署。
如果应用程序未提供应用程序 ID,则绑定器将为您自动生成一个静态应用程序 ID。这在开发场景中很方便,因为它避免了显式提供应用程序 ID 的需要。以这种方式生成的应用程序 ID 在应用程序重启时将保持静态。在函数模型中,生成的应用程序 ID 将是函数 bean 名称后跟文字 applicationID
,例如,如果 process
是函数 bean 名称,则为 process-applicationID
。在 StreamListener
的情况下,生成的应用程序 ID 将使用包含类名后跟方法名后跟文字 applicationId
,而不是使用函数 bean 名称。
设置应用程序 ID 总结
-
默认情况下,绑定器将为每个函数或
StreamListener
方法自动生成应用程序 ID。 -
如果您只有一个处理器,则可以使用
spring.kafka.streams.applicationId
、spring.application.name
或spring.cloud.stream.kafka.streams.binder.applicationId
。 -
如果您有多个处理器,则可以使用属性
spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId
为每个函数设置应用程序 ID。在StreamListener
的情况下,可以使用spring.cloud.stream.kafka.streams.bindings.input.applicationId
来完成此操作,假设输入绑定名称为input
。
2.4.3. 使用函数式风格覆盖绑定器生成的默认绑定名称
默认情况下,绑定器在使用函数式风格时,会使用上面讨论的策略来生成绑定名称,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0、process-out-0 等。如果您想覆盖这些绑定名称,可以通过指定以下属性来实现。
spring.cloud.stream.function.bindings.<default binding name>
。默认绑定名称是绑定器生成的原始绑定名称。
例如,假设您有以下函数。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
绑定器将生成名称为 process-in-0
、process-in-1
和 process-out-0
的绑定。现在,如果您想将它们更改为完全不同的名称,也许是更具领域特色的绑定名称,那么您可以按照以下步骤进行操作。
spring.cloud.stream.function.bindings.process-in-0=users
spring.cloud.stream.function.bindings.process-in-0=regions
和
spring.cloud.stream.function.bindings.process-out-0=clicks
之后,您必须在这些新的绑定名称上设置所有绑定级别的属性。
请记住,在上面描述的函数式编程模型中,在大多数情况下,遵循默认绑定名称是有意义的。您可能仍然想要进行这种覆盖的唯一原因是,当您有大量配置属性时,您希望将绑定映射到更具领域友好的名称。
2.4.4. 设置引导服务器配置
在运行 Kafka Streams 应用程序时,您必须提供 Kafka 代理服务器信息。如果您没有提供此信息,绑定器会期望您在默认的 localhost:9092
上运行代理。如果不是这种情况,则需要覆盖它。有几种方法可以做到这一点。
-
使用启动属性 -
spring.kafka.bootstrapServers
-
绑定器级别属性 -
spring.cloud.stream.kafka.streams.binder.brokers
当涉及到绑定器级别属性时,无论您使用的是通过常规 Kafka 绑定器提供的代理属性 - spring.cloud.stream.kafka.binder.brokers
还是 Kafka Streams 绑定器,都没有关系。Kafka Streams 绑定器将首先检查是否设置了 Kafka Streams 绑定器特定的代理属性 (spring.cloud.stream.kafka.streams.binder.brokers
),如果未找到,则会查找 spring.cloud.stream.kafka.binder.brokers
。
2.5. 记录序列化和反序列化
Kafka Streams 绑定器允许您以两种方式序列化和反序列化记录。一种是 Kafka 提供的原生序列化和反序列化功能,另一种是 Spring Cloud Stream 框架的消息转换功能。让我们看看一些细节。
2.5.1. 入站反序列化
键始终使用原生 Serdes 反序列化。
对于值,默认情况下,Kafka 会在入站进行本机反序列化。请注意,这与之前版本的 Kafka Streams 绑定器中的默认行为有重大变化,在之前版本中,反序列化是由框架完成的。
Kafka Streams 绑定器将尝试通过查看 `java.util.function.Function|Consumer` 或 `StreamListener` 的类型签名来推断匹配的 `Serde` 类型。以下是它匹配 Serdes 的顺序。
-
如果应用程序提供 `Serde` 类型的 bean,并且返回类型使用传入键或值的实际类型进行参数化,那么它将使用该 `Serde` 进行入站反序列化。例如,如果您在应用程序中具有以下内容,则绑定器会检测到 `KStream` 的传入值类型与使用 `Serde` bean 参数化的类型匹配。它将使用它进行入站反序列化。
@Bean
public Serde<Foo() customSerde{
...
}
@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
-
接下来,它会查看类型,并查看它们是否为 Kafka Streams 公开的类型之一。如果是,则使用它们。以下是绑定器将尝试从 Kafka Streams 匹配的 Serde 类型。
Integer, Long, Short, Double, Float, byte[], UUID and String.
-
如果 Kafka Streams 提供的任何 Serdes 都不匹配类型,那么它将使用 Spring Kafka 提供的 JsonSerde。在这种情况下,绑定器假设类型是 JSON 友好的。如果您有多个值对象作为输入,这将很有用,因为绑定器将在内部将它们推断为正确的 Java 类型。但在回退到 `JsonSerde` 之前,绑定器会检查 Kafka Streams 配置中设置的默认 `Serde`,以查看它是否是一个可以与传入 KStream 的类型匹配的 `Serde`。
如果上述策略均无效,则应用程序必须通过配置提供 `Serde`。这可以通过两种方式配置 - 绑定或默认。
首先,绑定器将查看是否在绑定级别提供了 `Serde`。例如,如果您有以下处理器,
@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
那么,您可以使用以下方法提供绑定级别 `Serde`
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果您按输入绑定提供上述 `Serde`,那么它将具有更高的优先级,并且绑定器将避免任何 `Serde` 推断。 |
如果您希望在入站反序列化时使用默认的键/值 Serdes,您可以在绑定器级别进行设置。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您不想使用 Kafka 提供的原生解码,您可以依靠 Spring Cloud Stream 提供的消息转换功能。由于原生解码是默认设置,为了让 Spring Cloud Stream 反序列化入站值对象,您需要显式禁用原生解码。
例如,如果您有与上面相同的 BiFunction 处理器,那么 spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false
您需要为所有输入单独禁用原生解码。否则,对于未禁用的输入,原生解码仍然会应用。
默认情况下,Spring Cloud Stream 将使用 application/json
作为内容类型,并使用相应的 json 消息转换器。您可以使用以下属性和相应的 MessageConverter
bean 来使用自定义消息转换器。
spring.cloud.stream.bindings.process-in-0.contentType
2.5.2. 出站序列化
出站序列化基本上遵循与入站反序列化相同的规则。与入站反序列化一样,Spring Cloud Stream 的先前版本的一个主要变化是,出站的序列化由 Kafka 本地处理。在 3.0 版本之前的绑定器中,这是由框架本身完成的。
出站的键始终由 Kafka 使用匹配的 Serde
进行序列化,该 Serde
由绑定器推断。如果它无法推断键的类型,则需要使用配置指定。
值 Serdes 使用与入站反序列化相同的规则进行推断。首先,它匹配以查看出站类型是否来自应用程序中提供的 bean。如果没有,它会检查它是否与 Kafka 公开的 Serde
匹配,例如 - Integer
、Long
、Short
、Double
、Float
、byte[]
、UUID
和 String
。如果这不起作用,它将回退到 Spring Kafka 项目提供的 JsonSerde
,但首先查看默认的 Serde
配置以查看是否有匹配项。请记住,所有这些都对应用程序透明。如果这些都不起作用,则用户必须通过配置提供要使用的 Serde
。
假设您使用的是与上面相同的 BiFunction
处理器。然后,您可以配置出站键/值 Serdes,如下所示。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果 Serde 推断失败,并且没有提供绑定级别 Serdes,则绑定器将回退到 JsonSerde
,但会查看默认 Serdes 以进行匹配。
默认 Serdes 的配置方式与上面描述的反序列化中的配置方式相同。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您的应用程序使用分支功能并具有多个输出绑定,则必须为每个绑定配置这些绑定。同样,如果绑定程序能够推断出Serde
类型,则无需进行此配置。
如果您不想使用 Kafka 提供的原生编码,而是想使用框架提供的消息转换,则需要显式禁用原生编码,因为原生编码是默认设置。例如,如果您具有与上述相同的 BiFunction 处理器,则spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false
。在分支的情况下,您需要为所有输出单独禁用原生编码。否则,原生编码仍将应用于您未禁用的那些输出。
当 Spring Cloud Stream 完成转换时,默认情况下,它将使用application/json
作为内容类型,并使用相应的 json 消息转换器。您可以使用以下属性和相应的MessageConverter
bean 来使用自定义消息转换器。
spring.cloud.stream.bindings.process-out-0.contentType
当禁用原生编码/解码时,绑定程序不会像原生 Serdes 那样进行任何推断。应用程序需要显式提供所有配置选项。因此,通常建议使用默认的序列化/反序列化选项,并在编写 Spring Cloud Stream Kafka Streams 应用程序时坚持使用 Kafka Streams 提供的原生序列化/反序列化。您必须使用框架提供的消息转换功能的一种情况是,当您的上游生产者使用特定的序列化策略时。在这种情况下,您希望使用匹配的反序列化策略,因为原生机制可能会失败。当依赖于默认的Serde
机制时,应用程序必须确保绑定程序具有正确映射入站和出站的正确方法,并使用适当的Serde
,否则可能会失败。
值得一提的是,上面概述的数据序列化/反序列化方法仅适用于处理器的边缘,即入站和出站。您的业务逻辑可能仍然需要调用显式需要Serde
对象的 Kafka Streams API。这些仍然是应用程序的责任,必须由开发人员相应地处理。
2.6. 错误处理
Apache Kafka Streams 提供了原生处理反序列化错误异常的能力。有关此支持的详细信息,请参阅 此处。默认情况下,Apache Kafka Streams 提供两种反序列化异常处理程序 - LogAndContinueExceptionHandler
和 LogAndFailExceptionHandler
。顾名思义,前者将记录错误并继续处理下一条记录,而后者将记录错误并失败。LogAndFailExceptionHandler
是默认的反序列化异常处理程序。
2.6.1. 处理绑定器中的反序列化异常
Kafka Streams 绑定器允许使用以下属性指定上述反序列化异常处理程序。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
或
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail
除了上述两种反序列化异常处理程序之外,绑定器还提供第三种处理程序,用于将错误记录(毒丸)发送到 DLQ(死信队列)主题。以下是启用此 DLQ 异常处理程序的方法。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq
设置上述属性后,所有反序列化错误的记录将自动发送到 DLQ 主题。
您可以如下设置发布 DLQ 消息的主题名称。
您可以为 DlqDestinationResolver
提供实现,它是一个函数式接口。DlqDestinationResolver
以 ConsumerRecord
和异常作为输入,然后允许指定主题名称作为输出。通过访问 Kafka ConsumerRecord
,可以在 BiFunction
的实现中内省标头记录。
以下是如何为 DlqDestinationResolver
提供实现的示例。
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
在为 DlqDestinationResolver
提供实现时,需要注意的一点是,绑定器中的供应器不会自动为应用程序创建主题。这是因为绑定器无法推断实现可能发送到的所有 DLQ 主题的名称。因此,如果您使用此策略提供 DLQ 名称,则应用程序有责任确保这些主题事先创建。
如果 DlqDestinationResolver
作为 bean 存在于应用程序中,则它具有更高的优先级。如果您不想遵循这种方法,而是使用配置提供静态 DLQ 名称,则可以设置以下属性。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)
如果设置了此属性,则错误记录将发送到主题 custom-dlq
。如果应用程序没有使用上述任何策略,则它将创建一个名为 error.<input-topic-name>.<application-id>
的 DLQ 主题。例如,如果您的绑定的目标主题是 inputTopic
,应用程序 ID 是 process-applicationId
,则默认 DLQ 主题是 error.inputTopic.process-applicationId
。如果您的意图是启用 DLQ,始终建议为每个输入绑定显式创建一个 DLQ 主题。
2.6.2. 每个输入消费者绑定的 DLQ
属性 spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
适用于整个应用程序。这意味着,如果同一个应用程序中有多个函数或 StreamListener
方法,则此属性将应用于所有这些方法。但是,如果您在一个处理器中有多个处理器或多个输入绑定,则可以使用绑定器为每个输入消费者绑定提供的更细粒度的 DLQ 控制。
如果您有以下处理器,
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
并且您只想在第一个输入绑定上启用 DLQ,并在第二个绑定上启用 logAndSkip,则可以在消费者上执行以下操作。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: logAndSkip
以这种方式设置反序列化异常处理程序比在绑定器级别设置具有更高的优先级。
2.6.3. DLQ 分区
默认情况下,记录使用与原始记录相同的分区发布到死信主题。这意味着死信主题必须至少与原始记录具有相同数量的分区。
要更改此行为,请将 DlqPartitionFunction
实现作为 @Bean
添加到应用程序上下文。只能存在一个这样的 bean。该函数将提供消费者组(在大多数情况下与应用程序 ID 相同)、失败的 ConsumerRecord
和异常。例如,如果您始终希望路由到分区 0,则可以使用
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果您将消费者绑定的 dlqPartitions 属性设置为 1(并且绑定器的 minPartitionCount 等于 1 ),则无需提供 DlqPartitionFunction ;框架将始终使用分区 0。如果您将消费者绑定的 dlqPartitions 属性设置为大于 1 的值(或绑定器的 minPartitionCount 大于 1 ),则 **必须** 提供 DlqPartitionFunction Bean,即使分区计数与原始主题的相同。
|
在使用 Kafka Streams 绑定器中的异常处理功能时,请牢记以下几点。
-
属性
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
适用于整个应用程序。这意味着,如果同一个应用程序中有多个函数或StreamListener
方法,则此属性将应用于所有这些方法。 -
反序列化异常处理与本机反序列化和框架提供的消息转换一致。
2.6.4. 处理绑定器中的生产异常
与上面描述的反序列化异常处理程序支持不同,绑定器没有为处理生产异常提供此类一流机制。但是,您仍然可以使用 StreamsBuilderFactoryBean
自定义程序配置生产异常处理程序,您可以在下面后续部分中找到有关此自定义程序的更多详细信息。
2.7. 重试关键业务逻辑
在某些情况下,您可能希望重试对应用程序至关重要的业务逻辑部分。可能存在对关系数据库的外部调用,或者从 Kafka Streams 处理器调用 REST 端点。这些调用可能由于各种原因而失败,例如网络问题或远程服务不可用。更常见的是,如果您可以再次尝试这些失败,它们可能会自行解决。默认情况下,Kafka Streams 绑定器为所有输入绑定创建 RetryTemplate
bean。
如果函数具有以下签名,
@Bean
public java.util.function.Consumer<KStream<Object, String>> process()
并且使用默认绑定名称,则 RetryTemplate
将注册为 process-in-0-RetryTemplate
。这遵循绑定名称 (process-in-0
) 后跟文字 -RetryTemplate
的约定。在多个输入绑定情况下,每个绑定将提供一个单独的 RetryTemplate
bean。如果应用程序中存在自定义 RetryTemplate
bean,并且通过 spring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName
提供,则该 bean 优先于任何输入绑定级别重试模板配置属性。
将绑定中的 RetryTemplate
注入应用程序后,可以使用它来重试应用程序的任何关键部分。以下是一个示例
@Bean
public java.util.function.Consumer<KStream<Object, String>> process(@Lazy @Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
retryTemplate.execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
或者,您可以使用以下自定义 RetryTemplate
。
@EnableAutoConfiguration
public static class CustomRetryTemplateApp {
@Bean
@StreamRetryTemplate
RetryTemplate fooRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
RetryPolicy retryPolicy = new SimpleRetryPolicy(4);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1);
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
fooRetryTemplate().execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
}
请注意,当重试次数用尽时,默认情况下,将抛出最后一个异常,导致处理器终止。如果您希望处理异常并继续处理,可以在 execute
方法中添加一个 RecoveryCallback:以下是一个示例。
retryTemplate.execute(context -> {
//Critical business logic goes here.
}, context -> {
//Recovery logic goes here.
return null;
));
有关 RetryTemplate、重试策略、退避策略等的更多信息,请参阅 Spring Retry 项目。
2.8. 状态存储
当使用高级 DSL 并进行适当的调用以触发状态存储时,Kafka Streams 会自动创建状态存储。
如果您想将传入的 KTable
绑定作为命名状态存储进行物化,则可以使用以下策略。
假设您有以下函数。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
然后通过设置以下属性,传入的 KTable
数据将被物化到命名状态存储中。
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store
您可以在应用程序中将自定义状态存储定义为 bean,这些 bean 将被绑定器检测到并添加到 Kafka Streams 构建器中。特别是当使用处理器 API 时,您需要手动注册状态存储。为此,您可以在应用程序中创建一个 StateStore 作为 bean。以下是如何定义此类 bean 的示例。
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}
然后,应用程序可以直接访问这些状态存储。
在引导过程中,绑定器将处理上述 bean 并将其传递给 Streams 构建器对象。
访问状态存储
Processor<Object, Product>() {
WindowStore<Object, String> state;
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore)processorContext.getStateStore("mystate");
}
...
}
这在注册全局状态存储时将不起作用。要注册全局状态存储,请参阅下面关于自定义 StreamsBuilderFactoryBean
的部分。
2.9. 交互式查询
Kafka Streams 绑定器 API 公开了一个名为 InteractiveQueryService
的类,用于交互式查询状态存储。您可以在应用程序中将其作为 Spring bean 访问。从应用程序访问此 bean 的一种简单方法是 autowire
该 bean。
@Autowired
private InteractiveQueryService interactiveQueryService;
获得对该 bean 的访问权限后,您就可以查询您感兴趣的特定状态存储。见下文。
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
在启动过程中,上述检索存储的方法调用可能会失败。例如,它可能仍在初始化状态存储的中间。在这种情况下,重试此操作将很有用。Kafka Streams 绑定器提供了一个简单的重试机制来适应这种情况。
以下是您可以用来控制此重试的两个属性。
-
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 默认值为
1
。 -
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 默认值为
1000
毫秒。
如果有多个 Kafka Streams 应用程序实例正在运行,那么在您能够交互式地查询它们之前,您需要确定哪个应用程序实例托管了您要查询的特定键。InteractiveQueryService
API 提供了用于识别主机信息的方法。
为了使此功能正常工作,您必须按如下所示配置属性 application.server
spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
以下是一些代码片段
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}
2.9.1. 通过 InteractiveQueryService 可用的其他 API 方法
使用以下 API 方法检索与给定存储和键的组合关联的 KeyQueryMetadata
对象。
public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)
使用以下 API 方法检索与给定存储和键的组合关联的 KakfaStreams
对象。
public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)
2.10. 健康指标
健康指标需要依赖项 spring-boot-starter-actuator
。对于 Maven,请使用
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
Spring Cloud Stream Kafka Streams Binder 提供了一个健康指标来检查底层流线程的状态。Spring Cloud Stream 定义了一个属性 management.health.binders.enabled
来启用健康指标。请参阅 Spring Cloud Stream 文档。
健康指标为每个流线程的元数据提供以下详细信息
-
线程名称
-
线程状态:
CREATED
、RUNNING
、PARTITIONS_REVOKED
、PARTITIONS_ASSIGNED
、PENDING_SHUTDOWN
或DEAD
-
活动任务:任务 ID 和分区
-
待机任务:任务 ID 和分区
默认情况下,只有全局状态可见(UP
或 DOWN
)。要显示详细信息,属性 management.endpoint.health.show-details
必须设置为 ALWAYS
或 WHEN_AUTHORIZED
。有关健康信息的更多详细信息,请参阅 Spring Boot Actuator 文档。
如果所有注册的 Kafka 线程都处于 RUNNING 状态,则健康指标的状态为 UP 。
|
由于 Kafka Streams 绑定器中存在三个独立的绑定器(KStream
、KTable
和 GlobalKTable
),因此它们都将报告健康状态。启用 show-details
时,报告的一些信息可能是冗余的。
当同一个应用程序中存在多个 Kafka Streams 处理器时,将为所有处理器报告健康检查,并将按 Kafka Streams 的应用程序 ID 进行分类。
2.11. 访问 Kafka Streams 指标
Spring Cloud Stream Kafka Streams 绑定器提供 Kafka Streams 指标,这些指标可以通过 Micrometer 的 MeterRegistry
导出。
对于 Spring Boot 2.2.x 版本,指标支持由绑定器通过自定义 Micrometer 指标实现提供。对于 Spring Boot 2.3.x 版本,Kafka Streams 指标支持通过 Micrometer 原生提供。
当通过 Boot 执行器端点访问指标时,请确保将 metrics
添加到属性 management.endpoints.web.exposure.include
中。然后,您可以访问 /acutator/metrics
获取所有可用指标的列表,这些指标可以通过相同的 URI (/actuator/metrics/<metric-name>
) 单独访问。
2.12. 混合使用高级 DSL 和低级 Processor API
Kafka Streams 提供两种 API 变体。它有一个更高级别的 DSL 类似 API,您可以在其中链接各种操作,这些操作可能对许多函数式程序员来说很熟悉。Kafka Streams 还提供对低级处理器 API 的访问。处理器 API 虽然功能强大,并且能够在更低级别控制事物,但本质上是命令式的。Spring Cloud Stream 的 Kafka Streams 绑定器允许您使用高级别 DSL 或混合使用 DSL 和处理器 API。混合使用这两种变体为您提供了许多选项来控制应用程序中的各种用例。应用程序可以使用 transform
或 process
方法 API 调用来访问处理器 API。
以下是如何在使用 process
API 的 Spring Cloud Stream 应用程序中组合使用 DSL 和处理器 API 的示例。
@Bean
public Consumer<KStream<Object, String>> process() {
return input ->
input.process(() -> new Processor<Object, String>() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, String value) {
//business logic
}
@Override
public void close() {
});
}
以下是如何使用 transform
API 的示例。
@Bean
public Consumer<KStream<Object, String>> process() {
return (input, a) ->
input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void close() {
}
@Override
public KeyValue<Object, String> transform(Object key, String value) {
// business logic - return transformed KStream;
}
});
}
process
API 方法调用是一个终止操作,而 transform
API 是非终止操作,它为您提供了一个可能已转换的 KStream
,您可以使用它继续使用 DSL 或处理器 API 进行进一步处理。
2.13. 出站的分区支持
Kafka Streams 处理器通常将处理后的输出发送到传出的 Kafka 主题。如果传出的主题已分区,并且处理器需要将传出的数据发送到特定分区,则应用程序需要提供一个类型为 StreamPartitioner
的 bean。有关更多详细信息,请参阅 StreamPartitioner。让我们看一些例子。
这是我们已经多次见过的同一个处理器,
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
...
}
这是输出绑定目标
spring.cloud.stream.bindings.process-out-0.destination: outputTopic
如果主题 outputTopic
有 4 个分区,如果您没有提供分区策略,Kafka Streams 将使用默认分区策略,这可能不是您想要的,具体取决于特定的用例。假设您想将与 spring
匹配的任何键发送到分区 0,将 cloud
发送到分区 1,将 stream
发送到分区 2,并将所有其他内容发送到分区 3。您需要在应用程序中执行以下操作。
@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
return (t, k, v, n) -> {
if (k.equals("spring")) {
return 0;
}
else if (k.equals("cloud")) {
return 1;
}
else if (k.equals("stream")) {
return 2;
}
else {
return 3;
}
};
}
这是一个基本的实现,但是您可以访问记录的键值对、主题名称和分区总数。因此,如果需要,您可以实现复杂的划分策略。
您还需要将此 Bean 名称与应用程序配置一起提供。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner
应用程序中的每个输出主题都需要像这样单独配置。
2.14. StreamsBuilderFactoryBean 自定义器
通常需要自定义创建 KafkaStreams
对象的 StreamsBuilderFactoryBean
。根据 Spring Kafka 提供的基础支持,绑定器允许您自定义 StreamsBuilderFactoryBean
。您可以使用 StreamsBuilderFactoryBeanCustomizer
自定义 StreamsBuilderFactoryBean
本身。然后,一旦您通过此自定义器访问 StreamsBuilderFactoryBean
,就可以使用 KafkaStreamsCustomzier
自定义相应的 KafkaStreams
。这两个自定义器都是 Spring for Apache Kafka 项目的一部分。
以下是如何使用 StreamsBuilderFactoryBeanCustomizer
的示例。
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
以上只是为了说明您可以对 StreamsBuilderFactoryBean
进行的自定义操作。您基本上可以调用 StreamsBuilderFactoryBean
中的任何可用变异操作来对其进行自定义。此自定义器将在工厂 Bean 启动之前由绑定器调用。
一旦您访问了 StreamsBuilderFactoryBean
,您还可以自定义底层的 KafkaStreams
对象。以下是如何操作的蓝图。
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
KafkaStreamsCustomizer
将在底层 KafkaStreams
启动之前由 StreamsBuilderFactoryBeabn
调用。
整个应用程序中只能有一个 StreamsBuilderFactoryBeanCustomizer
。那么我们如何处理多个 Kafka Streams 处理器,因为它们中的每一个都由单独的 StreamsBuilderFactoryBean
对象支持?在这种情况下,如果需要对这些处理器进行不同的自定义,那么应用程序需要根据应用程序 ID 应用一些过滤器。
例如,
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
}
};
2.14.1. 使用自定义器注册全局状态存储
如上所述,绑定器没有提供注册全局状态存储作为功能的第一类方法。为此,您需要使用自定义器。以下是如何操作。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
try {
final StreamsBuilder streamsBuilder = fb.getObject();
streamsBuilder.addGlobalStore(...);
}
catch (Exception e) {
}
};
}
再次,如果您有多个处理器,您需要通过使用上面概述的应用程序 ID 过滤掉其他 StreamsBuilderFactoryBean
对象,将全局状态存储附加到正确的 StreamsBuilder
上。
2.14.2. 使用自定义器注册生产异常处理程序
在错误处理部分,我们指出绑定器没有提供处理生产异常的一流方法。虽然情况如此,您仍然可以使用 StreamsBuilderFacotryBean
自定义器来注册生产异常处理程序。见下文。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class);
};
}
再次,如果您有多个处理器,您可能希望将其适当地设置在正确的 StreamsBuilderFactoryBean
上。您也可以使用配置属性添加此类生产异常处理程序(有关详细信息,请参见下文),但这是一种选择,如果您选择使用编程方法。
2.15. 时间戳提取器
Kafka Streams 允许您根据各种时间戳概念来控制对消费者记录的处理。默认情况下,Kafka Streams 提取嵌入在消费者记录中的时间戳元数据。您可以通过为每个输入绑定提供不同的 TimestampExtractor
实现来更改此默认行为。以下是如何执行此操作的一些详细信息。
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
return orderStream ->
customers ->
products -> orderStream;
}
@Bean
public TimestampExtractor timestampExtractor() {
return new WallclockTimestampExtractor();
}
然后,您为每个消费者绑定设置上述 TimestampExtractor
bean 名称。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"
如果您跳过输入消费者绑定以设置自定义时间戳提取器,则该消费者将使用默认设置。
2.16. 基于 Kafka Streams 的绑定器和常规 Kafka 绑定器的多绑定器
您可以拥有一个应用程序,其中既有基于常规 Kafka 绑定器的函数/消费者/供应商,也有基于 Kafka Streams 的处理器。但是,您不能将它们混合在一个函数或消费者中。
以下是一个示例,其中您在同一个应用程序中同时拥有基于绑定器的组件。
@Bean
public Function<String, String> process() {
return s -> s;
}
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {
return input -> input;
}
这是来自配置的相关部分
spring.cloud.stream.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
如果您有与上述相同的应用程序,但处理两个不同的 Kafka 集群,例如,常规的 process
同时作用于 Kafka 集群 1 和集群 2(从集群 1 接收数据并发送到集群 2),而 Kafka Streams 处理器作用于 Kafka 集群 2,那么事情就会变得更加复杂。然后您必须使用 Spring Cloud Stream 提供的 多绑定器 功能。
以下是该场景中您的配置可能发生的变化。
# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.function.definition=process;kstreamProcess
# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2
# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3
请注意上述配置。我们有两种类型的绑定器,但总共有 3 个绑定器,第一个是基于集群 1 的常规 Kafka 绑定器(kafka1
),然后是基于集群 2 的另一个 Kafka 绑定器(kafka2
),最后是 kstream
绑定器(kafka3
)。应用程序中的第一个处理器从 kafka1
接收数据并发布到 kafka2
,其中两个绑定器都基于常规 Kafka 绑定器,但集群不同。第二个处理器是一个 Kafka Streams 处理器,它从 kafka3
消费数据,kafka3
与 kafka2
在同一个集群中,但绑定器类型不同。
由于 Kafka Streams 绑定器系列中有三种不同的绑定器类型 - kstream
、ktable
和 globalktable
- 如果您的应用程序有多个基于这些绑定器中的任何一个的绑定,则需要明确提供绑定器类型。
例如,如果您有以下处理器,
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
...
}
那么,这必须在多绑定器场景中配置如下。请注意,这仅在您拥有真正的多绑定器场景时才需要,在这种场景中,多个处理器处理单个应用程序中的多个集群。在这种情况下,需要明确地为绑定器提供绑定,以区分其他处理器的绑定器类型和集群。
spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1 #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2 #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3 #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream
# rest of the configuration is omitted.
2.17. 状态清理
默认情况下,Kafkastreams.cleanup()
方法在绑定停止时被调用。请参阅 Spring Kafka 文档。要修改此行为,只需在应用程序上下文中添加一个 CleanupConfig
@Bean
(配置为在启动、停止或两者都不执行清理)即可;该 bean 将被检测到并连接到工厂 bean。
2.18. Kafka Streams 拓扑可视化
Kafka Streams 绑定器提供以下执行器端点,用于检索拓扑描述,您可以使用这些描述通过外部工具可视化拓扑。
/actuator/kafkastreamstopology
/actuator/kafkastreamstopology/<application-id of the processor>
您需要从 Spring Boot 包含执行器和 Web 依赖项才能访问这些端点。此外,您还需要将 kafkastreamstopology
添加到 management.endpoints.web.exposure.include
属性中。默认情况下,kafkastreamstopology
端点处于禁用状态。
2.19. 配置选项
本节包含 Kafka Streams 绑定器使用的配置选项。
有关与绑定器相关的常见配置选项和属性,请参阅 核心文档。
2.19.1. Kafka Streams 绑定器属性
以下属性在绑定器级别可用,必须以 spring.cloud.stream.kafka.streams.binder.
为前缀。
- configuration
-
包含与 Apache Kafka Streams API 相关的属性的键值对映射。此属性必须以
spring.cloud.stream.kafka.streams.binder.
为前缀。以下是使用此属性的一些示例。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
有关可能进入流配置的所有属性的更多信息,请参阅 Apache Kafka Streams 文档中的 StreamsConfig
JavaDocs。您可以从 StreamsConfig
设置的所有配置都可以通过此设置。使用此属性时,它适用于整个应用程序,因为这是一个绑定器级别属性。如果您的应用程序中有多个处理器,它们都将获取这些属性。对于 application.id
之类的属性,这将变得很麻烦,因此您必须仔细检查如何使用此绑定器级别 configuration
属性映射来自 StreamsConfig
的属性。
- functions.<function-bean-name>.applicationId
-
仅适用于函数式处理器。这可用于为应用程序中的每个函数设置应用程序 ID。在多个函数的情况下,这是一种方便的方式来设置应用程序 ID。
- functions.<function-bean-name>.configuration
-
仅适用于函数式处理器。包含与 Apache Kafka Streams API 相关的属性的键值对映射。这类似于上面描述的绑定器级别
configuration
属性,但此级别的configuration
属性仅限于命名函数。当您有多个处理器并且想要根据特定函数限制对配置的访问权限时,您可能需要使用此方法。所有StreamsConfig
属性都可以在此处使用。 - brokers
-
代理 URL
默认值:
localhost
- zkNodes
-
Zookeeper URL
默认值:
localhost
- deserializationExceptionHandler
-
反序列化错误处理程序类型。此处理程序在绑定器级别应用,因此应用于应用程序中的所有输入绑定。在消费者绑定级别,可以通过更细粒度的方式控制它。可能的值为 -
logAndContinue
、logAndFail
或sendToDlq
默认值:
logAndFail
- applicationId
-
在绑定器级别全局设置 Kafka Streams 应用程序的 application.id 的便捷方法。如果应用程序包含多个函数或
StreamListener
方法,则应分别设置 application id。请参阅上面的详细说明,其中讨论了设置 application id 的方法。默认值:应用程序将生成一个静态 application ID。有关更多详细信息,请参阅 application ID 部分。
- stateStoreRetry.maxAttempts
-
尝试连接到状态存储的最大次数。
默认值:1
- stateStoreRetry.backoffPeriod
-
重试时尝试连接到状态存储的回退时间段。
默认值:1000 毫秒
- consumerProperties
-
绑定器级别的任意消费者属性。
- producerProperties
-
绑定器级别的任意生产者属性。
2.19.2. Kafka Streams 生产者属性
以下属性仅适用于 Kafka Streams 生产者,并且必须以 spring.cloud.stream.kafka.streams.bindings.<绑定名称>.producer.
为前缀。为了方便起见,如果有多个输出绑定并且它们都需要一个公共值,则可以使用前缀 spring.cloud.stream.kafka.streams.default.producer.
进行配置。
- keySerde
-
要使用的键序列化器/反序列化器
默认值:请参阅上面关于消息序列化/反序列化的讨论
- valueSerde
-
要使用的值序列化器/反序列化器
默认值:请参阅上面关于消息序列化/反序列化的讨论
- useNativeEncoding
-
启用/禁用本机编码的标志
默认值:
true
。
streamPartitionerBeanName: 要在消费者处使用的自定义出站分区程序 bean 名称。应用程序可以提供自定义 StreamPartitioner
作为 Spring bean,并且可以将此 bean 的名称提供给生产者以代替默认分区程序。
+ 默认值:请参阅上面关于出站分区支持的讨论。
- producedAs
-
处理器输出到的接收器组件的自定义名称。
默认值:
none
(由 Kafka Streams 生成)
2.19.3. Kafka Streams 消费者属性
以下属性适用于 Kafka Streams 消费者,必须以 spring.cloud.stream.kafka.streams.bindings.<绑定名称>.consumer.
为前缀。为了方便起见,如果有多个输入绑定并且它们都需要一个公共值,则可以使用前缀 spring.cloud.stream.kafka.streams.default.consumer.
进行配置。
- applicationId
-
为每个输入绑定设置 application.id。这仅适用于基于
StreamListener
的处理器,对于基于函数的处理器,请参阅上面概述的其他方法。默认值:见上文。
- keySerde
-
要使用的键序列化器/反序列化器
默认值:请参阅上面关于消息序列化/反序列化的讨论
- valueSerde
-
要使用的值序列化器/反序列化器
默认值:请参阅上面关于消息序列化/反序列化的讨论
- materializedAs
-
使用传入的 KTable 类型时要物化的状态存储。
默认值:
none
。 - useNativeDecoding
-
启用/禁用原生解码的标志。
默认值:
true
。 - dlqName
-
DLQ 主题名称。
默认值:见上文关于错误处理和 DLQ 的讨论。
- startOffset
-
如果没有已提交的偏移量可供消费,则要从中开始的偏移量。这主要用于消费者第一次从主题中消费时。Kafka Streams 使用
earliest
作为默认策略,绑定程序使用相同的默认策略。可以使用此属性将其覆盖为latest
。默认值:
earliest
。
注意:在消费者上使用 resetOffsets
对 Kafka Streams 绑定程序没有影响。与基于消息通道的绑定程序不同,Kafka Streams 绑定程序不会按需寻求开始或结束。
- deserializationExceptionHandler
-
反序列化错误处理程序类型。此处理程序应用于每个消费者绑定,而不是之前描述的绑定程序级别属性。可能的值为 -
logAndContinue
、logAndFail
或sendToDlq
默认值:
logAndFail
- timestampExtractorBeanName
-
要在消费者处使用的特定时间戳提取器 bean 名称。应用程序可以提供
TimestampExtractor
作为 Spring bean,并且可以将此 bean 的名称提供给消费者以使用,而不是默认的 bean。默认值:见上文关于时间戳提取器的讨论。
- eventTypes
-
此绑定的支持事件类型的逗号分隔列表。
默认值:
none
- eventTypeHeaderKey
-
通过此绑定传入的每个记录上的事件类型标头键。
默认值:
event_type
- consumedAs
-
处理器从中消费的源组件的自定义名称。
默认值:
none
(由 Kafka Streams 生成)
2.19.4. 关于并发性的特别说明
在 Kafka Streams 中,您可以使用 num.stream.threads
属性控制处理器可以创建的线程数。您可以使用上面在绑定程序、函数、生产者或消费者级别描述的各种 configuration
选项来执行此操作。您还可以使用核心 Spring Cloud Stream 为此目的提供的 concurrency
属性。使用此属性时,您需要在消费者上使用它。当您在函数或 StreamListener
中有多个输入绑定时,请在第一个输入绑定上设置此属性。例如,当设置 spring.cloud.stream.bindings.process-in-0.consumer.concurrency
时,它将被绑定程序转换为 num.stream.threads
。如果您有多个处理器,并且一个处理器定义了绑定级别并发性,而其他处理器没有,那么那些没有绑定级别并发性的处理器将默认回退到通过 spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads
指定的绑定程序范围属性。如果此绑定程序配置不可用,则应用程序将使用 Kafka Streams 设置的默认值。
附录
附录 A: 构建
A.1. 基本编译和测试
要构建源代码,您需要安装 JDK 1.7。
构建使用 Maven 包装器,因此您无需安装特定版本的 Maven。要启用测试,您应该在构建之前运行 Kafka 服务器 0.9 或更高版本。有关运行服务器的更多信息,请参见下文。
主要的构建命令是
$ ./mvnw clean install
您也可以添加 '-DskipTests',以避免运行测试。
您也可以自己安装 Maven(>=3.3.3)并在以下示例中使用 mvn 命令代替 ./mvnw 。如果您这样做,您可能还需要添加 -P spring ,如果您的本地 Maven 设置不包含 spring 预发布工件的存储库声明。
|
请注意,您可能需要通过设置 MAVEN_OPTS 环境变量(值为 -Xmx512m -XX:MaxPermSize=128m )来增加可用于 Maven 的内存量。我们尝试在 .mvn 配置中涵盖这一点,因此如果您发现您必须这样做才能使构建成功,请提交工单以将设置添加到源代码控制。
|
需要中间件的项目通常包含一个 docker-compose.yml
,因此请考虑使用 Docker Compose 在 Docker 容器中运行中间件服务器。
A.2. 文档
有一个“完整”配置文件将生成文档。
A.3. 使用代码
如果您没有 IDE 偏好,我们建议您在使用代码时使用 Spring Tools Suite 或 Eclipse。我们使用 m2eclipe eclipse 插件来支持 maven。其他 IDE 和工具也应该可以正常工作。
A.3.1. 使用 m2eclipse 导入到 Eclipse
我们建议在使用 eclipse 时使用 m2eclipe eclipse 插件。如果您还没有安装 m2eclipse,它可以在“eclipse 市场”中找到。
不幸的是,m2e 还不支持 Maven 3.3,因此在将项目导入 Eclipse 后,您还需要告诉 m2eclipse 使用项目的 .settings.xml
文件。如果您不这样做,您可能会看到与项目中的 POM 相关的许多不同的错误。打开您的 Eclipse 首选项,展开 Maven 首选项,然后选择用户设置。在用户设置字段中,单击浏览并导航到您导入的 Spring Cloud 项目,选择该项目中的 .settings.xml
文件。单击应用,然后单击确定以保存首选项更改。
或者,您可以将存储库设置从 .settings.xml 复制到您自己的 ~/.m2/settings.xml 中。
|
A.3.2. 不使用 m2eclipse 导入到 Eclipse
如果您不想使用 m2eclipse,您可以使用以下命令生成 eclipse 项目元数据
$ ./mvnw eclipse:eclipse
生成的 Eclipse 项目可以通过从“文件”菜单中选择“导入现有项目”来导入。
[[contributing] == 贡献
Spring Cloud 在非限制性的 Apache 2.0 许可下发布,并遵循非常标准的 Github 开发流程,使用 Github 跟踪器来处理问题并将拉取请求合并到主分支。如果您想贡献即使是微不足道的东西,请不要犹豫,但请遵循以下指南。
A.4. 签署贡献者许可协议
在我们接受非微不足道的补丁或拉取请求之前,我们将需要您签署 贡献者协议。签署贡献者协议不会授予任何人对主存储库的提交权限,但它确实意味着我们可以接受您的贡献,并且如果您这样做,您将获得作者署名。可能会要求活跃的贡献者加入核心团队,并被赋予合并拉取请求的能力。
A.5. 代码规范和管理
这些都不是拉取请求的必要条件,但它们都会有所帮助。它们也可以在原始拉取请求之后但在合并之前添加。
-
使用 Spring Framework 代码格式约定。如果您使用 Eclipse,您可以使用 Spring Cloud Build 项目中的
eclipse-code-formatter.xml
文件导入格式化程序设置。如果使用 IntelliJ,您可以使用 Eclipse Code Formatter Plugin 导入相同的文件。 -
确保所有新的
.java
文件都具有一个简单的 Javadoc 类注释,至少包含一个@author
标签来标识您,最好至少包含一段关于该类用途的文字。 -
将 ASF 许可证头注释添加到所有新的
.java
文件中(从项目中的现有文件中复制) -
将自己添加为
@author
到您大幅修改(超过美观更改)的 .java 文件中。 -
添加一些 Javadoc,如果您更改了命名空间,则添加一些 XSD 文档元素。
-
一些单元测试也会有很大帮助——总有人要做的。
-
如果没有人使用您的分支,请将其重新定位到当前主分支(或主项目中的其他目标分支)。
-
在编写提交消息时,请遵循 这些约定,如果您正在修复现有问题,请在提交消息的末尾添加
Fixes gh-XXXX
(其中 XXXX 是问题编号)。