转换器

消息转换器在实现消息生产者和消息消费者的松耦合方面扮演着非常重要的角色。无需要求每个消息生产组件都知道下一个消费者期望什么类型,您可以在这些组件之间添加转换器。通用转换器(例如将 String 转换为 XML Document 的转换器)也具有高度的可重用性。

对于某些系统而言,提供规范数据模型可能是最佳选择,但 Spring Integration 的一般理念是不要求特定的格式。相反,为了实现最大的灵活性,Spring Integration 旨在提供最简单的扩展模型。与其他端点类型一样,在 XML 或 Java 注解中使用声明式配置可以将简单的 POJO 适配为消息转换器的角色。本章的其余部分将介绍这些配置选项。

为了最大限度地提高灵活性,Spring 不要求基于 XML 的消息负载。尽管如此,如果基于 XML 的负载确实是您应用程序的正确选择,该框架确实提供了一些方便的转换器来处理它们。有关这些转换器的更多信息,请参阅XML 支持 - 处理 XML Payload

使用 Java 和其他 DSL 配置转换器

对于简单的 Java 和注解配置,Spring bean POJO 方法必须使用 @Transformer 注解标记,当从输入通道消费消息时,框架会调用它

public class SomeService {

    @Transformer(inputChannel = "transformChannel", outputChannel = "nextServiceChannel")
    public OutputData exampleTransformer(InputData payload) {
        ...
    }

}

有关更多信息,请参阅注解支持

对于 Java、Groovy 或 Kotlin DSL,IntegrationFlow.transform() 操作符表示一个转换器端点

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

@Bean
public IntegrationFlow someFlow() {
    return IntegrationFlow
             .from("transformChannel")
             .transform(someService, "exampleTransformer")
             .channel("nextServiceChannel")
             .get();
}
@Bean
fun someFlow() =
    integrationFlow("transformChannel") {
        transform(someService, "exampleTransformer")
        channel("nextServiceChannel")
    }
@Bean
someFlow() {
    integrationFlow 'transformChannel',
            {
                transform someService, 'exampleTransformer'
                channel 'nextServiceChannel'
            }
}

有关 DSL 的更多信息,请参阅相应的章节

使用 XML 配置转换器

<transformer> 元素用于创建消息转换端点。除了 input-channeloutput-channel 属性外,它还需要一个 ref 属性。ref 可以指向包含单个方法上带有 @Transformer 注解的对象(参见使用注解配置转换器),也可以与 method 属性中提供的显式方法名值结合使用。

<int:transformer id="testTransformer" ref="testTransformerBean" input-channel="inChannel"
             method="transform" output-channel="outChannel"/>
<beans:bean id="testTransformerBean" class="org.foo.TestTransformer" />

如果自定义转换器处理程序实现可以在其他 <transformer> 定义中重用,通常建议使用 ref 属性。但是,如果自定义转换器处理程序实现应限于单个 <transformer> 定义,您可以定义内部 bean 定义,如下例所示

<int:transformer id="testTransformer" input-channel="inChannel" method="transform"
                output-channel="outChannel">
  <beans:bean class="org.foo.TestTransformer"/>
</transformer>
在同一个 <transformer> 配置中同时使用 ref 属性和内部处理程序定义是不允许的,因为它会创建歧义条件并导致抛出异常。
如果 ref 属性引用了扩展 AbstractMessageProducingHandler 的 bean(例如框架本身提供的转换器),则通过将输出通道直接注入处理程序来优化配置。在这种情况下,每个 ref 必须指向一个单独的 bean 实例(或 prototype 作用域的 bean),或者使用内部 <bean/> 配置类型。如果不小心从多个 bean 引用了同一个消息处理程序,您将收到配置异常。

使用 POJO 时,用于转换的方法可以期望 Message 类型或入站消息的 payload 类型。它还可以分别使用 @Header@Headers 参数注解来接受单个消息头值或完整的 Map。方法的返回值可以是任何类型。如果返回值本身是 Message,则将其传递到转换器的输出通道。

