消息端点

本章第一部分介绍了一些背景理论,并揭示了驱动 Spring Integration 各种消息组件的底层 API 的许多内容。如果您想真正理解幕后发生的事情,这些信息会很有帮助。但是,如果您想快速开始使用基于简化命名空间的各种元素配置,现在可以跳到端点命名空间支持部分。

如概述中所述,消息端点负责将各种消息组件连接到通道。接下来的几章中,我们将介绍许多不同的消息消费者组件。其中一些组件也能够发送回复消息。发送消息相当简单。如前文消息通道中所示,您可以将消息发送到消息通道。然而,接收消息则稍微复杂一些。主要原因是消费者有两种类型:轮询消费者事件驱动消费者

在这两者中,事件驱动消费者要简单得多。它们无需管理和调度单独的轮询线程,本质上是带有回调方法的监听器。当连接到 Spring Integration 的可订阅消息通道时,这种简单选项工作得很好。然而,当连接到缓冲、可轮询消息通道时,某些组件必须调度和管理轮询线程。Spring Integration 提供了两种不同的端点实现来适应这两种类型的消费者。因此,消费者本身只需实现回调接口即可。需要轮询时,端点充当消费者实例的容器。其好处类似于使用容器托管消息驱动 Bean,但由于这些消费者是运行在 ApplicationContext 中的 Spring 管理对象,它更接近于 Spring 自己的 MessageListener 容器。

消息处理器

Spring Integration 的 MessageHandler 接口由框架内的许多组件实现。换句话说,这不是公共 API 的一部分,您通常不会直接实现 MessageHandler。然而,它被消息消费者用于实际处理消费的消息,因此了解此策略接口有助于理解消费者扮演的整体角色。该接口定义如下

public interface MessageHandler {

    void handleMessage(Message<?> message);

}

尽管接口简单,但它为后续章节中介绍的大多数组件(路由器、转换器、分割器、聚合器、服务激活器等)提供了基础。这些组件处理消息的功能各不相同,但实际接收消息的要求是相同的,并且轮询和事件驱动行为的选择也是相同的。Spring Integration 提供了两种端点实现,它们托管这些基于回调的处理程序,并使它们能够连接到消息通道。

事件驱动消费者

由于事件驱动消费者是两者中较简单的一个,我们首先介绍它。您可能记得 SubscribableChannel 接口提供了一个 subscribe() 方法,并且该方法接受一个 MessageHandler 参数(如SubscribableChannel中所示)。以下列表显示了 subscribe 方法的定义

subscribableChannel.subscribe(messageHandler);

由于订阅到通道的处理程序不必主动轮询该通道,这是一个事件驱动消费者,并且 Spring Integration 提供的实现接受 SubscribableChannelMessageHandler,如下例所示

SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);

EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);

轮询消费者

Spring Integration 也提供了 PollingConsumer,其实例化方式与 EventDrivenConsumer 相同,只是通道必须实现 PollableChannel,如下例所示

PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);

PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
有关轮询消费者的更多信息,请参阅通道适配器通道适配器

轮询消费者还有许多其他配置选项。以下示例展示了如何设置触发器

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));

PeriodicTrigger 通常使用简单的间隔(Duration)定义,但也支持 initialDelay 属性和布尔型 fixedRate 属性(默认为 false,即无固定延迟)。以下示例同时设置了这两个属性

PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);

前例中三个设置的结果是,触发器等待五秒钟,然后每秒触发一次。

CronTrigger 需要有效的 cron 表达式。详情请参阅 Javadoc。以下示例设置了一个新的 CronTrigger

CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");

前例中定义的触发器的结果是,触发器在周一到周五每十秒触发一次。

轮询端点的默认触发器是一个 PeriodicTrigger 实例,具有 1 秒的固定延迟周期。

除了触发器之外,您还可以指定另外两个与轮询相关的配置属性:maxMessagesPerPollreceiveTimeout。以下示例展示了如何设置这两个属性

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);

