消息

Spring Integration 的Message是用于数据的通用容器。任何对象都可以作为有效负载提供,每个Message实例都包含包含用户可扩展属性的键值对的标头。

Message 接口

下面的列表显示了Message接口的定义。

public interface Message<T> {

    T getPayload();

    MessageHeaders getHeaders();

}

Message接口是API的核心部分。通过将数据封装在通用包装器中,消息系统可以在不知道数据类型的情况下传递数据。随着应用程序发展以支持新类型,或者当类型本身被修改或扩展时,消息系统不会受到影响。另一方面,当消息系统中的某些组件确实需要访问有关Message的信息时,此类元数据通常可以存储到消息标头中的元数据中并从中检索。

消息标头

正如Spring Integration允许任何Object用作Message的有效负载一样,它也支持任何Object类型作为标头值。事实上,MessageHeaders类实现了java.util.Map接口,如下面的类定义所示。

public final class MessageHeaders implements Map<String, Object>, Serializable {
  ...
}
即使MessageHeaders类实现了Map,它实际上也是一个只读实现。任何尝试在Map中put值的尝试都会导致UnsupportedOperationExceptionremoveclear也是如此。由于消息可能会传递给多个消费者,因此无法修改Map的结构。同样,在初始创建之后,也不能set消息的有效负载Object。但是,标头值本身(或有效负载Object)的可变性有意留给框架用户决定。

作为Map的实现,可以通过使用标头的名称调用get(..)来检索标头。或者,您可以提供预期的Class作为附加参数。更好的是,当检索预定义值之一时,可以使用方便的getter。以下示例显示了这三种选项中的每一种。

Object someValue = message.getHeaders().get("someKey");

CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);

Long timestamp = message.getHeaders().getTimestamp();

下表描述了预定义的消息标头。

表1. 预定义的消息标头
标头名称 标头类型 用法
 MessageHeaders.ID
 java.util.UUID

此消息实例的标识符。每次消息发生变异时都会更改。

 MessageHeaders.
TIMESTAMP
 java.lang.Long

创建消息的时间。每次消息发生变异时都会更改。

 MessageHeaders.
REPLY_CHANNEL
 java.lang.Object
(String or
MessageChannel)

当没有显式输出通道配置且没有ROUTING_SLIPROUTING_SLIP已用尽时,回复(如果有)发送到的通道。如果值为String,则它必须表示bean名称或由ChannelRegistry生成。

 MessageHeaders.
ERROR_CHANNEL
 java.lang.Object
(String or
MessageChannel)

发送错误的通道。如果值为String,则它必须表示bean名称或由ChannelRegistry生成。

许多入站和出站适配器实现也提供或需要某些标头,您可以配置其他用户定义的标头。这些标头的常量可以在存在此类标头的模块中找到,例如AmqpHeadersJmsHeaders等等。

MessageHeaderAccessor API

从Spring Framework 4.0和Spring Integration 4.0开始,核心消息传递抽象已移动到spring-messaging模块,并且引入了MessageHeaderAccessor API以提供对消息传递实现的额外抽象。所有(核心)Spring Integration特定的消息标头常量现在都在IntegrationMessageHeaderAccessor类中声明。下表描述了预定义的消息标头。

表2. 预定义的消息标头
标头名称 标头类型 用法
 IntegrationMessageHeaderAccessor.
CORRELATION_ID
 java.lang.Object

用于关联两条或多条消息。

 IntegrationMessageHeaderAccessor.
SEQUENCE_NUMBER
 java.lang.Integer

通常是一组具有SEQUENCE_SIZE的消息的序列号,但也可以在<resequencer/>中使用来重新排序无限组的消息。

 IntegrationMessageHeaderAccessor.
SEQUENCE_SIZE
 java.lang.Integer

一组相关消息中的消息数量。

 IntegrationMessageHeaderAccessor.
EXPIRATION_DATE
 java.lang.Long

指示消息何时过期。框架不直接使用,但可以使用标头增强器设置,并用于配置了UnexpiredMessageSelector<filter/>

 IntegrationMessageHeaderAccessor.
PRIORITY
 java.lang.Integer

消息优先级,例如,在PriorityChannel内。

 IntegrationMessageHeaderAccessor.
DUPLICATE_MESSAGE
 java.lang.Boolean