自 Spring Integration 2.0 起,消息转换器的转换方法不能再返回 null。返回 null 会导致异常,因为消息转换器应始终期望将每个源消息转换为有效的目标消息。换句话说,不应将消息转换器用作消息过滤器,因为为此提供了专用的 <filter> 选项。但是,如果您确实需要这种行为(组件可能返回 null 且不应视为错误),则可以使用服务激活器。其 requires-reply 值默认为 false,但可以将其设置为 true,以便对 null 返回值抛出异常,就像转换器一样。

转换器与 Spring 表达式语言 (SpEL)

与路由器、聚合器和其他组件一样,自 Spring Integration 2.0 起,当转换逻辑相对简单时,转换器也可以受益于SpEL 支持。以下示例展示了如何使用 SpEL 表达式

<int:transformer input-channel="inChannel"
	output-channel="outChannel"
	expression="payload.toUpperCase() + '- [' + T(System).currentTimeMillis() + ']'"/>

上面的示例在不编写自定义转换器的情况下转换了 payload。我们的 payload(假定为 String)被转换为大写,与当前时间戳连接,并应用了一些格式。

常用转换器

Spring Integration 提供了一些转换器实现。

Object-to-String 转换器

由于使用 ObjecttoString() 表示形式相当常见,Spring Integration 提供了一个 ObjectToStringTransformer(另请参见 Transformers 工厂),其中输出是一个带有 String payloadMessage。该 String 是对入站 Message 的 payload 调用 toString() 操作的结果。以下示例展示了如何声明 object-to-string 转换器的实例

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • XML

@Bean
public IntegrationFlow someFlow() {
    return IntegrationFlow
             .from("in")
             .transform(Transformers.objectToString())
             .channel("out")
             .get();
}
@Bean
fun someFlow() =
    integrationFlow("in") {
        transform(Transformers.objectToString())
        channel("out")
    }
@Bean
someFlow() {
    integrationFlow 'in',
            {
                transform Transformers.objectToString()
                channel 'out'
            }
}
<int:object-to-string-transformer input-channel="in" output-channel="out"/>

此转换器的一种潜在用途是将任意对象发送到 file 命名空间中的 'outbound-channel-adapter'。由于该通道适配器默认仅支持 String、byte-array 或 java.io.File payloads,因此在适配器之前立即添加此转换器可以处理必要的转换。只要 toString() 调用的结果是您想要写入文件的数据,这种方法就可以正常工作。否则,您可以使用前面显示的通用 'transformer' 元素提供基于 POJO 的自定义转换器。

调试时,通常不需要此转换器,因为 logging-channel-adapter 能够记录消息 payload。有关详细信息,请参见Wire Tap

object-to-string 转换器非常简单。它对入站 payload 调用 toString()。自 Spring Integration 3.0 起,此规则有两个例外:

  • 如果 payload 是 char[],则调用 new String(payload)

  • 如果 payload 是 byte[],则调用 new String(payload, charset),其中 charset 默认为 UTF-8。可以通过在转换器上提供 charset 属性来修改 charset

对于更复杂的功能(例如在运行时动态选择 charset),您可以改用基于 SpEL 表达式的转换器,如下例所示

  • Java DSL

  • XML

@Bean
public IntegrationFlow someFlow() {
    return IntegrationFlow
             .from("in")
             .transform("new String(payload, headers['myCharset']")
             .channel("out")
             .get();
}
<int:transformer input-channel="in" output-channel="out"
       expression="new String(payload, headers['myCharset']" />

如果您需要将 Object 序列化为 byte 数组或将 byte 数组反序列化回 Object,Spring Integration 提供了对称的序列化转换器。这些转换器默认使用标准的 Java 序列化,但您可以通过分别使用 serializerdeserializer 属性来提供 Spring SerializerDeserializer 策略的实现。另请参见 Transformers 工厂类。以下示例展示了如何使用 Spring 的序列化器和反序列化器

  • Java DSL

  • XML

