消息

Spring Integration Message 是一个通用的数据容器。任何对象都可以作为负载 (payload),并且每个 Message 实例都包含带有用户可扩展属性的头 (headers),这些属性以键值对的形式存储。

Message 接口

以下列表展示了 Message 接口的定义

public interface Message<T> {

    T getPayload();

    MessageHeaders getHeaders();

}

Message 接口是 API 的核心部分。通过将数据封装在通用包装器中,消息系统可以在不了解数据类型的情况下传递数据。随着应用程序演进以支持新类型,或者类型本身被修改或扩展时,消息系统不会受到影响。另一方面,当消息系统中的某些组件确实需要访问有关 Message 的信息时,此类元数据通常可以存储到消息头中并从中检索。

消息头 (Message Headers)

正如 Spring Integration 允许任何 Object 用作 Message 的负载一样,它也支持任何 Object 类型作为头值。实际上,MessageHeaders 类实现了 java.util.Map_ interface,如下面的类定义所示

public final class MessageHeaders implements Map<String, Object>, Serializable {
  ...
}
尽管 MessageHeaders 类实现了 Map,但它实际上是一个只读实现。任何尝试在 Map 中 put 值都会导致 UnsupportedOperationException。对于 removeclear 也是如此。由于消息可能传递给多个消费者,因此不能修改 Map 的结构。同样,消息的负载 Object 在初次创建后不能被 set。然而,头值本身(或负载 Object)的可变性有意留给框架用户自行决定。

作为 Map 的实现,可以通过调用带有头名称的 get(..) 来检索头。或者,您可以提供预期的 Class 作为附加参数。更好的是,当检索预定义值时,提供了方便的 getter 方法。以下示例展示了这三种选项

Object someValue = message.getHeaders().get("someKey");

CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);

Long timestamp = message.getHeaders().getTimestamp();

下表描述了预定义的消息头

表 1. 预定义消息头
头名称 头类型 用途
 MessageHeaders.ID
 java.util.UUID

此消息实例的标识符。每次消息被修改时都会改变。

 MessageHeaders.
TIMESTAMP
 java.lang.Long

消息创建的时间。每次消息被修改时都会改变。

 MessageHeaders.
REPLY_CHANNEL
 java.lang.Object
(String or
MessageChannel)

当未配置显式输出通道,且没有 ROUTING_SLIPROUTING_SLIP 已用尽时,回复(如果有)发送到的通道。如果值是 String,它必须表示一个 Bean 名称或由 ChannelRegistry 生成。

 MessageHeaders.
ERROR_CHANNEL
 java.lang.Object
(String or
MessageChannel)

错误发送到的通道。如果值是 String,它必须表示一个 Bean 名称或由 ChannelRegistry 生成。

许多入站和出站适配器实现也提供或期望某些头,并且您可以配置额外的用户定义头。这些头的常量可以在存在这些头的模块中找到——例如 AmqpHeadersJmsHeaders 等等。

MessageHeaderAccessor API

从 Spring Framework 4.0 和 Spring Integration 4.0 开始,核心消息抽象已移至 spring-messaging 模块,并引入了 MessageHeaderAccessor API,以提供对消息实现的额外抽象。所有 (核心) Spring Integration 特定消息头常量现在都在 IntegrationMessageHeaderAccessor 类中声明。下表描述了预定义的消息头

表 2. 预定义消息头
头名称 头类型 用途
 IntegrationMessageHeaderAccessor.
CORRELATION_ID
 java.lang.Object

用于关联两个或更多消息。

 IntegrationMessageHeaderAccessor.
SEQUENCE_NUMBER
 java.lang.Integer

通常是带有 SEQUENCE_SIZE 的消息组的序列号,但也可以在 <resequencer/> 中使用,以对无界消息组进行重排序。

 IntegrationMessageHeaderAccessor.
SEQUENCE_SIZE
 java.lang.Integer

一组关联消息中的消息数量。

 IntegrationMessageHeaderAccessor.
EXPIRATION_DATE
 java.lang.Long

指示消息何时过期。框架不直接使用,但可以通过 Header Enricher 设置,并在配置了 UnexpiredMessageSelector<filter/> 中使用。

 IntegrationMessageHeaderAccessor.
