技巧、窍门和配方

Kafka 简单 DLQ

问题陈述

作为开发者,我想编写一个从 Kafka 主题处理记录的消费者应用。但是,如果在处理过程中发生错误,我不希望应用完全停止。相反,我希望将出错的记录发送到 DLT(死信主题),然后继续处理新的记录。

解决方案

解决此问题的方法是使用 Spring Cloud Stream 中的 DLQ 功能。为了本次讨论的目的,我们假设以下是我们的处理器函数。

@Bean
public Consumer<byte[]> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

这是一个非常简单的函数,它对其处理的所有记录都抛出异常,但您可以采用此函数并将其扩展到任何其他类似情况。

为了将出错的记录发送到 DLT,我们需要提供以下配置。

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq

为了激活 DLQ,应用必须提供一个组名。匿名消费者无法使用 DLQ 功能。我们还需要通过将 Kafka 消费者绑定的 enableDLQ 属性设置为 true 来启用 DLQ。最后,我们可以选择通过在 Kafka 消费者绑定上提供 dlqName 来指定 DLT 名称,否则在此例中它将默认为 error.input-topic.my-group

请注意,在上面提供的示例消费者中,负载的类型是 byte[]。默认情况下,Kafka Binder 中的 DLQ 生产者期望负载类型为 byte[]。如果不是这种情况,那么我们需要提供适当的序列化器配置。例如,让我们将消费者函数重写如下:

@Bean
public Consumer<String> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

现在,我们需要告诉 Spring Cloud Stream,当写入 DLT 时,我们希望如何序列化数据。以下是针对此场景修改后的配置:

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq
         dlqProducerProperties:
           configuration:
             value.serializer: org.apache.kafka.common.serialization.StringSerializer

带高级重试选项的 DLQ

问题陈述

这与上面的配方类似,但作为开发者,我希望配置重试的处理方式。

解决方案

如果您遵循了上面的配方,那么当处理遇到错误时,您将获得内置于 Kafka Binder 中的默认重试选项。

默认情况下,Binder 会重试最多 3 次,初始延迟为一秒,每次回退的乘数为 2.0,最大延迟为 10 秒。您可以按如下方式更改所有这些配置:

spring.cloud.stream.bindings.processData-in-0.consumer.maxAttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultiplier
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval

如果您愿意,还可以通过提供一个布尔值映射来提供可重试异常的列表。例如:

spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false

默认情况下,任何未在上述映射中列出的异常都将重试。如果不需要这样,则可以通过提供以下配置来禁用它:

spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false

您还可以提供自己的 RetryTemplate,并将其标记为 @StreamRetryTemplate,Binder 将会扫描并使用它。这对于您想要更复杂的重试策略和策略时很有用。

如果您有多个 @StreamRetryTemplate bean,则可以使用以下属性指定您的绑定需要哪一个:

spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>

处理带有 DLQ 的反序列化错误

问题陈述

我的处理器在 Kafka 消费者中遇到了反序列化异常。我期望 Spring Cloud Stream DLQ 机制能够捕获这种情况,但它没有。我该如何处理?

解决方案

当 Kafka 消费者抛出不可恢复的反序列化异常时,Spring Cloud Stream 提供的常规 DLQ 机制将无济于事。这是因为此异常甚至在消费者方法的 poll() 返回之前就发生了。Spring for Apache Kafka 项目提供了一些很好的方法来帮助 Binder 处理这种情况。让我们来探讨一下。

假设这是我们的函数:

@Bean
public Consumer<String> functionName() {
    return s -> {
        System.out.println(s);
    };
}

这是一个采用 String 参数的简单函数。

我们希望绕过 Spring Cloud Stream 提供的消息转换器,转而使用原生的反序列化器。对于 String 类型,这样做意义不大,但对于 AVRO 等更复杂的类型,您必须依赖外部反序列化器,因此希望将转换委托给 Kafka。