@Bean
public IntegrationFlow someFlow() {
    return IntegrationFlow
             .from("objectsIn")
             .transform(Transformers.serializer())
             .channel("bytesOut")
             .channel("bytesIn")
             .transform(Transformers.deserializer("com.mycom.*", "com.yourcom.*"))
             .channel("objectsOut")
             .get();
}
<int:payload-serializing-transformer input-channel="objectsIn" output-channel="bytesOut"/>

<int:payload-deserializing-transformer input-channel="bytesIn" output-channel="objectsOut"
    allow-list="com.mycom.*,com.yourcom.*"/>
从不受信任的源反序列化数据时,应考虑添加包和类模式的允许列表(allow-list)。默认情况下,所有类都被反序列化。

Object-to-MapMap-to-Object 转换器

Spring Integration 还提供了 Object-to-MapMap-to-Object 转换器,它们使用 JSON 来序列化和反序列化对象图。对象层次结构被内省到最原始的类型(Stringint 等)。到达此类型的路径用 SpEL 描述,它成为转换后的 Map 中的 key。原始类型成为 value。

考虑以下示例

public class Parent{
    private Child child;
    private String name;
    // setters and getters are omitted
}

public class Child{
    private String name;
    private List<String> nickNames;
    // setters and getters are omitted
}

上例中的两个类被转换为以下 Map

{person.name=George, person.child.name=Jenna, person.child.nickNames[0]=Jen ...}

基于 JSON 的 Map 允许您在不共享实际类型的情况下描述对象结构,这使您能够将对象图恢复并重建为不同类型的对象图,只要保持结构即可。

例如,通过使用 Map-to-Object 转换器,可以将上述结构恢复回以下对象图

public class Father {
    private Kid child;
    private String name;
    // setters and getters are omitted
}

public class Kid {
    private String name;
    private List<String> nickNames;
    // setters and getters are omitted
}

如果需要创建“结构化”的 map,可以提供 flatten 属性。默认值为 'true'。如果将其设置为 'false',则结构是一个 Map 的 Map 对象。

考虑以下示例

public class Parent {
	private Child child;
	private String name;
	// setters and getters are omitted
}

public class Child {
	private String name;
	private List<String> nickNames;
	// setters and getters are omitted
}

上例中的两个类被转换为以下 Map

{name=George, child={name=Jenna, nickNames=[Bimbo, ...]}}

为了配置这些转换器,Spring Integration 提供了相应的 XML 组件和 Java DSL 工厂

  • Java DSL

  • XML

@Bean
public IntegrationFlow someFlow() {
    return IntegrationFlow
             .from("directInput")
             .transform(Transformers.toMap())
             .channel("output")
             .get();
}
<int:object-to-map-transformer input-channel="directInput" output-channel="output"/>

您还可以将 flatten 属性设置为 false,如下所示

  • Java DSL

  • XML

@Bean
public IntegrationFlow someFlow() {
    return IntegrationFlow
             .from("directInput")
             .transform(Transformers.toMap(false))
             .channel("output")
             .get();
}
<int:object-to-map-transformer input-channel="directInput" output-channel="output" flatten="false"/>

Spring Integration 为 Map-to-Object 提供了 XML 命名空间支持,并且 Java DSL 工厂具有 fromMap() 方法,如下例所示

  • Java DSL

  • XML

@Bean
public IntegrationFlow someFlow() {
    return IntegrationFlow
             .from("input")
             .transform(Transformers.fromMap(org.something.Person.class))
             .channel("output")
             .get();
}
<int:map-to-object-transformer input-channel="input"
                         output-channel="output"
                         type="org.something.Person"/>

或者,您可以使用 ref 属性和 prototype 作用域的 bean,如下例所示

  • Java DSL

  • XML

@Bean
IntegrationFlow someFlow() {
    return IntegrationFlow
             .from("inputA")
             .transform(Transformers.fromMap("person"))
             .channel("outputA")
             .get();
}