PRIORITY
 java.lang.Integer

消息优先级——例如,在 PriorityChannel 中。

 IntegrationMessageHeaderAccessor.
DUPLICATE_MESSAGE
 java.lang.Boolean

如果消息被幂等接收者拦截器检测为重复。参见 幂等接收者企业集成模式

 IntegrationMessageHeaderAccessor.
CLOSEABLE_RESOURCE
 java.io.Closeable

如果消息与一个处理完成后应关闭的 Closeable 相关联,则存在此头。例如,使用 FTP、SFTP 等进行流式文件传输时关联的 Session

 IntegrationMessageHeaderAccessor.
DELIVERY_ATTEMPT
 java.lang.
AtomicInteger

如果消息驱动通道适配器支持配置 RetryTemplate,则此头包含当前的投递尝试次数。

 IntegrationMessageHeaderAccessor.
ACKNOWLEDGMENT_CALLBACK
 o.s.i.support.
Acknowledgment
Callback

如果入站端点支持,则会有一个回调函数用于接受、拒绝或重新入队消息。参见 延迟确认可轮询消息源MQTT 手动确认

IntegrationMessageHeaderAccessor 类提供了其中一些头的便捷类型化 getter,如下例所示

IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...

下表描述了也出现在 IntegrationMessageHeaderAccessor 中但通常不由用户代码使用的头(即,它们通常由 Spring Integration 的内部部分使用——此处包含它们是为了完整性)

表 3. 预定义消息头
头名称 头类型 用途
 IntegrationMessageHeaderAccessor.
SEQUENCE_DETAILS
 java.util.
List<List<Object>>

当需要嵌套关联时使用的关联数据堆栈(例如,分发器→…​→分发器→…​→聚合器→…​→聚合器)。

 IntegrationMessageHeaderAccessor.
ROUTING_SLIP
 java.util.
Map<List<Object>, Integer>

参见 路由单

消息 ID 生成

当消息通过应用程序转换时,每次被修改(例如,通过转换器)都会分配一个新的消息 ID。消息 ID 是一个 UUID。从 Spring Integration 3.0 开始,用于 ID 生成的默认策略比以前的 java.util.UUID.randomUUID() 实现更高效。它基于安全的随机种子使用简单的随机数,而不是每次都创建安全的随机数。

可以通过在应用程序上下文中声明一个实现 org.springframework.util.IdGenerator 接口的 Bean 来选择不同的 UUID 生成策略。

在一个 ClassLoader 中只能使用一种 UUID 生成策略。这意味着,如果在同一个 ClassLoader 中运行两个或更多应用程序上下文,它们会共享相同的策略。如果其中一个上下文更改了策略,所有上下文都会使用它。如果在同一个 ClassLoader 中的两个或更多上下文声明了类型为 org.springframework.util.IdGenerator 的 Bean,它们必须是同一类的实例。否则,尝试替换自定义策略的上下文将无法初始化。如果策略相同,但参数化不同,则使用第一个被初始化的上下文中的策略。

除了默认策略外,还提供了另外两个 IdGeneratorsorg.springframework.util.JdkIdGenerator 使用之前的 UUID.randomUUID() 机制。当实际上不需要 UUID 并且简单的递增值就足够时,您可以使用 o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator

只读头

MessageHeaders.IDMessageHeaders.TIMESTAMP 是只读头,不能被覆盖。

从版本 4.3.2 开始,MessageBuilder 提供了 readOnlyHeaders(String…​ readOnlyHeaders) API,用于自定义不应从上游 Message 复制的头列表。默认情况下,只有 MessageHeaders.IDMessageHeaders.TIMESTAMP 是只读的。提供了全局属性 spring.integration.readOnly.headers(参见 全局属性)来为框架组件自定义 DefaultMessageBuilderFactory。当您不想让 ObjectToJsonTransformer 等组件填充某些现成的头(例如 contentType)时,这非常有用(参见 JSON 转换器)。

当您尝试使用 MessageBuilder 构建新消息时,此类头将被忽略,并且会在日志中发出特定的 INFO 消息。

从版本 5.0 开始,当使用 DefaultMessageBuilderFactory 时,消息网关Header Enricher内容增强器Header Filter 不允许您配置 MessageHeaders.IDMessageHeaders.TIMESTAMP 头名称,并且会抛出 BeanInitializationException