现在当消费者接收到数据时,假设有一个导致反序列化错误的坏记录,例如有人传递了一个 Integer 而不是 String。在这种情况下,如果您不在应用中做任何处理,异常将沿着调用链传播,最终导致您的应用退出。

为了处理这种情况,您可以添加一个 ListenerContainerCustomizer@Bean,它配置了一个 DefaultErrorHandler。这个 DefaultErrorHandler 配置了一个 DeadLetterPublishingRecoverer。我们还需要为消费者配置一个 ErrorHandlingDeserializer。听起来很复杂,但实际上,在这种情况下,它归结为这 3 个 bean:

	@Bean
	public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
		return (container, dest, group) -> {
			container.setCommonErrorHandler(errorHandler);
		};
	}
	@Bean
	public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
		return new DefaultErrorHandler(deadLetterPublishingRecoverer);
	}
	@Bean
	public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
		return new DeadLetterPublishingRecoverer(bytesTemplate);
	}

让我们分析一下它们。第一个是 ListenerContainerCustomizer bean,它接受一个 DefaultErrorHandler。容器现在会使用特定的错误处理器进行定制。您可以在这里了解更多关于容器定制的信息。

第二个 bean 是 DefaultErrorHandler,它配置为发布到 DLT。有关 DefaultErrorHandler 的更多详细信息,请参阅这里

第三个 bean 是 DeadLetterPublishingRecoverer,它最终负责发送到 DLT。默认情况下,DLT 主题的名称为 ORIGINAL_TOPIC_NAME.DLT。不过您可以更改它。有关更多详细信息,请参阅文档

我们还需要通过应用配置来配置一个ErrorHandlingDeserializer

ErrorHandlingDeserializer 将委托给实际的反序列化器。如果发生错误,它将记录的 key/value 设置为 null,并包含消息的原始字节。然后它会在 header 中设置异常,并将此记录传递给监听器,监听器随后会调用注册的错误处理器。

以下是所需的配置:

spring.cloud.stream:
  function:
    definition: functionName
  bindings:
    functionName-in-0:
      group: group-name
      destination: input-topic
      consumer:
       use-native-decoding: true
  kafka:
    bindings:
      functionName-in-0:
        consumer:
          enableDlq: true
          dlqName: dlq-topic
          dlqProducerProperties:
            configuration:
              value.serializer: org.apache.kafka.common.serialization.StringSerializer
          configuration:
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

我们通过绑定上的 configuration 属性提供了 ErrorHandlingDeserializer。我们还指出实际委托的反序列化器是 StringDeserializer

请记住,上面的任何 dlq 属性都与本配方中的讨论无关。它们纯粹是为了处理任何应用级别的错误。

Kafka Binder 中的基本 offset 管理

问题陈述

我想编写一个 Spring Cloud Stream Kafka 消费者应用,但不确定它是如何管理 Kafka 消费者 offset 的。您能解释一下吗?

解决方案

我们鼓励您阅读关于此主题的文档部分,以获得全面的理解。

以下是其要点:

Kafka 默认支持两种类型的起始 offset - earliestlatest。它们的语义从名称中就可以看出。

假设您是第一次运行消费者。如果在您的 Spring Cloud Stream 应用中缺少 group.id,那么它就会成为一个匿名消费者。无论何时您有一个匿名消费者,Spring Cloud Stream 应用默认都会从主题分区中可用的 latest offset 开始。另一方面,如果您明确指定了 group.id,则 Spring Cloud Stream 应用默认会从主题分区中可用的 earliest offset 开始。

在上述两种情况(具有显式组的消费者和匿名组的消费者)下,可以使用属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset 将起始 offset 切换为 earliestlatest

现在,假设您之前已经运行过消费者,现在再次启动它。在这种情况下,上述情况中的起始 offset 语义不再适用,因为消费者会找到一个已经为消费者组提交的 offset(对于匿名消费者,尽管应用没有提供 group.id,但 Binder 会自动为您生成一个)。它只会从最后提交的 offset 开始。即使您提供了 startOffset 值,情况也是如此。

