消息转换器
AmqpTemplate
还定义了几个用于发送和接收消息的方法,这些方法委托给 MessageConverter
。MessageConverter
为每个方向提供一个方法:一个用于转换**到** Message
,另一个用于转换**从** Message
。注意,在转换为 Message
时,除了对象外,还可以提供属性。object
参数通常对应于 Message 体。以下清单显示了 MessageConverter
接口定义:
public interface MessageConverter {
Message toMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException;
Object fromMessage(Message message) throws MessageConversionException;
}
AmqpTemplate
上相关的消息发送方法比我们之前讨论的方法更简单,因为它们不需要 Message
实例。相反,MessageConverter
负责通过将提供的对象转换为 Message 体的字节数组,然后添加任何提供的 MessageProperties
来“创建”每个 Message
。以下清单显示了各种方法的定义:
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message)
throws AmqpException;
void convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
throws AmqpException;
void convertAndSend(String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;
在接收端,只有两个方法:一个接受队列名称,另一个依赖于模板的“queue”属性已设置。以下清单显示了这两个方法的定义:
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
异步消费者中提到的 MessageListenerAdapter 也使用 MessageConverter 。 |
SimpleMessageConverter
MessageConverter
策略的默认实现称为 SimpleMessageConverter
。如果您没有明确配置替代方案,RabbitTemplate
实例将使用此转换器。它处理基于文本的内容、序列化的 Java 对象和字节数组。
从 Message
转换
如果输入 Message
的内容类型以 "text" 开头(例如,"text/plain"),它还会检查 content-encoding 属性,以确定将 Message
体字节数组转换为 Java String
时使用的字符集。如果输入 Message
上没有设置 content-encoding 属性,则默认使用 UTF-8 字符集。如果您需要覆盖此默认设置,可以配置一个 SimpleMessageConverter
实例,设置其 defaultCharset
属性,并将其注入到 RabbitTemplate
实例中。
如果输入 Message
的 content-type 属性值设置为 "application/x-java-serialized-object",SimpleMessageConverter
会尝试将字节数组反序列化(重构)为 Java 对象。虽然这对于简单的原型设计可能很有用,但我们不建议依赖 Java 序列化,因为它会导致生产者和消费者之间紧密耦合。当然,这也排除了在任何一方使用非 Java 系统。鉴于 AMQP 是一个线级协议,因这些限制而失去大部分优势将是不幸的。在接下来的两节中,我们将探讨一些无需依赖 Java 序列化即可传递丰富领域对象内容的替代方案。
对于所有其他内容类型,SimpleMessageConverter
直接返回 Message
体内容作为字节数组。
请参阅Java 反序列化以获取重要信息。
SerializerMessageConverter
此转换器类似于 SimpleMessageConverter
,不同之处在于它可以配置其他 Spring Framework 的 Serializer
和 Deserializer
实现,用于 application/x-java-serialized-object
转换。
请参阅Java 反序列化以获取重要信息。
Jackson2JsonMessageConverter
本节介绍如何使用 Jackson2JsonMessageConverter
在 Message
之间进行转换。它包含以下部分:
转换为 Message
正如上一节所述,通常不建议依赖 Java 序列化。一种更灵活且可跨不同语言和平台移植的常见替代方案是 JSON(JavaScript Object Notation)。该转换器可以在任何 RabbitTemplate
实例上配置,以覆盖其对 SimpleMessageConverter
默认的用法。Jackson2JsonMessageConverter
使用 com.fasterxml.jackson
2.x 库。以下示例配置了 Jackson2JsonMessageConverter
:
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
<!-- if necessary, override the DefaultClassMapper -->
<property name="classMapper" ref="customClassMapper"/>
</bean>
</property>
</bean>
如上所示,Jackson2JsonMessageConverter
默认使用 DefaultClassMapper
。类型信息被添加到(并从)MessageProperties
中检索。如果入站消息在 MessageProperties
中不包含类型信息,但您知道预期的类型,则可以使用 defaultType
属性配置静态类型,如下例所示:
<bean id="jsonConverterWithDefaultType"
class="o.s.amqp.support.converter.Jackson2JsonMessageConverter">
<property name="classMapper">
<bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
<property name="defaultType" value="thing1.PurchaseOrder"/>
</bean>
</property>
</bean>
此外,您可以提供从 TypeId
header 中的值到自定义映射。以下示例显示了如何进行操作:
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
jsonConverter.setClassMapper(classMapper());
return jsonConverter;
}
@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("thing1", Thing1.class);
idClassMapping.put("thing2", Thing2.class);
classMapper.setIdClassMapping(idClassMapping);
return classMapper;
}
现在,如果发送系统将 header 设置为 thing1
,转换器将创建一个 Thing1
对象,依此类推。有关从非 Spring 应用转换消息的完整讨论,请参阅从非 Spring 应用接收 JSON示例应用。
从版本 2.4.3 开始,如果 supportedMediaType
包含 charset
参数,转换器将不再添加 contentEncoding
消息属性;此参数也用于编码。添加了一个新方法 setSupportedMediaType
:
String utf16 = "application/json; charset=utf-16";
converter.setSupportedContentType(MimeTypeUtils.parseMimeType(utf16));
从 Message
转换
入站消息根据发送系统添加到 header 中的类型信息转换为对象。
从版本 2.4.3 开始,如果不存在 contentEncoding
消息属性,转换器将尝试在 contentType
消息属性中检测 charset
参数并使用它。如果两者都不存在,如果 supportedMediaType
包含 charset
参数,它将被用于解码,最终回退到 defaultCharset
属性。添加了一个新方法 setSupportedMediaType
:
String utf16 = "application/json; charset=utf-16";
converter.setSupportedContentType(MimeTypeUtils.parseMimeType(utf16));
在 1.6 版本之前,如果不存在类型信息,转换将失败。从 1.6 版本开始,如果类型信息丢失,转换器将使用 Jackson 默认值(通常是 map)转换 JSON。
此外,从版本 1.6 开始,当您使用 @RabbitListener
注解(在方法上)时,推断出的类型信息会被添加到 MessageProperties
中。这使得转换器能够转换为目标方法的参数类型。这仅适用于只有一个参数且没有注解或只有一个带有 @Payload
注解的参数的情况。类型为 Message
的参数在分析期间会被忽略。
默认情况下,推断出的类型信息将覆盖入站的 TypeId 和发送系统创建的相关 header。这使得接收系统可以自动转换为不同的领域对象。这仅适用于参数类型是具体类型(非抽象或接口)或来自 java.util 包的情况。在所有其他情况下,将使用 TypeId 和相关 header。在某些情况下,您可能希望覆盖默认行为并始终使用 TypeId 信息。例如,假设您有一个 @RabbitListener 方法,它接受一个 Thing1 参数,但消息包含一个 Thing2 ,它是 Thing1 的子类(并且是具体的)。推断出的类型将不正确。为了处理这种情况,将 Jackson2JsonMessageConverter 上的 TypePrecedence 属性设置为 TYPE_ID ,而不是默认的 INFERRED 。(该属性实际上在转换器的 DefaultJackson2JavaTypeMapper (DefaultClassMapper )上,但为了方便,转换器提供了 setter)。如果您注入了自定义类型映射器,则应在该映射器上设置属性。 |
从 Message 转换时,传入的 MessageProperties.getContentType() 必须符合 JSON 规范(使用 contentType.contains("json") 进行检查)。从版本 2.2 开始,如果没有 contentType 属性,或者其值为默认值 application/octet-stream ,则假定为 application/json 。要恢复到之前的行为(返回未转换的 byte[] ),请将转换器的 assumeSupportedContentType 属性设置为 false 。如果内容类型不受支持,将发出一条 WARN 级别的日志消息 Could not convert incoming message with content-type […] ,并按原样返回 message.getBody() - 作为 byte[] 。因此,为了满足消费者端 Jackson2JsonMessageConverter 的要求,生产者必须添加 contentType 消息属性 - 例如,设置为 application/json 或 text/x-json ,或者使用 Jackson2JsonMessageConverter ,它会自动设置该 header。以下清单显示了许多转换器调用: |
@RabbitListener
public void thing1(Thing1 thing1) {...}
@RabbitListener
public void thing1(@Payload Thing1 thing1, @Header("amqp_consumerQueue") String queue) {...}
@RabbitListener
public void thing1(Thing1 thing1, o.s.amqp.core.Message message) {...}
@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message<Foo> message) {...}
@RabbitListener
public void thing1(Thing1 thing1, String bar) {...}
@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message<?> message) {...}
在前面清单中的前四种情况下,转换器尝试转换为 Thing1
类型。第五个示例无效,因为我们无法确定哪个参数应接收消息负载。对于第六个示例,由于泛型类型是 WildcardType
,Jackson 的默认值适用。
但是,您可以创建一个自定义转换器,并使用 targetMethod
消息属性来决定将 JSON 转换为哪种类型。
只有当 @RabbitListener 注解声明在方法级别时,才能实现此类型推断。对于类级别的 @RabbitListener ,转换后的类型用于选择要调用的 @RabbitHandler 方法。因此,基础设施提供了 targetObject 消息属性,您可以在自定义转换器中使用它来确定类型。 |
从版本 1.6.11 开始,Jackson2JsonMessageConverter 以及因此而来的 DefaultJackson2JavaTypeMapper (DefaultClassMapper ) 提供了 trustedPackages 选项,以克服序列化 Gadgets 漏洞。默认情况下,为了向后兼容,Jackson2JsonMessageConverter 信任所有包 - 也就是说,它对该选项使用 * 。 |
从版本 2.4.7 开始,可以将转换器配置为在 Jackson 反序列化消息体返回 null
后返回 Optional.empty()
。这使得 @RabbitListener
可以通过两种方式接收 null 负载:
@RabbitListener(queues = "op.1")
void listen(@Payload(required = false) Thing payload) {
handleOptional(payload); // payload might be null
}
@RabbitListener(queues = "op.2")
void listen(Optional<Thing> optional) {
handleOptional(optional.orElse(this.emptyThing));
}
要启用此功能,请将 setNullAsOptionalEmpty
设置为 true
;当设置为 false
(默认值)时,转换器将回退到原始消息体(byte[]
)。
@Bean
Jackson2JsonMessageConverter converter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setNullAsOptionalEmpty(true);
return converter;
}
反序列化抽象类
在版本 2.2.8 之前,如果 @RabbitListener
的推断类型是抽象类(包括接口),转换器会回退到查找 header 中的类型信息,如果存在,则使用该信息;如果不存在,它会尝试创建抽象类。这导致了问题,当使用了配置了自定义反序列化器以处理抽象类的自定义 ObjectMapper
,但传入消息具有无效的类型 header 时。
从版本 2.2.8 开始,默认保留以前的行为。如果您有这样一个自定义 ObjectMapper
,并且希望忽略类型 header 并始终使用推断类型进行转换,请将 alwaysConvertToInferredType
设置为 true
。这是为了向后兼容并避免在(使用标准 ObjectMapper
)转换会失败时尝试转换的开销。
使用 Spring Data Projection 接口
从版本 2.2 开始,您可以将 JSON 转换为 Spring Data Projection 接口,而不是具体类型。这允许对数据进行非常有选择性和低耦合的绑定,包括从 JSON 文档内部的多个位置查找值。例如,可以将以下接口定义为消息负载类型:
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@RabbitListener(queues = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
访问器方法将默认用于将属性名称作为字段在接收到的 JSON 文档中查找。@JsonPath
表达式允许自定义值查找,甚至可以定义多个 JSON path 表达式,从多个位置查找值,直到某个表达式返回实际值。
要启用此功能,请在消息转换器上将 useProjectionForInterfaces
设置为 true
。您还必须将 spring-data:spring-data-commons
和 com.jayway.jsonpath:json-path
添加到类路径中。
当用作 @RabbitListener
方法的参数时,接口类型会自动作为普通类型传递给转换器。
使用 RabbitTemplate
从 Message
转换
如前所述,类型信息通过消息 header 传递,以帮助转换器从消息进行转换。这在大多数情况下都能正常工作。然而,当使用泛型类型时,它只能转换简单对象和已知的“容器”对象(列表、数组和 map)。从版本 2.0 开始,Jackson2JsonMessageConverter
实现了 SmartMessageConverter
,这使得它可以与接受 ParameterizedTypeReference
参数的新 RabbitTemplate
方法一起使用。这允许转换复杂的泛型类型,如下例所示:
Thing1<Thing2<Cat, Hat>> thing1 =
rabbitTemplate.receiveAndConvert(new ParameterizedTypeReference<Thing1<Thing2<Cat, Hat>>>() { });
从版本 2.1 开始,AbstractJsonMessageConverter 类已被删除。它不再是 Jackson2JsonMessageConverter 的基类。它已被 AbstractJackson2MessageConverter 替换。 |
MarshallingMessageConverter
另一个选项是 MarshallingMessageConverter
。它委托给 Spring OXM 库的 Marshaller
和 Unmarshaller
策略接口的实现。您可以在此处阅读更多关于该库的信息。在配置方面,通常只需要提供构造函数参数,因为大多数 Marshaller
的实现也实现了 Unmarshaller
。以下示例显示了如何配置 MarshallingMessageConverter
:
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
<constructor-arg ref="someImplemenationOfMarshallerAndUnmarshaller"/>
</bean>
</property>
</bean>
Jackson2XmlMessageConverter
此类在版本 2.1 中引入,可用于将消息从 XML 和转换为 XML。
Jackson2XmlMessageConverter
和 Jackson2JsonMessageConverter
具有相同的基类:AbstractJackson2MessageConverter
。
引入 AbstractJackson2MessageConverter 类是为了替换已删除的类:AbstractJsonMessageConverter 。 |
Jackson2XmlMessageConverter
使用 com.fasterxml.jackson
2.x 库。
您可以像使用 Jackson2JsonMessageConverter
一样使用它,只是它支持 XML 而不是 JSON。以下示例配置了 Jackson2JsonMessageConverter
:
<bean id="xmlConverterWithDefaultType"
class="org.springframework.amqp.support.converter.Jackson2XmlMessageConverter">
<property name="classMapper">
<bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
<property name="defaultType" value="foo.PurchaseOrder"/>
</bean>
</property>
</bean>
有关更多信息,请参阅Jackson2JsonMessageConverter。
从版本 2.2 开始,如果没有 contentType 属性,或者其值为默认值 application/octet-stream ,则假定为 application/xml 。要恢复到之前的行为(返回未转换的 byte[] ),请将转换器的 assumeSupportedContentType 属性设置为 false 。 |
ContentTypeDelegatingMessageConverter
此类在版本 1.4.2 中引入,允许根据 MessageProperties
中的内容类型属性委托给特定的 MessageConverter
。默认情况下,如果不存在 contentType
属性或其值与所有配置的转换器都不匹配,则委托给 SimpleMessageConverter
。以下示例配置了 ContentTypeDelegatingMessageConverter
:
<bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter">
<property name="delegates">
<map>
<entry key="application/json" value-ref="jsonMessageConverter" />
<entry key="application/xml" value-ref="xmlMessageConverter" />
</map>
</property>
</bean>
Java 反序列化
本节介绍如何反序列化 Java 对象。
从不受信任的源反序列化 Java 对象时可能存在漏洞。 如果您接受来自不受信任源的 默认情况下,允许列表为空,这意味着不会反序列化任何类。 您可以设置模式列表,例如 模式按顺序检查,直到找到匹配项。如果没有匹配项,则抛出 您可以使用这些转换器上的 |
消息属性转换器
MessagePropertiesConverter
策略接口用于在 Rabbit Client BasicProperties
和 Spring AMQP MessageProperties
之间进行转换。默认实现(DefaultMessagePropertiesConverter
)通常足以满足大多数目的,但如果需要,您可以实现自己的转换器。默认属性转换器会将类型为 LongString
的 BasicProperties
元素转换为 String
实例,当其大小不大于 1024
字节时。较大的 LongString
实例不会被转换(参见下一段)。此限制可以通过构造函数参数覆盖。
从版本 1.6 开始,默认情况下,DefaultMessagePropertiesConverter
将长于长字符串限制(默认值:1024)的 header 保留为 LongString
实例。您可以通过 getBytes[]
、toString()
或 getStream()
方法访问其内容。
以前,DefaultMessagePropertiesConverter
会将此类 header“转换”为 DataInputStream
(实际上它只是引用了 LongString
实例的 DataInputStream
)。在输出时,此 header 未被转换(除了转换为 String,例如通过调用 stream 的 toString()
转换为 java.io.DataInputStream@1d057a39
)。
大的入站 LongString
header 现在在输出时也能正确“转换”(默认情况下)。
提供了一个新的构造函数,允许您配置转换器,使其与以前一样工作。以下清单显示了该方法的 Javadoc 注释和声明:
/**
* Construct an instance where LongStrings will be returned
* unconverted or as a java.io.DataInputStream when longer than this limit.
* Use this constructor with 'true' to restore pre-1.6 behavior.
* @param longStringLimit the limit.
* @param convertLongLongStrings LongString when false,
* DataInputStream when true.
* @since 1.6
*/
public DefaultMessagePropertiesConverter(int longStringLimit, boolean convertLongLongStrings) { ... }
同样从版本 1.6 开始,向 MessageProperties
中添加了一个名为 correlationIdString
的新属性。以前,在转换为和从 RabbitMQ 客户端使用的 BasicProperties
时,会执行不必要的 byte[] <→ String
转换,因为 MessageProperties.correlationId
是一个 byte[]
,而 BasicProperties
使用一个 String
。(最终,RabbitMQ 客户端使用 UTF-8 将 String
转换为字节以放入协议消息中)。
为了提供最大的向后兼容性,向 DefaultMessagePropertiesConverter
中添加了一个名为 correlationIdPolicy
的新属性。它接受一个 DefaultMessagePropertiesConverter.CorrelationIdPolicy
枚举参数。默认情况下,它设置为 BYTES
,这复制了以前的行为。
对于入站消息:
-
STRING
:仅映射correlationIdString
属性 -
BYTES
:仅映射correlationId
属性 -
BOTH
:映射两个属性
对于出站消息:
-
STRING
:仅映射correlationIdString
属性 -
BYTES
:仅映射correlationId
属性 -
BOTH
:考虑两个属性,其中String
属性优先
同样从版本 1.6 开始,入站 deliveryMode
属性不再映射到 MessageProperties.deliveryMode
。它被映射到 MessageProperties.receivedDeliveryMode
。此外,入站 userId
属性不再映射到 MessageProperties.userId
。它被映射到 MessageProperties.receivedUserId
。这些更改是为了避免如果同一个 MessageProperties
对象用于出站消息时意外传播这些属性。
从版本 2.2 开始,DefaultMessagePropertiesConverter
会将任何值类型为 Class<?>
的自定义 header 使用 getName()
而不是 toString()
进行转换;这避免了消费端应用程序需要从 toString()
表示形式中解析类名。对于滚动升级,您可能需要更改您的消费者,使其能够理解两种格式,直到所有生产者都升级完成。