消息网关

网关隐藏了 Spring Integration 提供的消息处理 API。它使你的应用程序业务逻辑无需了解 Spring Integration API。通过使用通用网关,你的代码只需与一个简单接口交互。

引入 GatewayProxyFactoryBean

如前所述,最好不依赖 Spring Integration API,包括网关类。因此,Spring Integration 提供了 GatewayProxyFactoryBean,它可以为任何接口生成代理,并在内部调用如下所示的网关方法。通过使用依赖注入,你可以将该接口暴露给你的业务方法。

以下示例展示了一个可用于与 Spring Integration 交互的接口

public interface Cafe {

    void placeOrder(Order order);

}

网关 XML Namespace 支持

还提供了 Namespace 支持。它允许你将接口配置为服务,如下例所示

<int:gateway id="cafeService"
         service-interface="org.cafeteria.Cafe"
         default-request-channel="requestChannel"
         default-reply-timeout="10000"
         default-reply-channel="replyChannel"/>

定义此配置后,cafeService 现在可以注入到其他 bean 中,并且调用 Cafe 接口该代理实例上的方法的代码无需了解 Spring Integration API。请参阅“示例”附录,其中包含一个使用 gateway 元素的示例(在 Cafe demo 中)。

前述配置中的默认设置适用于网关接口上的所有方法。如果未指定回复超时时间,则调用线程将等待回复 30 秒。请参阅网关在没有回复时的行为

可以针对单个方法覆盖默认设置。请参阅使用注解和 XML 配置网关

设置默认回复通道

通常,你无需指定 default-reply-channel,因为网关会自动创建一个临时的、匿名的回复通道,并在其中监听回复。但是,在某些情况下,你可能需要定义 default-reply-channel(或对于适配器网关,如 HTTP、JMS 等,定义 reply-channel)。

为了提供一些背景信息,我们简要讨论一下网关的一些内部工作原理。网关会创建一个临时的点对点回复通道。它是匿名的,并以 replyChannel 名称添加到消息头中。当提供一个显式的 default-reply-channel(远程适配器网关使用 reply-channel)时,你可以指向一个发布-订阅通道,之所以这样命名是因为你可以向其添加多个订阅者。在内部,Spring Integration 在临时 replyChannel 和显式定义的 default-reply-channel 之间创建一个桥接。

假设你希望回复不仅发送到网关,还发送到其他一些消费者。在这种情况下,你需要两件事

  • 一个你可以订阅的命名通道

  • 该通道是一个发布-订阅通道

网关使用的默认策略不能满足这些需求,因为添加到消息头中的回复通道是匿名的点对点通道。这意味着没有其他订阅者可以获取其句柄,即使可以,该通道的点对点行为也只会导致只有一个订阅者收到消息。通过定义 default-reply-channel,你可以指向一个你选择的通道。在这种情况下,它是一个 publish-subscribe-channel。网关会从该通道桥接到存储在消息头中的临时匿名回复通道。

你可能还希望通过拦截器(例如,wiretap)显式提供一个回复通道用于监控或审计。要配置通道拦截器,你需要一个命名通道。

从 5.4 版本开始,当网关方法的返回类型为 void 时,如果未显式提供 replyChannel 头,框架会将其填充为 nullChannel bean 引用。这允许下游流可能产生的任何回复都被丢弃,满足单向网关的契约。

使用注解和 XML 配置网关

考虑以下示例,它在之前的 Cafe 接口示例基础上添加了 @Gateway 注解

public interface Cafe {

    @Gateway(requestChannel="orders")
    void placeOrder(Order order);

}

@Header 注解允许你添加将被解释为消息头的值,如下例所示

public interface FileWriter {

    @Gateway(requestChannel="filesOut")
    void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);

}

如果你倾向于使用 XML 方法来配置网关方法,你可以在网关配置中添加 method 元素,如下例所示

<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB"/>
  <int:method name="echoViaDefault"/>
</int:gateway>