但是,您可以使用 resetOffsets 属性来覆盖消费者从上次提交的 offset 开始的默认行为。要做到这一点,将属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets 设置为 true(默认为 false)。然后确保您提供了 startOffset 值(可以是 earliestlatest)。当您这样做并启动消费者应用时,每次启动都会像第一次启动一样,忽略分区的所有已提交 offset。

在 Kafka 中定位到任意 offset

问题陈述

使用 Kafka Binder,我知道它可以将 offset 设置为 earliestlatest,但我需要将 offset 定位到中间的某个任意 offset。Spring Cloud Stream Kafka Binder 有什么办法实现这一点吗?

解决方案

之前我们看到了 Kafka Binder 如何处理基本的 offset 管理。默认情况下,Binder 不允许您回退到任意 offset,至少通过我们在该配方中看到的机制不行。但是,Binder 提供了一些底层策略来实现此用例。让我们来探讨一下。

首先,当您想要重置到除 earliestlatest 之外的任意 offset 时,请确保将 resetOffsets 配置保留其默认值 false。然后,您必须提供一个类型为 KafkaBindingRebalanceListener 的自定义 bean,它将被注入到所有消费者绑定中。这是一个接口,带有一些默认方法,但我们关注的是以下方法:

/**
	 * Invoked when partitions are initially assigned or after a rebalance. Applications
	 * might only want to perform seek operations on an initial assignment. While the
	 * 'initial' argument is true for each thread (when concurrency is greater than 1),
	 * implementations should keep track of exactly which partitions have been sought.
	 * There is a race in that a rebalance could occur during startup and so a topic/
	 * partition that has been sought on one thread may be re-assigned to another
	 * thread and you may not wish to re-seek it at that time.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 * @param initial true if this is the initial assignment on the current thread.
	 */
	default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
			Collection<TopicPartition> partitions, boolean initial) {
		// do nothing
	}

让我们看看详细信息。

本质上,此方法将在主题分区的初始分配期间或再均衡后每次调用。为了更好地说明,假设我们的主题是 foo,它有 4 个分区。最初,我们只在该组中启动一个消费者,此消费者将消费所有分区。当消费者第一次启动时,所有 4 个分区都会获得初始分配。但是,我们不想让分区从默认值(因为我们定义了组,所以是 earliest)开始消费,而是希望每个分区在定位到任意 offset 后开始消费。想象一下,您有一个业务场景需要从以下特定 offset 开始消费:

Partition   start offset

0           1000
1           2000
2           2000
3           1000

这可以通过如下实现上述方法来实现:

@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {

    Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
    topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
    topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);

    if (initial) {
        partitions.forEach(tp -> {
            if (topicPartitionOffset.containsKey(tp)) {
                final Long offset = topicPartitionOffset.get(tp);
                try {
                    consumer.seek(tp, offset);
                }
                catch (Exception e) {
                    // Handle exceptions carefully.
                }
            }
        });
    }
}

这只是一个简单的实现。实际的用例比这复杂得多,您需要根据情况进行调整,但这无疑为您提供了一个基本框架。当消费者 seek 失败时,可能会抛出一些运行时异常,您需要决定在这些情况下如何处理。

[[what-if-we-start-a-second-consumer-with-the-same-group-id?]] === 如果我们启动第二个具有相同组 ID 的消费者怎么办?

当我们添加第二个消费者时,会发生再均衡,并且一些分区将移位。假设新消费者获得分区 23。当这个新的 Spring Cloud Stream 消费者调用 onPartitionsAssigned 方法时,它会看到这是此消费者上分区 23 的初始分配。因此,由于对 initial 参数进行了条件检查,它将执行 seek 操作。对于第一个消费者,它现在只有分区 01。但是,对于此消费者而言,这仅仅是一个再均衡事件,不被视为初始分配。因此,由于对 initial 参数进行了条件检查,它将不会重新定位到给定的 offset。