如果幂等接收器拦截器检测到消息为重复消息,则为真。参见幂等接收器企业集成模式

 IntegrationMessageHeaderAccessor.
CLOSEABLE_RESOURCE
 java.io.Closeable

如果消息与在消息处理完成后应关闭的Closeable相关联,则存在此标头。一个示例是使用FTP、SFTP等进行流式文件传输时关联的Session

 IntegrationMessageHeaderAccessor.
DELIVERY_ATTEMPT
 java.lang.
AtomicInteger

如果消息驱动的通道适配器支持RetryTemplate的配置,则此标头包含当前的传递尝试次数。

 IntegrationMessageHeaderAccessor.
ACKNOWLEDGMENT_CALLBACK
 o.s.i.support.
Acknowledgment
Callback

如果入站端点支持,则回调以接受、拒绝或重新排队消息。参见延迟确认轮询消息源MQTT手动确认

IntegrationMessageHeaderAccessor类提供了一些这些标头的便捷类型化getter,如下例所示。

IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...

下表描述了也出现在IntegrationMessageHeaderAccessor中但通常不用于用户代码的标头(即,它们通常由Spring Integration的内部部分使用——此处包含它们是为了完整性)。

表3. 预定义的消息标头
标头名称 标头类型 用法
 IntegrationMessageHeaderAccessor.
SEQUENCE_DETAILS
 java.util.
List<List<Object>>

当需要嵌套相关性时使用的相关数据堆栈(例如,splitter→…​→splitter→…​→aggregator→…​→aggregator)。

 IntegrationMessageHeaderAccessor.
ROUTING_SLIP
 java.util.
Map<List<Object>, Integer>

参见路由单

消息ID生成

当消息在应用程序中转换时,每次消息发生变异(例如,通过转换器)都会分配一个新的消息ID。消息ID是一个UUID。从Spring Integration 3.0开始,用于ID生成的默认策略比以前的java.util.UUID.randomUUID()实现更有效率。它使用基于安全随机种子简单的随机数,而不是每次都创建一个安全的随机数。

可以通过在应用程序上下文中声明实现org.springframework.util.IdGenerator的bean来选择不同的UUID生成策略。

一个类加载器只能使用一种UUID生成策略。这意味着,如果两个或多个应用程序上下文在同一个类加载器中运行,它们共享相同的策略。如果其中一个上下文更改了策略,则所有上下文都将使用该策略。如果同一个类加载器中的两个或多个上下文声明类型为org.springframework.util.IdGenerator的bean,则它们都必须是同一个类的实例。否则,尝试替换自定义策略的上下文将无法初始化。如果策略相同,但参数化,则使用第一个初始化的上下文中的策略。

除了默认策略外,还提供了两个额外的IdGeneratorsorg.springframework.util.JdkIdGenerator使用之前的UUID.randomUUID()机制。当不需要UUID且简单的递增值就足够时,您可以使用o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator

只读标头

MessageHeaders.IDMessageHeaders.TIMESTAMP是只读标头,不能被覆盖。

从4.3.2版开始,MessageBuilder提供readOnlyHeaders(String…​ readOnlyHeaders) API来定制不应该从上游Message复制的标头列表。默认情况下,只有MessageHeaders.IDMessageHeaders.TIMESTAMP是只读的。全局spring.integration.readOnly.headers属性(参见全局属性)用于为框架组件定制DefaultMessageBuilderFactory。当您不想填充某些开箱即用的标头(例如,ObjectToJsonTransformercontentType)时,这很有用(参见JSON转换器)。

当您尝试使用MessageBuilder构建新消息时,此类标头将被忽略,并且会向日志发出特定的INFO消息。

从5.0版开始,消息网关标头增强器内容增强器标头过滤器在使用DefaultMessageBuilderFactory时不允许配置MessageHeaders.IDMessageHeaders.TIMESTAMP标头名称,并且会抛出BeanInitializationException

标头传播

当消息由消息生产端点(例如服务激活器)处理(和修改)时,通常,入站标头会传播到出站消息。一个例外是转换器,当完整的消息返回给框架时。在这种情况下,用户代码负责整个出站消息。当转换器只返回有效负载时,会传播入站标头。此外,只有当标头不存在于出站消息中时,才会传播标头,允许您根据需要更改标头值。

从4.3.10版开始,您可以配置消息处理程序(修改消息并产生输出)以抑制特定标头的传播。要配置不想复制的标头,请在MessageProducingMessageHandler抽象类上调用setNotPropagatedHeaders()addNotPropagatedHeaders()方法。

