路由器实现

由于基于内容的路由通常需要一些领域特定的逻辑,因此大多数用例需要 Spring Integration 提供使用 XML 命名空间支持或注解将委托给 POJO 的选项。稍后将讨论这两者。但是,我们首先介绍几种满足常见需求的实现。

PayloadTypeRouter

PayloadTypeRouter 根据负载类型映射将消息发送到通道,示例如下

<bean id="payloadTypeRouter"
      class="org.springframework.integration.router.PayloadTypeRouter">
    <property name="channelMapping">
        <map>
            <entry key="java.lang.String" value-ref="stringChannel"/>
            <entry key="java.lang.Integer" value-ref="integerChannel"/>
        </map>
    </property>
</bean>

Spring Integration 提供的命名空间(参见 命名空间支持)也支持 PayloadTypeRouter 的配置,它通过将 <router/> 配置及其相应的实现(使用 <bean/> 元素定义)组合成一个更简洁的配置元素来简化配置。以下示例显示了一个等同于上述配置但使用命名空间支持的 PayloadTypeRouter 配置

<int:payload-type-router input-channel="routingChannel">
    <int:mapping type="java.lang.String" channel="stringChannel" />
    <int:mapping type="java.lang.Integer" channel="integerChannel" />
</int:payload-type-router>

以下示例显示了使用 Java 配置的等效路由器

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public PayloadTypeRouter router() {
    PayloadTypeRouter router = new PayloadTypeRouter();
    router.setChannelMapping(String.class.getName(), "stringChannel");
    router.setChannelMapping(Integer.class.getName(), "integerChannel");
    return router;
}

使用 Java DSL 时,有两种选项。

首先,您可以定义路由器对象,如上例所示

@Bean
public IntegrationFlow routerFlow1() {
    return IntegrationFlow.from("routingChannel")
            .route(router())
            .get();
}

public PayloadTypeRouter router() {
    PayloadTypeRouter router = new PayloadTypeRouter();
    router.setChannelMapping(String.class.getName(), "stringChannel");
    router.setChannelMapping(Integer.class.getName(), "integerChannel");
    return router;
}

请注意,路由器可以是一个 @Bean,但不是必须的。如果它不是 @Bean,流程将注册它。

其次,您可以在 DSL 流本身内部定义路由函数,示例如下

@Bean
public IntegrationFlow routerFlow2() {
    return IntegrationFlow.from("routingChannel")
            .<Object, Class<?>>route(Object::getClass, m -> m
                    .channelMapping(String.class, "stringChannel")
                    .channelMapping(Integer.class, "integerChannel"))
            .get();
}

HeaderValueRouter

HeaderValueRouter 根据单个消息头值映射将消息发送到通道。创建 HeaderValueRouter 时,会使用要评估的消息头名称进行初始化。消息头的值可以是以下两种情况之一

  • 任意值

  • 通道名称

如果它是任意值,则需要将这些消息头值附加映射到通道名称。否则,无需额外的配置。

Spring Integration 提供了一个简单的基于命名空间的 XML 配置来配置 HeaderValueRouter。以下示例演示了当需要将消息头值映射到通道时 HeaderValueRouter 的配置

<int:header-value-router input-channel="routingChannel" header-name="testHeader">
    <int:mapping value="someHeaderValue" channel="channelA" />
    <int:mapping value="someOtherHeaderValue" channel="channelB" />
</int:header-value-router>

在解析过程中,上面示例中定义的路由器可能会遇到通道解析失败,从而引发异常。如果您想抑制此类异常并将未解析的消息发送到默认输出通道(由 default-output-channel 属性标识),请将 resolution-required 设置为 false

通常,消息头值未明确映射到通道的消息会发送到 default-output-channel。但是,当消息头值已映射到通道名称但无法解析该通道时,将 resolution-required 属性设置为 false 会将此类消息路由到 default-output-channel

以下示例显示了使用 Java 配置的等效路由器

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public HeaderValueRouter router() {
    HeaderValueRouter router = new HeaderValueRouter("testHeader");
    router.setChannelMapping("someHeaderValue", "channelA");
    router.setChannelMapping("someOtherHeaderValue", "channelB");
    return router;
}