[[how-do-i-manually-acknowledge-using-kafka-binder?]] == 如何使用 Kafka Binder 手动确认?

问题陈述

使用 Kafka Binder,我想在我的消费者中手动确认消息。我该怎么做?

解决方案

默认情况下,Kafka Binder 委托给 Spring for Apache Kafka 项目中的默认提交设置。Spring Kafka 中的默认 ackModebatch。有关更多详细信息,请参阅这里

在某些情况下,您需要禁用此默认提交行为,并依赖手动提交。以下步骤允许您这样做。

将属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode 设置为 MANUALMANUAL_IMMEDIATE。当这样设置时,消费者方法接收到的消息中将包含一个名为 kafka_acknowledgment(来自 KafkaHeaders.ACKNOWLEDGMENT)的 header。

例如,假设这是您的消费者方法:

@Bean
public Consumer<Message<String>> myConsumer() {
    return msg -> {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
        }
    };
}

然后将属性 spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode 设置为 MANUALMANUAL_IMMEDIATE

[[how-do-i-override-the-default-binding-names-in-spring-cloud-stream?]] == 如何在 Spring Cloud Stream 中覆盖默认绑定名称?

问题陈述

Spring Cloud Stream 根据函数定义和签名创建默认绑定,但如何覆盖这些名称以使用更符合领域友好的名称?

解决方案

假设以下是您的函数签名:

@Bean
public Function<String, String> uppercase(){
...
}

默认情况下,Spring Cloud Stream 将按如下方式创建绑定:

  1. uppercase-in-0

  2. uppercase-out-0

您可以使用以下属性将这些绑定覆盖为其他名称:

spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out

在此之后,所有绑定属性都必须在新名称 my-transformer-inmy-transformer-out 上设置。

这是另一个带有 Kafka Streams 和多个输入的示例:

@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}

默认情况下,Spring Cloud Stream 将为此函数创建三个不同的绑定名称:

  1. processOrder-in-0

  2. processOrder-in-1

  3. processOrder-out-0

每次要对此绑定设置某些配置时,都必须使用这些绑定名称。您不喜欢这样,并且希望使用更符合领域友好的、更易读的绑定名称,例如:

  1. orders

  2. accounts

  3. enrichedOrders

您只需设置这三个属性即可轻松实现:

  1. spring.cloud.stream.function.bindings.processOrder-in-0=orders

  2. spring.cloud.stream.function.bindings.processOrder-in-1=accounts

  3. spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders

完成此操作后,它将覆盖默认的绑定名称,您要设置的任何属性都必须基于这些新的绑定名称。

[[how-do-i-send-a-message-key-as-part-of-my-record?]] == 如何在我的记录中包含消息 key?

问题陈述

我需要与记录的 payload 一起发送一个 key,Spring Cloud Stream 有办法做到这一点吗?

解决方案

这非常容易做到。以下是实现此目的的基本蓝图,但您可能需要根据您的具体用例进行调整。

以下是示例生产者方法(即 Supplier):

@Bean
public Supplier<Message<String>> supplier() {
    return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}

这是一个简单的函数,它发送一个带有 String payload 但也带有 key 的消息。请注意,我们使用 KafkaHeaders.MESSAGE_KEY 将 key 设置为消息 header。

如果您想更改默认的 key(默认为 kafka_messageKey),则需要在配置中指定此属性:

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']

请注意,我们使用绑定名称 supplier-out-0,因为这是我们的函数名称,请相应更新。

然后,我们在生成消息时使用这个新的 key。

[[how-do-i-use-native-serializer-and-deserializer-instead-of-message-conversion-done-by-spring-cloud-stream?]] == 如何使用原生的序列化器和反序列化器,而不是 Spring Cloud Stream 的消息转换?