@Bean
@Scope("prototype")
Person person() {
    return new Person();
}
<int:map-to-object-transformer input-channel="inputA"
                               output-channel="outputA"
                               ref="person"/>
<bean id="person" class="org.something.Person" scope="prototype"/>
‘ref’ 和 ‘type’ 属性是互斥的。此外,如果您使用 ‘ref’ 属性,则必须指向一个 ‘prototype’ 作用域的 bean。否则,会抛出 BeanCreationException

从 5.0 版本开始,您可以为 ObjectToMapTransformer 提供定制的 JsonObjectMapper — 当您需要特殊格式来处理日期或空集合的 null 值(以及其他用途)时。有关 JsonObjectMapper 实现的更多信息,请参见JSON 转换器

Stream 转换器

StreamTransformerInputStream payloads 转换为 byte[](如果提供了 charset,则转换为 String)。

以下示例展示了如何在 XML 中使用 stream-transformer 元素

  • Java DSL

  • XML

@Bean
public IntegrationFlow someFlow() {
    return IntegrationFlow
             .from("input")
             .transform(Transformers.fromStream("UTF-8"))
             .channel("output")
             .get();
}
<int:stream-transformer input-channel="directInput" output-channel="output"/> <!-- byte[] -->

<int:stream-transformer id="withCharset" charset="UTF-8"
    input-channel="charsetChannel" output-channel="output"/> <!-- String -->

以下示例展示了如何使用 StreamTransformer 类和 @Transformer 注解在 Java 中配置 stream 转换器

@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public StreamTransformer streamToBytes() {
    return new StreamTransformer(); // transforms to byte[]
}

@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public StreamTransformer streamToString() {
    return new StreamTransformer("UTF-8"); // transforms to String
}

JSON 转换器

Spring Integration 提供了 Object-to-JSON 和 JSON-to-Object 转换器。以下两对示例展示了如何在 XML 中声明它们

<int:object-to-json-transformer input-channel="objectMapperInput"/>

<int:json-to-object-transformer input-channel="objectMapperInput"
    type="foo.MyDomainObject"/>

默认情况下,上面列表中的转换器使用普通的 JsonObjectMapper。它基于 classpath 中的实现。您可以提供自己的带有适当选项或基于所需库(如 GSON)的自定义 JsonObjectMapper 实现,如下例所示

<int:json-to-object-transformer input-channel="objectMapperInput"
    type="something.MyDomainObject" object-mapper="customObjectMapper"/>

从 3.0 版本开始,object-mapper 属性引用了一个新的策略接口:JsonObjectMapper 的实例。这个抽象允许使用 JSON 映射器的多种实现。提供了封装 Jackson 2 的实现,其版本在 classpath 中检测。该类分别是 Jackson2JsonObjectMapper

您可能需要考虑使用 FactoryBean 或工厂方法来创建具有所需特性的 JsonObjectMapper。以下示例展示了如何使用这样的工厂

public class ObjectMapperFactory {

    public static Jackson2JsonObjectMapper getMapper() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
        return new Jackson2JsonObjectMapper(mapper);
    }
}

以下示例展示了如何在 XML 中执行相同的操作

<bean id="customObjectMapper" class="something.ObjectMapperFactory"
            factory-method="getMapper"/>

从 2.2 版本开始,如果输入消息尚未包含该头部,object-to-json-transformer 会默认将 content-type 头部设置为 application/json

如果您希望将 content-type 头部设置为其他值,或显式覆盖现有头部(包括 application/json),请使用 content-type 属性。如果您希望抑制头部的设置,请将 content-type 属性设置为空字符串("")。这样做会生成一个没有 content-type 头部的消息,除非输入消息中已存在该头部。

从 3.0 版本开始,ObjectToJsonTransformer 会向消息添加反映源类型的头部。类似地,JsonToObjectTransformer 在将 JSON 转换为对象时可以使用这些类型头部。这些头部在 AMQP 适配器中进行映射,以便它们与 Spring-AMQP JsonMessageConverter 完全兼容。