你还可以使用 XML 为每个方法调用提供独立的头信息。这在你想设置的头是静态的,并且不希望通过使用 @Header 注解将其嵌入到网关的方法签名中时非常有用。例如,在贷款代理示例中,我们想根据启动的请求类型(单个报价或所有报价)来影响贷款报价的聚合方式。通过评估调用了哪个网关方法来确定请求类型,虽然可能,但会违反关注点分离范式(方法是 Java 工件)。然而,在消息处理架构中,在消息头中表达你的意图(元信息)是很自然的。以下示例展示了如何为两个方法添加不同的消息头

<int:gateway id="loanBrokerGateway"
         service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
  <int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="BEST"/>
  </int:method>
  <int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="ALL"/>
  </int:method>
</int:gateway>

在前面的示例中,根据网关的方法为 'RESPONSE_TYPE' 头设置了不同的值。

例如,如果你在 <int:method/> 中以及在 @Gateway 注解中都指定了 requestChannel,则注解的值优先。
如果在 XML 中指定了一个无参数网关,并且接口方法同时具有 @Payload@Gateway 注解(在 <int:method/> 元素中带有 payloadExpressionpayload-expression),则 @Payload 值将被忽略。

表达式和“全局”头信息

<header/> 元素支持使用 expression 作为 value 的替代方案。SpEL 表达式会被评估以确定头的值。从 5.2 版本开始,评估上下文的 #root 对象是一个 MethodArgsHolder,具有 getMethod()getArgs() 访问器。例如,如果你希望根据简单的方法名进行路由,你可以添加一个带有以下表达式的头:method.name

java.reflect.Method 是不可序列化的。如果以后序列化消息,带有表达式 method 的头将丢失。因此,在这些情况下,你可能希望使用 method.namemethod.toString()toString() 方法提供了该方法的字符串表示形式,包括参数和返回类型。

从 3.0 版本开始,可以定义 <default-header/> 元素,以便将头添加到网关生成的所有消息中,无论调用了哪个方法。为特定方法定义的头优先于默认头。此处为特定方法定义的头会覆盖服务接口中的任何 @Header 注解。但是,默认头不会覆盖服务接口中的任何 @Header 注解。

网关现在也支持 default-payload-expression,它适用于所有方法(除非被覆盖)。

将方法参数映射到消息

使用上一节中的配置技术可以控制方法参数如何映射到消息元素(载荷和头)。在没有使用显式配置时,会使用某些约定来执行映射。在某些情况下,这些约定无法确定哪个参数是载荷,哪个应该映射到头。考虑以下示例

public String send1(Object thing1, Map thing2);

public String send2(Map thing1, Map thing2);

在第一种情况下,约定是将第一个参数映射到载荷(只要它不是 Map),第二个参数的内容成为头。

在第二种情况下(或者当参数 thing1 的实参是 Map 时),框架无法确定哪个参数应该作为载荷。因此,映射会失败。这通常可以通过使用 payload-expression@Payload 注解或 @Headers 注解来解决。

或者(以及当约定失效时),你可以完全负责将方法调用映射到消息。为此,实现一个 MethodArgsMessageMapper 并通过使用 mapper 属性将其提供给 <gateway/>。该 mapper 映射一个 MethodArgsHolder,它是一个简单的类,封装了 java.reflect.Method 实例和一个包含参数的 Object[]。提供自定义 mapper 时,网关上不允许使用 default-payload-expression 属性和 <default-header/> 元素。类似地,任何 <method/> 元素上也不允许使用 payload-expression 属性和 <header/> 元素。

映射方法参数

以下示例展示了如何将方法参数映射到消息,并展示了一些无效配置的示例

public interface MyGateway {

    void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);

    void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);

    void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);

    void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added

    void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);

    @Payload("args[0] + args[1] + '!'")
    void payloadAnnotationAtMethodLevel(String a, String b);

    @Payload("@someBean.exclaim(args[0])")
    void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);

    void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);

    void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); //  (1)

    // invalid
    void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);

    // invalid
    void twoPayloads(@Payload String s1, @Payload String s2);

    // invalid
    void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);

    // invalid
    void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);

}
1 请注意,在此示例中,SpEL 变量 #this 指的是参数,在此处为 s 的值。