问题陈述

我不想使用 Spring Cloud Stream 中的消息转换器,而是想使用 Kafka 中的原生 Serializer 和 Deserializer。默认情况下,Spring Cloud Stream 使用其内置的消息转换器来处理此转换。我如何绕过它并将责任委托给 Kafka?

解决方案

这真的很容易做到。

您只需提供以下属性即可启用原生序列化:

spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true

然后,您还需要设置序列化器。有两种方法可以做到这一点。

spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

或者使用 Binder 配置:

spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

使用 Binder 方式时,它应用于所有绑定,而在绑定级别设置它们则针对每个绑定。

在反序列化方面,您只需将反序列化器作为配置提供即可。

例如:

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

您也可以在 Binder 级别设置它们。

有一个可选属性,您可以设置它来强制进行原生解码。

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true

但是,对于 Kafka Binder,这是不必要的,因为它到达 Binder 时,Kafka 已经使用配置的反序列化器对其进行了反序列化。

解释 Kafka Streams Binder 中的 offset 重置如何工作

问题陈述

默认情况下,Kafka Streams Binder 对于新消费者始终从 earliest offset 开始。有时,应用程序需要从 latest offset 开始,这可能是有益的或必需的。Kafka Streams Binder 允许您做到这一点。

解决方案

在我们查看解决方案之前,让我们看看以下场景。

@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
    (s, t) -> s.join(t, ...)
    ...
}

我们有一个需要两个输入绑定的 BiConsumer bean。在这种情况下,第一个绑定用于 KStream,第二个绑定用于 KTable。当第一次运行此应用程序时,默认情况下,两个绑定都从 earliest offset 开始。如果由于某些要求,我只想从 latest offset 开始怎么办?您可以通过启用以下属性来实现此目的。

spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest

如果您只想让一个绑定从 latest offset 开始,而另一个绑定从默认的 earliest 开始,则将后一个绑定排除在配置之外即可。

请记住,一旦存在已提交的 offset,这些设置就将 **不** 会生效,并且已提交的 offset 具有优先权。

跟踪 Kafka 中记录成功发送(生产)的情况

问题陈述

我有一个 Kafka 生产者应用,我想跟踪所有成功的发送。

解决方案

假设我们在应用程序中有以下 Supplier:

@Bean
	public Supplier<Message<String>> supplier() {
		return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
	}

然后,我们需要定义一个新的 MessageChannel bean 来捕获所有成功的发送信息。

@Bean
	public MessageChannel fooRecordChannel() {
		return new DirectChannel();
	}

接下来,在应用程序配置中定义此属性,为 recordMetadataChannel 提供 bean 名称。

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel

此时,成功的发送信息将发送到 fooRecordChannel

您可以按如下方式编写 IntegrationFlow 以查看信息。

@Bean
public IntegrationFlow integrationFlow() {
    return f -> f.channel("fooRecordChannel")
                 .handle((payload, messageHeaders) -> payload);
}

handle 方法中,payload 是发送到 Kafka 的内容,消息 header 中包含一个特殊 key kafka_recordMetadata。其值是一个 RecordMetadata,包含主题分区、当前 offset 等信息。

在 Kafka 中添加自定义 header mapper

问题陈述

我有一个 Kafka 生产者应用程序,它设置了一些 header,但在消费者应用程序中它们不见了。这是为什么?

解决方案

正常情况下,这应该没有问题。

想象一下,您有以下生产者:

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}

在消费者端,您仍然应该看到 header "foo",并且以下代码不应该有任何问题:

@Bean
public Consumer<Message<String>> consume() {
    return s -> {
        final String foo = (String)s.getHeaders().get("foo");
        System.out.println(foo);
    };
}

如果您在应用程序中提供了自定义 header mapper,那么这将不起作用。假设您在应用程序中有一个空的 KafkaHeaderMapper

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {

        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {

        }
    };
}

