幂等接收者企业集成模式
从 4.1 版本开始,Spring Integration 提供了 幂等接收者 (Idempotent Receiver) 企业集成模式的一种实现。这是一个功能性模式,所有的幂等性逻辑应在应用程序中实现。然而,为了简化决策过程,提供了 IdempotentReceiverInterceptor
组件。这是一个应用于 MessageHandler.handleMessage()
方法的 AOP Advice
,可以根据其配置来 `filter` 请求消息或将其标记为 `duplicate`。
之前,你可以通过在 <filter/>
(参见 过滤器) 中使用自定义的 MessageSelector
来实现此模式,例如。然而,由于此模式真正定义的是端点的行为而非自身就是一个端点,幂等接收者实现不提供端点组件。相反,它被应用于在应用程序中声明的端点。
IdempotentReceiverInterceptor
的逻辑基于提供的 MessageSelector
;如果消息未被该选择器接受,则会通过将 duplicateMessage
消息头设置为 true
来丰富该消息。目标 MessageHandler
(或下游流)可以查阅此消息头以实现正确的幂等性逻辑。如果 IdempotentReceiverInterceptor
配置了 discardChannel
或 throwExceptionOnRejection = true
,则重复消息不会发送到目标 MessageHandler.handleMessage()
,而是被丢弃。如果你想丢弃(对重复消息不做任何处理),discardChannel
应配置一个 NullChannel
,例如默认的 nullChannel
bean。
为了在消息之间维护状态并提供比较消息以实现幂等性的能力,我们提供了 MetadataStoreSelector
。它接受一个 MessageProcessor
实现(基于 Message
创建查找键)和一个可选的 ConcurrentMetadataStore
(元数据存储)。更多信息请参阅 MetadataStoreSelector
Javadoc。你还可以通过额外的 MessageProcessor
定制 ConcurrentMetadataStore
的 value
。默认情况下,MetadataStoreSelector
使用 timestamp
消息头。
通常,如果键不存在现有值,选择器会选择消息进行接受。在某些情况下,比较键的当前值和新值以确定是否应接受消息会很有用。从 5.3 版本开始,提供了 compareValues
属性,它引用了一个 BiPredicate<String, String>
;第一个参数是旧值;返回 true
表示接受消息并在 MetadataStore
中用新值替换旧值。这对于减少键的数量很有用;例如,处理文件中的行时,可以将文件名存储在键中,将当前行号存储在值中。然后,在重新启动后,可以跳过已经处理的行。请参阅 拆分文件中的幂等下游处理 以获取示例。
为方便起见,MetadataStoreSelector
的选项可以直接在 <idempotent-receiver>
组件上配置。以下列表显示了所有可能的属性
<idempotent-receiver
id="" (1)
endpoint="" (2)
selector="" (3)
discard-channel="" (4)
metadata-store="" (5)
key-strategy="" (6)
key-expression="" (7)
value-strategy="" (8)
value-expression="" (9)
compare-values="" (10)
throw-exception-on-rejection="" /> (11)
1 | IdempotentReceiverInterceptor bean 的 ID。可选。 |
2 | 此拦截器应用的消费者端点名称或模式。用逗号 (, ) 分隔名称(模式),例如 endpoint="aaa, bbb*, ccc, *ddd, eee*fff" 。与这些模式匹配的端点 bean 名称用于检索目标端点的 MessageHandler bean(使用其 `.handler` 后缀),并将 IdempotentReceiverInterceptor 应用于这些 bean。必需。 |
3 | 一个 MessageSelector bean 引用。与 metadata-store 和 key-strategy (key-expression) 互斥。当未提供 selector 时,key-strategy 或 key-strategy-expression 之一是必需的。 |
4 | 指定当 IdempotentReceiverInterceptor 不接受消息时将其发送到的通道。省略时,重复消息会带着 duplicateMessage 消息头转发给处理器。可选。 |
5 | 一个 ConcurrentMetadataStore 引用。由底层 MetadataStoreSelector 使用。与 selector 互斥。可选。默认的 MetadataStoreSelector 使用一个内部的 SimpleMetadataStore ,它不会在应用程序执行之间维护状态。 |
6 | 一个 MessageProcessor 引用。由底层 MetadataStoreSelector 使用。从请求消息评估 idempotentKey 。与 selector 和 key-expression 互斥。当未提供 selector 时,key-strategy 或 key-strategy-expression 之一是必需的。 |
7 | 用于填充 ExpressionEvaluatingMessageProcessor 的 SpEL 表达式。由底层 MetadataStoreSelector 使用。通过使用请求消息作为评估上下文根对象来评估 idempotentKey 。与 selector 和 key-strategy 互斥。当未提供 selector 时,key-strategy 或 key-strategy-expression 之一是必需的。 |
8 | 一个 MessageProcessor 引用。由底层 MetadataStoreSelector 使用。从请求消息评估 idempotentKey 的 value 。与 selector 和 value-expression 互斥。默认情况下,'MetadataStoreSelector' 使用 'timestamp' 消息头作为元数据 'value'。 |
9 | 用于填充 ExpressionEvaluatingMessageProcessor 的 SpEL 表达式。由底层 MetadataStoreSelector 使用。通过使用请求消息作为评估上下文根对象来评估 idempotentKey 的 value 。与 selector 和 value-strategy 互斥。默认情况下,'MetadataStoreSelector' 使用 'timestamp' 消息头作为元数据 'value'。 |
10 | 引用一个 BiPredicate<String, String> bean,它允许你通过比较键的旧值和新值来有选择地接受消息;默认情况下为 null 。 |
11 | 如果 IdempotentReceiverInterceptor 拒绝消息是否抛出异常。默认为 false 。无论是否提供了 discard-channel ,此设置都有效。 |
对于 Java 配置,Spring Integration 提供了方法级别的 @IdempotentReceiver
注解。它用于标记带有消息处理注解(如 @ServiceActivator
、@Router
等)的 method
,以指定哪些 IdempotentReceiverInterceptor
对象应用于此端点。以下示例展示了如何使用 @IdempotentReceiver
注解
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
m.getHeaders().get(INVOICE_NBR_HEADER)));
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
....
}
使用 Java DSL 时,可以将拦截器添加到端点的 advice 链中,示例如下
@Bean
public IntegrationFlow flow() {
...
.handle("someBean", "someMethod",
e -> e.advice(idempotentReceiverInterceptor()))
...
}
IdempotentReceiverInterceptor 仅为 MessageHandler.handleMessage(Message<?>) 方法设计。从 4.3.1 版本开始,它实现了 HandleMessageAdvice ,并以 AbstractHandleMessageAdvice 作为基类,以实现更好的解耦。有关更多信息,请参阅 处理消息 Advice。 |