幂等接收器企业集成模式

从版本 4.1 开始,Spring Integration 提供了 幂等接收器 企业集成模式的实现。它是一个功能性模式,所有幂等性逻辑都应该在应用程序中实现。然而,为了简化决策过程,提供了 IdempotentReceiverInterceptor 组件。这是一个 AOP Advice,应用于 MessageHandler.handleMessage() 方法,可以根据其配置 过滤 请求消息或将其标记为 重复

以前,您可以通过在 <filter/> 中使用自定义的 MessageSelector 来实现此模式(请参阅 Filter),例如。但是,由于此模式真正定义了端点的行为,而不是自身作为一个端点,因此幂等接收器实现不提供端点组件。相反,它应用于在应用程序中声明的端点。

IdempotentReceiverInterceptor 的逻辑基于提供的 MessageSelector,如果消息不被该选择器接受,则会使用 duplicateMessage 标头设置为 true 来丰富该消息。目标 MessageHandler(或下游流)可以查询此标头以实现正确的幂等性逻辑。如果 IdempotentReceiverInterceptor 配置了 discardChannelthrowExceptionOnRejection = true,则重复消息不会发送到目标 MessageHandler.handleMessage()。相反,它被丢弃。如果要丢弃(对)重复消息不做任何操作,则应将 discardChannel 配置为 NullChannel,例如默认的 nullChannel bean。

为了在消息之间保持状态并提供比较消息以实现幂等性的能力,我们提供了 MetadataStoreSelector。它接受一个 MessageProcessor 实现(根据 Message 创建查找键)和一个可选的 ConcurrentMetadataStoreMetadata Store)。有关更多信息,请参阅 MetadataStoreSelector Javadoc。您还可以通过使用额外的 MessageProcessor 来自定义 ConcurrentMetadataStorevalue。默认情况下,MetadataStoreSelector 使用 timestamp 消息标头。

通常,如果没有键的现有值,选择器会选择一条消息以接受。在某些情况下,比较键的当前值和新值以确定是否应接受消息很有用。从版本 5.3 开始,提供了 compareValues 属性,它引用了一个 BiPredicate<String, String>;第一个参数是旧值;返回 true 表示接受消息并在 MetadataStore 中用新值替换旧值。这对于减少键的数量很有用;例如,在处理文件中的行时,可以将文件名存储在键中,将当前行号存储在值中。然后,重新启动后,可以跳过已处理的行。请参阅 幂等下游处理拆分文件 以获取示例。

为了方便起见,MetadataStoreSelector 选项可以直接在 <idempotent-receiver> 组件上配置。以下列表显示了所有可能的属性

<idempotent-receiver
        id=""  (1)
        endpoint=""  (2)
        selector=""  (3)
        discard-channel=""  (4)
        metadata-store=""  (5)
        key-strategy=""  (6)
        key-expression=""  (7)
        value-strategy=""  (8)
        value-expression=""  (9)
        compare-values="" (10)
        throw-exception-on-rejection="" />  (11)
1 IdempotentReceiverInterceptor bean 的 ID。可选。
2 此拦截器应用到的消费者端点名称或模式。用逗号 (,) 分隔名称(模式),例如 endpoint="aaa, bbb*, ccc, *ddd, eee*fff"。匹配这些模式的端点 bean 名称将用于检索目标端点的 MessageHandler bean(使用其 .handler 后缀),并且 IdempotentReceiverInterceptor 将应用于这些 bean。必填。
3 一个 MessageSelector bean 引用。与 metadata-storekey-strategy (key-expression) 互斥。当未提供 selector 时,需要 key-strategykey-strategy-expression 之一。
4 标识当 IdempotentReceiverInterceptor 不接受消息时将消息发送到的通道。省略时,重复消息将转发到处理器,并带有 duplicateMessage 标头。可选。
5 一个 ConcurrentMetadataStore 引用。由底层的 MetadataStoreSelector 使用。与 selector 互斥。可选。默认的 MetadataStoreSelector 使用一个内部 SimpleMetadataStore,它不会在应用程序执行之间保持状态。
6 一个 MessageProcessor 引用。由底层的 MetadataStoreSelector 使用。从请求消息评估 idempotentKey。与 selectorkey-expression 互斥。当未提供 selector 时,需要 key-strategykey-strategy-expression 之一。
7 用于填充 ExpressionEvaluatingMessageProcessor 的 SpEL 表达式。由底层的 MetadataStoreSelector 使用。使用请求消息作为评估上下文根对象来评估 idempotentKey。与 selectorkey-strategy 互斥。当未提供 selector 时,需要 key-strategykey-strategy-expression 之一。
8 一个 MessageProcessor 引用。由底层的 MetadataStoreSelector 使用。从请求消息中评估 idempotentKeyvalue。与 selectorvalue-expression 互斥。默认情况下,'MetadataStoreSelector' 使用 'timestamp' 消息标头作为元数据 'value'。
9 一个 SpEL 表达式,用于填充 ExpressionEvaluatingMessageProcessor。由底层的 MetadataStoreSelector 使用。使用请求消息作为评估上下文根对象来评估 idempotentKeyvalue。与 selectorvalue-strategy 互斥。默认情况下,'MetadataStoreSelector' 使用 'timestamp' 消息标头作为元数据 'value'。
10 BiPredicate<String, String> bean 的引用,该 bean 允许您通过比较键的旧值和新值来有选择地选择消息;默认为 null
11 如果 IdempotentReceiverInterceptor 拒绝消息是否抛出异常。默认为 false。无论是否提供了 discard-channel,它都会被应用。

对于 Java 配置,Spring Integration 提供了方法级别的 @IdempotentReceiver 注解。它用于标记带有消息注解(@ServiceActivator@Router 等)的 方法,以指定哪些 IdempotentReceiverInterceptor 对象应用于此端点。以下示例展示了如何使用 @IdempotentReceiver 注解

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
   return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
                                                    m.getHeaders().get(INVOICE_NBR_HEADER)));
}

@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
    ....
}

当您使用 Java DSL 时,可以将拦截器添加到端点的 advice 链中,如下例所示

@Bean
public IntegrationFlow flow() {
    ...
        .handle("someBean", "someMethod",
            e -> e.advice(idempotentReceiverInterceptor()))
    ...
}
IdempotentReceiverInterceptor 仅设计用于 MessageHandler.handleMessage(Message<?>) 方法。从版本 4.3.1 开始,它实现了 HandleMessageAdvice,以 AbstractHandleMessageAdvice 作为基类,以实现更好的解耦。有关更多信息,请参阅 处理消息 Advice
© . This site is unofficial and not affiliated with VMware.