相应的 XML 配置略有不同,因为方法参数没有 #this 上下文。然而,表达式可以通过使用 MethodArgsHolder 根对象的 args 属性来引用方法参数(有关详细信息,请参阅表达式和“全局”头信息),如下例所示

<int:gateway id="myGateway" service-interface="org.something.MyGateway">
  <int:method name="send1" payload-expression="args[0] + 'thing2'"/>
  <int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
  <int:method name="send3" payload-expression="method"/>
  <int:method name="send4">
    <int:header name="thing1" expression="args[2].toUpperCase()"/>
  </int:method>
</int:gateway>

@MessagingGateway 注解

从 4.0 版本开始,网关服务接口可以使用 @MessagingGateway 注解进行标记,而无需定义 <gateway /> XML 元素进行配置。以下两个示例比较了配置同一网关的两种方法

<int:gateway id="myGateway" service-interface="org.something.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB">
    <int:header name="thing1" value="thing2"/>
  </int:method>
  <int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
		  defaultHeaders = @GatewayHeader(name = "calledMethod",
		                           expression="#gatewayMethod.name"))
public interface TestGateway {

   @Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
   String echo(String payload);

   @Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
   String echoUpperCase(String payload);

   String echoViaDefault(String payload);

}
与 XML 版本类似,当 Spring Integration 在组件扫描期间发现这些注解时,它会使用其消息处理基础设施创建 proxy 实现。要执行此扫描并在应用程序上下文中注册 BeanDefinition,请将 @IntegrationComponentScan 注解添加到 @Configuration 类。标准的 @ComponentScan 基础设施不处理接口。因此,我们引入了自定义的 @IntegrationComponentScan 逻辑来查找接口上的 @MessagingGateway 注解并为其注册 GatewayProxyFactoryBean 实例。另请参阅注解支持

除了 @MessagingGateway 注解之外,你还可以使用 @Profile 注解标记服务接口,以便在该 Profile 未激活时避免创建 bean。

从 6.0 版本开始,带有 @MessagingGateway 的接口也可以标记 @Primary 注解以实现相应的配置逻辑,这与任何 Spring @Component 定义一样。

从 6.0 版本开始,@MessagingGateway 接口可以在标准的 Spring @Import 配置中使用。这可以作为 @IntegrationComponentScan 或手动 AnnotationGatewayProxyFactoryBean bean 定义的替代方案。

自 6.0 版本以来,@MessagingGateway 使用 @MessageEndpoint 进行了元注解,并且 name() 属性本质上是 @Compnent.value() 的别名。这样,网关代理的 bean 名称生成策略就与扫描和导入组件的标准 Spring 注解配置重新对齐了。可以通过 AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR 全局覆盖默认的 AnnotationBeanNameGenerator,或通过 @IntegrationComponentScan.nameGenerator() 属性覆盖。

如果你没有 XML 配置,则至少在一个 @Configuration 类上需要 @EnableIntegration 注解。有关详细信息,请参阅配置和 @EnableIntegration

调用无参数方法

当调用网关接口上没有任何参数的方法时,默认行为是从 PollableChannel 接收 Message

然而,有时你可能希望触发无参数方法,以便与不需要用户提供的参数的下游其他组件交互,例如触发无参数的 SQL 调用或存储过程。

为了实现发送和接收语义,你必须提供载荷。为了生成载荷,接口上的方法参数并非必需。你可以使用 @Payload 注解,或者在 XML 的 method 元素上使用 payload-expression 属性。以下列表包含了一些载荷可以是哪些内容的示例

  • 字面量字符串

  • #gatewayMethod.name

  • new java.util.Date()

  • @someBean.someMethod() 的返回值

以下示例展示了如何使用 @Payload 注解

public interface Cafe {

    @Payload("new java.util.Date()")
    List<Order> retrieveOpenOrders();

}

你也可以使用 @Gateway 注解。

public interface Cafe {

    @Gateway(payloadExpression = "new java.util.Date()")
    List<Order> retrieveOpenOrders();

}
如果两个注解都存在(并且提供了 payloadExpression),则 @Gateway 优先。

如果一个方法没有参数、没有返回值,但包含一个载荷表达式,它将被视为仅发送操作。

调用 default 方法

