消息
Spring Integration Message
是一个通用的数据容器。任何对象都可以作为负载 (payload),并且每个 Message
实例都包含带有用户可扩展属性的头 (headers),这些属性以键值对的形式存储。
Message
接口
以下列表展示了 Message
接口的定义
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
Message
接口是 API 的核心部分。通过将数据封装在通用包装器中,消息系统可以在不了解数据类型的情况下传递数据。随着应用程序演进以支持新类型,或者类型本身被修改或扩展时,消息系统不会受到影响。另一方面,当消息系统中的某些组件确实需要访问有关 Message
的信息时,此类元数据通常可以存储到消息头中并从中检索。
消息头 (Message Headers)
正如 Spring Integration 允许任何 Object
用作 Message
的负载一样,它也支持任何 Object
类型作为头值。实际上,MessageHeaders
类实现了 java.util.Map_ interface
,如下面的类定义所示
public final class MessageHeaders implements Map<String, Object>, Serializable {
...
}
尽管 MessageHeaders 类实现了 Map ,但它实际上是一个只读实现。任何尝试在 Map 中 put 值都会导致 UnsupportedOperationException 。对于 remove 和 clear 也是如此。由于消息可能传递给多个消费者,因此不能修改 Map 的结构。同样,消息的负载 Object 在初次创建后不能被 set 。然而,头值本身(或负载 Object)的可变性有意留给框架用户自行决定。 |
作为 Map
的实现,可以通过调用带有头名称的 get(..)
来检索头。或者,您可以提供预期的 Class
作为附加参数。更好的是,当检索预定义值时,提供了方便的 getter 方法。以下示例展示了这三种选项
Object someValue = message.getHeaders().get("someKey");
CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);
Long timestamp = message.getHeaders().getTimestamp();
下表描述了预定义的消息头
头名称 | 头类型 | 用途 |
---|---|---|
MessageHeaders.ID |
java.util.UUID |
此消息实例的标识符。每次消息被修改时都会改变。 |
MessageHeaders. TIMESTAMP |
java.lang.Long |
消息创建的时间。每次消息被修改时都会改变。 |
MessageHeaders. REPLY_CHANNEL |
java.lang.Object (String or MessageChannel) |
当未配置显式输出通道,且没有 |
MessageHeaders. ERROR_CHANNEL |
java.lang.Object (String or MessageChannel) |
错误发送到的通道。如果值是 |
许多入站和出站适配器实现也提供或期望某些头,并且您可以配置额外的用户定义头。这些头的常量可以在存在这些头的模块中找到——例如 AmqpHeaders
、JmsHeaders
等等。
MessageHeaderAccessor
API
从 Spring Framework 4.0 和 Spring Integration 4.0 开始,核心消息抽象已移至 spring-messaging
模块,并引入了 MessageHeaderAccessor
API,以提供对消息实现的额外抽象。所有 (核心) Spring Integration 特定消息头常量现在都在 IntegrationMessageHeaderAccessor
类中声明。下表描述了预定义的消息头
头名称 | 头类型 | 用途 |
---|---|---|
IntegrationMessageHeaderAccessor. CORRELATION_ID |
java.lang.Object |
用于关联两个或更多消息。 |
IntegrationMessageHeaderAccessor. SEQUENCE_NUMBER |
java.lang.Integer |
通常是带有 |
IntegrationMessageHeaderAccessor. SEQUENCE_SIZE |
java.lang.Integer |
一组关联消息中的消息数量。 |
IntegrationMessageHeaderAccessor. EXPIRATION_DATE |
java.lang.Long |
指示消息何时过期。框架不直接使用,但可以通过 Header Enricher 设置,并在配置了 |
IntegrationMessageHeaderAccessor. PRIORITY |
java.lang.Integer |
消息优先级——例如,在 |
IntegrationMessageHeaderAccessor. DUPLICATE_MESSAGE |
java.lang.Boolean |
如果消息被幂等接收者拦截器检测为重复。参见 幂等接收者企业集成模式。 |
IntegrationMessageHeaderAccessor. CLOSEABLE_RESOURCE |
java.io.Closeable |
如果消息与一个处理完成后应关闭的 |
IntegrationMessageHeaderAccessor. DELIVERY_ATTEMPT |
java.lang. AtomicInteger |
如果消息驱动通道适配器支持配置 |
IntegrationMessageHeaderAccessor. ACKNOWLEDGMENT_CALLBACK |
o.s.i.support. Acknowledgment Callback |
如果入站端点支持,则会有一个回调函数用于接受、拒绝或重新入队消息。参见 延迟确认可轮询消息源 和 MQTT 手动确认。 |
IntegrationMessageHeaderAccessor
类提供了其中一些头的便捷类型化 getter,如下例所示
IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...
下表描述了也出现在 IntegrationMessageHeaderAccessor
中但通常不由用户代码使用的头(即,它们通常由 Spring Integration 的内部部分使用——此处包含它们是为了完整性)
头名称 | 头类型 | 用途 |
---|---|---|
IntegrationMessageHeaderAccessor. SEQUENCE_DETAILS |
java.util. List<List<Object>> |
当需要嵌套关联时使用的关联数据堆栈(例如, |
IntegrationMessageHeaderAccessor. ROUTING_SLIP |
java.util. Map<List<Object>, Integer> |
参见 路由单。 |
消息 ID 生成
当消息通过应用程序转换时,每次被修改(例如,通过转换器)都会分配一个新的消息 ID。消息 ID 是一个 UUID
。从 Spring Integration 3.0 开始,用于 ID 生成的默认策略比以前的 java.util.UUID.randomUUID()
实现更高效。它基于安全的随机种子使用简单的随机数,而不是每次都创建安全的随机数。
可以通过在应用程序上下文中声明一个实现 org.springframework.util.IdGenerator
接口的 Bean 来选择不同的 UUID 生成策略。
在一个 ClassLoader 中只能使用一种 UUID 生成策略。这意味着,如果在同一个 ClassLoader 中运行两个或更多应用程序上下文,它们会共享相同的策略。如果其中一个上下文更改了策略,所有上下文都会使用它。如果在同一个 ClassLoader 中的两个或更多上下文声明了类型为 org.springframework.util.IdGenerator 的 Bean,它们必须是同一类的实例。否则,尝试替换自定义策略的上下文将无法初始化。如果策略相同,但参数化不同,则使用第一个被初始化的上下文中的策略。 |
除了默认策略外,还提供了另外两个 IdGenerators
。org.springframework.util.JdkIdGenerator
使用之前的 UUID.randomUUID()
机制。当实际上不需要 UUID 并且简单的递增值就足够时,您可以使用 o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator
。
只读头
MessageHeaders.ID
和 MessageHeaders.TIMESTAMP
是只读头,不能被覆盖。
从版本 4.3.2 开始,MessageBuilder
提供了 readOnlyHeaders(String… readOnlyHeaders)
API,用于自定义不应从上游 Message
复制的头列表。默认情况下,只有 MessageHeaders.ID
和 MessageHeaders.TIMESTAMP
是只读的。提供了全局属性 spring.integration.readOnly.headers
(参见 全局属性)来为框架组件自定义 DefaultMessageBuilderFactory
。当您不想让 ObjectToJsonTransformer
等组件填充某些现成的头(例如 contentType
)时,这非常有用(参见 JSON 转换器)。
当您尝试使用 MessageBuilder
构建新消息时,此类头将被忽略,并且会在日志中发出特定的 INFO
消息。
从版本 5.0 开始,当使用 DefaultMessageBuilderFactory
时,消息网关、Header Enricher、内容增强器 和 Header Filter 不允许您配置 MessageHeaders.ID
和 MessageHeaders.TIMESTAMP
头名称,并且会抛出 BeanInitializationException
。
头传播
当消息被产生消息的端点(例如 服务激活器)处理(和修改)时,通常,入站头会传播到出站消息。一个例外是 转换器,当完整的消息返回给框架时。在这种情况下,用户代码负责整个出站消息。当转换器只返回负载时,入站头会被传播。此外,只有当出站消息中不存在该头时,头才会被传播,从而允许您根据需要更改头值。
从版本 4.3.10 开始,您可以配置消息处理器(它们修改消息并产生输出)以抑制特定头的传播。要配置您不想复制的头,请在 MessageProducingMessageHandler
抽象类上调用 setNotPropagatedHeaders()
或 addNotPropagatedHeaders()
方法。
您还可以通过在 META-INF/spring.integration.properties
中将 readOnlyHeaders
属性设置为逗号分隔的头列表来全局抑制特定消息头的传播。
从版本 5.0 开始,AbstractMessageProducingHandler
上的 setNotPropagatedHeaders()
实现应用简单的模式 (xxx*
, *xxx
, *xxx*
, 或 xxx*yyy
) 来允许过滤具有共同后缀或前缀的头。有关更多信息,请参阅 PatternMatchUtils
Javadoc。当其中一个模式是 *
(星号)时,不传播任何头。所有其他模式都被忽略。在这种情况下,服务激活器的行为与转换器相同,任何必需的头必须在服务方法返回的 Message
中提供。notPropagatedHeaders()
选项在 Java DSL 的 ConsumerEndpointSpec
中可用。它也作为 <service-activator>
组件的 XML 配置中的 not-propagated-headers
属性可用。
消息实现
Message
接口的基本实现是 GenericMessage<T>
,它提供两个构造函数,如下列表所示
new GenericMessage<T>(T payload);
new GenericMessage<T>(T payload, Map<String, Object> headers)
创建 Message
时,会生成一个随机的唯一 ID。接受 Map
作为头的构造函数会将提供的头复制到新创建的 Message
中。
还有一个方便的 Message
实现,用于表示错误条件。此实现接受一个 Throwable
对象作为其负载,如下例所示
ErrorMessage message = new ErrorMessage(someThrowable);
Throwable t = message.getPayload();
请注意,此实现利用了 GenericMessage
基类是参数化的这一事实。因此,如两个示例所示,检索 Message
负载 Object
时无需进行强制转换。
前面提到的 Message
类实现是不可变的。在某些情况下,当可变性不是问题且应用程序逻辑设计良好以避免并发修改时,可以使用 MutableMessage
。
MessageBuilder
帮助类
您可能注意到 Message
接口定义了用于检索其负载和头的方法,但没有提供 setter。这是因为 Message
在初次创建后不能被修改。因此,当一个 Message
实例被发送到多个消费者(例如,通过发布-订阅通道)时,如果其中一个消费者需要发送带有不同负载类型的回复,它必须创建一个新的 Message
。结果是,其他消费者不受这些更改的影响。请记住,多个消费者可能访问相同的负载实例或头值,并且此类实例本身是否可变是留给您决定的。换句话说,Message
实例的契约类似于不可变的 Collection
,MessageHeaders
map 进一步例证了这一点。尽管 MessageHeaders
类实现了 java.util.Map
,但任何尝试在 MessageHeaders
实例上调用 put
操作(或 'remove' 或 'clear')都会导致 UnsupportedOperationException
。
Spring Integration 提供了一种更方便的方式来构建 Message,而不是要求创建和填充 Map 然后传递给 GenericMessage 构造函数:MessageBuilder
。MessageBuilder
提供了两个工厂方法,用于从现有 Message
或使用负载 Object
创建 Message
实例。从现有 Message
构建时,该 Message
的头和负载会被复制到新的 Message
中,如下例所示
Message<String> message1 = MessageBuilder.withPayload("test")
.setHeader("foo", "bar")
.build();
Message<String> message2 = MessageBuilder.fromMessage(message1).build();
assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));
如果您需要创建一个带有新负载但仍想从现有 Message
中复制头的 Message
,您可以使用 'copy' 方法之一,如下例所示
Message<String> message3 = MessageBuilder.withPayload("test3")
.copyHeaders(message1.getHeaders())
.build();
Message<String> message4 = MessageBuilder.withPayload("test4")
.setHeader("foo", 123)
.copyHeadersIfAbsent(message1.getHeaders())
.build();
assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));
请注意,copyHeadersIfAbsent
方法不会覆盖现有值。此外,在上面的示例中,您可以看到如何使用 setHeader
设置任何用户定义的头。最后,对于预定义头和设置任何头的非破坏性方法也提供了 set
方法(MessageHeaders
也定义了预定义头名称的常量)。
您还可以使用 MessageBuilder
设置消息的优先级,如下例所示
Message<Integer> importantMessage = MessageBuilder.withPayload(99)
.setPriority(5)
.build();
assertEquals(5, importantMessage.getHeaders().getPriority());
Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
.setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
.build();
assertEquals(2, lessImportantMessage.getHeaders().getPriority());
只有在使用 PriorityChannel
时才会考虑 priority
头(如下一章所述)。它被定义为 java.lang.Integer
。
提供了 MutableMessageBuilder
来处理 MutableMessage
实例。此类逻辑是创建 MutableMessage
或保留原样,并通过构建器方法修改其内容。这样可以在运行应用程序中获得轻微的性能提升,前提是不可变性不是消息交换的关注点。
从版本 6.4 开始,从 MessageBuilder 中提取了一个 BaseMessageBuilder 类,以简化默认消息构建逻辑的扩展。例如,结合自定义的 MessageBuilderFactory ,可以在应用程序上下文中全局使用自定义的 BaseMessageBuilder 实现来提供自定义的 Message 实例。特别是,可以覆盖 GenericMessage.toString() 方法,以便在记录此类消息时隐藏负载和头中的敏感信息。 |
MessageBuilderFactory
抽象
带有 IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME
名称的 MessageBuilderFactory
bean 会全局注册到应用程序上下文中,并在框架中随处用于创建 Message
实例。默认情况下,它是一个 DefaultMessageBuilderFactory
实例。开箱即用,框架还提供了一个 MutableMessageBuilderFactory
,用于在框架组件中创建 MutableMessage
实例。要自定义 Message
实例的创建,必须在目标应用程序上下文中提供一个带有 IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME
名称的 MessageBuilderFactory
bean,以覆盖默认的。例如,可以为 BaseMessageBuilder
的实现注册一个自定义的 MessageBuilderFactory
,我们希望在该实现中提供一个 GenericMessage
的扩展,并覆盖 toString()
方法,以便在记录此类消息时隐藏负载和头中的敏感信息。
这些类的一些快速实现以演示个人可识别信息的缓解措施可以像这样
class PiiMessageBuilderFactory implements MessageBuilderFactory {
@Override
public <T> PiiMessageBuilder<T> fromMessage(Message<T> message) {
return new PiiMessageBuilder<>(message.getPayload(), message);
}
@Override
public <T> PiiMessageBuilder<T> withPayload(T payload) {
return new PiiMessageBuilder<>(payload, null);
}
}
class PiiMessageBuilder<P> extends BaseMessageBuilder<P, PiiMessageBuilder<P>> {
public PiiMessageBuilder(P payload, @Nullable Message<P> originalMessage) {
super(payload, originalMessage);
}
@Override
public Message<P> build() {
return new PiiMessage<>(getPayload(), getHeaders());
}
}
class PiiMessage<P> extends GenericMessage<P> {
@Serial
private static final long serialVersionUID = -354503673433669578L;
public PiiMessage(P payload, Map<String, Object> headers) {
super(payload, headers);
}
@Override
public String toString() {
return "PiiMessage [payload=" + getPayload() + ", headers=" + maskHeaders(getHeaders()) + ']';
}
private static Map<String, Object> maskHeaders(Map<String, Object> headers) {
return headers.entrySet()
.stream()
.map((entry) -> entry.getKey().equals("password") ? Map.entry(entry.getKey(), "******") : entry)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
这个 PiiMessageBuilderFactory
可以注册为一个 Bean,并且无论何时框架记录消息(例如,在 errorChannel
的情况下),password
头都将被遮盖。