这使得以下流程无需任何特殊配置即可工作

  • …​→amqp-outbound-adapter---→

  • ---→amqp-inbound-adapter→json-to-object-transformer→…​

    其中 outbound 适配器配置了 JsonMessageConverter,而 inbound 适配器使用默认的 SimpleMessageConverter

  • …​→object-to-json-transformer→amqp-outbound-adapter---→

  • ---→amqp-inbound-adapter→…​

    其中 outbound 适配器配置了 SimpleMessageConverter,而 inbound 适配器使用默认的 JsonMessageConverter

  • …​→object-to-json-transformer→amqp-outbound-adapter---→

  • ---→amqp-inbound-adapter→json-to-object-transformer→

    其中两个适配器都配置了 SimpleMessageConverter

使用头部确定类型时,不应提供 class 属性,因为它优先于头部。

除了 JSON 转换器,Spring Integration 还提供了一个内置的 #jsonPath SpEL 函数用于表达式。有关更多信息,请参阅Spring 表达式语言 (SpEL)

自 3.0 版本以来,Spring Integration 还提供了一个内置的 #xpath SpEL 函数用于表达式。有关更多信息,请参阅#xpath SpEL 函数

从 4.0 版本开始,ObjectToJsonTransformer 支持 resultType 属性,用于指定节点的 JSON 表示形式。结果节点树的表示取决于所提供的 JsonObjectMapper 的实现。默认情况下,ObjectToJsonTransformer 使用 Jackson2JsonObjectMapper 并将对象转换为节点树的委托给 ObjectMapper#valueToTree 方法。当下游消息流使用 SpEL 表达式访问 JSON 数据属性时,节点 JSON 表示形式提高了使用 JsonPropertyAccessor 的效率。有关更多信息,请参阅属性访问器

从 5.1 版本开始,resultType 可以配置为 BYTES,以便生成带有 byte[] payload 的消息,方便与处理此数据类型的下游处理程序一起工作。

从 5.2 版本开始,JsonToObjectTransformer 可以配置 ResolvableType,以在目标 JSON 处理器进行反序列化时支持泛型。此外,此组件现在首先检查请求消息头部是否存在 JsonHeaders.RESOLVABLE_TYPEJsonHeaders.TYPE_ID,否则回退到配置的类型。ObjectToJsonTransformer 现在还会根据请求消息 payload 填充 JsonHeaders.RESOLVABLE_TYPE 头部,以应对任何可能的下游场景。

从 5.2.6 版本开始,JsonToObjectTransformer 可以提供 valueTypeExpression,用于在运行时根据请求消息解析要从 JSON 转换的 payload 的 ResolvableType。默认情况下,它会查询请求消息中的 JsonHeaders。如果此表达式返回 nullResolvableType 构建抛出 ClassNotFoundException,则转换器会回退到提供的 targetType。此逻辑作为表达式存在是因为 JsonHeaders 可能没有实际的类值,而是需要根据外部注册表映射到目标类的一些类型 ID。

Apache Avro 转换器

5.2 版本添加了简单的转换器用于与 Apache Avro 之间进行转换。

它们不够复杂,因为没有 schema 注册表;转换器只是使用从 Avro schema 生成的 SpecificRecord 实现中嵌入的 schema。

发送到 SimpleToAvroTransformer 的消息必须具有实现 SpecificRecord 的 payload;转换器可以处理多种类型。SimpleFromAvroTransformer 必须配置 SpecificRecord 类,该类用作默认的反序列化类型。您还可以使用 setTypeExpression 方法指定一个 SpEL 表达式来确定反序列化的类型。默认的 SpEL 表达式是 headers[avro_type] (AvroHeaders.TYPE),SimpleToAvroTransformer 默认使用源类的完全限定类名来填充它。如果表达式返回 null,则使用 defaultType

