技巧、窍门和配方
Kafka 简单 DLQ
问题陈述
作为开发者,我想编写一个从 Kafka 主题处理记录的消费者应用。但是,如果在处理过程中发生错误,我不希望应用完全停止。相反,我希望将出错的记录发送到 DLT(死信主题),然后继续处理新的记录。
解决方案
解决此问题的方法是使用 Spring Cloud Stream 中的 DLQ 功能。为了本次讨论的目的,我们假设以下是我们的处理器函数。
@Bean
public Consumer<byte[]> processData() {
return s -> {
throw new RuntimeException();
};
}
这是一个非常简单的函数,它对其处理的所有记录都抛出异常,但您可以采用此函数并将其扩展到任何其他类似情况。
为了将出错的记录发送到 DLT,我们需要提供以下配置。
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
为了激活 DLQ,应用必须提供一个组名。匿名消费者无法使用 DLQ 功能。我们还需要通过将 Kafka 消费者绑定的 enableDLQ
属性设置为 true
来启用 DLQ。最后,我们可以选择通过在 Kafka 消费者绑定上提供 dlqName
来指定 DLT 名称,否则在此例中它将默认为 error.input-topic.my-group
。
请注意,在上面提供的示例消费者中,负载的类型是 byte[]
。默认情况下,Kafka Binder 中的 DLQ 生产者期望负载类型为 byte[]
。如果不是这种情况,那么我们需要提供适当的序列化器配置。例如,让我们将消费者函数重写如下:
@Bean
public Consumer<String> processData() {
return s -> {
throw new RuntimeException();
};
}
现在,我们需要告诉 Spring Cloud Stream,当写入 DLT 时,我们希望如何序列化数据。以下是针对此场景修改后的配置:
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
带高级重试选项的 DLQ
解决方案
如果您遵循了上面的配方,那么当处理遇到错误时,您将获得内置于 Kafka Binder 中的默认重试选项。
默认情况下,Binder 会重试最多 3 次,初始延迟为一秒,每次回退的乘数为 2.0,最大延迟为 10 秒。您可以按如下方式更改所有这些配置:
spring.cloud.stream.bindings.processData-in-0.consumer.maxAttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultiplier
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval
如果您愿意,还可以通过提供一个布尔值映射来提供可重试异常的列表。例如:
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false
默认情况下,任何未在上述映射中列出的异常都将重试。如果不需要这样,则可以通过提供以下配置来禁用它:
spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false
您还可以提供自己的 RetryTemplate
,并将其标记为 @StreamRetryTemplate
,Binder 将会扫描并使用它。这对于您想要更复杂的重试策略和策略时很有用。
如果您有多个 @StreamRetryTemplate
bean,则可以使用以下属性指定您的绑定需要哪一个:
spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>
处理带有 DLQ 的反序列化错误
解决方案
当 Kafka 消费者抛出不可恢复的反序列化异常时,Spring Cloud Stream 提供的常规 DLQ 机制将无济于事。这是因为此异常甚至在消费者方法的 poll()
返回之前就发生了。Spring for Apache Kafka 项目提供了一些很好的方法来帮助 Binder 处理这种情况。让我们来探讨一下。
假设这是我们的函数:
@Bean
public Consumer<String> functionName() {
return s -> {
System.out.println(s);
};
}
这是一个采用 String
参数的简单函数。
我们希望绕过 Spring Cloud Stream 提供的消息转换器,转而使用原生的反序列化器。对于 String
类型,这样做意义不大,但对于 AVRO 等更复杂的类型,您必须依赖外部反序列化器,因此希望将转换委托给 Kafka。
现在当消费者接收到数据时,假设有一个导致反序列化错误的坏记录,例如有人传递了一个 Integer
而不是 String
。在这种情况下,如果您不在应用中做任何处理,异常将沿着调用链传播,最终导致您的应用退出。
为了处理这种情况,您可以添加一个 ListenerContainerCustomizer
的 @Bean
,它配置了一个 DefaultErrorHandler
。这个 DefaultErrorHandler
配置了一个 DeadLetterPublishingRecoverer
。我们还需要为消费者配置一个 ErrorHandlingDeserializer
。听起来很复杂,但实际上,在这种情况下,它归结为这 3 个 bean:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setCommonErrorHandler(errorHandler);
};
}
@Bean
public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
让我们分析一下它们。第一个是 ListenerContainerCustomizer
bean,它接受一个 DefaultErrorHandler
。容器现在会使用特定的错误处理器进行定制。您可以在这里了解更多关于容器定制的信息。
第二个 bean 是 DefaultErrorHandler
,它配置为发布到 DLT
。有关 DefaultErrorHandler
的更多详细信息,请参阅这里。
第三个 bean 是 DeadLetterPublishingRecoverer
,它最终负责发送到 DLT
。默认情况下,DLT
主题的名称为 ORIGINAL_TOPIC_NAME.DLT。不过您可以更改它。有关更多详细信息,请参阅文档。
我们还需要通过应用配置来配置一个ErrorHandlingDeserializer。
ErrorHandlingDeserializer
将委托给实际的反序列化器。如果发生错误,它将记录的 key/value 设置为 null,并包含消息的原始字节。然后它会在 header 中设置异常,并将此记录传递给监听器,监听器随后会调用注册的错误处理器。
以下是所需的配置:
spring.cloud.stream:
function:
definition: functionName
bindings:
functionName-in-0:
group: group-name
destination: input-topic
consumer:
use-native-decoding: true
kafka:
bindings:
functionName-in-0:
consumer:
enableDlq: true
dlqName: dlq-topic
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
configuration:
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
我们通过绑定上的 configuration
属性提供了 ErrorHandlingDeserializer
。我们还指出实际委托的反序列化器是 StringDeserializer
。
请记住,上面的任何 dlq 属性都与本配方中的讨论无关。它们纯粹是为了处理任何应用级别的错误。
Kafka Binder 中的基本 offset 管理
解决方案
我们鼓励您阅读关于此主题的文档部分,以获得全面的理解。
以下是其要点:
Kafka 默认支持两种类型的起始 offset - earliest
和 latest
。它们的语义从名称中就可以看出。
假设您是第一次运行消费者。如果在您的 Spring Cloud Stream 应用中缺少 group.id,那么它就会成为一个匿名消费者。无论何时您有一个匿名消费者,Spring Cloud Stream 应用默认都会从主题分区中可用的 latest
offset 开始。另一方面,如果您明确指定了 group.id,则 Spring Cloud Stream 应用默认会从主题分区中可用的 earliest
offset 开始。
在上述两种情况(具有显式组的消费者和匿名组的消费者)下,可以使用属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset
将起始 offset 切换为 earliest
或 latest
。
现在,假设您之前已经运行过消费者,现在再次启动它。在这种情况下,上述情况中的起始 offset 语义不再适用,因为消费者会找到一个已经为消费者组提交的 offset(对于匿名消费者,尽管应用没有提供 group.id,但 Binder 会自动为您生成一个)。它只会从最后提交的 offset 开始。即使您提供了 startOffset
值,情况也是如此。
但是,您可以使用 resetOffsets
属性来覆盖消费者从上次提交的 offset 开始的默认行为。要做到这一点,将属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets
设置为 true
(默认为 false
)。然后确保您提供了 startOffset
值(可以是 earliest
或 latest
)。当您这样做并启动消费者应用时,每次启动都会像第一次启动一样,忽略分区的所有已提交 offset。
在 Kafka 中定位到任意 offset
问题陈述
使用 Kafka Binder,我知道它可以将 offset 设置为 earliest
或 latest
,但我需要将 offset 定位到中间的某个任意 offset。Spring Cloud Stream Kafka Binder 有什么办法实现这一点吗?
解决方案
之前我们看到了 Kafka Binder 如何处理基本的 offset 管理。默认情况下,Binder 不允许您回退到任意 offset,至少通过我们在该配方中看到的机制不行。但是,Binder 提供了一些底层策略来实现此用例。让我们来探讨一下。
首先,当您想要重置到除 earliest
或 latest
之外的任意 offset 时,请确保将 resetOffsets
配置保留其默认值 false
。然后,您必须提供一个类型为 KafkaBindingRebalanceListener
的自定义 bean,它将被注入到所有消费者绑定中。这是一个接口,带有一些默认方法,但我们关注的是以下方法:
/**
* Invoked when partitions are initially assigned or after a rebalance. Applications
* might only want to perform seek operations on an initial assignment. While the
* 'initial' argument is true for each thread (when concurrency is greater than 1),
* implementations should keep track of exactly which partitions have been sought.
* There is a race in that a rebalance could occur during startup and so a topic/
* partition that has been sought on one thread may be re-assigned to another
* thread and you may not wish to re-seek it at that time.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment on the current thread.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial) {
// do nothing
}
让我们看看详细信息。
本质上,此方法将在主题分区的初始分配期间或再均衡后每次调用。为了更好地说明,假设我们的主题是 foo
,它有 4 个分区。最初,我们只在该组中启动一个消费者,此消费者将消费所有分区。当消费者第一次启动时,所有 4 个分区都会获得初始分配。但是,我们不想让分区从默认值(因为我们定义了组,所以是 earliest
)开始消费,而是希望每个分区在定位到任意 offset 后开始消费。想象一下,您有一个业务场景需要从以下特定 offset 开始消费:
Partition start offset
0 1000
1 2000
2 2000
3 1000
这可以通过如下实现上述方法来实现:
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);
if (initial) {
partitions.forEach(tp -> {
if (topicPartitionOffset.containsKey(tp)) {
final Long offset = topicPartitionOffset.get(tp);
try {
consumer.seek(tp, offset);
}
catch (Exception e) {
// Handle exceptions carefully.
}
}
});
}
}
这只是一个简单的实现。实际的用例比这复杂得多,您需要根据情况进行调整,但这无疑为您提供了一个基本框架。当消费者 seek
失败时,可能会抛出一些运行时异常,您需要决定在这些情况下如何处理。
[[what-if-we-start-a-second-consumer-with-the-same-group-id?]] === 如果我们启动第二个具有相同组 ID 的消费者怎么办?
当我们添加第二个消费者时,会发生再均衡,并且一些分区将移位。假设新消费者获得分区 2
和 3
。当这个新的 Spring Cloud Stream 消费者调用 onPartitionsAssigned
方法时,它会看到这是此消费者上分区 2
和 3
的初始分配。因此,由于对 initial
参数进行了条件检查,它将执行 seek 操作。对于第一个消费者,它现在只有分区 0
和 1
。但是,对于此消费者而言,这仅仅是一个再均衡事件,不被视为初始分配。因此,由于对 initial
参数进行了条件检查,它将不会重新定位到给定的 offset。
[[how-do-i-manually-acknowledge-using-kafka-binder?]] == 如何使用 Kafka Binder 手动确认?
解决方案
默认情况下,Kafka Binder 委托给 Spring for Apache Kafka 项目中的默认提交设置。Spring Kafka 中的默认 ackMode
是 batch
。有关更多详细信息,请参阅这里。
在某些情况下,您需要禁用此默认提交行为,并依赖手动提交。以下步骤允许您这样做。
将属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode
设置为 MANUAL
或 MANUAL_IMMEDIATE
。当这样设置时,消费者方法接收到的消息中将包含一个名为 kafka_acknowledgment
(来自 KafkaHeaders.ACKNOWLEDGMENT
)的 header。
例如,假设这是您的消费者方法:
@Bean
public Consumer<Message<String>> myConsumer() {
return msg -> {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
};
}
然后将属性 spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode
设置为 MANUAL
或 MANUAL_IMMEDIATE
。
[[how-do-i-override-the-default-binding-names-in-spring-cloud-stream?]] == 如何在 Spring Cloud Stream 中覆盖默认绑定名称?
解决方案
假设以下是您的函数签名:
@Bean
public Function<String, String> uppercase(){
...
}
默认情况下,Spring Cloud Stream 将按如下方式创建绑定:
-
uppercase-in-0
-
uppercase-out-0
您可以使用以下属性将这些绑定覆盖为其他名称:
spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out
在此之后,所有绑定属性都必须在新名称 my-transformer-in
和 my-transformer-out
上设置。
这是另一个带有 Kafka Streams 和多个输入的示例:
@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}
默认情况下,Spring Cloud Stream 将为此函数创建三个不同的绑定名称:
-
processOrder-in-0
-
processOrder-in-1
-
processOrder-out-0
每次要对此绑定设置某些配置时,都必须使用这些绑定名称。您不喜欢这样,并且希望使用更符合领域友好的、更易读的绑定名称,例如:
-
orders
-
accounts
-
enrichedOrders
您只需设置这三个属性即可轻松实现:
-
spring.cloud.stream.function.bindings.processOrder-in-0=orders
-
spring.cloud.stream.function.bindings.processOrder-in-1=accounts
-
spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders
完成此操作后,它将覆盖默认的绑定名称,您要设置的任何属性都必须基于这些新的绑定名称。
[[how-do-i-send-a-message-key-as-part-of-my-record?]] == 如何在我的记录中包含消息 key?
解决方案
这非常容易做到。以下是实现此目的的基本蓝图,但您可能需要根据您的具体用例进行调整。
以下是示例生产者方法(即 Supplier
):
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
这是一个简单的函数,它发送一个带有 String
payload 但也带有 key 的消息。请注意,我们使用 KafkaHeaders.MESSAGE_KEY
将 key 设置为消息 header。
如果您想更改默认的 key(默认为 kafka_messageKey
),则需要在配置中指定此属性:
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']
请注意,我们使用绑定名称 supplier-out-0
,因为这是我们的函数名称,请相应更新。
然后,我们在生成消息时使用这个新的 key。
[[how-do-i-use-native-serializer-and-deserializer-instead-of-message-conversion-done-by-spring-cloud-stream?]] == 如何使用原生的序列化器和反序列化器,而不是 Spring Cloud Stream 的消息转换?
问题陈述
我不想使用 Spring Cloud Stream 中的消息转换器,而是想使用 Kafka 中的原生 Serializer 和 Deserializer。默认情况下,Spring Cloud Stream 使用其内置的消息转换器来处理此转换。我如何绕过它并将责任委托给 Kafka?
解决方案
这真的很容易做到。
您只需提供以下属性即可启用原生序列化:
spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true
然后,您还需要设置序列化器。有两种方法可以做到这一点。
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
或者使用 Binder 配置:
spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
使用 Binder 方式时,它应用于所有绑定,而在绑定级别设置它们则针对每个绑定。
在反序列化方面,您只需将反序列化器作为配置提供即可。
例如:
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
您也可以在 Binder 级别设置它们。
有一个可选属性,您可以设置它来强制进行原生解码。
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true
但是,对于 Kafka Binder,这是不必要的,因为它到达 Binder 时,Kafka 已经使用配置的反序列化器对其进行了反序列化。
解释 Kafka Streams Binder 中的 offset 重置如何工作
问题陈述
默认情况下,Kafka Streams Binder 对于新消费者始终从 earliest
offset 开始。有时,应用程序需要从 latest
offset 开始,这可能是有益的或必需的。Kafka Streams Binder 允许您做到这一点。
解决方案
在我们查看解决方案之前,让我们看看以下场景。
@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
(s, t) -> s.join(t, ...)
...
}
我们有一个需要两个输入绑定的 BiConsumer
bean。在这种情况下,第一个绑定用于 KStream
,第二个绑定用于 KTable
。当第一次运行此应用程序时,默认情况下,两个绑定都从 earliest
offset 开始。如果由于某些要求,我只想从 latest
offset 开始怎么办?您可以通过启用以下属性来实现此目的。
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest
如果您只想让一个绑定从 latest
offset 开始,而另一个绑定从默认的 earliest
开始,则将后一个绑定排除在配置之外即可。
请记住,一旦存在已提交的 offset,这些设置就将 **不** 会生效,并且已提交的 offset 具有优先权。
跟踪 Kafka 中记录成功发送(生产)的情况
解决方案
假设我们在应用程序中有以下 Supplier:
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
然后,我们需要定义一个新的 MessageChannel
bean 来捕获所有成功的发送信息。
@Bean
public MessageChannel fooRecordChannel() {
return new DirectChannel();
}
接下来,在应用程序配置中定义此属性,为 recordMetadataChannel
提供 bean 名称。
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel
此时,成功的发送信息将发送到 fooRecordChannel
。
您可以按如下方式编写 IntegrationFlow
以查看信息。
@Bean
public IntegrationFlow integrationFlow() {
return f -> f.channel("fooRecordChannel")
.handle((payload, messageHeaders) -> payload);
}
在 handle
方法中,payload 是发送到 Kafka 的内容,消息 header 中包含一个特殊 key kafka_recordMetadata
。其值是一个 RecordMetadata
,包含主题分区、当前 offset 等信息。
在 Kafka 中添加自定义 header mapper
解决方案
正常情况下,这应该没有问题。
想象一下,您有以下生产者:
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}
在消费者端,您仍然应该看到 header "foo",并且以下代码不应该有任何问题:
@Bean
public Consumer<Message<String>> consume() {
return s -> {
final String foo = (String)s.getHeaders().get("foo");
System.out.println(foo);
};
}
如果您在应用程序中提供了自定义 header mapper,那么这将不起作用。假设您在应用程序中有一个空的 KafkaHeaderMapper
:
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
}
};
}
如果这是您的实现,那么您在消费者端将丢失 foo
header。很可能,您在这些 KafkaHeaderMapper
方法内部有一些逻辑。您需要以下代码来填充 foo
header:
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String foo = (String) headers.get("foo");
target.add("foo", foo.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header foo = source.lastHeader("foo");
target.put("foo", new String(foo.value()));
}
}
这将正确地将 foo
header 从生产者端传递到消费者端。
关于 id header 的特别说明
在 Spring Cloud Stream 中,id
header 是一个特殊的 header,但有些应用程序可能希望有特殊的自定义 id header - 例如 custom-id
或 ID
或 Id
。第一个 (custom-id
) 将在没有自定义 header mapper 的情况下从生产者传播到消费者。但是,如果您使用框架保留的 id
header 的变体(例如 ID
, Id
, iD
等)进行生产,则会遇到框架内部的问题。有关此用例的更多上下文,请参阅此Stack Overflow 帖子。在这种情况下,您必须使用自定义 KafkaHeaderMapper
来映射区分大小写的 id header。例如,假设您有以下生产者:
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}
上面示例中的 Id
header 将会从消费端丢失,因为它与框架的 id
header 冲突。您可以提供一个自定义 KafkaHeaderMapper
来解决此问题。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String myId = (String) headers.get("Id");
target.add("Id", myId.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header Id = source.lastHeader("Id");
target.put("Id", new String(Id.value()));
}
};
}
通过这样做,id
和 Id
header 都将从生产者端可用于消费者端。
事务中生产到多个主题
解决方案
使用 Kafka Binder 中的事务支持进行事务处理,然后提供一个 AfterRollbackProcessor
。为了生产到多个主题,使用 StreamBridge
API。
以下是实现此目的的代码片段:
@Autowired
StreamBridge bridge;
@Bean
Consumer<String> input() {
return str -> {
System.out.println(str);
this.bridge.send("left", str.toUpperCase());
this.bridge.send("right", str.toLowerCase());
if (str.equals("Fail")) {
throw new RuntimeException("test");
}
};
}
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
return (container, dest, group) -> {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
container.setAfterRollbackProcessor(rollbackProcessor);
};
}
DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
return new DefaultAfterRollbackProcessor<>(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}
所需配置
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right
spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1
为了测试,您可以使用以下配置:
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
System.in.read();
template.send("input", "Fail".getBytes());
template.send("input", "Good".getBytes());
};
}
一些重要说明
请确保您的应用程序配置中没有 DLQ 设置,因为我们手动配置 DLT(默认情况下它将发布到名为 input.DLT
的主题,基于初始消费者函数)。此外,将消费者绑定的 maxAttempts
重置为 1
,以避免 Binder 进行重试。在上面的示例中,总共会尝试最多 3 次(初始尝试 + FixedBackoff
中的 2 次尝试)。
有关如何测试此代码的更多详细信息,请参阅Stack Overflow 帖子。如果您正在使用 Spring Cloud Stream 通过添加更多消费者函数来测试它,请确保将消费者绑定上的 isolation-level
设置为 read-committed
。
这篇Stack Overflow 帖子也与此讨论相关。
运行多个 pollable 消费者时要避免的陷阱
解决方案
假设我有以下定义:
spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group
运行应用程序时,Kafka 消费者会生成一个 client.id(类似于 consumer-my-group-1
)。对于运行的每个应用程序实例,此 client.id
将是相同的,从而导致意外问题。
为了解决此问题,您可以在应用程序的每个实例上添加以下属性
spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}
有关更多详细信息,请参阅此 GitHub 问题。