您还可以通过在META-INF/spring.integration.properties中将readOnlyHeaders属性设置为标头的逗号分隔列表来全局抑制特定消息标头的传播。

从5.0版本开始,AbstractMessageProducingHandler上的setNotPropagatedHeaders()实现应用简单的模式(xxx*xxx*xxxxxx*yyy)来过滤具有公共后缀或前缀的报头。更多信息,请参见PatternMatchUtils Javadoc。当其中一个模式为*(星号)时,不会传播任何报头。所有其他模式都会被忽略。在这种情况下,服务激活器的工作方式与转换器相同,任何必需的报头都必须在服务方法返回的Message中提供。notPropagatedHeaders()选项在Java DSL的ConsumerEndpointSpec中可用,也可作为<service-activator>组件的not-propagated-headers属性用于XML配置。

报头传播抑制不适用于不修改消息的端点,例如桥接器路由器

消息实现

Message接口的基本实现是GenericMessage<T>,它提供了两个构造函数,如下所示:

new GenericMessage<T>(T payload);

new GenericMessage<T>(T payload, Map<String, Object> headers)

创建Message时,会生成一个随机的唯一ID。接受Map报头的构造函数会将提供的报头复制到新创建的Message

还有一个方便的Message实现,用于通信错误条件。此实现采用Throwable对象作为其有效负载,如下例所示:

ErrorMessage message = new ErrorMessage(someThrowable);

Throwable t = message.getPayload();

请注意,此实现利用了GenericMessage基类是参数化的这一事实。因此,如两个示例所示,在检索Message有效负载Object时,无需进行强制类型转换。

MessageBuilder辅助类

您可能会注意到,Message接口定义了其有效负载和报头的检索方法,但没有提供设置器。这是因为Message在其初始创建后不能被修改。因此,当Message实例发送到多个消费者(例如,通过发布-订阅通道)时,如果其中一个消费者需要发送具有不同有效负载类型的回复,则必须创建一个新的Message。这样,其他消费者就不会受到这些更改的影响。请记住,多个消费者可能访问相同的有效负载实例或报头值,并且此类实例本身是否不可变取决于您。换句话说,Message实例的约定类似于不可修改的Collection,而MessageHeaders映射进一步说明了这一点。即使MessageHeaders类实现了java.util.Map,任何尝试对MessageHeaders实例调用put操作(或“remove”或“clear”)都会导致UnsupportedOperationException

Spring Integration 提供了一种更方便的方式来构建消息,而不是需要创建和填充一个 Map 来传递给 GenericMessage 构造函数:MessageBuilderMessageBuilder 提供了两种工厂方法,用于从现有的 Message 或使用有效负载 Object 创建 Message 实例。从现有 Message 构建时,该 Message 的报头和有效负载将被复制到新的 Message,如下例所示:

Message<String> message1 = MessageBuilder.withPayload("test")
        .setHeader("foo", "bar")
        .build();

Message<String> message2 = MessageBuilder.fromMessage(message1).build();

assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));

如果您需要创建一个具有新有效负载的Message,但仍然希望从现有Message复制报头,可以使用其中一个“复制”方法,如下例所示:

Message<String> message3 = MessageBuilder.withPayload("test3")
        .copyHeaders(message1.getHeaders())
        .build();

Message<String> message4 = MessageBuilder.withPayload("test4")
        .setHeader("foo", 123)
        .copyHeadersIfAbsent(message1.getHeaders())
        .build();

assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));

请注意,copyHeadersIfAbsent方法不会覆盖现有值。此外,在上例中,您可以看到如何使用setHeader设置任何用户定义的报头。最后,还有一些可用于预定义报头的set方法,以及一种用于设置任何报头的非破坏性方法(MessageHeaders也为预定义的报头名称定义了常量)。

您还可以使用MessageBuilder设置消息的优先级,如下例所示:

Message<Integer> importantMessage = MessageBuilder.withPayload(99)
        .setPriority(5)
        .build();

assertEquals(5, importantMessage.getHeaders().getPriority());

Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
        .setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
        .build();

assertEquals(2, lessImportantMessage.getHeaders().getPriority());

仅当使用PriorityChannel(如下一章所述)时,才会考虑priority报头。它被定义为java.lang.Integer