头传播

当消息被产生消息的端点(例如 服务激活器)处理(和修改)时,通常,入站头会传播到出站消息。一个例外是 转换器,当完整的消息返回给框架时。在这种情况下,用户代码负责整个出站消息。当转换器只返回负载时,入站头会被传播。此外,只有当出站消息中不存在该头时,头才会被传播,从而允许您根据需要更改头值。

从版本 4.3.10 开始,您可以配置消息处理器(它们修改消息并产生输出)以抑制特定头的传播。要配置您不想复制的头,请在 MessageProducingMessageHandler 抽象类上调用 setNotPropagatedHeaders()addNotPropagatedHeaders() 方法。

您还可以通过在 META-INF/spring.integration.properties 中将 readOnlyHeaders 属性设置为逗号分隔的头列表来全局抑制特定消息头的传播。

从版本 5.0 开始,AbstractMessageProducingHandler 上的 setNotPropagatedHeaders() 实现应用简单的模式 (xxx*, *xxx, *xxx*, 或 xxx*yyy) 来允许过滤具有共同后缀或前缀的头。有关更多信息,请参阅 PatternMatchUtils Javadoc。当其中一个模式是 *(星号)时,不传播任何头。所有其他模式都被忽略。在这种情况下,服务激活器的行为与转换器相同,任何必需的头必须在服务方法返回的 Message 中提供。notPropagatedHeaders() 选项在 Java DSL 的 ConsumerEndpointSpec 中可用。它也作为 <service-activator> 组件的 XML 配置中的 not-propagated-headers 属性可用。

头传播抑制不适用于不修改消息的端点,例如 桥接器路由器

消息实现

Message 接口的基本实现是 GenericMessage<T>,它提供两个构造函数,如下列表所示

new GenericMessage<T>(T payload);

new GenericMessage<T>(T payload, Map<String, Object> headers)

创建 Message 时,会生成一个随机的唯一 ID。接受 Map 作为头的构造函数会将提供的头复制到新创建的 Message 中。

还有一个方便的 Message 实现,用于表示错误条件。此实现接受一个 Throwable 对象作为其负载,如下例所示

ErrorMessage message = new ErrorMessage(someThrowable);

Throwable t = message.getPayload();

请注意,此实现利用了 GenericMessage 基类是参数化的这一事实。因此,如两个示例所示,检索 Message 负载 Object 时无需进行强制转换。

前面提到的 Message 类实现是不可变的。在某些情况下,当可变性不是问题且应用程序逻辑设计良好以避免并发修改时,可以使用 MutableMessage

MessageBuilder 帮助类

您可能注意到 Message 接口定义了用于检索其负载和头的方法,但没有提供 setter。这是因为 Message 在初次创建后不能被修改。因此,当一个 Message 实例被发送到多个消费者(例如,通过发布-订阅通道)时,如果其中一个消费者需要发送带有不同负载类型的回复,它必须创建一个新的 Message。结果是,其他消费者不受这些更改的影响。请记住,多个消费者可能访问相同的负载实例或头值,并且此类实例本身是否可变是留给您决定的。换句话说,Message 实例的契约类似于不可变的 CollectionMessageHeaders map 进一步例证了这一点。尽管 MessageHeaders 类实现了 java.util.Map,但任何尝试在 MessageHeaders 实例上调用 put 操作(或 'remove' 或 'clear')都会导致 UnsupportedOperationException

Spring Integration 提供了一种更方便的方式来构建 Message,而不是要求创建和填充 Map 然后传递给 GenericMessage 构造函数:MessageBuilderMessageBuilder 提供了两个工厂方法,用于从现有 Message 或使用负载 Object 创建 Message 实例。从现有 Message 构建时,该 Message 的头和负载会被复制到新的 Message 中,如下例所示

Message<String> message1 = MessageBuilder.withPayload("test")
        .setHeader("foo", "bar")
        .build();

Message<String> message2 = MessageBuilder.fromMessage(message1).build();

assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));

如果您需要创建一个带有新负载但仍想从现有 Message 中复制头的 Message,您可以使用 'copy' 方法之一,如下例所示