SimpleToAvroTransformer 也有 setTypeExpression 方法。这允许生产者和消费者解耦,发送者可以将头部设置为表示类型的某个标记,然后消费者将该标记映射到类型。

Protocol Buffers 转换器

6.1 版本增加了对与 Protocol Buffers 数据内容之间进行转换的支持。

ToProtobufTransformercom.google.protobuf.Message 消息 payloads 转换为原生 byte 数组或 json 文本 payloads。默认使用的 application/x-protobuf 内容类型会产生 byte 数组输出 payload。如果内容类型是 application/json 并且在 classpath 中找到 com.google.protobuf:protobuf-java-util,则输出是文本 json payload。如果未设置 content type 头部,ToProtobufTransformer 默认为 application/x-protobuf

FromProtobufTransformer 将字节数组或文本 protobuf 负载(取决于内容类型)转换回 com.google.protobuf.Message 实例。FromProtobufTransformer 应该显式指定一个预期的类类型(使用 setExpectedType 方法),或者使用 SpEL 表达式通过 setExpectedTypeExpression 方法确定要反序列化的类型。默认的 SpEL 表达式是 headers[proto_type] (ProtoHeaders.TYPE),它由 ToProtobufTransformer 填充了源 com.google.protobuf.Message 类的全限定类名。

例如,编译以下 IDL

syntax = "proto2";
package tutorial;

option java_multiple_files = true;
option java_package = "org.example";
option java_outer_classname = "MyProtos";

message MyMessageClass {
  optional string foo = 1;
  optional string bar = 2;
}

将生成一个新的 org.example.MyMessageClass 类。

然后使用

// Transforms a MyMessageClass instance into a byte array.
ToProtobufTransformer toTransformer = new ToProtobufTransformer();

MyMessageClass test = MyMessageClass.newBuilder()
                                .setFoo("foo")
                                .setBar("bar")
                                .build();
// message1 payload is byte array protocol buffer wire format.
Message message1 = toTransformer.transform(new GenericMessage<>(test));

// Transforms a byte array payload into a MyMessageClass instance.
FromProtobufTransformer fromTransformer = new FromProtobufTransformer();

// message2 payload == test
Message message2 =  fromTransformer.transform(message1);

使用注解配置转换器

您可以将 @Transformer 注解添加到期望接收 Message 类型或消息负载类型的方法上。返回值的处理方式与前面描述 <transformer> 元素的章节中描述的完全相同。以下示例展示了如何使用 @Transformer 注解将一个 String 转换为一个 Order

@Transformer
Order generateOrder(String productId) {
    return new Order(productId);
}

转换器方法也可以接受 @Header@Headers 注解,正如 注解支持 中所记录的。以下示例展示了如何使用 @Header 注解

@Transformer
Order generateOrder(String productId, @Header("customerName") String customer) {
    return new Order(productId, customer);
}

另请参阅 使用注解增强端点

头部过滤器

有时,您的转换用例可能就像移除几个头部一样简单。对于这样的用例,Spring Integration 提供了一个头部过滤器,让您可以指定应该从输出消息中移除的特定头部名称(例如,出于安全原因或只需要临时使用的值而移除头部)。基本上,头部过滤器与头部丰富器相反。后者在 头部丰富器 中讨论。以下示例定义了一个头部过滤器

  • Java DSL

  • XML

@Bean
public IntegrationFlow someFlow() {
    return IntegrationFlow
             .from("inputChannel")
             .headerFilter("lastName", "state")
             .channel("outputChannel")
             .get();
}
<int:header-filter input-channel="inputChannel"
		output-channel="outputChannel" header-names="lastName, state"/>

如您所见,头部过滤器的配置非常简单。它是一个典型的端点,带有输入和输出通道以及一个 header-names 属性。该属性接受需要被移除的头部名称(如果多个则用逗号分隔)。因此,在前面的示例中,名为 ‘lastName’ 和 ‘state’ 的头部不会出现在出站消息中。

基于 Codec 的转换器

参见 Codec