配置消息通道

要创建消息通道实例,您可以使用 XML 配置中的 <channel/> 元素或 Java 配置中的 DirectChannel 实例,如下所示

  • Java

  • XML

@Bean
public MessageChannel exampleChannel() {
    return new DirectChannel();
}
<int:channel id="exampleChannel"/>

当您使用不带任何子元素的 <channel/> 元素时,它会创建一个 DirectChannel 实例(一个 SubscribableChannel)。

要创建发布-订阅通道,请使用 <publish-subscribe-channel/> 元素(Java 中的 PublishSubscribeChannel),如下所示

  • Java

  • XML

@Bean
public MessageChannel exampleChannel() {
    return new PublishSubscribeChannel();
}
<int:publish-subscribe-channel id="exampleChannel"/>

您也可以提供各种 <queue/> 子元素来创建任何可轮询通道类型(如 消息通道实现 中所述)。以下各节展示了每种通道类型的示例。

DirectChannel 配置

如前所述,DirectChannel 是默认类型。以下列表展示了如何定义一个

  • Java

  • XML

@Bean
public MessageChannel directChannel() {
    return new DirectChannel();
}
<int:channel id="directChannel"/>

默认通道具有轮询式负载均衡器,并且启用了故障转移(详见 DirectChannel)。要禁用其中一个或两者,请添加一个 <dispatcher/> 子元素(DirectChannel 的一个 LoadBalancingStrategy 构造函数),并按如下方式配置属性

  • Java

  • XML

@Bean
public MessageChannel failFastChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setFailover(false);
    return channel;
}

@Bean
public MessageChannel failFastChannel() {
    return new DirectChannel(null);
}
<int:channel id="failFastChannel">
    <int:dispatcher failover="false"/>
</channel>

<int:channel id="channelWithFixedOrderSequenceFailover">
    <int:dispatcher load-balancer="none"/>
</int:channel>

从版本 6.3 开始,所有基于 UnicastingDispatcherMessageChannel 实现都可以配置一个 Predicate<Exception> failoverStrategy,而不是使用简单的 failover 选项。这个谓词根据当前 MessageHandler 抛出的异常来决定是否故障转移到下一个 MessageHandler。更复杂的错误分析应该使用 ErrorMessageExceptionTypeRouter 来完成。

数据类型通道配置

有时,消费者只能处理特定类型的载荷,这就要求您确保输入消息的载荷类型。首先想到的可能是使用消息过滤器。然而,消息过滤器只能过滤掉不符合消费者要求的消息。另一种方法是使用基于内容的路由器,将具有不兼容数据类型的消息路由到特定的转换器,以强制转换为所需的数据类型。这种方法可行,但实现相同目标的更简单方法是应用 数据类型通道 (Datatype Channel) 模式。您可以为每种特定的载荷数据类型使用单独的数据类型通道。

要创建一个仅接受包含特定载荷类型消息的数据类型通道,请在通道元素的 datatype 属性中提供该数据类型的完全限定类名,如下例所示

  • Java

  • XML

@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(Number.class);
    return channel;
}
<int:channel id="numberChannel" datatype="java.lang.Number"/>

请注意,类型检查对于任何可分配给通道数据类型的类型都将通过。换句话说,前例中的 numberChannel 将接受载荷为 java.lang.Integerjava.lang.Double 的消息。可以以逗号分隔的列表形式提供多种类型,如下例所示

  • Java

  • XML

@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(String.class, Number.class);
    return channel;
}
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>

因此,前例中的 'numberChannel' 只接受数据类型为 java.lang.Number 的消息。但是,如果消息的载荷不是所需类型,会发生什么?这取决于您是否定义了一个名为 integrationConversionService 的 bean,它是 Spring 的 转换服务 (Conversion Service) 实例。如果未定义,则会立即抛出 Exception。但是,如果您定义了一个 integrationConversionService bean,它将尝试用于将消息的载荷转换为可接受的类型。