如果这是您的实现,那么您在消费者端将丢失 foo header。很可能,您在这些 KafkaHeaderMapper 方法内部有一些逻辑。您需要以下代码来填充 foo header:

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String foo = (String) headers.get("foo");
            target.add("foo", foo.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header foo = source.lastHeader("foo");
			target.put("foo", new String(foo.value()));
        }
    }

这将正确地将 foo header 从生产者端传递到消费者端。

关于 id header 的特别说明

在 Spring Cloud Stream 中,id header 是一个特殊的 header,但有些应用程序可能希望有特殊的自定义 id header - 例如 custom-idIDId。第一个 (custom-id) 将在没有自定义 header mapper 的情况下从生产者传播到消费者。但是,如果您使用框架保留的 id header 的变体(例如 ID, Id, iD 等)进行生产,则会遇到框架内部的问题。有关此用例的更多上下文,请参阅此Stack Overflow 帖子。在这种情况下,您必须使用自定义 KafkaHeaderMapper 来映射区分大小写的 id header。例如,假设您有以下生产者:

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}

上面示例中的 Id header 将会从消费端丢失,因为它与框架的 id header 冲突。您可以提供一个自定义 KafkaHeaderMapper 来解决此问题。

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String myId = (String) headers.get("Id");
			target.add("Id", myId.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header Id = source.lastHeader("Id");
			target.put("Id", new String(Id.value()));
        }
    };
}

通过这样做,idId header 都将从生产者端可用于消费者端。

事务中生产到多个主题

问题陈述

如何将事务性消息生产到多个 Kafka 主题?

有关更多上下文,请参阅此Stack Overflow 问题

解决方案

使用 Kafka Binder 中的事务支持进行事务处理,然后提供一个 AfterRollbackProcessor。为了生产到多个主题,使用 StreamBridge API。

以下是实现此目的的代码片段:

@Autowired
StreamBridge bridge;

@Bean
Consumer<String> input() {
    return str -> {
        System.out.println(str);
        this.bridge.send("left", str.toUpperCase());
        this.bridge.send("right", str.toLowerCase());
        if (str.equals("Fail")) {
            throw new RuntimeException("test");
        }
    };
}

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
    return (container, dest, group) -> {
        ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
                MessageChannel.class)).getTransactionalProducerFactory();
        KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
        DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
        container.setAfterRollbackProcessor(rollbackProcessor);
    };
}

DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
    return new DefaultAfterRollbackProcessor<>(
            new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}

所需配置

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right

spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1

为了测试,您可以使用以下配置:

@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
    return args -> {
        System.in.read();
        template.send("input", "Fail".getBytes());
        template.send("input", "Good".getBytes());
    };
}

一些重要说明

请确保您的应用程序配置中没有 DLQ 设置,因为我们手动配置 DLT(默认情况下它将发布到名为 input.DLT 的主题,基于初始消费者函数)。此外,将消费者绑定的 maxAttempts 重置为 1,以避免 Binder 进行重试。在上面的示例中,总共会尝试最多 3 次(初始尝试 + FixedBackoff 中的 2 次尝试)。

有关如何测试此代码的更多详细信息,请参阅Stack Overflow 帖子。如果您正在使用 Spring Cloud Stream 通过添加更多消费者函数来测试它,请确保将消费者绑定上的 isolation-level 设置为 read-committed

这篇Stack Overflow 帖子也与此讨论相关。

运行多个 pollable 消费者时要避免的陷阱

问题陈述

如何运行多个 pollable 消费者实例并为每个实例生成唯一的 client.id

解决方案

假设我有以下定义:

spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group

运行应用程序时,Kafka 消费者会生成一个 client.id(类似于 consumer-my-group-1)。对于运行的每个应用程序实例,此 client.id 将是相同的,从而导致意外问题。

为了解决此问题,您可以在应用程序的每个实例上添加以下属性

spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}

有关更多详细信息,请参阅此 GitHub 问题