maxMessagesPerPoll 属性指定在给定轮询操作中接收的最大消息数。这意味着轮询器会持续调用 receive() 而不等待,直到返回 null 或达到最大值。例如,如果一个轮询器有一个十秒间隔的触发器和 maxMessagesPerPoll 设置为 25,并且它正在轮询一个队列中有 100 条消息的通道,那么所有 100 条消息可以在 40 秒内被检索。它会获取 25 条,等待十秒,然后获取接下来的 25 条,依此类推。如果 maxMessagesPerPoll 配置为负值,则在单个轮询周期内会持续调用 MessageSource.receive(),直到它返回 null。从 5.5 版本开始,值 0 具有特殊含义——完全跳过 MessageSource.receive() 调用,这可以被视为暂停此轮询端点,直到稍后(例如通过 Control Bus)将 maxMessagesPerPoll 更改为非零值。

receiveTimeout 属性指定当轮询器调用接收操作时,如果没有可用消息应等待的时间。例如,考虑两种表面看起来相似但实际上非常不同的选项:第一种的间隔触发器为 5 秒,接收超时为 50 毫秒;而第二种的间隔触发器为 50 毫秒,接收超时为 5 秒。第一种配置可能会比消息到达通道后延迟最多 4950 毫秒才接收到该消息(如果该消息在其一次轮询调用返回后立即到达)。另一方面,第二种配置的消息延迟不会超过 50 毫秒。区别在于第二种选项需要一个线程等待。然而,结果是它能够更快地响应到达的消息。这种技术被称为“长轮询”,可以用来模拟轮询源上的事件驱动行为。

轮询消费者也可以委托给 Spring 的 TaskExecutor,如下例所示

PollingConsumer consumer = new PollingConsumer(channel, handler);

TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);

此外,PollingConsumer 有一个名为 adviceChain 的属性。此属性允许您指定一个 AOP 通知 List,用于处理包括事务在内的附加横切关注点。这些通知应用于 doPoll() 方法周围。有关更深入的信息,请参阅端点命名空间支持下的 AOP 通知链和事务支持部分。另请参阅 @Poller 注解的 Javadoc 以及相关的消息注解支持部分。Java DSL 也提供了 .poller() 端点配置选项及其相应的 Pollers 工厂。

前面的示例展示了依赖查找。但是,请记住这些消费者通常被配置为 Spring bean 定义。实际上,Spring Integration 还提供了一个名为 ConsumerEndpointFactoryBeanFactoryBean,它根据通道类型创建适当的消费者类型。此外,Spring Integration 具有完整的 XML 命名空间支持,可以进一步隐藏这些细节。本指南在介绍每种组件类型时都会重点介绍基于命名空间的配置。

许多 MessageHandler 实现可以生成回复消息。如前所述,与接收消息相比,发送消息微不足道。然而,何时发送以及发送多少回复消息取决于处理程序类型。例如,聚合器会等待一定数量的消息到达,并且通常被配置为分割器的下游消费者,分割器可以为它处理的每条消息生成多个回复。使用命名空间配置时,您不需要严格了解所有细节。但是,了解其中一些组件共享一个公共基类 AbstractReplyProducingMessageHandler,并且它提供了一个 setOutputChannel(..) 方法,可能仍然值得。

端点命名空间支持

在本参考手册中,您可以找到端点元素的具体配置示例,例如路由器、转换器、服务激活器等。其中大多数支持 input-channel 属性,许多支持 output-channel 属性。解析后,这些端点元素会生成 PollingConsumerEventDrivenConsumer 的实例,具体取决于引用的 input-channel 类型:分别是 PollableChannelSubscribableChannel。当通道是可轮询的时,轮询行为基于端点元素的 poller 子元素及其属性。

以下列出了 poller 的所有可用配置选项

