消息网关

网关隐藏了 Spring Integration 提供的消息传递 API。它允许您的应用程序业务逻辑不知道 Spring Integration API。通过使用通用网关,您的代码仅与一个简单的接口交互。

进入 GatewayProxyFactoryBean

如前所述,最好不要依赖 Spring Integration API,包括网关类。为此,Spring Integration 提供了 GatewayProxyFactoryBean,它为任何接口生成代理,并在内部调用下面显示的网关方法。通过使用依赖注入,您可以将接口公开给您的业务方法。

以下示例显示了一个可用于与 Spring Integration 交互的接口

public interface Cafe {

    void placeOrder(Order order);

}

网关 XML 命名空间支持

还提供了命名空间支持。它允许您将接口配置为服务,如下面的示例所示

<int:gateway id="cafeService"
         service-interface="org.cafeteria.Cafe"
         default-request-channel="requestChannel"
         default-reply-timeout="10000"
         default-reply-channel="replyChannel"/>

定义了这种配置后,cafeService 现在可以注入到其他 Bean 中,而调用该代理 Cafe 接口方法的代码并不知道 Spring Integration API。有关使用 gateway 元素(在 Cafe 演示中)的示例,请参见 “示例” 附录。

前面的配置中的默认值将应用于网关接口上的所有方法。如果未指定回复超时,则调用线程将等待 30 秒的回复。请参见 没有响应时的网关行为

可以为各个方法覆盖默认值。请参见 使用注解和 XML 进行网关配置

设置默认回复通道

通常,您不需要指定 default-reply-channel,因为网关会自动创建一个临时匿名回复通道,并在其中监听回复。但是,在某些情况下,您可能需要定义 default-reply-channel(或使用适配器网关(如 HTTP、JMS 等)的 reply-channel)。

为了更好地理解,我们简要讨论一下网关的一些内部工作原理。网关创建一个临时点对点回复通道。它是匿名的,并使用名称 replyChannel 添加到消息头中。当提供显式的 default-reply-channel(使用远程适配器网关的 reply-channel)时,您可以指向发布-订阅通道,之所以这样命名是因为您可以向其中添加多个订阅者。在内部,Spring Integration 在临时 replyChannel 和显式定义的 default-reply-channel 之间创建了一个桥梁。

假设您希望回复不仅发送到网关,还发送到其他消费者。在这种情况下,您需要两件事

  • 一个可以订阅的命名通道

  • 该通道是一个发布-订阅通道

网关使用的默认策略无法满足这些需求,因为添加到头部的回复通道是匿名的,并且是点对点的。这意味着没有其他订阅者可以获取其句柄,即使可以获取,该通道也具有点对点行为,因此只有一个订阅者会收到消息。通过定义 default-reply-channel,您可以指向您选择的通道。在这种情况下,它是一个 publish-subscribe-channel。网关从它创建一个桥梁到存储在头部的临时匿名回复通道。

您可能还想通过拦截器(例如,窃听)显式提供一个回复通道,用于监控或审核。要配置通道拦截器,您需要一个命名通道。

从 5.4 版本开始,当网关方法返回类型为 void 时,如果未显式提供此类头,则框架会将 replyChannel 头填充为 nullChannel bean 引用。这允许来自下游流的任何可能的回复被丢弃,从而满足单向网关契约。

使用注解和 XML 进行网关配置

考虑以下示例,它在之前的Cafe接口示例的基础上添加了@Gateway注解,进行了扩展。

public interface Cafe {

    @Gateway(requestChannel="orders")
    void placeOrder(Order order);

}

@Header注解允许您添加被解释为消息头的值,如下例所示。

public interface FileWriter {

    @Gateway(requestChannel="filesOut")
    void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);

}

如果您更喜欢使用 XML 方法来配置网关方法,您可以将method元素添加到网关配置中,如下例所示。

<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB"/>
  <int:method name="echoViaDefault"/>
</int:gateway>

