消息头
0.11.0.0 客户端引入了对消息头部的支持。从 2.0 版本开始,Spring for Apache Kafka 现在支持将这些头部映射到 spring-messaging MessageHeaders 以及从其映射。
以前的版本将 ConsumerRecord 和 ProducerRecord 映射到 spring-messaging Message<?>,其中值属性被映射到 payload 以及从其映射,其他属性(topic、partition 等)被映射到头部。这种情况仍然存在,但现在可以映射额外的(任意)头部。 |
Apache Kafka 头部有一个简单的 API,如下面的接口定义所示:
public interface Header {
String key();
byte[] value();
}
KafkaHeaderMapper 策略用于在 Kafka Headers 和 MessageHeaders 之间映射头部条目。其接口定义如下:
public interface KafkaHeaderMapper {
void fromHeaders(MessageHeaders headers, Headers target);
void toHeaders(Headers source, Map<String, Object> target);
}
SimpleKafkaHeaderMapper 将原始头部映射为 byte[],并提供转换为 String 值的配置选项。
JsonKafkaHeaderMapper 将键映射到 MessageHeaders 头部名称,为了支持出站消息的富头部类型,会执行 JSON 转换。一个“特殊”头部(键为 spring_json_header_types)包含一个 <key>:<type> 的 JSON 映射。此头部在入站端用于提供每个头部值到原始类型的适当转换。
在入站端,所有 Kafka Header 实例都映射到 MessageHeaders。在出站端,默认情况下,除了 id、timestamp 以及映射到 ConsumerRecord 属性的头部之外,所有 MessageHeaders 都被映射。
您可以通过向映射器提供模式来指定哪些头部要用于出站消息的映射。以下列表显示了一些示例映射:
public JsonKafkaHeaderMapper() { (1)
...
}
public JsonKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
...
}
public JsonKafkaHeaderMapper(String... patterns) { (3)
...
}
public JsonKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
...
}
| 1 | 使用默认的 Jackson ObjectMapper 并映射大多数头部,如示例之前所讨论。 |
| 2 | 使用提供的 Jackson ObjectMapper 并映射大多数头部,如示例之前所讨论。 |
| 3 | 使用默认的 Jackson ObjectMapper 并根据提供的模式映射头部。 |
| 4 | 使用提供的 Jackson ObjectMapper 并根据提供的模式映射头部。 |
模式相当简单,可以包含前导通配符(*)、尾随通配符或两者(例如,*.cat.*)。您可以使用前导 ! 来否定模式。第一个匹配头部名称的模式(无论是正向还是负向)获胜。
当您提供自己的模式时,我们建议包含 !id 和 !timestamp,因为这些头部在入站端是只读的。
默认情况下,映射器仅反序列化 java.lang 和 java.util 中的类。您可以通过 addTrustedPackages 方法添加受信任的包来信任其他(或所有)包。如果您从不受信任的源接收消息,您可能只希望添加您信任的那些包。要信任所有包,可以使用 mapper.addTrustedPackages("*")。 |
以原始形式映射 String 头部值在与不了解映射器 JSON 格式的系统通信时非常有用。 |
从 2.2.5 版本开始,您可以指定某些字符串值头部不应使用 JSON 映射,而应映射为/从原始 byte[]。AbstractKafkaHeaderMapper 具有新属性;当 mapAllStringsOut 设置为 true 时,所有字符串值头部都将使用 charset 属性(默认为 UTF-8)转换为 byte[]。此外,还有一个属性 rawMappedHeaders,它是一个 头部名称 : 布尔值 的映射;如果映射包含一个头部名称,并且该头部包含一个 String 值,它将使用字符集映射为原始 byte[]。此映射也用于将原始入站 byte[] 头部映射为 String,仅当映射值中的布尔值为 true 时才使用字符集。如果布尔值为 false,或者头部名称不在具有 true 值的映射中,则入站头部将简单地映射为原始未映射头部。
以下测试用例说明了此机制。
@Test
public void testSpecificStringConvert() {
JsonKafkaHeaderMapper mapper = new JsonKafkaHeaderMapper();
Map<String, Boolean> rawMappedHeaders = new HashMap<>();
rawMappedHeaders.put("thisOnesAString", true);
rawMappedHeaders.put("thisOnesBytes", false);
mapper.setRawMappedHeaders(rawMappedHeaders);
Map<String, Object> headersMap = new HashMap<>();
headersMap.put("thisOnesAString", "thing1");
headersMap.put("thisOnesBytes", "thing2");
headersMap.put("alwaysRaw", "thing3".getBytes());
MessageHeaders headers = new MessageHeaders(headersMap);
Headers target = new RecordHeaders();
mapper.fromHeaders(headers, target);
assertThat(target).containsExactlyInAnyOrder(
new RecordHeader("thisOnesAString", "thing1".getBytes()),
new RecordHeader("thisOnesBytes", "thing2".getBytes()),
new RecordHeader("alwaysRaw", "thing3".getBytes()));
headersMap.clear();
mapper.toHeaders(target, headersMap);
assertThat(headersMap).contains(
entry("thisOnesAString", "thing1"),
entry("thisOnesBytes", "thing2".getBytes()),
entry("alwaysRaw", "thing3".getBytes()));
}
默认情况下,两个头部映射器都映射所有入站头部。从 2.8.8 版本开始,模式也可以应用于入站映射。要创建用于入站映射的映射器,请使用相应映射器上的静态方法之一:
public static JsonKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
public static JsonKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}
public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
例如:
JsonKafkaHeaderMapper inboundMapper = JsonKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");
这将排除所有以 abc 开头的头部,并包含所有其他头部。
默认情况下,只要 Jackson 位于类路径上,JsonKafkaHeaderMapper 就会在 MessagingMessageConverter 和 BatchMessagingMessageConverter 中使用。
对于批处理转换器,转换后的头部可在 KafkaHeaders.BATCH_CONVERTED_HEADERS 中作为 List<Map<String, Object>> 提供,其中列表中某个位置的映射对应于有效负载中的数据位置。
如果没有转换器(因为 Jackson 不存在或显式设置为 null),则来自消费者记录的头部将以未转换的形式在 KafkaHeaders.NATIVE_HEADERS 头部中提供。此头部是一个 Headers 对象(或者在批处理转换器的情况下是 List<Headers>),其中列表中的位置对应于有效负载中的数据位置。
某些类型不适合 JSON 序列化,对于这些类型,可能更喜欢简单的 toString() 序列化。JsonKafkaHeaderMapper 有一个名为 addToStringClasses() 的方法,允许您提供应该以这种方式处理出站映射的类名。在入站映射期间,它们被映射为 String。默认情况下,只有 org.springframework.util.MimeType 和 org.springframework.http.MediaType 以这种方式映射。 |
从 2.3 版本开始,字符串值头部的处理得到简化。默认情况下,此类头部不再进行 JSON 编码(即,它们不再添加封闭的 "...")。类型仍然添加到 JSON_TYPES 头部,以便接收系统可以转换回字符串(从 byte[])。映射器可以处理(解码)由旧版本生成的头部(它检查前导 ");通过这种方式,使用 2.3 的应用程序可以消费来自旧版本的记录。 |
为了与早期版本兼容,如果使用 2.3 版本的应用程序生成的记录可能被使用早期版本的应用程序消费,请将 encodeStrings 设置为 true。当所有应用程序都使用 2.3 或更高版本时,您可以将该属性保留为默认值 false。 |
@Bean
MessagingMessageConverter converter() {
MessagingMessageConverter converter = new MessagingMessageConverter();
JsonKafkaHeaderMapper mapper = new JsonKafkaHeaderMapper();
mapper.setEncodeStrings(true);
converter.setHeaderMapper(mapper);
return converter;
}
如果使用 Spring Boot,它会自动将此转换器 bean 配置到自动配置的 KafkaTemplate 中;否则,您应该将此转换器添加到模板中。
支持多值头部映射
从 4.0 版本开始,支持多值头部映射,其中同一逻辑头部键在 Kafka 记录中出现多次。
默认情况下,HeaderMapper 不会创建多个同名 Kafka 头部。相反,当它遇到集合值(例如,List<byte[]>)时,它会将整个集合序列化为一个 Kafka 头部,其值为 JSON 数组。
-
生产者端:
JsonKafkaHeaderMapper写入 JSON 字节,而SimpleKafkaHeaderMapper忽略它。 -
消费者端: 映射器将头部作为单个值暴露——最后一次出现获胜;较早的重复项会被静默丢弃。
保留每个单独的头部需要显式注册将头部指定为多值模式。
JsonKafkaHeaderMapper#setMultiValueHeaderPatterns(String… patterns) 接受一个模式列表,可以是通配符表达式或精确的头部名称。
JsonKafkaHeaderMapper mapper = new JsonKafkaHeaderMapper();
// Explicit header names
mapper.setMultiValueHeaderPatterns("test-multi-value1", "test-multi-value2");
// Wildcard patterns for test-multi-value1, test-multi-value2
mapper.setMultiValueHeaderPatterns("test-multi-*");
任何名称与所提供的模式之一匹配的头部
-
生产者端: 作为单独的 Kafka 头部写入,每个元素一个。
-
消费者端: 收集到包含单个头部值的
List<?>中;每个元素在经过配置的HeaderMapper执行的常规反序列化或类型转换后返回给应用程序。
| 不支持正则表达式;简单模式中只允许使用 * 通配符——支持直接相等和诸如:xxx*、*xxx、*xxx*、xxx*yyy 等形式。 |
|
在生产者端,当 |