网关代理的接口也可以有 default 方法,从 5.3 版本开始,框架会向代理注入一个 DefaultMethodInvokingMethodInterceptor,用于使用 java.lang.invoke.MethodHandle 方法调用 default 方法,而不是通过代理方式。JDK 中的接口,例如 java.util.function.Function,仍然可以用于网关代理,但由于针对 JDK 类的 MethodHandles.Lookup 实例化的内部 Java 安全原因,它们的 default 方法无法调用。这些方法也可以通过在方法上显式使用 @Gateway 注解,或者在 @MessagingGateway 注解或 <gateway> XML 组件上使用 proxyDefaultMethods 来进行代理(会丢失其实现逻辑,同时恢复之前的网关代理行为)。

错误处理

网关调用可能导致错误。默认情况下,下游发生的任何错误都会在网关方法调用时“原样”重新抛出。例如,考虑以下简单流程

gateway -> service-activator

如果服务激活器调用的服务抛出 MyException(例如),框架会将其包装在 MessagingException 中,并将传递给服务激活器的消息附加到 failedMessage 属性中。因此,框架执行的任何日志记录都具有完整的失败上下文。默认情况下,当网关捕获异常时,MyException 会被解包并抛给调用者。你可以在网关方法声明上配置一个 throws 子句来匹配 cause 链中特定的异常类型。例如,如果你想捕获包含下游错误原因所有消息信息的整个 MessagingException,你的网关方法应该类似于以下内容

public interface MyGateway {

    void performProcess() throws MessagingException;

}

由于我们鼓励 POJO 编程,你可能不想让调用者接触消息处理基础设施。

如果你的网关方法没有 throws 子句,网关会遍历 cause 树,查找不是 MessagingExceptionRuntimeException。如果找不到,框架会抛出 MessagingException。如果在前面讨论中的 MyException 的 cause 是 SomeOtherException,并且你的方法 throws SomeOtherException,网关会进一步解包并将其抛给调用者。

当网关声明时没有指定 service-interface,将使用内部框架接口 RequestReplyExchanger

考虑以下示例

public interface RequestReplyExchanger {

	Message<?> exchange(Message<?> request) throws MessagingException;

}

在 5.0 版本之前,这个 exchange 方法没有 throws 子句,因此异常会被解包。如果你使用此接口并想恢复之前的解包行为,请改用自定义的 service-interface 或自行访问 MessagingExceptioncause

然而,你可能希望记录错误而不是传播它,或者你可能希望将异常视为一个有效的回复(通过将其映射到符合调用者理解的某种“错误消息”契约的消息)。为了实现这一点,网关通过包含对 error-channel 属性的支持,提供了专门用于错误的通道支持。在以下示例中,一个 'transformer' 从 Exception 创建一个回复 Message

<int:gateway id="sampleGateway"
    default-request-channel="gatewayChannel"
    service-interface="foo.bar.SimpleGateway"
    error-channel="exceptionTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
        ref="exceptionTransformer" method="createErrorResponse"/>

exceptionTransformer 可以是一个简单的 POJO,它知道如何创建预期的错误响应对象。这将成为发送回调用者的载荷。如果需要,你可以在这样的“错误流”中执行更多复杂的操作。它可能涉及路由器(包括 Spring Integration 的 ErrorMessageExceptionTypeRouter)、过滤器等。然而,大多数情况下,一个简单的 'transformer' 应该足够了。

或者,你可能只希望记录异常(或将其异步发送到某个地方)。如果你提供一个单向流,则不会有任何内容发送回调用者。如果你想完全抑制异常,可以提供对全局 nullChannel 的引用(本质上是 /dev/null 方法)。最后,如上所述,如果未定义 error-channel,则异常会照常传播。

当你使用 @MessagingGateway 注解时(请参阅@MessagingGateway 注解),你可以使用 errorChannel 属性。

从 5.0 版本开始,当你使用返回类型为 void 的网关方法(单向流)时,提供的 error-channel 引用会被填充到每条发送消息的标准 errorChannel 头中。此功能允许基于标准 ExecutorChannel 配置(或 QueueChannel)的下游异步流覆盖默认的全局 errorChannel 异常发送行为。之前,你必须使用 @GatewayHeader 注解或 <header> 元素手动指定 errorChannel 头。对于具有异步流的 void 方法,error-channel 属性会被忽略。相反,错误消息会被发送到默认的 errorChannel

