消息头

0.11.0.0 客户端引入了消息头支持。从 2.0 版本开始,Spring for Apache Kafka 现在支持将这些头映射到 spring-messagingMessageHeaders,反之亦然。

之前的版本将 ConsumerRecordProducerRecord 映射到 spring-messaging 的 Message<?>,其中 value 属性映射到 payload,而其他属性(如 topicpartition 等)则映射到头。这种情况仍然存在,但现在可以映射额外的(任意)头。

Apache Kafka 的头具有简单的 API,如以下接口定义所示

public interface Header {

    String key();

    byte[] value();

}

提供了 KafkaHeaderMapper 策略,用于在 Kafka 的 HeadersMessageHeaders 之间映射头条目。其接口定义如下

public interface KafkaHeaderMapper {

    void fromHeaders(MessageHeaders headers, Headers target);

    void toHeaders(Headers source, Map<String, Object> target);

}

SimpleKafkaHeaderMapper 将原始头映射为 byte[],并提供配置选项以转换为 String 值。

DefaultKafkaHeaderMapper 将键映射到 MessageHeaders 的头名称,并且为了支持出站消息的丰富头类型,会执行 JSON 转换。一个“特殊”的头(键为 spring_json_header_types)包含一个 <key>:<type> 的 JSON 映射。此头在入站侧用于将每个头值适当转换回原始类型。

在入站侧,所有 Kafka 的 Header 实例都映射到 MessageHeaders。在出站侧,默认情况下,除了 idtimestamp 以及映射到 ConsumerRecord 属性的头之外,所有 MessageHeaders 都被映射。

你可以通过向映射器提供模式来指定要映射哪些出站消息的头。以下列表显示了一些示例映射

public DefaultKafkaHeaderMapper() { (1)
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
    ...
}

public DefaultKafkaHeaderMapper(String... patterns) { (3)
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
    ...
}
1 使用默认的 Jackson ObjectMapper,并映射大多数头,如示例之前所述。
2 使用提供的 Jackson ObjectMapper,并映射大多数头,如示例之前所述。
3 使用默认的 Jackson ObjectMapper,并根据提供的模式映射头。
4 使用提供的 Jackson ObjectMapper,并根据提供的模式映射头。

模式相当简单,可以包含前导通配符 (*)、尾部通配符或两者都有(例如 *.cat.*)。你可以使用前导 ! 来否定模式。第一个匹配到头名称的模式(无论是正向还是负向)将生效。

当你提供自己的模式时,我们建议包含 !id!timestamp,因为这些头在入站侧是只读的。

默认情况下,映射器只反序列化 java.langjava.util 中的类。你可以通过 addTrustedPackages 方法添加受信任的包来信任其他(或所有)包。如果你从不受信任的源接收消息,你可能只想添加你信任的那些包。要信任所有包,可以使用 mapper.addTrustedPackages("*")
当与不了解映射器 JSON 格式的系统通信时,以原始形式映射 String 头值非常有用。

从 2.2.5 版本开始,你可以指定某些字符串值的头不应使用 JSON 进行映射,而是使用原始的 byte[] 进行映射。AbstractKafkaHeaderMapper 有新的属性;当 mapAllStringsOut 设置为 true 时,所有字符串值的头将使用 charset 属性(默认为 UTF-8)转换为 byte[]。此外,还有一个属性 rawMappedHeaders,它是一个 header name : boolean 的映射;如果映射中包含一个头名称,并且该头包含一个 String 值,它将使用字符集被映射为原始的 byte[]。此映射也用于将原始的入站 byte[] 头使用字符集映射到 String,当且仅当映射中的布尔值为 true 时。如果布尔值为 false,或者头名称不在映射中且值为 true,则入站头将简单地被映射为原始的未映射头。

以下测试用例说明了此机制。

@Test
public void testSpecificStringConvert() {
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    Map<String, Boolean> rawMappedHeaders = new HashMap<>();
    rawMappedHeaders.put("thisOnesAString", true);
    rawMappedHeaders.put("thisOnesBytes", false);
    mapper.setRawMappedHeaders(rawMappedHeaders);
    Map<String, Object> headersMap = new HashMap<>();
    headersMap.put("thisOnesAString", "thing1");
    headersMap.put("thisOnesBytes", "thing2");
    headersMap.put("alwaysRaw", "thing3".getBytes());
    MessageHeaders headers = new MessageHeaders(headersMap);
    Headers target = new RecordHeaders();
    mapper.fromHeaders(headers, target);
    assertThat(target).containsExactlyInAnyOrder(
            new RecordHeader("thisOnesAString", "thing1".getBytes()),
            new RecordHeader("thisOnesBytes", "thing2".getBytes()),
            new RecordHeader("alwaysRaw", "thing3".getBytes()));
    headersMap.clear();
    mapper.toHeaders(target, headersMap);
    assertThat(headersMap).contains(
            entry("thisOnesAString", "thing1"),
            entry("thisOnesBytes", "thing2".getBytes()),
            entry("alwaysRaw", "thing3".getBytes()));
}

默认情况下,两个头映射器都映射所有入站头。从 2.8.8 版本开始,模式也可以应用于入站映射。要创建用于入站映射的映射器,请使用相应映射器上的静态方法之一

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}

public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

例如

DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");

这将排除所有以 abc 开头的头,并包含所有其他头。

默认情况下,只要 Jackson 在类路径上,DefaultKafkaHeaderMapper 就会在 MessagingMessageConverterBatchMessagingMessageConverter 中使用。

对于批处理转换器,转换后的头在 KafkaHeaders.BATCH_CONVERTED_HEADERS 中以 List<Map<String, Object>> 的形式提供,其中列表中某个位置的映射对应于有效载荷中的数据位置。

如果没有转换器(可能是因为没有 Jackson,或者它被明确设置为 null),则消费者记录中的头会在 KafkaHeaders.NATIVE_HEADERS 头中以未转换的形式提供。此头是一个 Headers 对象(对于批处理转换器来说是 List<Headers>),其中列表中某个位置对应于有效载荷中的数据位置。

某些类型不适合进行 JSON 序列化,对于这些类型,可能更倾向于简单的 toString() 序列化。DefaultKafkaHeaderMapper 有一个名为 addToStringClasses() 的方法,允许你提供在出站映射时应以这种方式处理的类名。在入站映射期间,它们被映射为 String。默认情况下,只有 org.springframework.util.MimeTypeorg.springframework.http.MediaType 是以这种方式映射的。
从 2.3 版本开始,对字符串值头的处理得到简化。默认情况下,此类头不再进行 JSON 编码(即不再添加封闭的 "...")。类型仍然添加到 JSON_TYPES 头中,以便接收系统可以转换回 String(从 byte[])。映射器可以处理(解码)由旧版本产生的头(它检查是否有前导的 ");这样,使用 2.3 的应用程序可以消费旧版本产生的记录。
为了与早期版本兼容,如果使用 2.3 版本生成的记录可能被使用早期版本的应用程序消费,请将 encodeStrings 设置为 true。当所有应用程序都使用 2.3 或更高版本时,可以将此属性保持其默认值 false
@Bean
MessagingMessageConverter converter() {
    MessagingMessageConverter converter = new MessagingMessageConverter();
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    mapper.setEncodeStrings(true);
    converter.setHeaderMapper(mapper);
    return converter;
}

如果使用 Spring Boot,它将自动将此转换器 bean 配置到自动配置的 KafkaTemplate 中;否则,你应该手动将此转换器添加到模板中。