使用 Java DSL 时,有两种选项。首先,您可以定义路由器对象,如上例所示

@Bean
public IntegrationFlow routerFlow1() {
    return IntegrationFlow.from("routingChannel")
            .route(router())
            .get();
}

public HeaderValueRouter router() {
    HeaderValueRouter router = new HeaderValueRouter("testHeader");
    router.setChannelMapping("someHeaderValue", "channelA");
    router.setChannelMapping("someOtherHeaderValue", "channelB");
    return router;
}

请注意,路由器可以是一个 @Bean,但不是必须的。如果它不是 @Bean,流程将注册它。

其次,您可以在 DSL 流本身内部定义路由函数,示例如下

@Bean
public IntegrationFlow routerFlow2() {
    return IntegrationFlow.from("routingChannel")
            .route(Message.class, m -> m.getHeaders().get("testHeader", String.class),
                    m -> m
                        .channelMapping("someHeaderValue", "channelA")
                        .channelMapping("someOtherHeaderValue", "channelB"),
                e -> e.id("headerValueRouter"))
            .get();
}

配置不需要将消息头值映射到通道名称的情况,因为消息头值本身就代表通道名称。以下示例显示了一个不需要将消息头值映射到通道名称的路由器

<int:header-value-router input-channel="routingChannel" header-name="testHeader"/>

自 Spring Integration 2.1 起,解析通道的行为更加明确。例如,如果您省略了 default-output-channel 属性,并且路由器无法解析至少一个有效通道,并且通过将 resolution-required 设置为 false 忽略了任何通道名称解析失败,那么将抛出 MessageDeliveryException

基本上,默认情况下,路由器必须能够成功地将消息路由到至少一个通道。如果您确实想丢弃消息,则必须将 default-output-channel 设置为 nullChannel

RecipientListRouter

RecipientListRouter 将每条收到的消息发送到一个静态定义的消息通道列表。以下示例创建了一个 RecipientListRouter

<bean id="recipientListRouter"
      class="org.springframework.integration.router.RecipientListRouter">
    <property name="channels">
        <list>
            <ref bean="channel1"/>
            <ref bean="channel2"/>
            <ref bean="channel3"/>
        </list>
    </property>
</bean>

Spring Integration 也为 RecipientListRouter 配置提供了命名空间支持(参见 命名空间支持),示例如下

<int:recipient-list-router id="customRouter" input-channel="routingChannel"
        timeout="1234"
        ignore-send-failures="true"
        apply-sequence="true">
  <int:recipient channel="channel1"/>
  <int:recipient channel="channel2"/>
</int:recipient-list-router>

以下示例显示了使用 Java 配置的等效路由器

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public RecipientListRouter router() {
    RecipientListRouter router = new RecipientListRouter();
    router.setSendTimeout(1_234L);
    router.setIgnoreSendFailures(true);
    router.setApplySequence(true);
    router.addRecipient("channel1");
    router.addRecipient("channel2");
    router.addRecipient("channel3");
    return router;
}

以下示例显示了使用 Java DSL 配置的等效路由器

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlow.from("routingChannel")
            .routeToRecipients(r -> r
                    .applySequence(true)
                    .ignoreSendFailures(true)
                    .recipient("channel1")
                    .recipient("channel2")
                    .recipient("channel3")
                    .sendTimeout(1_234L))
            .get();
}
此处的 'apply-sequence' 标志与 publish-subscribe-channel 的效果相同,并且与 publish-subscribe-channel 一样,在 recipient-list-router 上默认禁用。有关详细信息,请参见 PublishSubscribeChannel 配置

配置 RecipientListRouter 的另一个便捷选项是使用 Spring Expression Language (SpEL) 支持作为各个接收通道的选择器。这样做类似于在“链”的开头使用过滤器充当“选择性消费者”。然而,在这种情况下,它都相当简洁地组合在路由器的配置中,示例如下

<int:recipient-list-router id="customRouter" input-channel="routingChannel">
    <int:recipient channel="channel1" selector-expression="payload.equals('foo')"/>
    <int:recipient channel="channel2" selector-expression="headers.containsKey('bar')"/>
</int:recipient-list-router>