通过简单的 POJO 网关暴露消息系统带来了好处,但“隐藏”底层消息系统的现实也付出了代价,因此您应该考虑某些事情。我们希望 Java 方法尽快返回,而不是在调用者等待它返回(无论是 void、返回值还是抛出的 Exception)时无限期地挂起。当常规方法用作消息系统前面的代理时,我们必须考虑底层消息传递潜在的异步性质。这意味着网关发起的消息可能会被过滤器丢弃,永远无法到达负责生成回复的组件。某些服务激活器方法可能会导致异常,从而没有提供回复(因为我们不生成空消息)。换句话说,多种情况可能导致回复消息永远不会到达。这在消息系统中是非常自然的。然而,考虑一下这对网关方法的影响。网关方法的输入参数被封装到消息中并发送到下游。回复消息将被转换为网关方法的返回值。因此,您可能希望确保对于每次网关调用,总会有一个回复消息。否则,如果 reply-timeout 设置为负值,您的网关方法可能永远不会返回并无限期挂起。处理这种情况的一种方法是使用异步网关(本节稍后解释)。另一种处理方法是依靠默认的 reply-timeout,例如 30 秒。这样,网关就不会比 reply-timeout 指定的时间挂起更长时间,并且如果超时到期则返回 'null'。最后,您可能想考虑在服务激活器上设置下游标志,例如 'requires-reply',或者在过滤器上设置 'throw-exceptions-on-rejection'。本章的最后一节将更详细地讨论这些选项。
如果下游流返回一个 ErrorMessage,其 payload(一个 Throwable)将被视为常规的下游错误。如果配置了 error-channel,则会发送到错误流。否则,payload 会抛给网关的调用者。类似地,如果在 error-channel 上的错误流返回一个 ErrorMessage,其 payload 会抛给调用者。这也适用于任何带有 Throwable payload 的消息。这在异步情况下很有用,当您需要将 Exception 直接传播给调用者时。为此,您可以返回一个 Exception(作为某个服务的 reply)或抛出它。通常,即使是异步流,框架也会负责将下游流抛出的异常传播回网关。TCP Client-Server Multiplex 示例演示了将异常返回给调用者的两种技术。它通过使用带有 group-timeoutaggregator(参见 Aggregator and Group Timeout)并在丢弃流上返回 MessagingTimeoutException 来模拟套接字 IO 错误到等待线程。

网关超时

网关有两个超时属性:requestTimeoutreplyTimeout。请求超时仅在通道可能阻塞(例如,已满的有界 QueueChannel)时适用。replyTimeout 的值是网关等待回复或返回 null 的时间长度。它默认为无限。

超时可以设置为网关上所有方法的默认值(defaultRequestTimeoutdefaultReplyTimeout),或在 MessagingGateway 接口注解上设置。单个方法可以覆盖这些默认值(在 <method/> 子元素中)或在 @Gateway 注解上设置。

从 5.0 版本开始,超时可以定义为表达式,如下例所示

@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
        requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);

评估上下文具有一个 BeanResolver(使用 @someBean 引用其他 bean),并且 `#root` 对象的 args 数组属性可用。有关此 root 对象的更多信息,请参见 表达式与“全局”头部。使用 XML 配置时,超时属性可以是 long 值或 SpEL 表达式,如下例所示

<method name="someMethod" request-channel="someRequestChannel"
                      payload-expression="args[0]"
                      request-timeout="1000"
                      reply-timeout="args[1]">
</method>

异步网关

作为一种模式,消息网关提供了一种很好的方式来隐藏消息传递特定的代码,同时仍然暴露消息系统的全部功能。如 前所述GatewayProxyFactoryBean 提供了一种便捷的方式来通过服务接口暴露代理,从而为您提供基于 POJO 的消息系统访问(基于您自己领域中的对象、基本类型/字符串或其他对象)。然而,当通过返回值的简单 POJO 方法暴露网关时,这意味着对于每个请求消息(方法调用时生成),都必须有一个回复消息(方法返回时生成)。由于消息系统本质上是异步的,您可能无法总是保证“每个请求总会有一个回复”的契约。Spring Integration 2.0 引入了对异步网关的支持,它提供了一种便捷的方式来启动流程,当您可能不知道是否期望回复或回复需要多久才能到达时。