<int:poller cron=""                                  (1)
            default="false"                          (2)
            error-channel=""                         (3)
            fixed-delay=""                           (4)
            fixed-rate=""                            (5)
            initial-delay=""                         (6)
            id=""                                    (7)
            max-messages-per-poll=""                 (8)
            receive-timeout=""                       (9)
            ref=""                                   (10)
            task-executor=""                         (11)
            time-unit="MILLISECONDS"                 (12)
            trigger="">                              (13)
            <int:advice-chain />                     (14)
            <int:transactional />                    (15)
</int:poller>
1 提供了使用 Cron 表达式配置轮询器的能力。底层实现使用了 org.springframework.scheduling.support.CronTrigger。如果设置了此属性,则不能指定以下任何属性:fixed-delaytriggerfixed-rateref
2 通过将此属性设置为 true,您可以定义恰好一个全局默认轮询器。如果在应用上下文中定义了多个默认轮询器,则会引发异常。连接到 PollableChannelPollingConsumer)的任何端点或任何未显式配置轮询器的 SourcePollingChannelAdapter 都将使用全局默认轮询器。它默认为 false。可选。
3 指定当此轮询器调用发生故障时发送错误消息的通道。要完全抑制异常,您可以提供对 nullChannel 的引用。可选。
4 固定延迟触发器底层使用了 PeriodicTrigger。数值以 time-unit 为单位,或可以采用持续时间格式(从 6.2 版本开始),例如 PT10SP1D。如果设置了此属性,则不能指定以下任何属性:fixed-ratetriggercronref
5 固定速率触发器底层使用了 PeriodicTrigger。数值以 time-unit 为单位,或可以采用持续时间格式(从 6.2 版本开始),例如 PT10SP1D。如果设置了此属性,则不能指定以下任何属性:fixed-delaytriggercronref
6 PeriodicTrigger 的初始延迟(从 6.2 版本开始)。数值以 time-unit 为单位,或可以采用持续时间格式,例如 PT10SP1D
7 指向轮询器底层 bean 定义的 ID,其类型为 org.springframework.integration.scheduling.PollerMetadata。顶级轮询器元素必须指定 id 属性,除非它是默认轮询器(default="true")。
8 有关更多信息,请参阅配置入站通道适配器。如果未指定,默认值取决于上下文。如果您使用 PollingConsumer,此属性默认为 -1。然而,如果您使用 SourcePollingChannelAdaptermax-messages-per-poll 属性默认为 1。可选。
9 该值设置在底层类 PollerMetadata 上。如果未指定,默认为 1000(毫秒)。可选。
10 对另一个顶级轮询器的 bean 引用。顶级 poller 元素上不得出现 ref 属性。然而,如果设置了此属性,则不能指定以下任何属性:fixed-ratetriggercronfixed-delay
11 提供了引用自定义任务执行器的能力。有关更多信息,请参阅TaskExecutor 支持。可选。
12 此属性指定了底层 org.springframework.scheduling.support.PeriodicTrigger 上的 java.util.concurrent.TimeUnit 枚举值。因此,此属性只能与 fixed-delayfixed-rate 属性结合使用。如果与 crontrigger 引用属性结合使用,则会导致失败。PeriodicTrigger 支持的最小粒度是毫秒。因此,唯一可用的选项是毫秒和秒。如果未提供此值,则任何 fixed-delayfixed-rate 值都将被解释为毫秒。基本上,此枚举提供了秒级间隔触发器值的便利。对于按小时、按天和按月的设置,我们建议使用 cron 触发器。
13 对实现了 org.springframework.scheduling.Trigger 接口的任何 Spring 配置的 bean 的引用。然而,如果设置了此属性,则不能指定以下任何属性:fixed-delayfixed-ratecronref。可选。
14 允许指定额外的 AOP 通知来处理附加的横切关注点。有关更多信息,请参阅事务。可选。
15 轮询器可以设置为事务性的。有关更多信息,请参阅AOP 通知链。可选。

示例