您也可以使用 XML 为每次方法调用提供单独的标头。如果要设置的标头本质上是静态的,并且您不想使用@Header注解将它们嵌入到网关的方法签名中,这将很有用。例如,在贷款经纪人示例中,我们希望根据启动的请求类型(单一报价或所有报价)来影响贷款报价的聚合方式。通过评估调用了哪个网关方法来确定请求类型,虽然可行,但会违反关注点分离原则(该方法是 Java 工件)。但是,在消息传递架构中,在消息头中表达您的意图(元信息)是自然的。以下示例显示了如何为两种方法中的每一种添加不同的消息头。

<int:gateway id="loanBrokerGateway"
         service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
  <int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="BEST"/>
  </int:method>
  <int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="ALL"/>
  </int:method>
</int:gateway>

在前面的示例中,根据网关的方法,为“RESPONSE_TYPE”标头设置了不同的值。

如果您在<int:method/>中以及在@Gateway注解中指定了requestChannel,则注解值将获胜。
如果在 XML 中指定了无参数网关,并且接口方法同时具有@Payload@Gateway注解(在<int:method/>元素中具有payloadExpressionpayload-expression),则@Payload值将被忽略。

表达式和“全局”标头

<header/>元素支持expression作为value的替代方案。SpEL 表达式将被评估以确定标头的值。从版本 5.2 开始,评估上下文的#root对象是MethodArgsHolder,它具有getMethod()getArgs()访问器。例如,如果您希望根据简单的方法名称进行路由,您可能会添加一个带有以下表达式的标头:method.name

java.reflect.Method不可序列化。如果您稍后序列化消息,则带有method表达式的标头将丢失。因此,在这些情况下,您可能希望使用method.namemethod.toString()toString()方法提供了方法的String表示形式,包括参数和返回类型。

从版本 3.0 开始,可以定义<default-header/>元素,以便将标头添加到网关生成的所有消息中,而不管调用了哪个方法。为方法定义的特定标头优先于默认标头。此处为方法定义的特定标头会覆盖服务接口中的任何@Header注解。但是,默认标头不会覆盖服务接口中的任何@Header注解。

网关现在还支持default-payload-expression,它适用于所有方法(除非被覆盖)。

将方法参数映射到消息

使用上一节中的配置技术可以控制如何将方法参数映射到消息元素(有效负载和标头)。当没有使用显式配置时,会使用某些约定来执行映射。在某些情况下,这些约定无法确定哪个参数是有效负载,哪个参数应该映射到标头。请考虑以下示例

public String send1(Object thing1, Map thing2);

public String send2(Map thing1, Map thing2);

在第一种情况下,约定是将第一个参数映射到有效负载(只要它不是 Map),而第二个参数的内容将成为标头。

在第二种情况下(或者当参数 thing1Map 时),框架无法确定哪个参数应该是有效负载。因此,映射失败。这通常可以通过使用 payload-expression@Payload 注解或 @Headers 注解来解决。

或者(以及在约定失效时),您可以承担将方法调用映射到消息的全部责任。为此,请实现一个 MethodArgsMessageMapper 并使用 mapper 属性将其提供给 <gateway/>。映射器映射一个 MethodArgsHolder,它是一个简单的类,包装了 java.reflect.Method 实例和一个包含参数的 Object[]。当提供自定义映射器时,default-payload-expression 属性和 <default-header/> 元素在网关上是不允许的。类似地,payload-expression 属性和 <header/> 元素在任何 <method/> 元素上是不允许的。

映射方法参数

以下示例展示了如何将方法参数映射到消息,并展示了一些无效配置的示例

public interface MyGateway {

    void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);

    void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);

    void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);

    void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added

    void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);

    @Payload("args[0] + args[1] + '!'")
    void payloadAnnotationAtMethodLevel(String a, String b);

    @Payload("@someBean.exclaim(args[0])")
    void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);

    void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);

    void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); //  (1)

    // invalid
    void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);

    // invalid
    void twoPayloads(@Payload String s1, @Payload String s2);

    // invalid
    void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);

    // invalid
    void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);

}
1 请注意,在本例中,SpEL 变量 #this 指的是参数——在本例中,是 s 的值。

XML 等效项看起来略有不同,因为方法参数没有 `#this` 上下文。但是,表达式可以通过使用 `MethodArgsHolder` 根对象的 `args` 属性来引用方法参数(有关更多信息,请参见 表达式和“全局”标头),如下例所示