您甚至可以注册自定义转换器。例如,假设您向上面配置的 'numberChannel' 发送一条载荷为 String 的消息。您可能会按如下方式处理该消息

MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));

通常,这是一个完全合法的操作。然而,由于我们使用了数据类型通道,此类操作的结果会生成类似如下的异常

Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…

发生异常是因为我们要求载荷类型是 Number,但我们发送的是 String。因此,我们需要一些东西来将 String 转换为 Number。为此,我们可以实现一个类似于以下示例的转换器

public static class StringToIntegerConverter implements Converter<String, Integer> {
    public Integer convert(String source) {
        return Integer.parseInt(source);
    }
}

然后我们可以将其注册为集成转换服务 (Integration Conversion Service) 的一个转换器,如下例所示

  • Java

  • XML

@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
    return new StringToIntegerConverter();
}
<int:converter ref="strToInt"/>

<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>

或者在 StringToIntegerConverter 类上标记 @Component 注解以进行自动扫描。

解析 'converter' 元素时,如果尚未定义 integrationConversionService bean,则会创建它。有了该转换器,send 操作现在就会成功,因为数据类型通道使用该转换器将 String 载荷转换为 Integer

有关载荷类型转换的更多信息,请参阅 载荷类型转换

从版本 4.0 开始,integrationConversionServiceDefaultDatatypeChannelMessageConverter 调用,后者在应用程序上下文中查找转换服务。要使用不同的转换技术,您可以在通道上指定 message-converter 属性。这必须是对 MessageConverter 实现的引用。仅使用 fromMessage 方法。它为转换器提供对消息头的访问权(以防转换可能需要来自消息头的信息,例如 content-type)。该方法只能返回转换后的载荷或完整的 Message 对象。如果是后者,转换器必须注意复制入站消息中的所有消息头。

或者,您可以声明一个 ID 为 datatypeChannelMessageConverter、类型为 MessageConverter<bean/>,并且该转换器将被所有带有 datatype 属性的通道使用。

QueueChannel 配置

要创建一个 QueueChannel,请使用 <queue/> 子元素。您可以按如下方式指定通道的容量

  • Java

  • XML

@Bean
public PollableChannel queueChannel() {
    return new QueueChannel(25);
}
<int:channel id="queueChannel">
    <queue capacity="25"/>
</int:channel>
如果您未在此 <queue/> 子元素上为 'capacity' 属性提供值,则生成的队列是无界的。为避免内存不足等问题,我们强烈建议您为有界队列设置一个明确的值。

持久化 QueueChannel 配置

由于 QueueChannel 提供了缓冲消息的能力,但默认情况下仅在内存中进行,这也引入了系统发生故障时消息可能丢失的可能性。为了降低这种风险,QueueChannel 可以由 MessageGroupStore 策略接口的持久化实现作为后端支持。有关 MessageGroupStoreMessageStore 的更多详细信息,请参阅 消息存储

当使用 message-store 属性时,不允许使用 capacity 属性。

QueueChannel 收到 Message 时,它将消息添加到消息存储中。当从 QueueChannel 轮询 Message 时,它将从消息存储中移除。

默认情况下,QueueChannel 将其消息存储在内存队列中,这可能导致前面提到的消息丢失场景。然而,Spring Integration 提供了持久化存储,例如 JdbcChannelMessageStore

您可以通过添加 message-store 属性来为任何 QueueChannel 配置消息存储,如下例所示

<int:channel id="dbBackedChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

(有关 Java/Kotlin 配置选项,请参见下面的示例。)

Spring Integration JDBC 模块还为一些流行的数据库提供了模式数据定义语言 (DDL)。这些模式位于该模块(spring-integration-jdbc)的 org.springframework.integration.jdbc.store.channel 包中。

一个重要的特性是,对于任何事务性持久存储(例如 JdbcChannelMessageStore),只要轮询器配置了事务,从存储中移除的消息只有在事务成功完成后才能被永久移除。否则,事务将回滚,且 Message 不会丢失。