一个简单的、基于间隔的轮询器,间隔为 1 秒,可以按如下方式配置

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller fixed-rate="1000"/>
</int:transformer>

作为使用 fixed-rate 属性的替代方案,您也可以使用 fixed-delay 属性。

对于基于 Cron 表达式的轮询器,请改用 cron 属性,如下例所示

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>

如果输入通道是 PollableChannel,则需要配置轮询器。具体来说,如前所述,triggerPollingConsumer 类的一个必需属性。因此,如果您遗漏轮询消费者端点配置的 poller 子元素,可能会抛出异常。如果您尝试在连接到非可轮询通道的元素上配置轮询器,也可能会抛出异常。

也可以创建顶级轮询器,在这种情况下只需要 ref 属性,如下例所示

<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller ref="weekdayPoller"/>
</int:transformer>
ref 属性仅允许在内部轮询器定义上使用。在顶级轮询器上定义此属性会导致在应用上下文初始化期间抛出配置异常。

全局默认轮询器

为了进一步简化配置,您可以定义一个全局默认轮询器。XML DSL 中的单个顶级轮询器组件可以将 default 属性设置为 true。对于 Java 配置,在这种情况下必须声明一个名称为 PollerMetadata.DEFAULT_POLLERPollerMetadata bean。在这种情况下,同一 ApplicationContext 中定义的任何输入通道为 PollableChannel 且未显式配置 poller 的端点都会使用该默认设置。以下示例展示了这样一个轮询器以及使用它的转换器

  • Java DSL

  • Java

  • Kotlin DSL

  • XML

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(5);
    pollerMetadata.setTrigger(new PeriodicTrigger(3000));
    return pollerMetadata;
}

// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
    return IntegrationFlow.from(MessageChannels.queue("pollable"))
                           .transform(transformer) // No 'poller' attribute because there is a default global poller
                           .channel("output")
                           .get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(5);
    pollerMetadata.setTrigger(new PeriodicTrigger(3000));
    return pollerMetadata;
}