<int:gateway id="myGateway" service-interface="org.something.MyGateway">
  <int:method name="send1" payload-expression="args[0] + 'thing2'"/>
  <int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
  <int:method name="send3" payload-expression="method"/>
  <int:method name="send4">
    <int:header name="thing1" expression="args[2].toUpperCase()"/>
  </int:method>
</int:gateway>

@MessagingGateway 注解

从 4.0 版本开始,网关服务接口可以使用 `@MessagingGateway` 注解标记,而无需定义 `<gateway />` xml 元素进行配置。以下示例对使用两种方法配置相同网关进行了比较

<int:gateway id="myGateway" service-interface="org.something.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB">
    <int:header name="thing1" value="thing2"/>
  </int:method>
  <int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
		  defaultHeaders = @GatewayHeader(name = "calledMethod",
		                           expression="#gatewayMethod.name"))
public interface TestGateway {

   @Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
   String echo(String payload);

   @Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
   String echoUpperCase(String payload);

   String echoViaDefault(String payload);

}
与 XML 版本类似,当 Spring Integration 在组件扫描期间发现这些注解时,它会使用其消息传递基础设施创建 `proxy` 实现。要执行此扫描并在应用程序上下文中注册 `BeanDefinition`,请将 `@IntegrationComponentScan` 注解添加到 `@Configuration` 类中。标准 `@ComponentScan` 基础设施不处理接口。因此,我们引入了自定义 `@IntegrationComponentScan` 逻辑来查找接口上的 `@MessagingGateway` 注解,并为它们注册 `GatewayProxyFactoryBean` 实例。另请参见 注解支持.

除了 `@MessagingGateway` 注解之外,您还可以使用 `@Profile` 注解标记服务接口,以避免创建 bean,如果此类配置文件未处于活动状态。

从 6.0 版本开始,带有 `@MessagingGateway` 的接口也可以使用 `@Primary` 注解标记,以实现相应的配置逻辑,就像使用任何 Spring `@Component` 定义一样。

从 6.0 版本开始,`@MessagingGateway` 接口可以在标准 Spring `@Import` 配置中使用。这可以用作 `@IntegrationComponentScan` 或手动 `AnnotationGatewayProxyFactoryBean` bean 定义的替代方法。

从 `6.0` 版本开始,`@MessagingGateway` 使用 `@MessageEndpoint` 进行元注解,并且 `name()` 属性本质上是 `@Compnent.value()` 的别名。这样,网关代理的 bean 名称生成策略就与扫描和导入组件的标准 Spring 注解配置保持一致。默认的 `AnnotationBeanNameGenerator` 可以通过 `AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR` 在全局范围内覆盖,也可以作为 `@IntegrationComponentScan.nameGenerator()` 属性覆盖。

如果您没有 XML 配置,则至少需要在 `@Configuration` 类上使用 `@EnableIntegration` 注解。有关更多信息,请参见 配置和 `@EnableIntegration`.

调用无参数方法

当在没有参数的 Gateway 接口上调用方法时,默认行为是从 PollableChannel 中接收一个 Message

但是,有时您可能希望触发无参数方法,以便您可以与下游不需要用户提供参数的其他组件进行交互,例如触发无参数的 SQL 调用或存储过程。

为了实现发送和接收语义,您必须提供一个有效载荷。为了生成有效载荷,接口上的方法参数不是必需的。您可以使用 @Payload 注解或 XML 中 method 元素上的 payload-expression 属性。以下列表包含有效载荷可能是什么的一些示例

  • 一个文字字符串

  • #gatewayMethod.name

  • new java.util.Date()

  • @someBean.someMethod() 的返回值

以下示例展示了如何使用 @Payload 注解

public interface Cafe {

    @Payload("new java.util.Date()")
    List<Order> retrieveOpenOrders();

}

您也可以使用 @Gateway 注解。

public interface Cafe {

    @Gateway(payloadExpression = "new java.util.Date()")
    List<Order> retrieveOpenOrders();

}
如果两个注解都存在(并且提供了 payloadExpression),@Gateway 优先。