为了处理这些类型的场景,Spring Integration 使用 java.util.concurrent.Future 实例来支持异步网关。

从 XML 配置来看,没有任何变化,您仍然以与定义常规网关相同的方式定义异步网关,如下例所示

<int:gateway id="mathService"
     service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
     default-request-channel="requestChannel"/>

然而,网关接口(服务接口)略有不同,如下所示

public interface MathServiceGateway {

  Future<Integer> multiplyByTwo(int i);

}

如上例所示,网关方法的返回类型是 Future。当 GatewayProxyFactoryBean 看到网关方法的返回类型是 Future 时,它会立即通过使用 AsyncTaskExecutor 切换到异步模式。区别仅此而已。调用此类方法总是立即返回一个 Future 实例。然后您可以按照自己的节奏与 Future 交互以获取结果、取消等等。此外,与使用任何其他 Future 实例一样,调用 get() 可能会暴露超时、执行异常等。下面的示例展示了如何使用异步网关返回的 Future

MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult =  result.get(1000, TimeUnit.SECONDS);

有关更详细的示例,请参见 Spring Integration 示例中的 async-gateway 示例。

AsyncTaskExecutor

默认情况下,当为返回类型为 Future 的任何网关方法提交内部 AsyncInvocationTask 实例时,GatewayProxyFactoryBean 使用 org.springframework.core.task.SimpleAsyncTaskExecutor。然而,`` 元素的配置中的 async-executor 属性允许您提供对 Spring 应用程序上下文中可用的任何 java.util.concurrent.Executor 实现的引用。

(默认的)SimpleAsyncTaskExecutor 支持 FutureCompletableFuture 返回类型。参见 CompletableFuture。即使有默认执行器,提供一个外部执行器也通常很有用,这样您就可以在日志中识别其线程(使用 XML 时,线程名称基于执行器的 bean 名称),如下例所示

@Bean
public AsyncTaskExecutor exec() {
    SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
    simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
    return simpleAsyncTaskExecutor;
}

@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}

如果您希望返回不同的 Future 实现,您可以提供自定义执行器或完全禁用执行器,并在下游流的回复消息 payload 中返回 Future。要禁用执行器,请在 GatewayProxyFactoryBean 中将其设置为 null(通过使用 setAsyncTaskExecutor(null))。使用 XML 配置网关时,使用 async-executor=""。使用 `@MessagingGateway` 注解配置时,使用类似于以下的示例代码

@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}
如果返回类型是特定的具体 Future 实现或配置的执行器不支持的某个其他子接口,则流程将在调用者的线程上运行,并且流程必须在回复消息 payload 中返回所需的类型。

CompletableFuture

从 4.2 版本开始,网关方法现在可以返回 CompletableFuture<?>。返回此类型时有两种操作模式

  • 当提供了异步执行器且返回类型恰好是 CompletableFuture(不是子类)时,框架会在执行器上运行任务,并立即向调用者返回一个 CompletableFuture。使用 CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor) 创建 future。

  • 当异步执行器明确设置为 null 且返回类型是 CompletableFutureCompletableFuture 的子类时,流程会在调用者的线程上调用。在这种情况下,期望下游流返回适当类型的 CompletableFuture

使用场景

在以下场景中,调用者线程会立即返回一个 CompletableFuture<Invoice>,当下游流回复网关(带有 Invoice 对象)时,该 future 会完成。

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />

在以下场景中,当下游流将 CompletableFuture<Invoice> 作为回复网关的 payload 提供时,调用者线程会返回该 CompletableFuture<Invoice>。当发票准备好时,必须由其他某个进程完成该 future。

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
    async-executor="" />