Message<String> message3 = MessageBuilder.withPayload("test3")
        .copyHeaders(message1.getHeaders())
        .build();

Message<String> message4 = MessageBuilder.withPayload("test4")
        .setHeader("foo", 123)
        .copyHeadersIfAbsent(message1.getHeaders())
        .build();

assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));

请注意,copyHeadersIfAbsent 方法不会覆盖现有值。此外,在上面的示例中,您可以看到如何使用 setHeader 设置任何用户定义的头。最后,对于预定义头和设置任何头的非破坏性方法也提供了 set 方法(MessageHeaders 也定义了预定义头名称的常量)。

您还可以使用 MessageBuilder 设置消息的优先级,如下例所示

Message<Integer> importantMessage = MessageBuilder.withPayload(99)
        .setPriority(5)
        .build();

assertEquals(5, importantMessage.getHeaders().getPriority());

Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
        .setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
        .build();

assertEquals(2, lessImportantMessage.getHeaders().getPriority());

只有在使用 PriorityChannel 时才会考虑 priority 头(如下一章所述)。它被定义为 java.lang.Integer

提供了 MutableMessageBuilder 来处理 MutableMessage 实例。此类逻辑是创建 MutableMessage 或保留原样,并通过构建器方法修改其内容。这样可以在运行应用程序中获得轻微的性能提升,前提是不可变性不是消息交换的关注点。

从版本 6.4 开始,从 MessageBuilder 中提取了一个 BaseMessageBuilder 类,以简化默认消息构建逻辑的扩展。例如,结合自定义的 MessageBuilderFactory,可以在应用程序上下文中全局使用自定义的 BaseMessageBuilder 实现来提供自定义的 Message 实例。特别是,可以覆盖 GenericMessage.toString() 方法,以便在记录此类消息时隐藏负载和头中的敏感信息。

MessageBuilderFactory 抽象

带有 IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME 名称的 MessageBuilderFactory bean 会全局注册到应用程序上下文中,并在框架中随处用于创建 Message 实例。默认情况下,它是一个 DefaultMessageBuilderFactory 实例。开箱即用,框架还提供了一个 MutableMessageBuilderFactory,用于在框架组件中创建 MutableMessage 实例。要自定义 Message 实例的创建,必须在目标应用程序上下文中提供一个带有 IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME 名称的 MessageBuilderFactory bean,以覆盖默认的。例如,可以为 BaseMessageBuilder 的实现注册一个自定义的 MessageBuilderFactory,我们希望在该实现中提供一个 GenericMessage 的扩展,并覆盖 toString() 方法,以便在记录此类消息时隐藏负载和头中的敏感信息。

这些类的一些快速实现以演示个人可识别信息的缓解措施可以像这样

class PiiMessageBuilderFactory implements MessageBuilderFactory {

	@Override
	public <T> PiiMessageBuilder<T> fromMessage(Message<T> message) {
	    return new PiiMessageBuilder<>(message.getPayload(), message);
	}

	@Override
	public <T> PiiMessageBuilder<T> withPayload(T payload) {
	    return new PiiMessageBuilder<>(payload, null);
	}

}

class PiiMessageBuilder<P> extends BaseMessageBuilder<P, PiiMessageBuilder<P>> {

    public PiiMessageBuilder(P payload, @Nullable Message<P> originalMessage) {
        super(payload, originalMessage);
    }

    @Override
    public Message<P> build() {
        return new PiiMessage<>(getPayload(), getHeaders());
    }

}

class PiiMessage<P> extends GenericMessage<P> {

    @Serial
    private static final long serialVersionUID = -354503673433669578L;

    public PiiMessage(P payload, Map<String, Object> headers) {
        super(payload, headers);
    }

    @Override
    public String toString() {
        return "PiiMessage [payload=" + getPayload() + ", headers=" + maskHeaders(getHeaders()) + ']';
    }

    private static Map<String, Object> maskHeaders(Map<String, Object> headers) {
        return headers.entrySet()
                .stream()
                .map((entry) -> entry.getKey().equals("password") ? Map.entry(entry.getKey(), "******") : entry)
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

}

这个 PiiMessageBuilderFactory 可以注册为一个 Bean,并且无论何时框架记录消息(例如,在 errorChannel 的情况下),password 头都将被遮盖。