随着与“NoSQL”数据存储相关的 Spring 项目数量不断增加,许多其他消息存储的实现也应运而生,为这些存储提供了底层支持。如果您找不到符合您特定需求的实现,也可以提供自己的 MessageGroupStore 接口实现。

自版本 4.0 以来,我们建议在可能的情况下配置 QueueChannel 实例使用 ChannelMessageStore。与通用消息存储相比,这些通常为此用途进行了优化。如果 ChannelMessageStore 是一个 ChannelPriorityMessageStore,则消息将按照优先级顺序以 FIFO 方式接收。优先级的概念由消息存储实现决定。例如,以下示例显示了 MongoDB 通道消息存储 (MongoDB Channel Message Store) 的 Java 配置

  • Java

  • Java DSL

  • Kotlin DSL

@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
    MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
    store.setPriorityEnabled(true);
    return store;
}

@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
    return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
    return IntegrationFlow.from((Channels c) ->
            c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
            ....
            .get();
}
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
    integrationFlow {
        channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
    }
请注意 MessageGroupQueue 类。这是使用 MessageGroupStore 操作的 BlockingQueue 实现。

定制 QueueChannel 环境的另一个选项由 <int:queue> 子元素的 ref 属性或其特定构造函数提供。该属性提供对任何 java.util.Queue 实现的引用。例如,可以按如下方式配置 Hazelcast 分布式 IQueue

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance(new Config()
                                           .setProperty("hazelcast.logging.type", "log4j"));
}

@Bean
public PollableChannel distributedQueue() {
    return new QueueChannel(hazelcastInstance()
                              .getQueue("springIntegrationQueue"));
}

PublishSubscribeChannel 配置

要创建一个 PublishSubscribeChannel,请使用 <publish-subscribe-channel/> 元素。使用此元素时,您还可以指定用于发布消息的 task-executor(如果未指定,则在发送者的线程中发布),如下所示

  • Java

  • XML

@Bean
public MessageChannel pubsubChannel() {
    return new PublishSubscribeChannel(someExecutor());
}
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>

如果您在 PublishSubscribeChannel 的下游提供重排序器或聚合器,您可以将通道上的 'apply-sequence' 属性设置为 true。这样做表示通道在传递消息之前应该设置 sequence-sizesequence-number 消息头以及 关联 ID (correlation ID)。例如,如果有五个订阅者,sequence-size 将设置为 5,并且消息将具有介于 15 之间的 sequence-number 头值。

除了 Executor 之外,您还可以配置一个 ErrorHandler。默认情况下,PublishSubscribeChannel 使用 MessagePublishingErrorHandler 实现将错误发送到 errorChannel 头中的 MessageChannel 或发送到全局 errorChannel 实例。如果未配置 Executor,则 ErrorHandler 会被忽略,异常将直接抛给调用者的线程。

如果您在 PublishSubscribeChannel 的下游提供 ResequencerAggregator,您可以将通道上的 'apply-sequence' 属性设置为 true。这样做表示通道在传递消息之前应该设置 sequence-size 和 sequence-number 消息头以及 correlation ID。例如,如果有五个订阅者,sequence-size 将设置为 5,并且消息将具有介于 15 之间的 sequence-number 头值。

以下示例展示了如何将 apply-sequence 头设置为 true

  • Java

  • XML

@Bean
public MessageChannel pubsubChannel() {
    PublishSubscribeChannel channel = new PublishSubscribeChannel();
    channel.setApplySequence(true);
    return channel;
}
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
apply-sequence 的值默认为 false,以便发布-订阅通道可以将完全相同的消息实例发送到多个出站通道。由于 Spring Integration 强制执行载荷和消息头引用的不可变性,当此标志设置为 true 时,通道会创建新的 Message 实例,这些实例具有相同的载荷引用,但消息头值不同。

从版本 5.4.3 开始,PublishSubscribeChannel 还可以配置其 BroadcastingDispatcherrequireSubscribers 选项,以指示当通道没有订阅者时,不会静默地忽略消息。当没有订阅者且此选项设置为 true 时,会抛出带有 Dispatcher has no subscribers 消息的 MessageDispatchingException