@Bean
public QueueChannel pollable() {
   return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
    ...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
    PollerMetadata()
        .also {
            it.maxMessagesPerPoll = 5
            it.trigger = PeriodicTrigger(3000)
        }

@Bean
fun convertFlow() =
    integrationFlow(MessageChannels.queue("pollable")) {
    	transform(transformer) // No 'poller' attribute because there is a default global poller
    	channel("output")
    }
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>

<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
                 ref="transformer"
                 output-channel="output"/>

事务支持

Spring Integration 还为轮询器提供事务支持,以便每个接收并转发操作都可以作为一个原子工作单元执行。要为轮询器配置事务,请添加 <transactional/> 子元素。以下示例展示了可用属性

<int:poller fixed-delay="1000">
    <int:transactional transaction-manager="txManager"
                       propagation="REQUIRED"
                       isolation="REPEATABLE_READ"
                       timeout="10000"
                       read-only="false"/>
</int:poller>

有关更多信息,请参阅轮询器事务支持

AOP 通知链

由于 Spring 事务支持依赖于代理机制,通过 TransactionInterceptor(AOP 通知)来处理由轮询器(poller)启动的消息流的事务行为,因此有时您必须提供额外的通知(advice)来处理与轮询器相关的其他横切行为。为此,poller 定义了一个 advice-chain 元素,允许您添加更多实现了 MethodInterceptor 接口的类作为通知。以下示例展示了如何为 poller 定义一个 advice-chain

<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
		method="good" output-channel="output">
	<int:poller max-messages-per-poll="1" fixed-rate="10000">
		 <int:advice-chain>
			<ref bean="adviceA" />
			<beans:bean class="org.something.SampleAdvice" />
			<ref bean="txAdvice" />
		</int:advice-chain>
	</int:poller>
</int:service-activator>

有关如何实现 MethodInterceptor 接口的更多信息,请参阅 Spring Framework Reference Guide 的 AOP 部分。通知链也可以应用于没有任何事务配置的轮询器,从而让您增强由轮询器启动的消息流的行为。

使用通知链时,不能指定 <transactional/> 子元素。取而代之的是,声明一个 <tx:advice/> bean 并将其添加到 <advice-chain/> 中。完整的配置详情请参见 轮询器事务支持

TaskExecutor 支持

轮询线程可以由 Spring 的 TaskExecutor 抽象的任何实例执行。这使得端点或端点组能够实现并发。自 Spring 3.0 起,核心 Spring Framework 提供了一个 task 命名空间,其 <executor/> 元素支持创建简单的线程池执行器。该元素接受常见并发设置的属性,例如 pool-size 和 queue-capacity。配置一个线程池执行器可以显著影响端点在负载下的性能。这些设置对每个端点都可用,因为端点的性能是需要考虑的主要因素之一(另一个主要因素是端点订阅的通道的预期消息量)。为了启用 XML 命名空间支持配置的轮询端点的并发性,在其 <poller/> 元素上提供 task-executor 引用,然后提供以下示例中显示的一个或多个属性。

<int:poller task-executor="pool" fixed-rate="1000"/>

<task:executor id="pool"
               pool-size="5-25"
               queue-capacity="20"
               keep-alive="120"/>

如果您不提供任务执行器,消费者的处理程序将在调用者的线程中调用。请注意,调用者通常是默认的 TaskScheduler(参见 配置 Task Scheduler)。您还应该记住,task-executor 属性可以通过指定 bean 名称来引用 Spring 的 TaskExecutor 接口的任何实现。前面显示的 executor 元素是为了方便而提供的。

正如前面在轮询消费者背景部分提到的,您也可以配置轮询消费者,使其模拟事件驱动行为。通过较长的接收超时时间和较短的触发器间隔,即使是在轮询的消息源上,您也可以确保对到达消息作出非常及时的反应。请注意,这仅适用于具有阻塞等待调用并带超时的源。例如,文件轮询器不阻塞。每次 receive() 调用会立即返回,要么包含新文件,要么不包含。因此,即使轮询器包含较长的 receive-timeout,该值在这种情况下也永远不会被使用。另一方面,当使用 Spring Integration 自有的基于队列的通道时,超时值确实有机会发挥作用。以下示例展示了轮询消费者如何几乎即时地接收消息。

<int:service-activator input-channel="someQueueChannel"
    output-channel="output">
    <int:poller receive-timeout="30000" fixed-rate="10"/>

</int:service-activator>

使用这种方法不会产生太多开销,因为在内部,它不过是一个定时等待线程,它并不像(例如)一个无休止的忙等(thrashing, infinite while loop)那样需要大量 CPU 资源。

运行时改变轮询速率

当使用 fixed-delayfixed-rate 属性配置轮询器时,默认实现使用 PeriodicTrigger 实例。PeriodicTrigger 是核心 Spring Framework 的一部分。它只接受间隔作为构造函数参数。因此,它不能在运行时改变。

但是,您可以定义自己的 org.springframework.scheduling.Trigger 接口实现。您甚至可以使用 PeriodicTrigger 作为起点。然后您可以为间隔(周期)添加一个 setter,或者您甚至可以在触发器本身中嵌入自己的节流逻辑。period 属性在每次调用 nextExecutionTime 时用于安排下一次轮询。要在轮询器中使用此自定义触发器,请在您的应用程序上下文中声明自定义触发器的 bean 定义,并使用 trigger 属性将依赖项注入到您的轮询器配置中,该属性引用自定义触发器 bean 实例。现在,您可以获取触发器 bean 的引用,并在轮询之间更改轮询间隔。

有关示例,请参阅 Spring Integration Samples 项目。它包含一个名为 dynamic-poller 的示例,该示例使用自定义触发器并演示了在运行时更改轮询间隔的能力。

该示例提供了一个实现 org.springframework.scheduling.Trigger 接口的自定义触发器。该示例的触发器基于 Spring 的 PeriodicTrigger 实现。然而,自定义触发器的字段不是 final 的,并且属性具有显式的 getter 和 setter,允许您在运行时动态更改轮询周期。

但是,重要的是要注意,由于 Trigger 方法是 nextExecutionTime(),因此对动态触发器的任何更改都不会立即生效,而是基于现有配置在下一次轮询时才会生效。不可能强制触发器在其当前配置的下一次执行时间之前触发。

载荷类型转换

在本参考手册中,您还可以看到各种端点的特定配置和实现示例,这些端点接受消息或任意 Object 作为输入参数。对于 Object 的情况,此类参数被映射到消息载荷或载荷的一部分或消息头(当使用 Spring 表达式语言时)。但是,端点方法的输入参数类型有时与载荷或其部分的类型不匹配。在这种情况下,我们需要执行类型转换。Spring Integration 提供了一种便捷的方式来在其自身的名为 integrationConversionService 的转换服务 bean 实例中注册类型转换器(通过使用 Spring 的 ConversionService)。一旦通过 Spring Integration 基础设施定义了第一个转换器,就会自动创建该 bean。要注册转换器,您可以实现 org.springframework.core.convert.converter.Converterorg.springframework.core.convert.converter.GenericConverterorg.springframework.core.convert.converter.ConverterFactory

Converter 实现是最简单的,用于从单一类型转换为另一种类型。为了更复杂的需求,例如转换为类层次结构,您可以实现 GenericConverter,并且可能实现 ConditionalConverter。这些接口让您可以完全访问 fromto 类型描述符,从而实现复杂的转换。例如,如果您有一个名为 Something 的抽象类作为您转换的目标(参数类型、通道数据类型等),并且有两个具体实现 Thing1Thing,您希望根据输入类型转换为其中一个,那么 GenericConverter 将是一个很好的选择。有关更多信息,请参阅这些接口的 Javadoc。

实现转换器后,您可以使用便捷的命名空间支持注册它,如下例所示:

<int:converter ref="sampleConverter"/>

<bean id="sampleConverter" class="foo.bar.TestConverter"/>

或者,您可以使用内部 bean,如下例所示:

<int:converter>
    <bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>

从 Spring Integration 4.0 开始,您可以使用注解来创建上述配置,如下例所示:

@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {

	public Number convert(Boolean source) {
		return source ? 1 : 0;
	}

}

或者,您可以使用 @Configuration 注解,如下例所示:

@Configuration
@EnableIntegration
public class ContextConfiguration {

	@Bean
	@IntegrationConverter
	public SerializingConverter serializingConverter() {
		return new SerializingConverter();
	}

}

配置应用程序上下文时,Spring Framework 允许您添加一个 conversionService bean(参见 配置 ConversionService 一章)。在 bean 创建和配置期间需要时,此服务用于执行适当的转换。

相比之下,integrationConversionService 用于运行时转换。这两种用途截然不同。用于连接 bean 构造函数参数和属性的转换器,如果在运行时用于 Spring Integration 表达式评估(针对数据类型通道中的消息、载荷类型转换器等),可能会产生意外结果。

但是,如果您确实希望将 Spring 的 conversionService 用作 Spring Integration 的 integrationConversionService,您可以在应用程序上下文中配置一个别名,如下例所示:

<alias name="conversionService" alias="integrationConversionService"/>

在这种情况下,conversionService 提供的转换器将可用于 Spring Integration 的运行时转换。

内容类型转换

从 5.0 版本开始,默认情况下,方法调用机制基于 org.springframework.messaging.handler.invocation.InvocableHandlerMethod 基础设施。它的 HandlerMethodArgumentResolver 实现(如 PayloadArgumentResolverMessageMethodArgumentResolver)可以使用 MessageConverter 抽象将传入的 payload 转换为目标方法参数类型。转换可以基于 contentType 消息头。为此,Spring Integration 提供了 ConfigurableCompositeMessageConverter,它会将委托给一个注册的转换器列表,直到其中一个返回非空结果。默认情况下,此转换器提供(按严格顺序):

有关它们的用途和适当的 contentType 转换值,请参阅 Javadoc(链接在前述列表中)。使用 ConfigurableCompositeMessageConverter 是因为它可以提供任何其他 MessageConverter 实现,包括或不包括前面提到的默认转换器。它也可以在应用程序上下文中注册为适当的 bean,覆盖默认转换器,如下例所示:

@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
    List<MessageConverter> converters =
        Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
                 new JavaSerializationMessageConverter());
    return new ConfigurableCompositeMessageConverter(converters);
}