在以下场景中,当下游流将 CompletableFuture<Invoice> 作为回复网关的 payload 提供时,调用者线程会返回该 CompletableFuture<Invoice>。当发票准备好时,必须由其他某个进程完成该 future。如果启用了 DEBUG 日志记录,则会发出一条日志条目,指示此场景无法使用异步执行器。

MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />

CompletableFuture 实例可用于对回复执行额外的操作,如下例所示

CompletableFuture<String> process(String data);

...

CompletableFuture result = process("foo")
    .thenApply(t -> t.toUpperCase());

...

String out = result.get(10, TimeUnit.SECONDS);

Reactor Mono

从 5.0 版本开始,GatewayProxyFactoryBean 允许在网关接口方法中使用 Project Reactor,使用 Mono<T> 返回类型。内部的 AsyncInvocationTask 被包装在 Mono.fromCallable() 中。

Mono 可用于稍后检索结果(类似于 Future<?>),或者您可以通过分发器在使用者的 Consumer 中消费它,当结果返回到网关时。

框架不会立即刷新 Mono。因此,在网关方法返回之前,底层的消息流不会启动(这与 Future<?> Executor 任务不同)。当 Mono 被订阅时,流程开始。或者,Mono(作为一个“可组合对象”)可能是 Reactor 流的一部分,此时 subscribe() 与整个 Flux 相关联。下面的示例展示了如何使用 Project Reactor 创建网关
@MessagingGateway
public interface TestGateway {

    @Gateway(requestChannel = "multiplyChannel")
    Mono<Integer> multiply(Integer value);

}

@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
    return value * 2;
}

这种网关可以在处理数据 Flux 的某个服务中使用

@Autowired
TestGateway testGateway;

public void hadnleFlux() {
    Flux.just("1", "2", "3", "4", "5")
            .map(Integer::parseInt)
            .flatMap(this.testGateway::multiply)
            .collectList()
            .subscribe(System.out::println);
}

使用 Project Reactor 的另一个示例是一个简单的回调场景,如下例所示

Mono<Invoice> mono = service.process(myOrder);

mono.subscribe(invoice -> handleInvoice(invoice));

调用线程继续执行,当流程完成时会调用 handleInvoice()

另请参见 Kotlin 协程 以获取更多信息。

下游流返回异步类型

如上文 AsyncTaskExecutor 部分所述,如果您希望某个下游组件返回带有异步 payload(FutureMono 等)的消息,您必须明确将异步执行器设置为 null(使用 XML 配置时设置为 "")。然后流程会在调用线程上调用,并且可以稍后检索结果。

异步 void 返回类型

消息网关方法可以这样声明

@MessagingGateway
public interface MyGateway {

    @Gateway(requestChannel = "sendAsyncChannel")
    @Async
    void sendAsync(String payload);

}

但下游异常不会传播回调用者。为了确保下游流程调用和异常传播到调用者的异步行为,从 6.0 版本开始,框架支持 `Future<Void>` 和 `Mono<Void>` 返回类型。用例类似于之前描述的简单 `void` 返回类型的“发送即忘”行为,但不同之处在于流程执行是异步发生的,并且返回的 `Future`(或 `Mono`)会根据 `send` 操作结果完成,结果为 `null` 或异常。

如果 Future<Void> 是下游流程的确切回复,则网关的 asyncExecutor 选项必须设置为 null(对于 @MessagingGateway 配置为 AnnotationConstants.NULL),并且 send 部分在生产者线程上执行。回复取决于下游流程配置。这样,正确产生 Future<Void> 回复的任务就取决于目标应用程序。Mono 的用例已经超出了框架的线程控制范围,因此将 asyncExecutor 设置为 null 没有意义。作为请求-回复网关操作结果的 Mono<Void> 必须配置为网关方法的 Mono<?> 返回类型。

没有回复到达时的网关行为

前所述,网关提供了一种便捷的方式,通过 POJO 方法调用与消息系统交互。然而,通常期望总是返回(即使带有 Exception)的典型方法调用,可能并非总是与消息交换一一对应(例如,回复消息可能不会到达——这相当于方法没有返回)。