ExecutorChannel

要创建 ExecutorChannel,请添加带有 task-executor 属性的 <dispatcher> 子元素。该属性的值可以引用上下文中的任何 TaskExecutor。例如,这样做可以配置线程池以将消息分发给订阅的处理程序。如前所述,这样做会打破发送者和接收者之间的单线程执行上下文,以便任何活动的事务上下文不会被处理程序的调用共享(即,处理程序可能会抛出 Exception,但 send 调用已经成功返回)。以下示例展示了如何使用 dispatcher 元素并在 task-executor 属性中指定执行器

  • Java

  • XML

@Bean
public MessageChannel executorChannel() {
    return new ExecutorChannel(someExecutor());
}
<int:channel id="executorChannel">
    <int:dispatcher task-executor="someExecutor"/>
</int:channel>

load-balancerfailover 选项也在 <dispatcher/> 子元素上可用,如前面的 DirectChannel 配置 中所述。应用相同的默认值。因此,除非为其中一个或两个属性提供了明确的配置,否则通道具有启用了故障转移的轮询式负载均衡策略,如下例所示

<int:channel id="executorChannelWithoutFailover">
    <int:dispatcher task-executor="someExecutor" failover="false"/>
</int:channel>

PriorityChannel 配置

要创建 PriorityChannel,请使用 <priority-queue/> 子元素,如下例所示

  • Java

  • XML

@Bean
public PollableChannel priorityChannel() {
    return new PriorityChannel(20);
}
<int:channel id="priorityChannel">
    <int:priority-queue capacity="20"/>
</int:channel>

默认情况下,通道会查阅消息的 priority 头。但是,您可以改为提供自定义的 Comparator 引用。另外请注意,PriorityChannel(与其他类型一样)确实支持 datatype 属性。与 QueueChannel 一样,它也支持 capacity 属性。以下示例演示了所有这些配置

  • Java

  • XML

@Bean
public PollableChannel priorityChannel() {
    PriorityChannel channel = new PriorityChannel(20, widgetComparator());
    channel.setDatatypes(example.Widget.class);
    return channel;
}
<int:channel id="priorityChannel" datatype="example.Widget">
    <int:priority-queue comparator="widgetComparator"
                    capacity="10"/>
</int:channel>

自版本 4.0 以来,priority-channel 子元素支持 message-store 选项(在这种情况下不允许使用 comparatorcapacity)。消息存储必须是 PriorityCapableChannelMessageStore。目前为 RedisJDBCMongoDB 提供了 PriorityCapableChannelMessageStore 的实现。有关更多信息,请参阅 QueueChannel 配置消息存储。您可以在 支持的消息通道 (Backing Message Channels) 中找到示例配置。

RendezvousChannel 配置

当队列子元素是 <rendezvous-queue> 时,会创建一个 RendezvousChannel。它不提供任何前面描述的额外配置选项,并且其队列不接受任何容量值,因为它是一个容量为零的直接切换 (direct handoff) 队列。以下示例展示了如何声明一个 RendezvousChannel

  • Java

  • XML

@Bean
public PollableChannel rendezvousChannel() {
    return new RendezvousChannel();
}
<int:channel id="rendezvousChannel"/>
    <int:rendezvous-queue/>
</int:channel>

作用域通道配置

任何通道都可以配置 scope 属性,如下例所示

<int:channel id="threadLocalChannel" scope="thread"/>

通道拦截器配置

消息通道也可以有拦截器,如 通道拦截器 中所述。可以将 <interceptors/> 子元素添加到 <channel/>(或更具体的元素类型)中。您可以提供 ref 属性来引用任何实现了 ChannelInterceptor 接口的 Spring 管理对象,如下例所示

<int:channel id="exampleChannel">
    <int:interceptors>
        <ref bean="trafficMonitoringInterceptor"/>
    </int:interceptors>
</int:channel>

通常,我们建议在单独的位置定义拦截器实现,因为它们通常提供可在多个通道中重用的通用行为。