如果一个方法没有参数和返回值,但包含一个有效载荷表达式,则它被视为一个只发送操作。

调用 default 方法

用于网关代理的接口也可以包含 default 方法,从 5.3 版本开始,框架将 DefaultMethodInvokingMethodInterceptor 注入代理,以使用 java.lang.invoke.MethodHandle 方法而不是代理来调用 default 方法。来自 JDK 的接口,例如 java.util.function.Function,仍然可以用于网关代理,但由于针对 JDK 类实例化 MethodHandles.Lookup 的内部 Java 安全原因,它们的 default 方法无法被调用。这些方法也可以通过在方法上显式使用 @Gateway 注解,或在 @MessagingGateway 注解或 <gateway> XML 组件上使用 proxyDefaultMethods 来代理(丢失它们的实现逻辑,同时恢复之前的网关代理行为)。

错误处理

网关调用可能会导致错误。默认情况下,任何在下游发生的错误都会在网关方法调用时“原样”重新抛出。例如,考虑以下简单流程

gateway -> service-activator

如果服务激活器调用的服务抛出MyException(例如),框架会将其包装在MessagingException中,并将传递给服务激活器的消息附加到failedMessage属性中。因此,框架执行的任何日志记录都具有故障的完整上下文。默认情况下,当网关捕获异常时,MyException会被解包并抛给调用者。您可以在网关方法声明上配置一个throws子句来匹配原因链中的特定异常类型。例如,如果您想捕获包含所有下游错误原因消息信息的MessagingException,则应该有一个类似于以下内容的网关方法

public interface MyGateway {

    void performProcess() throws MessagingException;

}

由于我们鼓励 POJO 编程,您可能不希望将调用者暴露给消息传递基础设施。

如果您的网关方法没有throws子句,网关会遍历原因树,查找不是MessagingExceptionRuntimeException。如果找不到,框架会抛出MessagingException。如果前面的讨论中的MyException的原因是SomeOtherException,并且您的方法throws SomeOtherException,网关会进一步解包并将其抛给调用者。

当网关声明没有service-interface时,将使用内部框架接口RequestReplyExchanger

考虑以下示例

public interface RequestReplyExchanger {

	Message<?> exchange(Message<?> request) throws MessagingException;

}

在 5.0 版本之前,此exchange方法没有throws子句,因此异常被解包。如果您使用此接口并希望恢复之前的解包行为,请使用自定义service-interface或自己访问MessagingExceptioncause

但是,您可能希望记录错误而不是传播错误,或者您可能希望将异常视为有效回复(通过将其映射到符合调用者理解的某些“错误消息”契约的消息)。为了实现这一点,网关通过支持error-channel属性,提供了用于错误的专用消息通道的支持。在以下示例中,'transformer' 从Exception创建回复Message

<int:gateway id="sampleGateway"
    default-request-channel="gatewayChannel"
    service-interface="foo.bar.SimpleGateway"
    error-channel="exceptionTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
        ref="exceptionTransformer" method="createErrorResponse"/>

exceptionTransformer可以是一个简单的POJO,它知道如何创建预期的错误响应对象。这成为发送回调用者的有效负载。如果需要,您可以在这样的“错误流”中执行更多复杂的操作。它可能涉及路由器(包括Spring Integration的ErrorMessageExceptionTypeRouter)、过滤器等等。然而,大多数情况下,一个简单的'transformer'就足够了。

或者,您可能只想记录异常(或异步将其发送到某个地方)。如果您提供单向流,则不会将任何内容发送回调用者。如果您想完全抑制异常,您可以提供对全局nullChannel的引用(本质上是/dev/null方法)。最后,如上所述,如果未定义error-channel,则异常将像往常一样传播。