在上述配置中,由 selector-expression 属性标识的 SpEL 表达式会被评估,以确定该接收者是否应包含在给定输入消息的接收者列表中。表达式的评估结果必须是 boolean 类型。如果未定义此属性,则该通道始终位于接收者列表中。

RecipientListRouterManagement

从 4.1 版本开始,RecipientListRouter 提供了几个操作,可以在运行时动态操纵接收者。这些管理操作通过 @ManagedResource 注解由 RecipientListRouterManagement 提供。它们可以通过 控制总线 以及 JMX 使用,示例如下

<control-bus input-channel="controlBus"/>

<recipient-list-router id="simpleRouter" input-channel="routingChannelA">
   <recipient channel="channel1"/>
</recipient-list-router>

<channel id="channel2"/>
Message<?> addRecipientCommandMessage =
                     MessageBuilder.withPayload("'simpleRouter.handler'.addRecipient")
                            .setHeader(IntegrationMessageHeaderAccessor.CONTROL_BUS_ARGUMENTS, List.of("channel2"))
                            .build();

从应用启动开始,simpleRouter 只有一个 channel1 接收者。但在执行 addRecipient 命令后,添加了 channel2 接收者。这是一个“注册对消息中某部分感兴趣”的用例,当我们可能在某个时间段对来自路由器的消息感兴趣时,因此我们订阅了 recipient-list-router,并在某个时候决定取消订阅。

由于 <recipient-list-router> 的运行时管理操作,它可以从一开始就不配置任何 <recipient>。在这种情况下,当消息没有匹配的接收者时,RecipientListRouter 的行为与有匹配接收者时相同。如果配置了 defaultOutputChannel,消息将被发送到那里。否则,将抛出 MessageDeliveryException

XPath 路由器

XPath 路由器是 XML 模块的一部分。参见 使用 XPath 路由 XML 消息

路由和错误处理

Spring Integration 还提供了一种特殊的基于类型的路由器,称为 ErrorMessageExceptionTypeRouter,用于路由错误消息(定义为负载为 Throwable 实例的消息)。ErrorMessageExceptionTypeRouterPayloadTypeRouter 类似。事实上,它们几乎相同。唯一的区别是,虽然 PayloadTypeRouter 导航负载实例的实例层级结构(例如,payload.getClass().getSuperclass())以找到最具体的类型和通道映射,但 ErrorMessageExceptionTypeRouter 导航“异常原因”的层级结构(例如,payload.getCause())以找到最具体的 Throwable 类型或通道映射,并使用 mappingClass.isInstance(cause)cause 与类或任何超类匹配。

在这种情况下,通道映射的顺序很重要。因此,如果需要获取 IllegalArgumentException 的映射而不是 RuntimeException 的映射,则必须先在路由器上配置后者。
自版本 4.3 起,ErrorMessageExceptionTypeRouter 在初始化阶段加载所有映射类,以便在发生 ClassNotFoundException 时快速失败。

以下示例显示了 ErrorMessageExceptionTypeRouter 的示例配置

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • XML DSL

@Bean
public IntegrationFlow someFlow() {
    return f -> f
            .routeByException(r -> r
                 .channelMapping(IllegalArgumentException.class, "illegalChannel")
                 .channelMapping(NullPointerException.class, "npeChannel")
                 .defaultOutputChannel("defaultChannel"));
}
@Bean
fun someFlow() =
    integrationFlow {
        routeByException {
                    channelMapping(IllegalArgumentException::class.java, "illegalChannel")
                    channelMapping(NullPointerException::class.java, "npeChannel")
                    defaultOutputChannel("defaultChannel")
                }
    }
@Bean
someFlow() {
    integrationFlow {
        routeByException {
            channelMapping IllegalArgumentException, 'illegalChannel'
            channelMapping NullPointerException, 'npeChannel'
            defaultOutputChannel 'defaultChannel'
        }
    }
}
<int:exception-type-router input-channel="inputChannel"
                           default-output-channel="defaultChannel">
    <int:mapping exception-type="java.lang.IllegalArgumentException"
                 channel="illegalChannel"/>
    <int:mapping exception-type="java.lang.NullPointerException"
                 channel="npeChannel"/>
</int:exception-type-router>

<int:channel id="illegalChannel" />
<int:channel id="npeChannel" />