全局通道拦截器配置

通道拦截器提供了一种简洁明了的方式来为每个单独的通道应用横切行为。如果同一行为需要应用于多个通道,则为每个通道配置同一组拦截器并不是最有效的方式。为了避免重复配置并使拦截器能够应用于多个通道,Spring Integration 提供了全局拦截器。考虑以下两个示例

<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
    <bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>

<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>

每个 <channel-interceptor/> 元素允许您定义一个全局拦截器,它应用于所有与 pattern 属性定义的任何模式匹配的通道。在前面的例子中,全局拦截器应用于 'thing1' 通道以及所有以 'thing2' 或 'input' 开头但 *不* 以 'thing3' 开头的其他通道(自版本 5.0 起)。

在模式中添加此语法可能会导致一个潜在(尽管可能不太可能)的问题。如果您有一个名为 !thing1 的 bean,并且在通道拦截器的 pattern 模式中包含 !thing1 模式,它将不再匹配。该模式现在匹配所有 *不* 名为 thing1 的 bean。在这种情况下,您可以在模式中使用 \ 转义 !。模式 \!thing1 匹配名为 !thing1 的 bean。

当给定通道上有多个拦截器时,order 属性让您管理此拦截器注入的位置。例如,通道 'inputChannel' 可以通过本地配置的方式(见下文)拥有单独的拦截器,如以下示例所示:

<int:channel id="inputChannel">
  <int:interceptors>
    <int:wire-tap channel="logger"/>
  </int:interceptors>
</int:channel>

一个合理的问题是“全局拦截器相对于通过本地或其他全局拦截器定义配置的其他拦截器如何注入?”目前的实现提供了一个简单的机制来定义拦截器的执行顺序。order 属性中的正数确保拦截器在任何现有拦截器 *之后* 注入,而负数确保拦截器在现有拦截器 *之前* 注入。这意味着,在前面的示例中,全局拦截器在本地配置的 'wire-tap' 拦截器 *之后* 注入(因为其 order 大于 0)。如果存在另一个具有匹配 pattern 的全局拦截器,其顺序将通过比较两个拦截器的 order 属性值来确定。要在现有拦截器 *之前* 注入全局拦截器,请使用 order 属性的负值。

请注意,orderpattern 属性都是可选的。order 的默认值是 0,pattern 的默认值是 '*'(匹配所有通道)。

监听(Wire Tap)

如前所述,Spring Integration 提供了一个简单的监听拦截器。您可以在 <interceptors/> 元素内的任何通道上配置监听器。这样做对于调试特别有用,并且可以与 Spring Integration 的日志通道适配器结合使用,如下所示:

<int:channel id="in">
    <int:interceptors>
        <int:wire-tap channel="logger"/>
    </int:interceptors>
</int:channel>

<int:logging-channel-adapter id="logger" level="DEBUG"/>
'logging-channel-adapter' 也接受一个 'expression' 属性,以便您可以对 'payload' 和 'headers' 变量评估 SpEL 表达式。或者,要日志完整消息的 toString() 结果,请为 'log-full-message' 属性提供值 true。默认情况下,它为 false,因此只日志 payload。将其设置为 true 可以日志除 payload 外的所有 headers。'expression' 选项提供了最大的灵活性(例如,expression="payload.user.name")。