这两个新的转换器在复合转换器中注册,并且位于默认转换器之前。您也可以不使用 ConfigurableCompositeMessageConverter,而是通过注册一个名为 integrationArgumentResolverMessageConverter(通过设置 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME 属性)的 bean 来提供您自己的 MessageConverter

基于 MessageConverter 的(包括 contentType 头)转换在使用 SpEL 方法调用时不可用。在这种情况下,只有上面在 载荷类型转换 中提到的常规类到类转换可用。

异步轮询

如果您希望轮询是异步的,轮询器可以选择指定一个 task-executor 属性,该属性指向任何现有的 TaskExecutor bean 实例(Spring 3.0 通过 task 命名空间提供便捷的命名空间配置)。但是,在配置带有 TaskExecutor 的轮询器时,您必须了解一些事项。

问题在于存在两种配置:轮询器和 TaskExecutor。它们必须相互协调。否则,您可能会导致人为的内存泄漏。

考虑以下配置:

<int:channel id="publishChannel">
    <int:queue />
</int:channel>

<int:service-activator input-channel="publishChannel" ref="myService">
	<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>

<task:executor id="taskExecutor" pool-size="20" />

上述配置展示了一种不协调的配置。

默认情况下,任务执行器具有无界任务队列。轮询器会不断调度新任务,即使所有线程都在阻塞,等待新消息到达或超时过期。考虑到有 20 个线程执行任务,超时时间为五秒,它们的执行速率为每秒 4 次。但是,新任务的调度速率为每秒 20 次,因此任务执行器的内部队列以每秒 16 次的速度增长(当进程空闲时),所以我们存在内存泄漏。

