3.0.13.RELEASE
参考指南
本指南介绍了 Spring Cloud Stream 绑定器的 Apache Kafka 实现。它包含有关其设计、使用和配置选项的信息,以及有关 Stream Cloud Stream 概念如何映射到 Apache Kafka 特定构造的信息。此外,本指南还解释了 Spring Cloud Stream 的 Kafka Streams 绑定功能。
1. Apache Kafka 绑定器
1.1. 使用
要使用 Apache Kafka 绑定器,您需要将spring-cloud-stream-binder-kafka
作为依赖项添加到您的 Spring Cloud Stream 应用程序中,如下面的 Maven 示例所示
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
或者,您也可以使用 Spring Cloud Stream Kafka 启动器,如下面的 Maven 示例所示
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
1.2. 概述
下图显示了 Apache Kafka 绑定器如何工作的简化图
Apache Kafka 绑定器实现将每个目标映射到一个 Apache Kafka 主题。消费者组直接映射到相同的 Apache Kafka 概念。分区也直接映射到 Apache Kafka 分区。
绑定器目前使用 Apache Kafka kafka-clients
版本2.3.1
。此客户端可以与旧版本的代理通信(请参阅 Kafka 文档),但某些功能可能不可用。例如,在 0.11.x.x 之前的版本中,不支持原生标头。此外,0.11.x.x 不支持autoAddPartitions
属性。
1.3. 配置选项
本节包含 Apache Kafka 绑定器使用的配置选项。
有关与绑定器相关的通用配置选项和属性,请参阅核心文档中的绑定属性。
1.3.1. Kafka 绑定器属性
- spring.cloud.stream.kafka.binder.brokers
-
Kafka 绑定器连接到的代理列表。
默认值:
localhost
。 - spring.cloud.stream.kafka.binder.defaultBrokerPort
-
brokers
允许指定带有或不带有端口信息的宿主(例如,host1,host2:port2
)。当在代理列表中未配置端口时,这将设置默认端口。默认值:
9092
。 - spring.cloud.stream.kafka.binder.configuration
-
传递给绑定器创建的所有客户端(生产者和消费者)的客户端属性(生产者和消费者)的键/值映射。由于这些属性同时被生产者和消费者使用,因此使用应限于通用属性,例如安全设置。通过此配置提供的未知 Kafka 生产者或消费者属性将被过滤掉,并且不允许传播。此处的属性将覆盖在启动时设置的任何属性。
默认值:空映射。
- spring.cloud.stream.kafka.binder.consumerProperties
-
任意 Kafka 客户端消费者属性的键/值映射。除了支持已知的 Kafka 消费者属性外,此处还允许使用未知的消费者属性。此处的属性将覆盖在启动时以及在上面的
configuration
属性中设置的任何属性。默认值:空映射。
- spring.cloud.stream.kafka.binder.headers
-
绑定器传输的自定义标头列表。仅当与使用
kafka-clients
版本< 0.11.0.0的旧版应用程序(⇐ 1.3.x)通信时才需要。较新版本原生支持标头。默认值:空。
- spring.cloud.stream.kafka.binder.healthTimeout
-
等待获取分区信息的时间(以秒为单位)。如果此计时器超时,则健康状况报告为关闭。
默认值:10。
- spring.cloud.stream.kafka.binder.requiredAcks
-
代理上所需的确认数。请参阅 Kafka 文档中生产者
acks
属性。默认值:
1
。 - spring.cloud.stream.kafka.binder.minPartitionCount
-
仅当设置
autoCreateTopics
或autoAddPartitions
时才有效。绑定器在它生产或消费数据的主题上配置的分区的全局最小数量。它可以被生产者的partitionCount
设置或生产者的instanceCount * concurrency
设置的值覆盖(如果两者都更大)。默认值:
1
。 - spring.cloud.stream.kafka.binder.producerProperties
-
任意 Kafka 客户端生产者属性的键/值映射。除了支持已知的 Kafka 生产者属性外,此处还允许使用未知的生产者属性。此处的属性将覆盖在启动时以及在上面的
configuration
属性中设置的任何属性。默认值:空映射。
- spring.cloud.stream.kafka.binder.replicationFactor
-
如果
autoCreateTopics
处于活动状态,则自动创建主题的复制因子。可以在每个绑定上覆盖。如果您使用的是 Kafka 代理版本 2.4 之前的版本,则此值应至少设置为 1
。从版本 3.0.8 开始,绑定器使用-1
作为默认值,这表示将使用代理的'default.replication.factor'属性来确定副本的数量。请与您的 Kafka 代理管理员确认是否已实施要求最小复制因子的策略,如果是,则通常default.replication.factor
将与该值匹配,并且应使用-1
,除非您需要大于最小值的复制因子。默认值:
-1
。 - spring.cloud.stream.kafka.binder.autoCreateTopics
-
如果设置为
true
,则绑定器会自动创建新主题。如果设置为false
,则绑定器依赖于已配置的主题。在后一种情况下,如果主题不存在,则绑定器将无法启动。此设置独立于代理的 auto.create.topics.enable
设置,并且不会影响它。如果服务器设置为自动创建主题,则它们可能会作为元数据检索请求的一部分创建,并使用默认的代理设置。默认值:
true
。 - spring.cloud.stream.kafka.binder.autoAddPartitions
-
如果设置为
true
,则绑定器会在需要时创建新分区。如果设置为false
,则绑定器依赖于已配置的主题的分区大小。如果目标主题的分区数量小于预期值,则绑定器将无法启动。默认值:
false
。 - spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
-
在绑定器中启用事务。请参阅 Kafka 文档中的
transaction.id
以及spring-kafka
文档中的事务。启用事务后,将忽略各个producer
属性,并且所有生产者都将使用spring.cloud.stream.kafka.binder.transaction.producer.*
属性。默认值
null
(无事务) - spring.cloud.stream.kafka.binder.transaction.producer.*
-
事务绑定器中生产者的全局生产者属性。请参阅
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
和Kafka 生产者属性以及所有绑定器支持的通用生产者属性。默认值:请参阅各个生产者属性。
- spring.cloud.stream.kafka.binder.headerMapperBeanName
-
用于将
spring-messaging
标头映射到 Kafka 标头和从 Kafka 标头映射到spring-messaging
标头的KafkaHeaderMapper
的 bean 名称。例如,如果您希望自定义使用 JSON 反序列化标头的BinderHeaderMapper
bean 中的可信包,则可以使用此方法。如果此自定义的BinderHeaderMapper
bean 未通过此属性提供给绑定器,则绑定器将在回退到绑定器创建的默认BinderHeaderMapper
之前,查找名为kafkaBinderHeaderMapper
且类型为BinderHeaderMapper
的标头映射器 bean。默认值:无。
- spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader
-
标志,用于在主题上的任何分区(无论接收来自该分区的数据的消费者是谁)发现没有领导者时,将绑定器健康状况设置为
down
。默认值:
false
。 - spring.cloud.stream.kafka.binder.certificateStoreDirectory
-
当信任库或密钥库证书位置作为类路径 URL(
classpath:…
)给出时,绑定器会将资源从 JAR 文件中的类路径位置复制到文件系统上的某个位置。该文件将移动到此属性值指定的位置,该位置必须是文件系统上可由运行应用程序的进程写入的现有目录。如果未设置此值,并且证书文件是类路径资源,则它将移动到由System.getProperty("java.io.tmpdir")
返回的系统的临时目录。如果此值存在,但文件系统上找不到该目录或无法写入,则也是如此。默认值:无。
1.3.2. Kafka 消费者属性
为了避免重复,Spring Cloud Stream 支持以spring.cloud.stream.kafka.default.consumer.<property>=<value> 的格式设置所有通道的值。 |
以下属性仅适用于 Kafka 消费者,并且必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer.
为前缀。
- admin.configuration
-
从 2.1.1 版本开始,此属性已弃用,建议使用
topic.properties
,并且将在未来版本中移除对它的支持。 - admin.replicas-assignment
-
从 2.1.1 版本开始,此属性已弃用,建议使用
topic.replicas-assignment
,并且将在未来版本中移除对它的支持。 - admin.replication-factor
-
从 2.1.1 版本开始,此属性已弃用,建议使用
topic.replication-factor
,并且将在未来版本中移除对它的支持。 - autoRebalanceEnabled
-
当设置为
true
时,主题分区将在消费者组的成员之间自动重新平衡。当设置为false
时,每个消费者将根据spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
分配一组固定的分区。这要求在启动的每个实例上都正确设置spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
属性。在这种情况下,spring.cloud.stream.instanceCount
属性的值通常必须大于 1。默认值:
true
。 - ackEachRecord
-
当
autoCommitOffset
为true
时,此设置决定是否在处理每个记录后提交偏移量。默认情况下,在consumer.poll()
返回的批次中的所有记录都已处理后,才会提交偏移量。通过consumer
的configuration
属性设置的 Kafka 属性max.poll.records
可以控制轮询返回的记录数量。将此设置为true
可能会导致性能下降,但这可以降低发生故障时重新传递记录的可能性。此外,请参阅绑定程序的requiredAcks
属性,它也会影响提交偏移量的性能。默认值:
false
。 - autoCommitOffset
-
是否在消息处理后自动提交偏移量。如果设置为
false
,则入站消息中存在一个键为kafka_acknowledgment
、类型为org.springframework.kafka.support.Acknowledgment
的标头。应用程序可以使用此标头来确认消息。有关详细信息,请参阅示例部分。当此属性设置为false
时,Kafka 绑定程序将确认模式设置为org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL
,应用程序负责确认记录。另请参阅ackEachRecord
。默认值:
true
。 - autoCommitOnError
-
仅当
autoCommitOffset
设置为true
时有效。如果设置为false
,则会抑制导致错误的消息的自动提交,并且仅对成功的消息进行提交。在发生持久性故障的情况下,它允许流从最后成功处理的消息自动重播。如果设置为true
,则始终自动提交(如果启用了自动提交)。如果未设置(默认值),则其值与enableDlq
相同,如果错误消息发送到 DLQ,则自动提交它们,否则不提交它们。默认值:未设置。
- resetOffsets
-
是否将消费者的偏移量重置为
startOffset
提供的值。如果提供了KafkaRebalanceListener
,则必须为false
;请参阅 使用 KafkaRebalanceListener。默认值:
false
。 - startOffset
-
新组的起始偏移量。允许的值:
earliest
和latest
。如果为消费者“绑定”显式设置了消费者组(通过spring.cloud.stream.bindings.<channelName>.group
),则将 'startOffset' 设置为earliest
。否则,对于anonymous
消费者组,将其设置为latest
。另请参阅resetOffsets
(此列表中的前面部分)。默认值:null(等效于
earliest
)。 - enableDlq
-
当设置为
true
时,它将为消费者启用 DLQ 行为。默认情况下,导致错误的消息将转发到名为error.<destination>.<group>
的主题。可以通过设置dlqName
属性或定义类型为DlqDestinationResolver
的@Bean
来配置 DLQ 主题名称。这为更常见的 Kafka 重播场景提供了一个替代方案,在这种情况下,错误数量相对较少,并且重播整个原始主题可能过于麻烦。有关更多信息,请参阅 死信主题处理 处理。从 2.0 版本开始,发送到 DLQ 主题的消息将使用以下标头进行增强:x-original-topic
、x-exception-message
和x-exception-stacktrace
作为byte[]
。默认情况下,失败的记录将发送到与原始记录相同的 DLQ 主题分区号。有关如何更改此行为,请参阅 死信主题分区选择。当destinationIsPattern
为true
时不允许。默认值:
false
。 - dlqPartitions
-
当
enableDlq
为true
且未设置此属性时,将创建与主主题相同分区数的死信主题。通常,死信记录将发送到与原始记录相同的死信主题分区。此行为可以更改;请参阅 死信主题分区选择。如果此属性设置为1
且没有DqlPartitionFunction
bean,则所有死信记录都将写入分区0
。如果此属性大于1
,则**必须**提供DlqPartitionFunction
bean。请注意,实际的分区计数受绑定程序的minPartitionCount
属性影响。默认值:
none
- configuration
-
包含通用 Kafka 消费者属性的键值对映射。除了 Kafka 消费者属性外,还可以在此处传递其他配置属性。例如,应用程序需要的一些属性,例如
spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar
。默认值:空映射。
- dlqName
-
接收错误消息的 DLQ 主题的名称。
默认值:null(如果未指定,则导致错误的消息将转发到名为
error.<destination>.<group>
的主题)。 - dlqProducerProperties
-
使用此属性可以设置特定于 DLQ 的生产者属性。可以通过此属性设置 Kafka 生产者属性中提供的所有属性。当在消费者端启用本机解码时(即,
useNativeDecoding: true
),应用程序必须为 DLQ 提供相应的键/值序列化程序。这必须以dlqProducerProperties.configuration.key.serializer
和dlqProducerProperties.configuration.value.serializer
的形式提供。默认值:默认 Kafka 生产者属性。
- standardHeaders
-
指示入站通道适配器填充哪些标准标头。允许的值:
none
、id
、timestamp
或both
。如果使用本机反序列化并且接收消息的第一个组件需要id
(例如,配置为使用 JDBC 消息存储的聚合器),则很有用。默认值:
none
- converterBeanName
-
实现
RecordMessageConverter
的 bean 的名称。在入站通道适配器中用于替换默认的MessagingMessageConverter
。默认值:
null
- idleEventInterval
-
指示最近未收到任何消息的事件之间的间隔(以毫秒为单位)。使用
ApplicationListener<ListenerContainerIdleEvent>
来接收这些事件。有关用法示例,请参阅 示例:暂停和恢复消费者。默认值:
30000
- destinationIsPattern
-
当为
true
时,目标将被视为用于由代理匹配主题名称的正则表达式Pattern
。当为true
时,不会预配主题,并且不允许使用enableDlq
,因为绑定程序在预配阶段不知道主题名称。请注意,检测与模式匹配的新主题所需的时间由消费者属性metadata.max.age.ms
控制,该属性(在撰写本文时)默认为 300,000 毫秒(5 分钟)。这可以使用上面的configuration
属性进行配置。默认值:
false
- topic.properties
-
预配新主题时使用的 Kafka 主题属性的
Map
,例如spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0
默认值:无。
- topic.replicas-assignment
-
副本分配的
Map<Integer, List<Integer>>
,其中键是分区,值是分配。预配新主题时使用。请参阅kafka-clients
jar 中的NewTopic
Javadoc。默认值:无。
- topic.replication-factor
-
预配主题时使用的副本因子。覆盖绑定程序范围的设置。如果存在
replicas-assignments
,则忽略。默认值:无(使用绑定程序范围的默认值 -1)。
- pollTimeout
-
可轮询消费者中用于轮询的超时时间。
默认值:5 秒。
- transactionManager
-
用于覆盖此绑定的绑定程序的事务管理器的
KafkaAwareTransactionManager
的 bean 名称。如果要使用ChainedKafkaTransactionManaager
将另一个事务与 Kafka 事务同步,则通常需要它。为了实现记录的恰好一次消费和生产,必须使用相同的事务管理器配置消费者和生产者绑定。默认值:无。
- txCommitRecovered
-
使用事务性绑定程序时,恢复记录的偏移量(例如,当重试次数用尽且记录发送到死信主题时)默认情况下将通过新事务提交。将此属性设置为
false
会抑制提交恢复记录的偏移量。默认值:true。
1.3.3. 重置偏移量
当应用程序启动时,每个分配的分区中的初始位置取决于两个属性 startOffset
和 resetOffsets
。如果 resetOffsets
为 false
,则应用正常的 Kafka 消费者 auto.offset.reset
语义。即,如果绑定的消费者组没有该分区的已提交偏移量,则位置为 earliest
或 latest
。默认情况下,具有显式 group
的绑定使用 earliest
,而匿名绑定(没有 group
)使用 latest
。可以通过设置 startOffset
绑定属性来覆盖这些默认值。第一次使用特定 group
启动绑定时,将没有已提交的偏移量。另一个不存在已提交偏移量的情况是偏移量已过期。使用现代代理(从 2.1 开始)和默认代理属性,偏移量将在最后一个成员离开组后 7 天过期。有关更多信息,请参阅 offsets.retention.minutes
代理属性。
当 resetOffsets
为 true
时,绑定程序会应用与代理上不存在已提交偏移量时类似的语义,就像此绑定从未从主题中消费过一样;即,忽略任何当前已提交的偏移量。
以下是在可能使用此功能的两种用例。
-
从包含键值对的压缩主题中消费。将
resetOffsets
设置为true
,并将startOffset
设置为earliest
;绑定将在所有新分配的分区上执行seekToBeginning
。 -
从包含事件的主题中消费,您只对在此绑定运行期间发生的事件感兴趣。将
resetOffsets
设置为true
,并将startOffset
设置为latest
;绑定将在所有新分配的分区上执行seekToEnd
。
如果在初始分配后发生重新平衡,则仅对在初始分配期间未分配的任何新分配的分区执行查找。 |
有关对主题偏移量的更多控制,请参阅 使用 KafkaRebalanceListener;提供侦听器时,resetOffsets
不应设置为 true
,否则会导致错误。>>>>>>> 7bc90c10… GH-1084: 添加 txCommitRecovered 属性
1.3.4. 批量消费
从 3.0 版本开始,当 spring.cloud.stream.binding.<name>.consumer.batch-mode
设置为 true
时,通过轮询 Kafka Consumer
收到的所有记录都将作为 List<?>
提供给侦听器方法。否则,将使用一次一个记录来调用该方法。批次的大小由 Kafka 消费者属性 max.poll.records
、fetch.min.bytes
、fetch.max.wait.ms
控制;有关更多信息,请参阅 Kafka 文档。
请记住,批处理模式不支持 @StreamListener
- 它仅适用于较新的函数式编程模型。
在使用批处理模式时,不支持绑定器内的重试,因此maxAttempts 将被覆盖为1。您可以配置一个SeekToCurrentBatchErrorHandler (使用ListenerContainerCustomizer )来实现类似于绑定器中重试的功能。您还可以使用手动AckMode 并调用Ackowledgment.nack(index, sleep) 来提交部分批次的偏移量,并使剩余的记录重新传递。有关这些技术的更多信息,请参阅Spring for Apache Kafka 文档。 |
1.3.5. Kafka 生产者属性
为了避免重复,Spring Cloud Stream 支持以spring.cloud.stream.kafka.default.producer.<property>=<value> 的格式为所有通道设置值。 |
以下属性仅适用于 Kafka 生产者,并且必须以spring.cloud.stream.kafka.bindings.<channelName>.producer.
为前缀。
- admin.configuration
-
从 2.1.1 版本开始,此属性已弃用,建议使用
topic.properties
,并且将在未来版本中移除对它的支持。 - admin.replicas-assignment
-
从 2.1.1 版本开始,此属性已弃用,建议使用
topic.replicas-assignment
,并且将在未来版本中移除对它的支持。 - admin.replication-factor
-
从 2.1.1 版本开始,此属性已弃用,建议使用
topic.replication-factor
,并且将在未来版本中移除对它的支持。 - bufferSize
-
Kafka 生产者在发送之前尝试批量处理的数据量的上限(以字节为单位)。
默认值:
16384
。 - sync
-
生产者是否同步。
默认值:
false
。 - sendTimeoutExpression
-
针对发送的消息评估的 SpEL 表达式,用于评估在启用同步发布时等待确认的时间——例如,
headers['mySendTimeout']
。超时值以毫秒为单位。在 3.0 之前的版本中,除非使用原生编码,否则无法使用有效负载,因为在评估此表达式时,有效负载已经以byte[]
的形式存在。现在,在转换有效负载之前评估表达式。默认值:
none
。 - batchTimeout
-
生产者在发送消息之前等待更多消息累积到同一批次中的时间。(通常,生产者根本不等待,而只是简单地发送在上次发送过程中累积的所有消息。)非零值可能会提高吞吐量,但会以延迟为代价。
默认值:
0
。 - messageKeyExpression
-
针对发送的消息评估的 SpEL 表达式,用于填充生成的 Kafka 消息的键——例如,
headers['myKey']
。在 3.0 之前的版本中,除非使用原生编码,否则无法使用有效负载,因为在评估此表达式时,有效负载已经以byte[]
的形式存在。现在,在转换有效负载之前评估表达式。对于常规处理器(Function<String, String>
或Function<Message<?>, Message<?>
),如果生成的键需要与主题中的传入键相同,则可以如下设置此属性。spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']
对于反应式函数需要注意一个重要的注意事项。在这种情况下,应用程序需要手动将标头从传入的消息复制到传出消息。您可以设置标头,例如myKey
并使用headers['myKey']
,如上所述,或者为了方便起见,只需设置KafkaHeaders.MESSAGE_KEY
标头,您就不需要设置此属性。默认值:
none
。 - headerPatterns
-
一个逗号分隔的简单模式列表,用于匹配要映射到
ProducerRecord
中的 KafkaHeaders
的 Spring 消息标头。模式可以以通配符(星号)开头或结尾。可以通过在前面添加!
来否定模式。匹配在第一次匹配(正匹配或负匹配)后停止。例如,!ask,as*
将传递ash
,但不传递ask
。id
和timestamp
永远不会被映射。默认值:
*
(所有标头 - 除了id
和timestamp
) - configuration
-
包含通用 Kafka 生产者属性的键/值对的映射。
默认值:空映射。
- topic.properties
-
在配置新主题时使用的 Kafka 主题属性的
Map
——例如,spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0
- topic.replicas-assignment
-
副本分配的
Map<Integer, List<Integer>>
,其中键是分区,值是分配。预配新主题时使用。请参阅kafka-clients
jar 中的NewTopic
Javadoc。默认值:无。
- topic.replication-factor
-
预配主题时使用的副本因子。覆盖绑定程序范围的设置。如果存在
replicas-assignments
,则忽略。默认值:无(使用绑定程序范围的默认值 -1)。
- useTopicHeader
-
设置为
true
以使用传出消息中的KafkaHeaders.TOPIC
消息标头的值覆盖默认绑定目标(主题名称)。如果标头不存在,则使用默认绑定目标。默认值:false
。 - recordMetadataChannel
-
应将成功发送结果发送到的
MessageChannel
的 bean 名称;该 bean 必须存在于应用程序上下文中。发送到该通道的消息是发送的消息(如有转换,则在转换之后),并带有一个额外的标头KafkaHeaders.RECORD_METADATA
。该标头包含 Kafka 客户端提供的RecordMetadata
对象;它包括记录写入主题的所在分区和偏移量。
ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)
失败的发送会进入生产者错误通道(如果已配置);请参阅错误通道。默认值:null
+
Kafka 绑定器使用生产者的partitionCount 设置作为提示,以创建具有给定分区数的主题(与minPartitionCount 结合使用,两者中的较大值为使用值)。在为绑定器配置minPartitionCount 和为应用程序配置partitionCount 时要小心,因为使用较大的值。如果主题已存在且分区数较小,并且autoAddPartitions 已禁用(默认值),则绑定器无法启动。如果主题已存在且分区数较小,并且autoAddPartitions 已启用,则会添加新的分区。如果主题已存在且分区数大于(minPartitionCount 或 partitionCount )的最大值,则使用现有的分区数。 |
- compression
-
设置
compression.type
生产者属性。支持的值为none
、gzip
、snappy
和lz4
。如果您按照Spring for Apache Kafka 文档中所述将kafka-clients
jar 覆盖到 2.1.0(或更高版本),并希望使用zstd
压缩,请使用spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd
。默认值:
none
。 - closeTimeout
-
关闭生产者时等待的超时时间(以秒为单位)。
默认值:
30
1.3.6. 使用示例
在本节中,我们将展示在特定场景中使用上述属性。
示例:将autoCommitOffset
设置为false
并依赖手动确认
此示例说明如何在消费者应用程序中手动确认偏移量。
此示例要求将spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset
设置为false
。在您的示例中使用相应的输入通道名称。
@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {
public static void main(String[] args) {
SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
}
@StreamListener(Sink.INPUT)
public void process(Message<?> message) {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}
示例:安全配置
Apache Kafka 0.9 支持客户端和代理之间的安全连接。要利用此功能,请遵循Apache Kafka 文档以及 Kafka 0.9 的Confluent 文档中的安全指南中的指南。使用spring.cloud.stream.kafka.binder.configuration
选项为绑定器创建的所有客户端设置安全属性。
例如,要将security.protocol
设置为SASL_SSL
,请设置以下属性
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
所有其他安全属性都可以以类似的方式设置。
使用 Kerberos 时,请遵循参考文档中有关创建和引用 JAAS 配置的说明。
Spring Cloud Stream 支持通过使用 JAAS 配置文件和使用 Spring Boot 属性将 JAAS 配置信息传递给应用程序。
使用 JAAS 配置文件
可以通过使用系统属性为 Spring Cloud Stream 应用程序设置 JAAS 和(可选)krb5 文件位置。以下示例显示了如何通过使用 JAAS 配置文件启动具有 SASL 和 Kerberos 的 Spring Cloud Stream 应用程序
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用 Spring Boot 属性
作为拥有 JAAS 配置文件的替代方法,Spring Cloud Stream 提供了一种通过使用 Spring Boot 属性为 Spring Cloud Stream 应用程序设置 JAAS 配置的机制。
以下属性可用于配置 Kafka 客户端的登录上下文
- spring.cloud.stream.kafka.binder.jaas.loginModule
-
登录模块名称。在正常情况下不需要设置。
默认值:
com.sun.security.auth.module.Krb5LoginModule
。 - spring.cloud.stream.kafka.binder.jaas.controlFlag
-
登录模块的控制标志。
默认值:
required
。 - spring.cloud.stream.kafka.binder.jaas.options
-
包含登录模块选项的键/值对的映射。
默认值:空映射。
以下示例显示了如何通过使用 Spring Boot 配置属性启动具有 SASL 和 Kerberos 的 Spring Cloud Stream 应用程序
java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.autoCreateTopics=false \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM
上述示例表示以下 JAAS 文件的等效内容
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_client.keytab"
principal="[email protected]";
};
如果所需的主题已存在于代理上或将由管理员创建,则可以关闭自动创建,并且只需要发送客户端 JAAS 属性。
请勿在同一个应用程序中混合使用 JAAS 配置文件和 Spring Boot 属性。如果-Djava.security.auth.login.config 系统属性已存在,则 Spring Cloud Stream 会忽略 Spring Boot 属性。 |
在使用autoCreateTopics 和autoAddPartitions 与 Kerberos 时要小心。通常,应用程序可能会使用在 Kafka 和 Zookeeper 中没有管理权限的主体。因此,依赖 Spring Cloud Stream 来创建/修改主题可能会失败。在安全环境中,我们强烈建议使用 Kafka 工具以管理员身份创建主题和管理 ACL。 |
示例:暂停和恢复消费者
如果您希望暂停消费但不导致分区重新平衡,则可以暂停和恢复消费者。这可以通过将Consumer
作为参数添加到您的@StreamListener
中来实现。要恢复,您需要一个针对ListenerContainerIdleEvent
实例的ApplicationListener
。发布事件的频率由idleEventInterval
属性控制。由于消费者不是线程安全的,因此必须在调用线程上调用这些方法。
以下简单的应用程序显示了如何暂停和恢复
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
}
@Bean
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
return event -> {
System.out.println(event);
if (event.getConsumer().paused().size() > 0) {
event.getConsumer().resume(event.getConsumer().paused());
}
};
}
}
1.4. 事务绑定器
通过将spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
设置为非空值(例如tx-
)来启用事务。在处理器应用程序中使用时,消费者会启动事务;消费者线程上发送的任何记录都参与同一事务。当侦听器正常退出时,侦听器容器会将偏移量发送到事务并提交它。一个通用的生产者工厂用于所有使用spring.cloud.stream.kafka.binder.transaction.producer.*
属性配置的生产者绑定;单个绑定 Kafka 生产者属性会被忽略。
由于重试将在原始事务中运行,并且原始事务可能会回滚,任何已发布的记录也将回滚,因此事务不支持正常的绑定程序重试(以及死信)。当启用重试(公共属性maxAttempts 大于零)时,重试属性用于配置DefaultAfterRollbackProcessor 以在容器级别启用重试。类似地,此功能会移至侦听器容器,而不是在事务中发布死信记录,同样是通过在主事务回滚后运行的DefaultAfterRollbackProcessor 来实现的。 |
如果希望在源应用程序中使用事务,或从某些任意线程用于仅生产者事务(例如@Scheduled
方法),则必须获取对事务性生产者工厂的引用,并使用它定义一个KafkaTransactionManager
bean。
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
return new KafkaTransactionManager<>(pf);
}
请注意,我们使用BinderFactory
获取绑定程序的引用;当仅配置了一个绑定程序时,在第一个参数中使用null
。如果配置了多个绑定程序,则使用绑定程序名称获取引用。获得绑定程序的引用后,我们可以获取对ProducerFactory
的引用并创建事务管理器。
然后,您将使用正常的 Spring 事务支持,例如TransactionTemplate
或@Transactional
,例如
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
如果希望将仅生产者事务与来自某些其他事务管理器的那些事务同步,请使用ChainedTransactionManager
。
1.5. 错误通道
从 1.3 版开始,绑定程序无条件地将异常发送到每个消费者目标的错误通道,并且还可以配置为将异步生产者发送失败发送到错误通道。有关更多信息,请参阅有关错误处理的此部分。
发送失败的ErrorMessage
的有效负载是具有以下属性的KafkaSendFailureException
-
failedMessage
:发送失败的 Spring MessagingMessage<?>
。 -
record
:从failedMessage
创建的原始ProducerRecord
没有自动处理生产者异常(例如发送到死信队列)。您可以使用自己的 Spring Integration 流来使用这些异常。
1.6. Kafka 指标
Kafka 绑定程序模块公开了以下指标
spring.cloud.stream.binder.kafka.offset
:此指标指示给定绑定程序的主题中给定消费者组尚未消费的消息数量。提供的指标基于 Micrometer 库。如果 Micrometer 位于类路径上且应用程序未提供任何其他此类 bean,则绑定程序将创建KafkaBinderMetrics
bean。该指标包含消费者组信息、主题以及主题上最新偏移量的已提交偏移量的实际延迟。此指标对于向 PaaS 平台提供自动扩展反馈特别有用。
您可以排除KafkaBinderMetrics
以创建必要的基础设施(如消费者),然后通过在应用程序中提供以下组件来报告指标。
@Component
class NoOpBindingMeters {
NoOpBindingMeters(MeterRegistry registry) {
registry.config().meterFilter(
MeterFilter.denyNameStartsWith(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME));
}
}
有关如何选择性地抑制指标的更多详细信息,请参阅此处。
1.7. tombstone 记录(空记录值)
使用压缩主题时,具有null
值的记录(也称为墓碑记录)表示键的删除。要在@StreamListener
方法中接收此类消息,必须将参数标记为不需要接收null
值参数。
@StreamListener(Sink.INPUT)
public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key,
@Payload(required = false) Customer customer) {
// customer is null if a tombstone record
...
}
1.8. 使用 KafkaRebalanceListener
应用程序可能希望在最初分配分区时将主题/分区查找至任意偏移量,或对消费者执行其他操作。从 2.1 版开始,如果在应用程序上下文中提供单个KafkaRebalanceListener
bean,它将连接到所有 Kafka 消费者绑定。
public interface KafkaBindingRebalanceListener {
/**
* Invoked by the container before any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedBeforeCommit(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
}
/**
* Invoked by the container after any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedAfterCommit(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
/**
* Invoked when partitions are initially assigned or after a rebalance.
* Applications might only want to perform seek operations on an initial assignment.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
boolean initial) {
}
}
当您提供重新平衡侦听器时,不能将resetOffsets
消费者属性设置为true
。
1.9. 自定义消费者和生产者配置
如果希望对用于在 Kafka 中创建ConsumerFactory
和ProducerFactory
的消费者和生产者配置进行高级自定义,您可以实现以下自定义程序。
-
ConsusumerConfigCustomizer
-
ProducerConfigCustomizer
这两个接口都提供了一种配置用于消费者和生产者属性的配置映射的方法。例如,如果希望访问应用程序级别定义的 bean,则可以在configure
方法的实现中注入该 bean。当绑定程序发现这些自定义程序作为 bean 可用时,它将在创建消费者和生产者工厂之前立即调用configure
方法。
1.10. 自定义 AdminClient 配置
与上述消费者和生产者配置自定义一样,应用程序还可以通过提供AdminClientConfigCustomizer
来自定义管理客户端的配置。AdminClientConfigCustomizer 的 configure 方法提供了对管理客户端属性的访问,您可以使用这些属性定义进一步的自定义。绑定程序的 Kafka 主题供应程序对通过此自定义程序给出的属性具有最高优先级。以下是提供此自定义程序 bean 的示例。
@Bean
public AdminClientConfigCustomizer adminClientConfigCustomizer() {
return props -> {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
};
}
1.11. 死信主题处理
1.11.1. 死信主题分区选择
默认情况下,记录使用与原始记录相同的分区发布到死信主题。这意味着死信主题必须至少具有与原始记录一样多的分区。
要更改此行为,请将DlqPartitionFunction
实现作为@Bean
添加到应用程序上下文。只能存在一个这样的 bean。该函数提供消费者组、失败的ConsumerRecord
和异常。例如,如果始终希望路由到分区 0,则可以使用
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果将消费者绑定的dlqPartitions 属性设置为 1(并且绑定程序的minPartitionCount 等于1 ),则无需提供DlqPartitionFunction ;框架将始终使用分区 0。如果将消费者绑定的dlqPartitions 属性设置为大于1 的值(或绑定程序的minPartitionCount 大于1 ),则**必须**提供DlqPartitionFunction bean,即使分区数与原始主题相同。 |
还可以为 DLQ 主题定义自定义名称。为此,请将DlqDestinationResolver
的实现作为@Bean
添加到应用程序上下文。当绑定程序检测到此类 bean 时,它将具有优先级,否则它将使用dlqName
属性。如果两者都未找到,它将默认为error.<destination>.<group>
。以下是DlqDestinationResolver
作为@Bean
的示例。
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
在为DlqDestinationResolver
提供实现时,需要牢记的一件重要事情是,绑定程序中的供应程序不会自动为应用程序创建主题。这是因为绑定程序无法推断实现可能发送到的所有 DLQ 主题的名称。因此,如果使用此策略提供 DLQ 名称,则应用程序负责确保事先创建这些主题。
1.11.2. 处理死信主题中的记录
由于框架无法预测用户希望如何处理死信,因此它不提供任何处理死信的标准机制。如果死信的原因是暂时的,您可能希望将消息路由回原始主题。但是,如果问题是永久性问题,则可能导致无限循环。本主题中的示例 Spring Boot 应用程序是如何将这些消息路由回原始主题的示例,但它在三次尝试后将它们移动到“停车场”主题。该应用程序是另一个 spring-cloud-stream 应用程序,它从死信主题读取。当 5 秒内未收到任何消息时,它将终止。
这些示例假设原始目标是so8400out
,消费者组是so8400
。
有一些策略需要考虑
-
考虑仅在主应用程序未运行时运行重新路由。否则,瞬态错误的重试会很快用完。
-
或者,使用两阶段方法:使用此应用程序路由到第三个主题,然后使用另一个应用程序从那里路由回主主题。
以下代码列表显示了示例应用程序
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private MessageChannel parkingLot;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<?> reRoute(Message<?> failed) {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries.intValue() < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
parkingLot.send(MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, terminating");
return;
}
}
}
public interface TwoOutputProcessor extends Processor {
@Output("parkingLot")
MessageChannel parkingLot();
}
}
1.12. 使用 Kafka 绑定器进行分区
Apache Kafka 本身支持主题分区。
有时将数据发送到特定分区是有利的,例如,当您希望严格排序消息处理时(特定客户的所有消息都应转到同一分区)。
以下示例显示了如何配置生产者和消费者端
@SpringBootApplication
@EnableBinding(Source.class)
public class KafkaPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"foo1", "bar1", "qux1",
"foo2", "bar2", "qux2",
"foo3", "bar3", "qux3",
"foo4", "bar4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
.web(false)
.run(args);
}
@InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
public Message<?> generate() {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
}
}
spring:
cloud:
stream:
bindings:
output:
destination: partitioned.topic
producer:
partition-key-expression: headers['partitionKey']
partition-count: 12
必须为主题配置足够的分区以实现所有消费者组所需的并发性。上述配置支持最多 12 个消费者实例(如果它们的concurrency 为 2,则为 6 个;如果它们的并发性为 3,则为 4 个,依此类推)。通常最好“过度配置”分区,以允许将来增加消费者或并发性。 |
前面的配置使用默认分区(key.hashCode() % partitionCount )。根据键值,这可能提供或可能不提供合适的平衡算法。您可以使用partitionSelectorExpression 或partitionSelectorClass 属性覆盖此默认值。 |
由于 Kafka 本身处理分区,因此消费者端不需要任何特殊配置。Kafka 在实例之间分配分区。
以下 Spring Boot 应用程序侦听 Kafka 流并将(到控制台)打印每条消息所属的分区 ID
@SpringBootApplication
@EnableBinding(Sink.class)
public class KafkaPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
.web(false)
.run(args);
}
@StreamListener(Sink.INPUT)
public void listen(@Payload String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(in + " received from partition " + partition);
}
}
spring:
cloud:
stream:
bindings:
input:
destination: partitioned.topic
group: myGroup
您可以根据需要添加实例。Kafka 重新平衡分区分配。如果实例数(或instance count * concurrency
)超过分区数,则某些消费者处于空闲状态。
2. Kafka Streams 绑定器
2.1. 使用
要使用 Kafka Streams 绑定程序,您只需要使用以下 Maven 坐标将其添加到 Spring Cloud Stream 应用程序中
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
为 Kafka Streams 绑定程序引导新项目的快速方法是使用Spring Initializr,然后选择“Cloud Streams”和“Spring for Kafka Streams”,如下所示
2.2. 概述
Spring Cloud Stream 包含一个专为Apache Kafka Streams绑定设计的绑定程序实现。通过这种原生集成,Spring Cloud Stream“处理器”应用程序可以直接在核心业务逻辑中使用Apache Kafka Streams API。
Kafka Streams 绑定程序实现构建在Spring for Apache Kafka项目提供的基础之上。
Kafka Streams 绑定程序为 Kafka Streams 中的三种主要类型提供了绑定功能 - KStream
、KTable
和GlobalKTable
。
Kafka Streams 应用程序通常遵循一个模型,其中记录从入站主题读取,应用业务逻辑,然后将转换后的记录写入出站主题。或者,也可以定义没有出站目标的处理器应用程序。
在以下部分中,我们将详细了解 Spring Cloud Stream 与 Kafka Streams 的集成。
2.3. 编程模型
使用 Kafka Streams 绑定程序提供的编程模型时,可以使用高级Streams DSL以及高级和低级Processor-API的混合作为选项。当混合使用高级和低级 API 时,通常通过在KStream
上调用transform
或process
API 方法来实现。
2.3.1. 函数式风格
从 Spring Cloud Stream 3.0.0
开始,Kafka Streams 绑定器允许应用程序使用 Java 8 中提供的函数式编程风格进行设计和开发。这意味着应用程序可以简洁地表示为 java.util.function.Function
或 java.util.function.Consumer
类型的 Lambda 表达式。
让我们来看一个非常基本的例子。
@SpringBootApplication
public class SimpleConsumerApplication {
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input ->
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
虽然简单,但这是一个完整的独立 Spring Boot 应用程序,它利用 Kafka Streams 进行流处理。这是一个没有输出绑定的消费者应用程序,只有一个输入绑定。应用程序消费数据,并简单地将 KStream
的键和值的信息记录到标准输出。应用程序包含 SpringBootApplication
注解和一个标记为 Bean
的方法。该 Bean 方法的类型为 java.util.function.Consumer
,它以 KStream
为参数。然后在实现中,我们返回一个本质上是 Lambda 表达式的 Consumer 对象。在 Lambda 表达式内部,提供了处理数据的代码。
在这个应用程序中,有一个类型为 KStream
的单个输入绑定。绑定器为应用程序创建此绑定,名称为 process-in-0
,即函数 Bean 名称后跟一个破折号字符 (-
) 和文字 in
,再跟另一个破折号,然后是参数的序数位置。您可以使用此绑定名称设置其他属性,例如目标。例如,spring.cloud.stream.bindings.process-in-0.destination=my-topic
。
如果未在绑定上设置目标属性,则会创建一个与绑定名称相同的主题(如果应用程序具有足够的权限)或期望该主题已存在。 |
构建为 uber-jar(例如,kstream-consumer-app.jar
)后,您可以像下面这样运行上述示例。
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic
这是另一个示例,它是一个完整的处理器,具有输入和输出绑定。这是经典的单词计数示例,其中应用程序从主题接收数据,然后在滚动时间窗口中计算每个单词出现的次数。
@SpringBootApplication
public class WordCountProcessorApplication {
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("word-counts-state-store"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
}
这里同样,这是一个完整的 Spring Boot 应用程序。这里与第一个应用程序的区别在于 Bean 方法的类型为 java.util.function.Function
。Function
的第一个参数化类型用于输入 KStream
,第二个用于输出。在方法体中,提供了一个类型为 Function
的 Lambda 表达式,并在实现中给出了实际的业务逻辑。与之前讨论的基于 Consumer 的应用程序类似,这里的输入绑定默认命名为 process-in-0
。对于输出,绑定名称也自动设置为 process-out-0
。
构建为 uber-jar(例如,wordcount-processor.jar
)后,您可以像下面这样运行上述示例。
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts
此应用程序将从 Kafka 主题 words
中消费消息,并将计算结果发布到输出主题 counts
。
Spring Cloud Stream 将确保来自传入和传出主题的消息自动绑定为 KStream 对象。作为开发人员,您可以专注于代码的业务方面,即编写处理器所需的逻辑。框架会自动处理 Kafka Streams 基础设施所需的 Kafka Streams 特定配置。
我们上面看到的两个示例都有一个 KStream
输入绑定。在这两种情况下,绑定都从单个主题接收记录。如果要将多个主题多路复用到单个 KStream
绑定中,可以在下面提供以逗号分隔的 Kafka 主题作为目标。
spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3
此外,如果要根据正则表达式匹配主题,还可以提供主题模式作为目标。
spring.cloud.stream.bindings.process-in-0.destination=input.*
多个输入绑定
许多非平凡的 Kafka Streams 应用程序通常会通过多个绑定从多个主题消费数据。例如,一个主题作为 Kstream
消费,另一个作为 KTable
或 GlobalKTable
消费。应用程序可能希望将数据作为表类型接收的原因有很多。考虑一个用例,其中基础主题通过数据库中的更改数据捕获 (CDC) 机制填充,或者应用程序只关心最新更新以供下游处理。如果应用程序指定需要将数据绑定为 KTable
或 GlobalKTable
,则 Kafka Streams 绑定器将正确地将目标绑定到 KTable
或 GlobalKTable
,并使它们可供应用程序操作。我们将研究 Kafka Streams 绑定器中如何处理多个输入绑定的几种不同场景。
Kafka Streams 绑定器中的 BiFunction
这是一个有两个输入和一个输出的示例。在这种情况下,应用程序可以利用 java.util.function.BiFunction
。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
这里基本主题与前面的示例相同,但这里有两个输入。Java 的 BiFunction
支持用于将输入绑定到所需的目标。绑定器为输入生成的默认绑定名称分别为 process-in-0
和 process-in-1
。默认输出绑定为 process-out-0
。在此示例中,BiFunction
的第一个参数绑定为第一个输入的 KStream
,第二个参数绑定为第二个输入的 KTable
。
Kafka Streams 绑定器中的 BiConsumer
如果有两个输入但没有输出,在这种情况下,我们可以使用 java.util.function.BiConsumer
,如下所示。
@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
return (userClicksStream, userRegionsTable) -> {}
}
超过两个输入
如果您有多于两个输入怎么办?在某些情况下,您可能需要多于两个输入。在这种情况下,绑定器允许您链接部分函数。在函数式编程术语中,此技术通常称为柯里化。随着 Java 8 中添加的函数式编程支持,Java 现在使您能够编写柯里化函数。Spring Cloud Stream Kafka Streams 绑定器可以利用此功能启用多个输入绑定。
让我们来看一个例子。
@Bean
public Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
return orders -> (
customers -> (
products -> (
orders.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}
让我们看一下上面介绍的绑定模型的细节。在此模型中,我们在入站方向有 3 个部分应用的函数。让我们将它们称为 f(x)
、f(y)
和 f(z)
。如果我们从真正数学函数的角度扩展这些函数,它将看起来像这样:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>
。x
变量代表 KStream<Long, Order>
,y
变量代表 GlobalKTable<Long, Customer>
,z
变量代表 GlobalKTable<Long, Product>
。第一个函数 f(x)
具有应用程序的第一个输入绑定 (KStream<Long, Order>
),其输出是函数 f(y)。函数 f(y)
具有应用程序的第二个输入绑定 (GlobalKTable<Long, Customer>
),其输出是另一个函数 f(z)
。函数 f(z)
的输入是应用程序的第三个输入 (GlobalKTable<Long, Product>
),其输出是 KStream<Long, EnrichedOrder>
,它是应用程序的最终输出绑定。来自三个部分函数的输入分别为 KStream
、GlobalKTable
、GlobalKTable
,您可以在方法体中使用它们来实现业务逻辑,作为 Lambda 表达式的一部分。
输入绑定分别命名为 enrichOrder-in-0
、enrichOrder-in-1
和 enrichOrder-in-2
。输出绑定命名为 enrichOrder-out-0
。
使用柯里化函数,您可以拥有任意数量的输入。但是,请记住,在 Java 中,超过少量输入及其部分应用函数可能会导致代码难以阅读。因此,如果您的 Kafka Streams 应用程序需要多于合理数量的输入绑定,并且您希望使用此函数模型,则可能需要重新考虑您的设计并适当地分解应用程序。
多个输出绑定
Kafka Streams 允许将输出数据写入多个主题。此功能在 Kafka Streams 中称为分支。使用多个输出绑定时,需要提供一个 KStream 数组 (KStream[]
) 作为输出返回类型。
这是一个例子
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.branch(isEnglish, isFrench, isSpanish);
}
编程模型保持不变,但是输出参数化类型为 KStream[]
。默认输出绑定名称分别为 process-out-0
、process-out-1
、process-out-2
。绑定器生成三个输出绑定的原因是它检测到返回的 KStream
数组的长度。
Kafka Streams 函数式编程风格总结
总之,下表显示了可以在函数式范式中使用的各种选项。
输入数量 | 输出数量 | 要使用的组件 |
---|---|---|
1 |
0 |
java.util.function.Consumer |
2 |
0 |
java.util.function.BiConsumer |
1 |
1..n |
java.util.function.Function |
2 |
1..n |
java.util.function.BiFunction |
>= 3 |
0..n |
使用柯里化函数 |
-
在此表中,如果有多个输出,类型将简单地变为
KStream[]
。
2.3.2. 命令式编程模型。
尽管上面概述的函数式编程模型是首选方法,但如果您愿意,仍然可以使用经典的基于 StreamListener
的方法。
以下是一些示例。
以下是使用 StreamListener
的单词计数示例的等效代码。
@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class WordCountProcessorApplication {
@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<?, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-multi"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
如您所见,这有点冗长,因为您需要提供 EnableBinding
和其他额外注释(如 StreamListener
和 SendTo
)才能使其成为一个完整的应用程序。EnableBinding
用于指定包含绑定的绑定接口。在这种情况下,我们使用的是库存 KafkaStreamsProcessor
绑定接口,它具有以下契约。
public interface KafkaStreamsProcessor {
@Input("input")
KStream<?, ?> input();
@Output("output")
KStream<?, ?> output();
}
由于您使用的是包含这些声明的绑定接口,因此绑定器将为输入 KStream
和输出 KStream
创建绑定。
除了函数式编程模型中提供的明显差异外,这里需要特别说明的是,绑定名称是您在绑定接口中指定的。例如,在上面的应用程序中,由于我们使用的是KafkaStreamsProcessor
,因此绑定名称为input
和output
。绑定属性需要使用这些名称。例如spring.cloud.stream.bindings.input.destination
、spring.cloud.stream.bindings.output.destination
等。请记住,这与函数式编程模型有根本的不同,因为在函数式编程模型中,绑定器会为应用程序生成绑定名称。这是因为应用程序在使用EnableBinding
的函数模型中不提供任何绑定接口。
以下是一个具有两个输入的 Sink 的示例。
@EnableBinding(KStreamKTableBinding.class)
.....
.....
@StreamListener
public void process(@Input("inputStream") KStream<String, PlayEvent> playEvents,
@Input("inputTable") KTable<Long, Song> songTable) {
....
....
}
interface KStreamKTableBinding {
@Input("inputStream")
KStream<?, ?> inputStream();
@Input("inputTable")
KTable<?, ?> inputTable();
}
以下是我们上面看到的基于BiFunction
的处理器的相同StreamListener
等效项。
@EnableBinding(KStreamKTableBinding.class)
....
....
@StreamListener
@SendTo("output")
public KStream<String, Long> process(@Input("input") KStream<String, Long> userClicksStream,
@Input("inputTable") KTable<String, String> userRegionsTable) {
....
....
}
interface KStreamKTableBinding extends KafkaStreamsProcessor {
@Input("inputX")
KTable<?, ?> inputTable();
}
最后,以下是具有三个输入和柯里化函数的应用程序的StreamListener
等效项。
@EnableBinding(CustomGlobalKTableProcessor.class)
...
...
@StreamListener
@SendTo("output")
public KStream<Long, EnrichedOrder> process(
@Input("input-1") KStream<Long, Order> ordersStream,
@Input("input-2") GlobalKTable<Long, Customer> customers,
@Input("input-3") GlobalKTable<Long, Product> products) {
KStream<Long, CustomerOrder> customerOrdersStream = ordersStream.join(
customers, (orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order));
return customerOrdersStream.join(products,
(orderId, customerOrder) -> customerOrder.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
});
}
interface CustomGlobalKTableProcessor {
@Input("input-1")
KStream<?, ?> input1();
@Input("input-2")
GlobalKTable<?, ?> input2();
@Input("input-3")
GlobalKTable<?, ?> input3();
@Output("output")
KStream<?, ?> output();
}
您可能会注意到,以上两个示例更加冗长,因为除了提供EnableBinding
之外,您还需要编写自己的自定义绑定接口。使用函数模型,您可以避免所有这些仪式化的细节。
在我们结束对 Kafka Streams 绑定器提供的通用编程模型的讨论之前,以下是多个输出绑定的StreamListener
版本。
EnableBinding(KStreamProcessorWithBranches.class)
public static class WordCountProcessorApplication {
@Autowired
private TimeWindows timeWindows;
@StreamListener("input")
@SendTo({"output1","output2","output3"})
public KStream<?, WordCount>[] process(KStream<Object, String> input) {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(timeWindows)
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))
.branch(isEnglish, isFrench, isSpanish);
}
interface KStreamProcessorWithBranches {
@Input("input")
KStream<?, ?> input();
@Output("output1")
KStream<?, ?> output1();
@Output("output2")
KStream<?, ?> output2();
@Output("output3")
KStream<?, ?> output3();
}
}
概括地说,我们回顾了使用 Kafka Streams 绑定器时的各种编程模型选择。
绑定器为输入上的KStream
、KTable
和GlobalKTable
提供绑定功能。KTable
和GlobalKTable
绑定仅在输入上可用。绑定器支持KStream
的输入和输出绑定。
Kafka Streams 绑定器编程模型的优点在于,绑定器为您提供了使用完全函数式编程模型或使用基于StreamListener
的命令式方法的灵活性。
2.4. 编程模型的辅助工具
2.4.1. 单个应用程序中的多个 Kafka Streams 处理器
绑定器允许在一个 Spring Cloud Stream 应用程序中拥有多个 Kafka Streams 处理器。您可以拥有如下所示的应用程序。
@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
...
}
在这种情况下,绑定器将创建 3 个具有不同应用程序 ID 的独立 Kafka Streams 对象(稍后将详细介绍)。但是,如果您在应用程序中有多个处理器,则必须告诉 Spring Cloud Stream 哪些函数需要激活。以下是如何激活函数。
spring.cloud.stream.function.definition: process;anotherProcess;yetAnotherProcess
如果您希望某些函数不要立即激活,可以将其从此列表中删除。
当您在一个应用程序中有一个 Kafka Streams 处理器和其他类型的Function
bean 时,这种情况也是如此,这些 bean 通过不同的绑定器处理(例如,基于常规 Kafka 消息通道绑定器的函数 bean)。
2.4.2. Kafka Streams 应用程序 ID
应用程序 ID 是您需要为 Kafka Streams 应用程序提供的必填属性。Spring Cloud Stream Kafka Streams 绑定器允许您通过多种方式配置此应用程序 ID。
如果您在应用程序中只有一个处理器或StreamListener
,则可以使用以下属性在绑定器级别设置它
spring.cloud.stream.kafka.streams.binder.applicationId
.
为方便起见,如果您只有一个处理器,也可以使用spring.application.name
作为属性来委托应用程序 ID。
如果您在应用程序中有多个 Kafka Streams 处理器,则需要为每个处理器设置应用程序 ID。在函数模型的情况下,您可以将其作为属性附加到每个函数。
例如,假设您有以下函数。
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
...
}
和
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
然后,您可以使用以下绑定器级别属性为每个函数设置应用程序 ID。
spring.cloud.stream.kafka.streams.binder.functions.process.applicationId
和
spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId
在StreamListener
的情况下,您需要在处理器的第一个输入绑定上设置它。
例如,假设您有以下两个基于StreamListener
的处理器。
@StreamListener
@SendTo("output")
public KStream<String, String> process(@Input("input") <KStream<Object, String>> input) {
...
}
@StreamListener
@SendTo("anotherOutput")
public KStream<String, String> anotherProcess(@Input("anotherInput") <KStream<Object, String>> input) {
...
}
然后,您必须使用以下绑定属性为此设置应用程序 ID。
spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId
和
spring.cloud.stream.kafka.streams.bindings.anotherInput.consumer.applicationId
对于基于函数的模型,这种在绑定级别设置应用程序 ID 的方法也能奏效。但是,如果您使用的是函数模型,则在绑定器级别为每个函数设置应用程序 ID(如上所述)要容易得多。
对于生产部署,强烈建议通过配置显式指定应用程序 ID。如果您正在自动扩展应用程序,这尤其重要,在这种情况下,您需要确保使用相同的应用程序 ID 部署每个实例。
如果应用程序未提供应用程序 ID,则在这种情况下,绑定器将为您自动生成一个静态应用程序 ID。这在开发场景中很方便,因为它避免了显式提供应用程序 ID 的需要。以这种方式生成的应用程序 ID 在应用程序重启后将保持静态。在函数模型的情况下,生成的应用程序 ID 将是函数 bean 名称后跟文字applicationID
,例如,如果process
是函数 bean 名称,则为process-applicationID
。在StreamListener
的情况下,生成的应用程序 ID 将使用包含类名称后跟方法名称后跟文字applicationId
,而不是使用函数 bean 名称。
设置应用程序 ID 的总结
-
默认情况下,绑定器将为每个函数或
StreamListener
方法自动生成应用程序 ID。 -
如果您只有一个处理器,则可以使用
spring.kafka.streams.applicationId
、spring.application.name
或spring.cloud.stream.kafka.streams.binder.applicationId
。 -
如果您有多个处理器,则可以使用属性
spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId
为每个函数设置应用程序 ID。在StreamListener
的情况下,这可以通过使用spring.cloud.stream.kafka.streams.bindings.input.applicationId
来完成,假设输入绑定名称为input
。
2.4.3. 使用函数式风格覆盖绑定器生成的默认绑定名称
默认情况下,绑定器在使用函数式风格时使用上面讨论的策略生成绑定名称,即<function-bean-name>-<in>|<out>-[0..n],例如 process-in-0、process-out-0 等。如果您想覆盖这些绑定名称,可以通过指定以下属性来实现。
spring.cloud.stream.function.bindings.<default binding name>
。默认绑定名称是绑定器生成的原始绑定名称。
例如,假设您有此函数。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
绑定器将生成名称为process-in-0
、process-in-1
和process-out-0
的绑定。现在,如果您想将它们完全更改为其他内容,也许是更具领域意义的绑定名称,则可以按如下方式操作。
spring.cloud.stream.function.bindings.process-in-0=users
spring.cloud.stream.function.bindings.process-in-0=regions
和
spring.cloud.stream.function.bindings.process-out-0=clicks
之后,您必须在这些新的绑定名称上设置所有绑定级别属性。
请记住,使用上面描述的函数式编程模型,在大多数情况下遵循默认绑定名称是有意义的。您可能仍然想要执行此覆盖的唯一原因是,当您有大量配置属性并且想要将绑定映射到更具领域友好性的内容时。
2.4.4. 设置 bootstrap 服务器配置
运行 Kafka Streams 应用程序时,必须提供 Kafka 代理服务器信息。如果您不提供此信息,则绑定器期望您在默认的localhost:9092
上运行代理。如果不是这种情况,则需要覆盖它。有几种方法可以做到这一点。
-
使用引导属性 -
spring.kafka.bootstrapServers
-
绑定器级别属性 -
spring.cloud.stream.kafka.streams.binder.brokers
对于绑定器级别属性,无论您是否使用通过常规 Kafka 绑定器提供的代理属性 - spring.cloud.stream.kafka.binder.brokers
,都没有关系。Kafka Streams 绑定器将首先检查是否设置了 Kafka Streams 绑定器特定的代理属性(spring.cloud.stream.kafka.streams.binder.brokers
),如果未找到,则查找spring.cloud.stream.kafka.binder.brokers
。
2.5. 记录序列化和反序列化
Kafka Streams 绑定器允许您通过两种方式序列化和反序列化记录。一种是 Kafka 提供的原生序列化和反序列化功能,另一种是 Spring Cloud Stream 框架的消息转换功能。让我们看看一些细节。
2.5.1. 入站反序列化
键始终使用原生 Serdes 反序列化。
对于值,默认情况下,入站的反序列化由 Kafka 本地执行。请注意,这是与 Kafka Streams 绑定器先前版本默认行为的一个重大变化,在先前版本中,反序列化是由框架完成的。
Kafka Streams 绑定器将尝试通过查看java.util.function.Function|Consumer
或StreamListener
的类型签名来推断匹配的Serde
类型。以下是它匹配 Serdes 的顺序。
-
如果应用程序提供类型为
Serde
的 bean,并且如果返回类型使用传入键或值类型的实际类型进行参数化,则它将使用该Serde
进行入站反序列化。例如,如果您在应用程序中具有以下内容,则绑定器检测到KStream
的传入值类型与在Serde
bean 上进行参数化的类型匹配。它将用于入站反序列化。
@Bean
public Serde<Foo() customSerde{
...
}
@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
-
接下来,它查看类型并查看它们是否是 Kafka Streams 公开的类型之一。如果是,则使用它们。以下是绑定器将尝试从 Kafka Streams 中匹配的 Serde 类型。
Integer, Long, Short, Double, Float, byte[], UUID and String.
-
如果 Kafka Streams 提供的任何 Serdes 都不匹配类型,则它将使用 Spring Kafka 提供的 JsonSerde。在这种情况下,绑定器假设类型是 JSON 友好的。如果您有多个值对象作为输入,这很有用,因为绑定器将在内部将它们推断为正确的 Java 类型。但在回退到
JsonSerde
之前,绑定器会检查 Kafka Streams 配置中设置的默认Serde
,以查看它是否是一个可以与传入 KStream 的类型匹配的Serde
。
如果上述策略均无效,则应用程序必须通过配置提供Serde
。这可以通过两种方式配置 - 绑定或默认。
首先,绑定器将查看是否在绑定级别提供了Serde
。例如,如果您有以下处理器,
@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
然后,您可以使用以下方法提供绑定级别的Serde
。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果您为每个输入绑定按上述方式提供Serde ,那么它将具有更高的优先级,并且绑定器将避免任何Serde 推断。 |
如果希望对入站反序列化使用默认的键/值 Serdes,可以在绑定器级别执行此操作。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您不希望使用 Kafka 提供的原生解码,则可以依靠 Spring Cloud Stream 提供的消息转换功能。由于原生解码是默认设置,因此为了让 Spring Cloud Stream 反序列化入站值对象,您需要显式禁用原生解码。
例如,如果您具有与上述相同的 BiFunction 处理器,则spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false
您需要为所有输入分别禁用原生解码。否则,对于您未禁用的那些输入,仍将应用原生解码。
默认情况下,Spring Cloud Stream 将使用application/json
作为内容类型并使用相应的 JSON 消息转换器。您可以使用以下属性和相应的MessageConverter
bean 使用自定义消息转换器。
spring.cloud.stream.bindings.process-in-0.contentType
2.5.2. 出站序列化
出站序列化与上述入站反序列化的规则基本相同。与入站反序列化一样,Spring Cloud Stream 的先前版本的一个主要变化是,出站的序列化由 Kafka 本地处理。在绑定器的 3.0 版本之前,这是由框架本身完成的。
出站上的键始终由 Kafka 使用绑定器推断出的匹配Serde
进行序列化。如果它无法推断键的类型,则需要使用配置进行指定。
值 Serdes 使用与入站反序列化相同的规则推断。首先,它匹配以查看出站类型是否来自应用程序中提供的 bean。如果没有,则检查它是否与 Kafka 公开的Serde
匹配,例如 - Integer
、Long
、Short
、Double
、Float
、byte[]
、UUID
和String
。如果这不起作用,则它将回退到 Spring Kafka 项目提供的JsonSerde
,但首先查看默认Serde
配置以查看是否存在匹配项。请记住,所有这些都对应用程序透明地进行。如果这些都不起作用,则用户必须通过配置提供要使用的Serde
。
假设您正在使用与上述相同的BiFunction
处理器。然后,您可以按如下方式配置出站键/值 Serdes。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果 Serde 推断失败,并且未提供绑定级别的 Serdes,则绑定器将回退到JsonSerde
,但会查找默认 Serdes 以进行匹配。
默认 serdes 的配置方式与上述反序列化中描述的相同。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您的应用程序使用分支功能并具有多个输出绑定,则必须为每个绑定配置这些绑定。再次,如果绑定器能够推断Serde
类型,则无需执行此配置。
如果您不希望使用 Kafka 提供的原生编码,但希望使用框架提供的消息转换,则需要显式禁用原生编码,因为原生编码是默认设置。例如,如果您具有与上述相同的 BiFunction 处理器,则spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false
在分支的情况下,您需要为所有输出分别禁用原生编码。否则,对于您未禁用的那些输出,仍将应用原生编码。
当 Spring Cloud Stream 完成转换时,默认情况下,它将使用application/json
作为内容类型并使用相应的 JSON 消息转换器。您可以使用以下属性和相应的MessageConverter
bean 使用自定义消息转换器。
spring.cloud.stream.bindings.process-out-0.contentType
禁用原生编码/解码后,绑定器将不会像原生 Serdes 那样进行任何推断。应用程序需要显式提供所有配置选项。因此,通常建议使用反序列化的默认选项,并在编写 Spring Cloud Stream Kafka Streams 应用程序时坚持使用 Kafka Streams 提供的原生反序列化/序列化。您必须使用框架提供的消息转换功能的一种情况是,上游生产者使用特定的序列化策略。在这种情况下,您希望使用匹配的反序列化策略,因为原生机制可能会失败。当依赖默认Serde
机制时,应用程序必须确保绑定器能够正确地将入站和出站与正确的Serde
映射,否则可能会失败。
值得一提的是,上面概述的数据反序列化/序列化方法仅适用于处理器的边缘,即入站和出站。您的业务逻辑可能仍然需要调用 Kafka Streams API,这些 API 明确需要Serde
对象。这些仍然是应用程序的责任,必须由开发人员相应地处理。
2.6. 错误处理
Apache Kafka Streams 提供了本地处理反序列化错误异常的功能。有关此支持的详细信息,请参阅此处。开箱即用,Apache Kafka Streams 提供两种反序列化异常处理程序 - LogAndContinueExceptionHandler
和LogAndFailExceptionHandler
。顾名思义,前者将记录错误并继续处理下一条记录,后者将记录错误并失败。LogAndFailExceptionHandler
是默认的反序列化异常处理程序。
2.6.1. 处理绑定器中的反序列化异常
Kafka Streams 绑定器允许使用以下属性指定上述反序列化异常处理程序。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
或
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail
除了上述两个反序列化异常处理程序之外,绑定器还提供第三个处理程序,用于将错误记录(毒丸)发送到 DLQ(死信队列)主题。以下是如何启用此 DLQ 异常处理程序。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq
设置上述属性后,反序列化错误中的所有记录都会自动发送到 DLQ 主题。
您可以设置发布 DLQ 消息的主题名称,如下所示。
您可以提供DlqDestinationResolver
的实现,它是一个函数式接口。DlqDestinationResolver
将ConsumerRecord
和异常作为输入,然后允许指定主题名称作为输出。通过访问 Kafka ConsumerRecord
,可以在BiFunction
的实现中内省标头记录。
以下是如何提供DlqDestinationResolver
的实现示例。
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
在为DlqDestinationResolver
提供实现时,需要牢记的一件重要事情是,绑定程序中的供应程序不会自动为应用程序创建主题。这是因为绑定程序无法推断实现可能发送到的所有 DLQ 主题的名称。因此,如果使用此策略提供 DLQ 名称,则应用程序负责确保事先创建这些主题。
如果应用程序中存在DlqDestinationResolver
作为 bean,则它将具有更高的优先级。如果您不想遵循此方法,而是希望使用配置提供静态 DLQ 名称,则可以设置以下属性。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)
如果设置了此属性,则错误记录将发送到主题custom-dlq
。如果应用程序未使用上述任何策略,则它将创建一个名为error.<input-topic-name>.<application-id>
的 DLQ 主题。例如,如果绑定的目标主题是inputTopic
,应用程序 ID 是process-applicationId
,则默认 DLQ 主题是error.inputTopic.process-applicationId
。如果您的目的是启用 DLQ,则始终建议为每个输入绑定显式创建 DLQ 主题。
2.6.2. 每个输入消费者绑定的 DLQ
属性spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
适用于整个应用程序。这意味着,如果同一应用程序中有多个函数或StreamListener
方法,则此属性将应用于所有这些方法。但是,如果您在一个处理器中有多个处理器或多个输入绑定,则可以使用绑定器为每个输入使用者绑定提供的更细粒度的 DLQ 控制。
如果您有以下处理器,
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
并且您只想在第一个输入绑定上启用 DLQ,并在第二个绑定上启用 logAndSkip,则可以在使用者上执行以下操作。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: logAndSkip
以这种方式设置反序列化异常处理程序的优先级高于在绑定器级别设置。
2.6.3. DLQ 分区
默认情况下,记录使用与原始记录相同的分区发布到死信主题。这意味着死信主题必须至少具有与原始记录一样多的分区。
要更改此行为,请将DlqPartitionFunction
实现作为@Bean
添加到应用程序上下文。只能存在一个这样的 bean。该函数将提供使用者组(在大多数情况下与应用程序 ID 相同)、失败的ConsumerRecord
和异常。例如,如果您始终希望路由到分区 0,则可以使用
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果将消费者绑定的dlqPartitions 属性设置为 1(并且绑定程序的minPartitionCount 等于1 ),则无需提供DlqPartitionFunction ;框架将始终使用分区 0。如果将消费者绑定的dlqPartitions 属性设置为大于1 的值(或绑定程序的minPartitionCount 大于1 ),则**必须**提供DlqPartitionFunction bean,即使分区数与原始主题相同。 |
在 Kafka Streams 绑定器中使用异常处理功能时,需要注意以下几点。
-
属性
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
适用于整个应用程序。这意味着,如果同一应用程序中有多个函数或StreamListener
方法,则此属性将应用于所有这些方法。 -
反序列化的异常处理与原生反序列化和框架提供的消息转换一致。
2.6.4. 处理绑定器中的生产异常
与上述反序列化异常处理程序的支持不同,绑定器没有为处理生产异常提供此类的一流机制。但是,您仍然可以使用StreamsBuilderFactoryBean
自定义程序配置生产异常处理程序,您可以在下面后续部分中找到有关此内容的更多详细信息。
2.7. 重试关键业务逻辑
在某些情况下,您可能希望重试对应用程序至关重要的业务逻辑的部分内容。可能存在对关系数据库的外部调用或从 Kafka Streams 处理器调用 REST 端点。这些调用可能因各种原因而失败,例如网络问题或远程服务不可用。更常见的是,如果您可以再次尝试这些操作,这些故障可能会自行解决。默认情况下,Kafka Streams 绑定器为所有输入绑定创建RetryTemplate
bean。
如果函数具有以下签名,
@Bean
public java.util.function.Consumer<KStream<Object, String>> process()
并且使用默认绑定名称,RetryTemplate
将注册为 process-in-0-RetryTemplate
。这遵循绑定名称 (process-in-0
) 后跟字面量 -RetryTemplate
的约定。在多个输入绑定的情况下,每个绑定都会有一个单独的 RetryTemplate
bean 可用。如果应用程序中存在自定义 RetryTemplate
bean,并通过 spring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName
提供,则该 bean 优先于任何输入绑定级别的重试模板配置属性。
一旦将绑定中的 RetryTemplate
注入到应用程序中,就可以使用它来重试应用程序的任何关键部分。以下是一个示例
@Bean
public java.util.function.Consumer<KStream<Object, String>> process(@Lazy @Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
retryTemplate.execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
或者您可以使用如下所示的自定义 RetryTemplate
。
@EnableAutoConfiguration
public static class CustomRetryTemplateApp {
@Bean
@StreamRetryTemplate
RetryTemplate fooRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
RetryPolicy retryPolicy = new SimpleRetryPolicy(4);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1);
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
fooRetryTemplate().execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
}
请注意,当重试次数耗尽时,默认情况下,将抛出最后一个异常,导致处理器终止。如果您希望处理异常并继续处理,则可以在 execute
方法中添加一个 RecoveryCallback:以下是一个示例。
retryTemplate.execute(context -> {
//Critical business logic goes here.
}, context -> {
//Recovery logic goes here.
return null;
));
有关 RetryTemplate、重试策略、回退策略等的更多信息,请参阅 Spring Retry 项目。
2.8. 状态存储
当使用高级 DSL 并进行触发状态存储的适当调用时,Kafka Streams 会自动创建状态存储。
如果要将传入的 KTable
绑定实现为命名状态存储,则可以使用以下策略。
假设您有以下函数。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
然后通过设置以下属性,传入的 KTable
数据将实现到命名状态存储中。
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store
您可以在应用程序中将自定义状态存储定义为 bean,绑定器会检测到这些 bean 并将其添加到 Kafka Streams 构建器中。特别是在使用处理器 API 时,您需要手动注册状态存储。为此,您可以在应用程序中创建 StateStore 作为 bean。以下是如何定义此类 bean 的示例。
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}
然后,应用程序可以直接访问这些状态存储。
在引导过程中,绑定器将处理上述 bean 并将其传递给 Streams 构建器对象。
访问状态存储
Processor<Object, Product>() {
WindowStore<Object, String> state;
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore)processorContext.getStateStore("mystate");
}
...
}
在注册全局状态存储时,此方法无效。要注册全局状态存储,请参阅下面关于自定义 StreamsBuilderFactoryBean
的部分。
2.9. 交互式查询
Kafka Streams 绑定器 API 公开了一个名为 InteractiveQueryService
的类,用于交互式查询状态存储。您可以在应用程序中将其作为 Spring bean 访问。从应用程序访问此 bean 的一种简单方法是 自动装配
bean。
@Autowired
private InteractiveQueryService interactiveQueryService;
获得对该 bean 的访问权限后,就可以查询您感兴趣的特定状态存储。请参见下文。
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
在启动期间,检索存储的上述方法调用可能会失败。例如,它可能仍在初始化状态存储的中间。在这种情况下,重试此操作将很有用。Kafka Streams 绑定器提供了一个简单的重试机制来适应这种情况。
以下是您可以用来控制此重试的两个属性。
-
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 默认值为
1
。 -
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 默认值为
1000
毫秒。
如果正在运行多个 Kafka Streams 应用程序实例,则在交互式查询它们之前,需要确定哪个应用程序实例托管了您正在查询的特定键。InteractiveQueryService
API 提供了用于识别主机信息的方法。
要使此功能正常工作,必须按如下所示配置属性 application.server
spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
以下是一些代码片段
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}
2.9.1. 通过 InteractiveQueryService 可用的其他 API 方法
使用以下 API 方法检索与给定存储和键的组合关联的 KeyQueryMetadata
对象。
public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)
使用以下 API 方法检索与给定存储和键的组合关联的 KakfaStreams
对象。
public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)
2.10. 健康指标
健康指标需要依赖项 spring-boot-starter-actuator
。对于 Maven,请使用
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
Spring Cloud Stream Kafka Streams 绑定器提供了一个健康指标来检查底层流线程的状态。Spring Cloud Stream 定义了一个属性 management.health.binders.enabled
来启用健康指标。请参阅 Spring Cloud Stream 文档。
健康指标为每个流线程的元数据提供以下详细信息
-
线程名称
-
线程状态:
CREATED
、RUNNING
、PARTITIONS_REVOKED
、PARTITIONS_ASSIGNED
、PENDING_SHUTDOWN
或DEAD
-
活动任务:任务 ID 和分区
-
备用任务:任务 ID 和分区
默认情况下,仅显示全局状态 (UP
或 DOWN
)。要显示详细信息,必须将属性 management.endpoint.health.show-details
设置为 ALWAYS
或 WHEN_AUTHORIZED
。有关健康信息的更多详细信息,请参阅 Spring Boot Actuator 文档。
如果所有已注册的 Kafka 线程都处于 RUNNING 状态,则健康指标的状态为 UP 。 |
由于 Kafka Streams 绑定器中有三个单独的绑定器 (KStream
、KTable
和 GlobalKTable
),因此所有绑定器都将报告健康状态。启用 show-details
时,报告的一些信息可能是冗余的。
当同一应用程序中存在多个 Kafka Streams 处理器时,将为所有处理器报告健康检查,并按 Kafka Streams 的应用程序 ID 进行分类。
2.11. 访问 Kafka Streams 指标
Spring Cloud Stream Kafka Streams 绑定器提供 Kafka Streams 指标,这些指标可以通过 Micrometer MeterRegistry
导出。
对于 Spring Boot 2.2.x 版本,指标支持通过绑定器提供的自定义 Micrometer 指标实现来提供。对于 Spring Boot 2.3.x 版本,Kafka Streams 指标支持通过 Micrometer 本地提供。
通过 Boot 执行器端点访问指标时,请确保将 metrics
添加到属性 management.endpoints.web.exposure.include
中。然后,您可以访问 /acutator/metrics
以获取所有可用指标的列表,然后可以通过相同的 URI (/actuator/metrics/<metric-name>
) 单独访问这些指标。
2.12. 混合使用高级 DSL 和低级处理器 API
Kafka Streams 提供了两种 API 变体。它具有更高级别的 DSL 类似 API,您可以在其中链接各种操作,这些操作可能对许多函数式程序员来说很熟悉。Kafka Streams 还允许访问低级处理器 API。处理器 API 虽然功能非常强大,并且能够在更低的级别控制事物,但本质上是命令式的。Kafka Streams 绑定器 for Spring Cloud Stream 允许您使用高级 DSL 或混合使用 DSL 和处理器 API。混合使用这两种变体为您提供了许多选项来控制应用程序中的各种用例。应用程序可以使用 transform
或 process
方法 API 调用来访问处理器 API。
以下是如何在使用 process
API 的 Spring Cloud Stream 应用程序中组合 DSL 和处理器 API 的示例。
@Bean
public Consumer<KStream<Object, String>> process() {
return input ->
input.process(() -> new Processor<Object, String>() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, String value) {
//business logic
}
@Override
public void close() {
});
}
以下是如何使用 transform
API 的示例。
@Bean
public Consumer<KStream<Object, String>> process() {
return (input, a) ->
input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void close() {
}
@Override
public KeyValue<Object, String> transform(Object key, String value) {
// business logic - return transformed KStream;
}
});
}
process
API 方法调用是终止操作,而 transform
API 是非终止操作,它为您提供了一个可能已转换的 KStream
,您可以使用它继续使用 DSL 或处理器 API 进行进一步处理。
2.13. 出站的分区支持
Kafka Streams 处理器通常将处理后的输出发送到出站 Kafka 主题。如果出站主题已分区,并且处理器需要将传出数据发送到特定分区,则应用程序需要提供类型为 StreamPartitioner
的 bean。有关更多详细信息,请参阅 StreamPartitioner。让我们看一些示例。
这是我们之前多次看到的同一个处理器,
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
...
}
这是输出绑定目标
spring.cloud.stream.bindings.process-out-0.destination: outputTopic
如果主题 outputTopic
有 4 个分区,如果您不提供分区策略,Kafka Streams 将使用默认分区策略,这可能不是您想要的输出,具体取决于特定用例。假设您希望将任何与 spring
匹配的键发送到分区 0、cloud
发送到分区 1、stream
发送到分区 2,以及其他所有内容发送到分区 3。您需要在应用程序中执行以下操作。
@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
return (t, k, v, n) -> {
if (k.equals("spring")) {
return 0;
}
else if (k.equals("cloud")) {
return 1;
}
else if (k.equals("stream")) {
return 2;
}
else {
return 3;
}
};
}
这是一个基本的实现,但是,您可以访问记录的键/值、主题名称和分区总数。因此,如果需要,您可以实现复杂的分区策略。
您还需要将此 bean 名称与应用程序配置一起提供。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner
应用程序中的每个输出主题都需要像这样单独配置。
2.14. StreamsBuilderFactoryBean 自定义器
通常需要自定义创建 KafkaStreams
对象的 StreamsBuilderFactoryBean
。基于 Spring Kafka 提供的底层支持,绑定器允许您自定义 StreamsBuilderFactoryBean
。您可以使用 StreamsBuilderFactoryBeanCustomizer
自定义 StreamsBuilderFactoryBean
本身。然后,一旦通过此自定义程序获得对 StreamsBuilderFactoryBean
的访问权限,就可以使用 KafkaStreamsCustomzier
自定义相应的 KafkaStreams
。这两个自定义程序都是 Spring for Apache Kafka 项目的一部分。
以下是如何使用 StreamsBuilderFactoryBeanCustomizer
的示例。
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
以上显示的是您可以执行的自定义 StreamsBuilderFactoryBean
操作的示例。您基本上可以调用 StreamsBuilderFactoryBean
中的任何可用变异操作来自定义它。此自定义程序将在工厂 bean 启动之前由绑定器调用。
获得对 StreamsBuilderFactoryBean
的访问权限后,还可以自定义底层的 KafkaStreams
对象。以下是如何执行此操作的蓝图。
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
KafkaStreamsCustomizer
将由 StreamsBuilderFactoryBeabn
在底层的 KafkaStreams
启动之前调用。
整个应用程序中只能有一个 StreamsBuilderFactoryBeanCustomizer
。那么,如何考虑多个 Kafka Streams 处理器,因为它们每个都由单独的 StreamsBuilderFactoryBean
对象支持?在这种情况下,如果这些处理器的自定义需要不同,则应用程序需要根据应用程序 ID 应用一些过滤器。
例如,
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
}
};
2.14.1. 使用自定义器注册全局状态存储
如上所述,绑定器没有提供将全局状态存储作为功能注册的第一类方法。为此,您需要使用自定义程序。以下是如何执行此操作。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
try {
final StreamsBuilder streamsBuilder = fb.getObject();
streamsBuilder.addGlobalStore(...);
}
catch (Exception e) {
}
};
}
同样,如果您有多个处理器,则需要通过使用上面概述的应用程序 ID 过滤掉其他 StreamsBuilderFactoryBean
对象,将全局状态存储附加到正确的 StreamsBuilder
。
2.14.2. 使用自定义器注册生产异常处理程序
在错误处理部分,我们指出绑定器没有提供处理生产异常的第一类方法。虽然情况确实如此,但您仍然可以使用 StreamsBuilderFacotryBean
自定义程序来注册生产异常处理程序。请参见下文。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class);
};
}
再次,如果您有多个处理器,您可能希望针对正确的 StreamsBuilderFactoryBean
适当地设置它。您也可以使用配置属性添加此类生产异常处理程序(有关详细信息,请参阅下文),但这是一种选择,如果您选择使用编程方法。
2.15. 时间戳提取器
Kafka Streams 允许您根据各种时间戳概念控制消费者记录的处理。默认情况下,Kafka Streams 提取嵌入在消费者记录中的时间戳元数据。您可以通过为每个输入绑定提供不同的 TimestampExtractor
实现来更改此默认行为。以下是如何执行此操作的一些详细信息。
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
return orderStream ->
customers ->
products -> orderStream;
}
@Bean
public TimestampExtractor timestampExtractor() {
return new WallclockTimestampExtractor();
}
然后,您为每个消费者绑定设置上述 TimestampExtractor
bean 名称。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"
如果您跳过输入消费者绑定以设置自定义时间戳提取器,则该消费者将使用默认设置。
2.16. 基于 Kafka Streams 的绑定器和常规 Kafka 绑定器的多绑定器
您可以构建一个应用程序,其中包含基于常规 Kafka 绑定器的函数/消费者/生产者和基于 Kafka Streams 的处理器。但是,您不能在一个函数或消费者中混合使用这两种组件。
以下是一个示例,其中包含在同一个应用程序中使用基于绑定器的组件。
@Bean
public Function<String, String> process() {
return s -> s;
}
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {
return input -> input;
}
这是配置中的相关部分。
spring.cloud.stream.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
如果您有与上述相同的应用程序,但它处理两个不同的 Kafka 集群,情况就会变得稍微复杂一些,例如,常规的 process
同时作用于 Kafka 集群 1 和集群 2(从集群 1 接收数据并发送到集群 2),而 Kafka Streams 处理器作用于 Kafka 集群 2。然后,您必须使用 Spring Cloud Stream 提供的 多绑定器 功能。
以下是在这种情况下配置可能发生的变化。
# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.function.definition=process;kstreamProcess
# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2
# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3
请注意上述配置。我们有两种绑定器,但总共有 3 个绑定器,第一个是基于集群 1 的常规 Kafka 绑定器 (kafka1
),然后是基于集群 2 的另一个 Kafka 绑定器 (kafka2
),最后是 kstream
绑定器 (kafka3
)。应用程序中的第一个处理器从 kafka1
接收数据并发布到 kafka2
,这两个绑定器都基于常规 Kafka 绑定器,但使用不同的集群。第二个处理器是 Kafka Streams 处理器,它从 kafka3
消费数据,kafka3
与 kafka2
使用相同的集群,但绑定器类型不同。
由于 Kafka Streams 绑定器系列中有三种不同的绑定器类型 - kstream
、ktable
和 globalktable
- 如果您的应用程序有多个基于这些绑定器中的任何一个的绑定,则需要显式提供绑定器类型。
例如,如果您有如下处理器:
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
...
}
那么,在多绑定器场景中,必须将其配置如下。请注意,这仅在您拥有真正的多绑定器场景时才需要,在这种场景中,单个应用程序中有多个处理器处理多个集群。在这种情况下,需要使用绑定及其绑定器类型和集群来明确区分不同处理器的绑定器。
spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1 #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2 #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3 #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream
# rest of the configuration is omitted.
2.17. 状态清理
默认情况下,当绑定停止时,会调用 Kafkastreams.cleanup()
方法。请参阅 Spring Kafka 文档。要修改此行为,只需向应用程序上下文添加单个 CleanupConfig
@Bean
(配置为在启动、停止或都不清理时清理);该 bean 将被检测到并连接到工厂 bean。
2.18. Kafka Streams 拓扑可视化
Kafka Streams 绑定器提供以下执行器端点,用于检索拓扑描述,您可以使用外部工具可视化该拓扑。
/actuator/kafkastreamstopology
/actuator/kafkastreamstopology/<application-id of the processor>
您需要包含来自 Spring Boot 的执行器和 Web 依赖项才能访问这些端点。此外,您还需要将 kafkastreamstopology
添加到 management.endpoints.web.exposure.include
属性中。默认情况下,kafkastreamstopology
端点处于禁用状态。
2.19. 配置选项
本节包含 Kafka Streams 绑定器使用的配置选项。
有关与绑定器相关的常见配置选项和属性,请参阅 核心文档。
2.19.1. Kafka Streams 绑定器属性
以下属性在绑定器级别可用,并且必须以 spring.cloud.stream.kafka.streams.binder.
为前缀。
- configuration
-
包含与 Apache Kafka Streams API 相关的属性的键/值对映射。此属性必须以
spring.cloud.stream.kafka.streams.binder.
为前缀。以下是一些使用此属性的示例。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
有关可能包含在流配置中的所有属性的更多信息,请参阅 Apache Kafka Streams 文档中的 StreamsConfig
JavaDocs。您可以通过此属性设置所有可从 StreamsConfig
设置的配置。使用此属性时,它适用于整个应用程序,因为这是一个绑定器级别的属性。如果应用程序中有多个处理器,则所有处理器都将获取这些属性。对于 application.id
等属性,这将成为问题,因此您必须仔细检查如何使用此绑定器级别的 configuration
属性映射来自 StreamsConfig
的属性。
- functions.<function-bean-name>.applicationId
-
仅适用于函数式处理器。这可用于为应用程序中的每个函数设置应用程序 ID。在多个函数的情况下,这是一种方便的方式来设置应用程序 ID。
- functions.<function-bean-name>.configuration
-
仅适用于函数式处理器。包含与 Apache Kafka Streams API 相关的属性的键/值对映射。这类似于上面描述的绑定器级别的
configuration
属性,但此级别的configuration
属性仅限于命名函数。当您有多个处理器并且想要根据特定函数限制对配置的访问时,您可能希望使用此方法。所有StreamsConfig
属性都可在此处使用。 - brokers
-
代理 URL
默认值:
localhost
- zkNodes
-
Zookeeper URL
默认值:
localhost
- deserializationExceptionHandler
-
反序列化错误处理程序类型。此处理程序应用于绑定器级别,因此应用于应用程序中的所有输入绑定。有一种方法可以在消费者绑定级别以更细粒度的方式控制它。可能的值包括 -
logAndContinue
、logAndFail
或sendToDlq
默认值:
logAndFail
- applicationId
-
一种方便的方法,可以在绑定器级别全局设置 Kafka Streams 应用程序的
application.id
。如果应用程序包含多个函数或StreamListener
方法,则应以不同的方式设置应用程序 ID。请参阅上面详细讨论设置应用程序 ID 的部分。默认值:应用程序将生成一个静态应用程序 ID。有关更多详细信息,请参阅应用程序 ID 部分。
- stateStoreRetry.maxAttempts
-
尝试连接到状态存储的最大次数。
默认值:1
- stateStoreRetry.backoffPeriod
-
重试时尝试连接到状态存储的回退时间段。
默认值:1000 毫秒
- consumerProperties
-
绑定器级别的任意消费者属性。
- producerProperties
-
绑定器级别的任意生产者属性。
2.19.2. Kafka Streams 生产者属性
以下属性仅适用于 Kafka Streams 生产者,并且必须以 spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.
为前缀。为方便起见,如果有多个输出绑定并且它们都需要一个公共值,则可以通过使用前缀 spring.cloud.stream.kafka.streams.default.producer.
进行配置。
- keySerde
-
要使用的键序列化器/反序列化器
默认值:请参阅上面关于消息序列化/反序列化的讨论
- valueSerde
-
要使用的值序列化器/反序列化器
默认值:请参阅上面关于消息序列化/反序列化的讨论
- useNativeEncoding
-
启用/禁用原生编码的标志
默认值:
true
。
streamPartitionerBeanName:要在消费者处使用的自定义输出分区程序 bean 名称。应用程序可以提供自定义 StreamPartitioner
作为 Spring bean,并且可以将此 bean 的名称提供给生产者以代替默认分区程序。
+ 默认值:请参阅上面关于输出分区支持的讨论。
- producedAs
-
处理器要生产到的接收器组件的自定义名称。
默认值:
none
(由 Kafka Streams 生成)
2.19.3. Kafka Streams 消费者属性
以下属性适用于 Kafka Streams 消费者,并且必须以 spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.
为前缀。为方便起见,如果有多个输入绑定并且它们都需要一个公共值,则可以通过使用前缀 spring.cloud.stream.kafka.streams.default.consumer.
进行配置。
- applicationId
-
为每个输入绑定设置
application.id
。这仅适用于基于StreamListener
的处理器,对于基于函数的处理器,请参阅上面概述的其他方法。默认值:请参阅上面。
- keySerde
-
要使用的键序列化器/反序列化器
默认值:请参阅上面关于消息序列化/反序列化的讨论
- valueSerde
-
要使用的值序列化器/反序列化器
默认值:请参阅上面关于消息序列化/反序列化的讨论
- materializedAs
-
使用传入的 KTable 类型时要物化的状态存储
默认值:
none
。 - useNativeDecoding
-
启用/禁用原生解码的标志
默认值:
true
。 - dlqName
-
DLQ 主题名称。
默认值:请参阅上面关于错误处理和 DLQ 的讨论。
- startOffset
-
如果没有已提交的偏移量可供消费,则要从中开始的偏移量。这主要用于消费者第一次从主题中消费数据时。Kafka Streams 使用
earliest
作为默认策略,绑定器使用相同的默认策略。可以使用此属性将其覆盖为latest
。默认值:
earliest
。
注意:在消费者上使用 resetOffsets
对 Kafka Streams 绑定器没有任何影响。与基于消息通道的绑定器不同,Kafka Streams 绑定器不会根据需要跳转到开头或结尾。
- deserializationExceptionHandler
-
反序列化错误处理程序类型。此处理程序应用于每个消费者绑定,而不是之前描述的绑定器级别属性。可能的值包括 -
logAndContinue
、logAndFail
或sendToDlq
默认值:
logAndFail
- timestampExtractorBeanName
-
要在消费者处使用的特定时间戳提取器 bean 名称。应用程序可以提供
TimestampExtractor
作为 Spring bean,并且可以将此 bean 的名称提供给消费者以代替默认提取器。默认值:请参阅上面关于时间戳提取器的讨论。
- eventTypes
-
此绑定的支持事件类型的逗号分隔列表。
默认值:
none
- eventTypeHeaderKey
-
通过此绑定传入的每个记录上的事件类型标头键。
默认值:
event_type
- consumedAs
-
处理器要从中消费数据的源组件的自定义名称。
默认值:
none
(由 Kafka Streams 生成)
2.19.4. 并发性的特别说明
在 Kafka Streams 中,您可以使用 num.stream.threads
属性控制处理器可以创建的线程数。您可以使用上面在绑定器、函数、生产者或消费者级别描述的各种 configuration
选项来执行此操作。您还可以为此目的使用核心 Spring Cloud Stream 提供的 concurrency
属性。使用此属性时,您需要在消费者上使用它。当您在函数或 StreamListener
中有多个输入绑定时,请在第一个输入绑定上设置此属性。例如,当设置 spring.cloud.stream.bindings.process-in-0.consumer.concurrency
时,绑定器会将其转换为 num.stream.threads
。如果您有多个处理器,并且一个处理器定义了绑定级别的并发性,而其他处理器没有,那么那些没有绑定级别的并发性的处理器将默认为通过 spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads
指定的绑定器范围属性。如果此绑定器配置不可用,则应用程序将使用 Kafka Streams 设置的默认值。
附录
附录 A:构建
A.1. 基本编译和测试
要构建源代码,您需要安装 JDK 1.7。
构建使用 Maven wrapper,因此您无需安装特定版本的 Maven。要启用测试,您应该在构建之前运行 Kafka 服务器 0.9 或更高版本。有关运行服务器的更多信息,请参见下文。
主要的构建命令是
$ ./mvnw clean install
您也可以添加 '-DskipTests',以避免运行测试。
您也可以自己安装 Maven(>=3.3.3)并在以下示例中使用 mvn 命令替换 ./mvnw 。如果您这样做,您可能还需要添加 -P spring ,如果您的本地 Maven 设置不包含 Spring 预发布构件的存储库声明。 |
请注意,您可能需要增加 Maven 可用的内存量,方法是设置一个 MAVEN_OPTS 环境变量,其值类似于 -Xmx512m -XX:MaxPermSize=128m 。我们尝试在 .mvn 配置中涵盖这一点,因此,如果您发现必须这样做才能使构建成功,请提交工单以将设置添加到源代码管理。 |
需要中间件的项目通常包含一个 docker-compose.yml
,因此请考虑使用 Docker Compose 在 Docker 容器中运行中间件服务器。
A.2. 文档
有一个“完整”配置文件将生成文档。
A.3. 使用代码
如果您没有 IDE 偏好,我们建议您在处理代码时使用 Spring Tools Suite 或 Eclipse。我们使用 m2eclipe Eclipse 插件来支持 Maven。其他 IDE 和工具也应该可以正常工作。
A.3.1. 使用 m2eclipse 导入到 eclipse 中
我们建议在使用 Eclipse 时使用 m2eclipe Eclipse 插件。如果您尚未安装 m2eclipse,则可以从“Eclipse 市场”获取。
不幸的是,m2e 尚未支持 Maven 3.3,因此,一旦将项目导入到 Eclipse 中,您还需要告诉 m2eclipse 使用项目的 .settings.xml
文件。如果不这样做,您可能会看到许多与项目中的 POM 相关的不同错误。打开您的 Eclipse 首选项,展开 Maven 首选项,然后选择用户设置。在“用户设置”字段中,单击“浏览”并导航到您导入的 Spring Cloud 项目,在该项目中选择 .settings.xml
文件。单击“应用”,然后单击“确定”以保存首选项更改。
或者,您可以将存储库设置从 .settings.xml 复制到您自己的 ~/.m2/settings.xml 中。 |
A.3.2. 在没有 m2eclipse 的情况下导入到 eclipse 中
如果您不想使用 m2eclipse,您可以使用以下命令生成 Eclipse 项目元数据
$ ./mvnw eclipse:eclipse
生成的 Eclipse 项目可以通过从“文件”菜单中选择“导入现有项目”来导入。
[[contributing] == 贡献
Spring Cloud 在非限制性 Apache 2.0 许可证下发布,并遵循非常标准的 Github 开发流程,使用 Github 跟踪器来处理问题并将拉取请求合并到 master 分支。如果您想贡献任何东西,即使是很小的东西,也请不要犹豫,但请遵循以下准则。
A.4. 签署贡献者许可协议
在我们接受非微不足道的补丁或拉取请求之前,我们将需要您签署 贡献者协议。签署贡献者协议不会授予任何人对主存储库的提交权限,但它确实意味着我们可以接受您的贡献,并且如果您这样做,您将获得作者署名。可能会要求活跃的贡献者加入核心团队,并赋予他们合并拉取请求的能力。
A.5. 代码约定和日常管理
这些都不是拉取请求所必需的,但它们都会有所帮助。它们也可以在原始拉取请求之后但在合并之前添加。
-
使用 Spring Framework 代码格式约定。如果您使用 Eclipse,则可以使用来自 Spring Cloud Build 项目的
eclipse-code-formatter.xml
文件导入格式化程序设置。如果使用 IntelliJ,则可以使用 Eclipse Code Formatter Plugin 导入相同的文件。 -
确保所有新的
.java
文件都具有简单的 Javadoc 类注释,至少包含一个@author
标记来识别您,最好至少包含一段关于该类的用途的文字。 -
将 ASF 许可证头注释添加到所有新的
.java
文件中(从项目中的现有文件中复制)。 -
将您自己添加为
@author
到您大幅修改(超过了 cosmetic 更改)的 .java 文件中。 -
添加一些 Javadoc,如果您更改了命名空间,则添加一些 XSD 文档元素。
-
一些单元测试也会有很大帮助——总得有人去做。
-
如果没有人使用您的分支,请将其重新定位到当前的 master 分支(或主项目中的其他目标分支)。
-
在编写提交消息时,请遵循 这些约定,如果您正在修复现有问题,请在提交消息的末尾添加
Fixes gh-XXXX
(其中 XXXX 是问题编号)。