关于监听器和其他类似组件(消息发布配置)的常见误解之一是它们本质上是自动异步的。默认情况下,作为组件的监听器不是异步调用的。相反,Spring Integration 专注于一种统一的方法来配置异步行为:消息通道。使消息流的某些部分同步或异步的是在该流中配置的 Message Channel 类型。这是消息通道抽象的主要优点之一。从框架创建之初,我们就一直强调消息通道作为框架的一等公民的需求和价值。它不仅仅是 EIP 模式的一种内部、隐式实现。它作为可配置组件完全暴露给最终用户。因此,监听器组件仅负责执行以下任务:

  • 通过监听一个通道(例如,channelA)来拦截消息流

  • 捕获每条消息

  • 将消息发送到另一个通道(例如,channelB

它本质上是桥接模式的一种变体,但它被封装在通道定义中(因此更容易启用和禁用而不会中断流)。此外,与桥接不同的是,它基本是派生出另一个消息流。该流是同步还是异步的?答案取决于 'channelB' 是哪种类型的消息通道。我们有以下选项:direct channel(直接通道)、pollable channel(可轮询通道)和 executor channel(执行器通道)。后两者打破了线程边界,使通过此类通道的通信变为异步,因为消息从该通道分派到其订阅的处理程序所使用的线程与将消息发送到该通道所使用的线程不同。这就是决定您的监听流是同步还是异步的因素。这与框架中的其他组件(例如消息发布器)一致,并通过避免您提前担心(除了编写线程安全的代码)特定代码段是应实现为同步还是异步,增加了统一性和简单性。通过消息通道连接两个代码段(例如,组件 A 和组件 B)的实际方式决定了它们的协作是同步还是异步的。您甚至可能希望将来从同步改为异步,而消息通道让您无需更改代码即可快速完成。

关于监听器的最后一点是,尽管上面提供了默认不异步的理由,您应该记住通常期望尽快将消息传递出去。因此,使用异步通道选项作为监听器的出站通道会非常常见。然而,异步行为并非默认强制执行。如果我们这样做,许多用例将会中断,包括您可能不想破坏事务边界。也许您使用监听器模式进行审计,并且确实希望审计消息在原始事务中发送。例如,您可能将监听器连接到 JMS 出站通道适配器。这样,您就可以兼得两全其美:1)发送 JMS 消息可以在事务内发生,同时 2)它仍然是“即发即忘”的操作,从而防止对主消息流造成任何明显的延迟。

从版本 4.0 开始,当拦截器(例如 WireTap)引用通道时,重要的是避免循环引用。您需要从当前拦截器拦截的通道中排除此类通道。这可以通过适当的模式或编程方式完成。如果您有一个引用 channel 的自定义 ChannelInterceptor,请考虑实现 VetoCapableInterceptor。这样,框架会根据提供的模式询问拦截器是否可以拦截每个候选通道。您还可以在拦截器方法中添加运行时保护,以确保通道不是被拦截器引用的通道。WireTap 使用了这两种技术。

从版本 4.3 开始,WireTap 增加了接受 channelName 而不是 MessageChannel 实例的构造函数。这对于 Java 配置和使用通道自动创建逻辑时非常方便。目标 MessageChannel bean 会在稍后,在与拦截器的第一次交互时,从提供的 channelName 中解析出来。

通道解析需要 BeanFactory,因此监听器实例必须是 Spring 管理的 bean。

这种延迟绑定方法还允许简化使用 Java DSL 配置的典型监听模式,如以下示例所示:

@Bean
public PollableChannel myChannel() {
    return MessageChannels.queue()
            .wireTap("loggingFlow.input")
            .get();
}

@Bean
public IntegrationFlow loggingFlow() {
    return f -> f.log();
}

条件监听

通过使用 selectorselector-expression 属性,可以使监听器具有条件性。selector 引用一个 MessageSelector bean,它可以在运行时决定消息是否应发送到监听通道。类似地,selector-expression 是一个布尔类型的 SpEL 表达式,其目的相同:如果表达式评估结果为 true,则将消息发送到监听通道。

全局监听配置

可以将全局监听器配置为全局通道拦截器配置的一种特殊情况。为此,配置一个顶层 wire-tap 元素。现在,除了正常的 wire-tap 命名空间支持外,还支持 patternorder 属性,它们的工作方式与 channel-interceptor 完全相同。以下示例展示了如何配置全局监听器:

  • Java

  • XML

@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
    return new WireTap(wiretapChannel);
}
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
全局监听器提供了一种方便的方式,可以在外部配置单个通道监听器,而无需修改现有通道配置。为此,将 pattern 属性设置为目标通道名称。例如,您可以使用此技术配置测试用例以验证通道上的消息。