处理这个问题的一种方法是设置任务执行器的 queue-capacity 属性。即使是 0 也是一个合理的值。您也可以通过设置 Task Executor 的 rejection-policy 属性(例如,设置为 DISCARD)来管理如何处理无法排队的消​​息。换句话说,在配置 TaskExecutor 时,您必须理解某些细节。有关此主题的更多详细信息,请参阅 Spring 参考手册中的“任务执行和调度”

端点内部 Bean

许多端点是复合 Bean。这包括所有消费者和所有轮询入站通道适配器。消费者(轮询或事件驱动)委托给 MessageHandler。轮询适配器通过委托给 MessageSource 获取消息。通常,获取委托 bean 的引用很有用,也许是为了在运行时更改配置或用于测试。这些 bean 可以通过众所周知的名称从 ApplicationContext 中获取。MessageHandler 实例在应用程序上下文中注册的 bean ID 类似于 someConsumer.handler(其中 'consumer' 是端点 id 属性的值)。MessageSource 实例注册的 bean ID 类似于 somePolledAdapter.source,其中 'somePolledAdapter' 是适配器的 ID。

上述内容仅适用于框架组件本身。您可以改用内部 bean 定义,如下例所示:

<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
            output-channel = "outChannel" method="foo">
    <beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>

该 bean 被视为声明的任何内部 bean,并且未在应用程序上下文中注册。如果您希望以其他方式访问此 bean,请将其声明在顶层并提供一个 id,然后改用 ref 属性。有关更多信息,请参阅 Spring 文档