当您使用@MessagingGateway注释(参见@MessagingGateway` 注释)时,您可以使用`errorChannel`属性。

从版本 5.0 开始,当您使用具有void 返回类型的网关方法(单向流)时,error-channel 引用(如果提供)将填充到每个发送消息的标准errorChannel 标头中。此功能允许基于标准ExecutorChannel 配置(或QueueChannel)的下游异步流覆盖默认全局errorChannel 异常发送行为。以前,您必须使用@GatewayHeader 注释或<header> 元素手动指定errorChannel 标头。error-channel 属性对于具有异步流的void 方法被忽略。相反,错误消息将发送到默认的errorChannel

通过简单的 POJI 网关暴露消息系统可以带来好处,但“隐藏”底层消息系统的真实情况也会付出代价,因此您需要考虑一些事项。我们希望我们的 Java 方法尽快返回,而不是在调用者等待它返回时无限期地挂起(无论是 void、返回值还是抛出的异常)。当常规方法用作消息系统的代理时,我们必须考虑底层消息的潜在异步性。这意味着,由网关发起的消息有可能被过滤器丢弃,而永远不会到达负责生成回复的组件。一些服务激活器方法可能会导致异常,从而无法提供回复(因为我们不生成空消息)。换句话说,多种情况会导致回复消息永远不会到来。这在消息系统中是完全正常的。但是,请考虑对网关方法的影响。网关的方法输入参数被合并到一条消息中并发送到下游。回复消息将被转换为网关方法的返回值。因此,您可能希望确保对于每个网关调用,始终存在一条回复消息。否则,如果 `reply-timeout` 设置为负值,您的网关方法可能永远不会返回并无限期地挂起。处理这种情况的一种方法是使用异步网关(在本节后面解释)。另一种处理方法是依赖于默认的 `reply-timeout`,例如 `30` 秒。这样,网关不会挂起超过 `reply-timeout` 指定的时间,并在超时时返回 'null'。最后,您可能希望考虑在服务激活器上设置下游标志,例如 'requires-reply',或在过滤器上设置 'throw-exceptions-on-rejection'。这些选项将在本章的最后一部分详细讨论。
如果下游流程返回一个 `ErrorMessage`,它的 `payload`(一个 `Throwable`)将被视为常规的下游错误。如果配置了 `error-channel`,它将被发送到错误流程。否则,负载将被抛出到网关的调用者。类似地,如果 `error-channel` 上的错误流程返回一个 `ErrorMessage`,它的负载将被抛出到调用者。对于任何带有 `Throwable` 负载的消息,情况也是如此。这在异步情况下很有用,当您需要将 `Exception` 直接传播到调用者时。为此,您可以返回一个 `Exception`(作为某些服务的 `reply`)或抛出它。通常,即使在异步流程中,框架也会负责将下游流程抛出的异常传播回网关。 TCP 客户端-服务器多路复用 示例演示了将异常返回给调用者的两种技术。它使用带有 `group-timeout` 的 `aggregator`(参见 聚合器和组超时)和丢弃流程上的 `MessagingTimeoutException` 回复来模拟对等待线程的套接字 IO 错误。

网关超时

网关有两个超时属性:requestTimeoutreplyTimeout。请求超时仅在通道可以阻塞时适用(例如,已满的 QueueChannel)。replyTimeout 值是网关等待回复或返回 null 的时间。默认值为无穷大。

超时可以设置为网关上所有方法的默认值(defaultRequestTimeoutdefaultReplyTimeout),也可以在 MessagingGateway 接口注解中设置。各个方法可以覆盖这些默认值(在 <method/> 子元素中)或在 @Gateway 注解中设置。

从 5.0 版本开始,超时可以定义为表达式,如下例所示

@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
        requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);

评估上下文包含一个 BeanResolver(使用 @someBean 引用其他 bean),并且 #root 对象的 args 数组属性可用。有关此根对象的更多信息,请参阅 表达式和“全局”标头。在使用 XML 配置时,超时属性可以是长整型值或 SpEL 表达式,如下例所示

<method name="someMethod" request-channel="someRequestChannel"
                      payload-expression="args[0]"
                      request-timeout="1000"
                      reply-timeout="args[1]">
</method>

异步网关

作为一种模式,消息网关提供了一种很好的方法来隐藏消息特定代码,同时仍然暴露消息系统的全部功能。如 前面所述GatewayProxyFactoryBean 提供了一种便捷的方式来公开服务接口上的代理,从而使您能够以基于 POJO 的方式访问消息系统(基于您自己域中的对象、基本类型/字符串或其他对象)。但是,当网关通过返回值的简单 POJO 方法公开时,这意味着对于每个请求消息(在调用方法时生成),必须有一个回复消息(在方法返回时生成)。由于消息系统本质上是异步的,因此您可能无法始终保证“对于每个请求,都将始终有一个回复”的契约。Spring Integration 2.0 引入了对异步网关的支持,这提供了一种便捷的方式来启动流,当您可能不知道是否需要回复或回复需要多长时间时。

为了处理这些类型的场景,Spring Integration 使用 java.util.concurrent.Future 实例来支持异步网关。

从 XML 配置来看,没有任何变化,您仍然以与定义常规网关相同的方式定义异步网关,如下例所示

<int:gateway id="mathService"
     service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
     default-request-channel="requestChannel"/>

但是,网关接口(服务接口)略有不同,如下所示

public interface MathServiceGateway {

  Future<Integer> multiplyByTwo(int i);

}

如前面的示例所示,网关方法的返回类型为Future。当GatewayProxyFactoryBean看到网关方法的返回类型为Future时,它会立即使用AsyncTaskExecutor切换到异步模式。这就是差异的全部。对这种方法的调用总是立即返回一个Future实例。然后,您可以按照自己的节奏与Future交互以获取结果、取消等等。此外,与任何其他使用Future实例一样,调用get()可能会显示超时、执行异常等等。以下示例展示了如何使用从异步网关返回的Future

MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult =  result.get(1000, TimeUnit.SECONDS);

有关更详细的示例,请参阅 Spring Integration 示例中的async-gateway示例。

AsyncTaskExecutor

默认情况下,GatewayProxyFactoryBean在提交任何返回类型为Future的网关方法的内部AsyncInvocationTask实例时,使用org.springframework.core.task.SimpleAsyncTaskExecutor。但是,<gateway/>元素配置中的async-executor属性允许您提供对 Spring 应用程序上下文内可用的任何java.util.concurrent.Executor实现的引用。

(默认)SimpleAsyncTaskExecutor支持FutureCompletableFuture返回类型。请参阅CompletableFuture。即使存在默认执行器,提供外部执行器通常也很有用,这样您就可以在日志中识别其线程(使用 XML 时,线程名称基于执行器的 bean 名称),如下例所示

@Bean
public AsyncTaskExecutor exec() {
    SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
    simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
    return simpleAsyncTaskExecutor;
}

@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}

如果您希望返回不同的Future实现,您可以提供自定义执行器或完全禁用执行器,并在下游流的回复消息有效负载中返回Future。要禁用执行器,请在GatewayProxyFactoryBean中将其设置为null(使用setAsyncTaskExecutor(null))。使用 XML 配置网关时,使用async-executor=""。使用@MessagingGateway注释配置时,使用类似于以下代码的代码

@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}
如果返回类型是特定的具体Future实现或配置的执行器不支持的其他子接口,则流将在调用者的线程上运行,并且流必须在回复消息有效负载中返回所需的类型。

CompletableFuture

从 4.2 版本开始,网关方法现在可以返回 CompletableFuture<?>。返回此类型时,有两种操作模式。

  • 当提供异步执行器且返回类型恰好为 CompletableFuture(而不是子类)时,框架将在执行器上运行任务并立即将 CompletableFuture 返回给调用者。CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor) 用于创建 future。

  • 当异步执行器显式设置为 null 且返回类型为 CompletableFuture 或返回类型为 CompletableFuture 的子类时,将在调用者的线程上调用流。在这种情况下,下游流应返回适当类型的 CompletableFuture

从 Spring Framework 6.0 开始,org.springframework.util.concurrent.ListenableFuture 已被弃用。现在建议迁移到提供类似处理功能的 CompletableFuture

使用场景

在以下场景中,调用者线程立即返回 CompletableFuture<Invoice>,该 future 在下游流回复网关(使用 Invoice 对象)时完成。

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />

在以下场景中,当下游流将 CompletableFuture<Invoice> 作为对网关的回复的有效负载提供时,调用者线程返回 CompletableFuture<Invoice>。当发票准备就绪时,其他一些进程必须完成 future。

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
    async-executor="" />

在以下场景中,当下游流将 CompletableFuture<Invoice> 作为对网关的回复的有效负载提供时,调用者线程返回 CompletableFuture<Invoice>。当发票准备就绪时,其他一些进程必须完成 future。如果启用了 DEBUG 日志记录,则会发出日志条目,指示异步执行器不能用于此场景。

MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />

CompletableFuture 实例可用于对回复执行其他操作,如下面的示例所示

CompletableFuture<String> process(String data);

...

CompletableFuture result = process("foo")
    .thenApply(t -> t.toUpperCase());

...

String out = result.get(10, TimeUnit.SECONDS);

Reactor Mono

从 5.0 版本开始,GatewayProxyFactoryBean 允许使用 Project Reactor 与网关接口方法,使用 Mono<T> 返回类型。内部 AsyncInvocationTask 被包装在 Mono.fromCallable() 中。

Mono 可用于稍后检索结果(类似于 Future<?>),或者您可以通过在结果返回网关时调用您的 Consumer 来使用调度程序从其消费。

Mono 不会被框架立即刷新。因此,在网关方法返回之前(与 Future<?> Executor 任务一样),底层消息流不会启动。当 Mono 被订阅时,流将启动。或者,Mono(作为“可组合的”)可能是 Reactor 流的一部分,当 subscribe() 与整个 Flux 相关时。以下示例展示了如何使用 Project Reactor 创建网关
@MessagingGateway
public interface TestGateway {

    @Gateway(requestChannel = "multiplyChannel")
    Mono<Integer> multiply(Integer value);

}

@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
    return value * 2;
}

其中,此类网关可用于处理 Flux 数据的一些服务

@Autowired
TestGateway testGateway;

public void hadnleFlux() {
    Flux.just("1", "2", "3", "4", "5")
            .map(Integer::parseInt)
            .flatMap(this.testGateway::multiply)
            .collectList()
            .subscribe(System.out::println);
}

另一个使用 Project Reactor 的示例是简单的回调场景,如下面的示例所示

Mono<Invoice> mono = service.process(myOrder);

mono.subscribe(invoice -> handleInvoice(invoice));

调用线程继续执行,当流程完成时会调用handleInvoice()

另请参阅Kotlin 协程以获取更多信息。

下游流返回异步类型

如上文AsyncTaskExecutor部分所述,如果您希望某个下游组件返回带有异步有效负载的消息(FutureMono 等),则必须将异步执行器显式设置为null(或在使用 XML 配置时设置为"")。然后在调用线程上调用流,结果可以在稍后检索。

异步void 返回类型

消息网关方法可以这样声明

@MessagingGateway
public interface MyGateway {

    @Gateway(requestChannel = "sendAsyncChannel")
    @Async
    void sendAsync(String payload);

}

但下游异常不会传播回调用方。为了确保下游流调用的异步行为以及异常传播到调用方,从 6.0 版本开始,框架提供了对Future<Void>Mono<Void> 返回类型的支持。用例类似于之前针对普通void 返回类型描述的发送并忘记行为,但不同之处在于流执行是异步的,返回的Future(或Mono)根据send 操作结果以null 或异常方式完成。

如果Future<Void> 是确切的下游流回复,则网关的asyncExecutor 选项必须设置为 null(@MessagingGateway 配置的AnnotationConstants.NULL),并且send 部分在生产者线程上执行。回复取决于下游流配置。这样,目标应用程序就可以正确地生成Future<Void> 回复。Mono 用例已经超出了框架线程控制,因此将asyncExecutor 设置为 null 毫无意义。作为请求回复网关操作结果的Mono<Void> 必须配置为网关方法的Mono<?> 返回类型。

没有响应时网关的行为

正如之前解释的那样,网关提供了一种通过 POJO 方法调用与消息系统交互的便捷方式。但是,典型的方法调用通常预期始终返回(即使出现异常),可能并不总是与消息交换一一对应(例如,回复消息可能不会到达——相当于方法没有返回)。

本节的其余部分涵盖了各种场景以及如何使网关的行为更可预测。可以配置某些属性以使同步网关行为更可预测,但其中一些可能并不总是按预期工作。其中之一是 `reply-timeout`(在方法级别或 `default-reply-timeout` 在网关级别)。我们检查 `reply-timeout` 属性以了解它如何在各种场景中影响同步网关的行为。我们检查单线程场景(所有下游组件通过直接通道连接)和多线程场景(例如,在下游某个地方,您可能有一个可轮询或执行器通道,它打破了单线程边界)。

下游长时间运行的进程

同步网关,单线程

如果下游组件仍在运行(可能是由于无限循环或服务缓慢),设置 `reply-timeout` 不会有任何效果,网关方法调用不会返回,直到下游服务退出(通过返回或抛出异常)。

同步网关,多线程

如果下游组件仍在多线程消息流中运行(可能是由于无限循环或服务缓慢),设置 `reply-timeout` 会产生效果,允许网关方法调用在超时后返回,因为 `GatewayProxyFactoryBean` 会轮询回复通道,等待消息直到超时。但是,如果在实际回复产生之前已达到超时,则可能导致网关方法返回 'null'。您应该了解,回复消息(如果产生)是在网关方法调用可能已返回后发送到回复通道的,因此您必须意识到这一点并以此为基础设计您的流程。

另请参阅 `errorOnTimeout` 属性,以便在超时发生时抛出 `MessageTimeoutException` 而不是返回 `null`。

下游组件返回 'null'

同步网关 - 单线程

如果下游组件返回 'null' 并且 `reply-timeout` 已配置为负值,则网关方法调用会无限期挂起,除非 `requires-reply` 属性已设置在下游组件(例如,服务激活器)上,该组件可能返回 'null'。在这种情况下,将抛出异常并传播到网关。

同步网关 - 多线程

行为与上一个情况相同。

下游组件返回签名为 'void',而网关方法签名为非 'void'

同步网关 - 单线程

如果下游组件返回 'void' 并且 reply-timeout 已配置为负值,则网关方法调用将无限期挂起。

同步网关 - 多线程

行为与上一个情况相同。

下游组件导致运行时异常

同步网关 - 单线程

如果下游组件抛出运行时异常,则异常将通过错误消息传播回网关并重新抛出。

同步网关 - 多线程

行为与上一个情况相同。

您应该了解,默认情况下,reply-timeout 是无界的。因此,如果您将 reply-timeout 设置为负值,您的网关方法调用可能会无限期挂起。因此,为了确保您分析您的流程,并且即使存在这些情况之一发生的可能性,您也应该将 reply-timeout 属性设置为“安全”值。默认值为 30 秒。更好的是,您可以将下游组件的 requires-reply 属性设置为 'true',以确保及时响应,因为一旦该下游组件在内部返回 null,就会抛出异常。但是,您也应该意识到,有一些情况(请参阅 第一个reply-timeout 无法提供帮助。这意味着分析消息流并决定何时使用同步网关而不是异步网关也很重要。如 前面所述,后一种情况是定义返回 Future 实例的网关方法的问题。然后,您保证会收到该返回值,并且您可以更细粒度地控制调用的结果。此外,在处理路由器时,您应该记住,将 resolution-required 属性设置为 'true' 会导致路由器在无法解析特定通道时抛出异常。同样,在处理过滤器时,您可以设置 throw-exception-on-rejection 属性。在这两种情况下,结果流的行为就像它包含一个具有 'requires-reply' 属性的服务激活器一样。换句话说,它有助于确保网关方法调用的及时响应。
您应该了解,计时器在线程返回网关时启动——也就是说,在流程完成或消息传递给另一个线程时。此时,调用线程开始等待回复。如果流程完全同步,则回复会立即可用。对于异步流程,线程最多等待此时间。

从 6.2 版本开始,MessagingGatewaySupport 的内部 MethodInvocationGateway 扩展的 errorOnTimeout 属性在 @MessagingGatewayGatewayEndpointSpec 上公开。此选项与 端点摘要 章节末尾解释的任何入站网关的含义完全相同。换句话说,将此选项设置为 true 将导致 MessageTimeoutException 从发送和接收网关操作中抛出,而不是在接收超时耗尽时返回 null

有关通过 IntegrationFlow 定义网关的选项,请参阅 Java DSL 章节中的 IntegrationFlow 作为网关