本节其余部分涵盖了各种场景以及如何使网关的行为更可预测。可以配置某些属性来使同步网关的行为更可预测,但其中一些可能并不总是按您预期的方式工作。其中之一是 reply-timeout(方法级别)或 default-reply-timeout(网关级别)。我们检查 reply-timeout 属性,以了解它在各种场景中如何影响和不能影响同步网关的行为。我们检查单线程场景(所有下游组件通过直接通道连接)和多线程场景(例如,下游某个地方可能有一个可轮询或执行器通道,打破了单线程边界)。

下游长时间运行的进程

同步网关,单线程

如果下游组件仍在运行(可能是因为无限循环或慢速服务),设置 reply-timeout 无效,网关方法调用直到下游服务退出(通过返回或抛出异常)才会返回。

同步网关,多线程

如果下游组件在多线程消息流中仍在运行(可能是因为无限循环或慢速服务),设置 reply-timeout 会产生效果,允许网关方法调用在达到超时后返回,因为 GatewayProxyFactoryBean 会在回复通道上轮询,等待消息直到超时过期。然而,如果在实际回复生成之前达到了超时,可能导致网关方法返回 'null'。您应该理解,回复消息(如果生成)是在网关方法调用可能已经返回之后发送到回复通道的,因此您必须意识到这一点并在设计流程时考虑在内。

另请参阅 errorOnTimeout 属性,以便在发生超时时抛出 MessageTimeoutException 而不是返回 null

下游组件返回 'null'

同步网关 — 单线程

如果下游组件返回 'null' 且 reply-timeout 配置为负值,网关方法调用将无限期挂起,除非在可能返回 'null' 的下游组件(例如,服务激活器)上设置了 requires-reply 属性。在这种情况下,会抛出异常并传播到网关。

同步网关 — 多线程

行为与前一种情况相同。

下游组件返回签名是 'void',而网关方法签名是非 void

同步网关 — 单线程

如果下游组件返回 'void' 且 reply-timeout 配置为负值,网关方法调用将无限期挂起。

同步网关 — 多线程

行为与前一种情况相同。

下游组件导致运行时异常

同步网关 — 单线程

如果下游组件抛出运行时异常,该异常将通过错误消息传播回网关并重新抛出。

同步网关 — 多线程

行为与前一种情况相同。

您应该理解,默认情况下,reply-timeout 是无界的。因此,如果您将 reply-timeout 设置为负值,您的网关方法调用可能会无限期挂起。因此,为确保您分析您的流程,并且即使存在这些场景之一发生的微小可能性,您也应该将 reply-timeout 属性设置为一个“安全的”值。默认是 30 秒。更好的是,您可以将下游组件的 requires-reply 属性设置为 'true' 以确保及时响应,当该下游组件内部返回 null 时,会立即抛出异常产生及时响应。然而,您还应该意识到,有些场景(参见 第一个)中 reply-timeout 没有帮助。这意味着分析您的消息流并决定何时使用同步网关而不是异步网关也很重要。如 前所述,后一种情况就是定义返回 Future 实例的网关方法。这样您就可以保证收到返回值,并且对调用的结果有更细粒度的控制。此外,在使用路由时,您应该记住,如果路由无法解析特定通道,将 resolution-required 属性设置为 'true' 会导致路由抛出异常。同样,在使用过滤器时,您可以设置 throw-exception-on-rejection 属性。在这两种情况下,生成的流程表现得就像包含了一个带有 'requires-reply' 属性的服务激活器。换句话说,这有助于确保网关方法调用的及时响应。
您应该理解,计时器在线程返回到网关时启动——即当流程完成或消息被传递到另一个线程时。此时,调用线程开始等待回复。如果流程完全同步,则回复会立即可用。对于异步流程,线程最多等待到此时间。

从 6.2 版本开始,内部 MethodInvocationGatewayMessagingGatewaySupport 的扩展)的 errorOnTimeout 属性在 @MessagingGatewayGatewayEndpointSpec 上暴露。此选项的含义与 端点摘要 章末尾解释的任何入站网关完全相同。换句话说,将此选项设置为 true 将导致在接收超时耗尽时,发送-接收网关操作抛出 MessageTimeoutException,而不是返回 null

有关通过 IntegrationFlow 定义网关的选项,请参见 Java DSL 章中的 IntegrationFlow 作为网关