消息网关
网关隐藏了 Spring Integration 提供的消息传递 API。它允许应用程序的业务逻辑不了解 Spring Integration API。通过使用通用网关,您的代码仅与一个简单的接口进行交互。
进入 GatewayProxyFactoryBean
如前所述,最好不依赖于 Spring Integration API(包括网关类)。为此,Spring Integration 提供了 GatewayProxyFactoryBean
,它为任何接口生成代理,并在内部调用下面显示的网关方法。通过使用依赖注入,您可以将接口公开给您的业务方法。
以下示例显示了一个可用于与 Spring Integration 交互的接口
public interface Cafe {
void placeOrder(Order order);
}
网关 XML 命名空间支持
还提供了命名空间支持。它允许您将接口配置为服务,如下例所示
<int:gateway id="cafeService"
service-interface="org.cafeteria.Cafe"
default-request-channel="requestChannel"
default-reply-timeout="10000"
default-reply-channel="replyChannel"/>
定义此配置后,cafeService
现在可以注入到其他 Bean 中,并且在该代理的 Cafe
接口实例上调用方法的代码不了解 Spring Integration API。有关使用 gateway
元素(在 Cafe 演示中)的示例,请参阅“示例”附录。
上述配置中的默认值应用于网关接口上的所有方法。如果未指定回复超时,则调用线程将等待 30 秒的回复。请参阅无响应时网关的行为。
可以为各个方法覆盖默认值。请参阅使用注释和 XML 配置网关。
设置默认回复通道
通常,您无需指定 default-reply-channel
,因为网关会自动创建一个临时的匿名回复通道,并在其中侦听回复。但是,某些情况下可能会提示您定义 default-reply-channel
(或使用适配器网关(如 HTTP、JMS 等)的 reply-channel
)。
作为背景信息,我们将简要讨论网关的一些内部工作原理。网关创建一个临时点对点回复通道。它是匿名的,并使用名称 replyChannel
添加到消息头中。当提供显式的 default-reply-channel
(远程适配器网关的 reply-channel
)时,您可以指向发布-订阅通道,之所以这样命名是因为您可以向其中添加多个订阅者。在内部,Spring Integration 在临时 replyChannel
和显式定义的 default-reply-channel
之间创建桥梁。
假设您希望回复不仅发送到网关,还发送到其他一些使用者。在这种情况下,您需要两件事
-
您可以订阅的命名通道
-
该通道为发布-订阅通道
网关使用的默认策略无法满足这些需求,因为添加到头中的回复通道是匿名的且是点对点的。这意味着没有其他订阅者可以获取其句柄,即使可以,通道也具有点对点行为,因此只有一个订阅者会收到消息。通过定义 default-reply-channel
,您可以指向您选择的通道。在这种情况下,它是 publish-subscribe-channel
。网关会从中创建到存储在标头中的临时匿名回复通道的桥梁。
您可能还希望通过拦截器(例如,窃听)显式提供用于监控或审计的回复通道。要配置通道拦截器,您需要一个命名通道。
从 5.4 版开始,当网关方法返回类型为 void 时,如果未显式提供此标头,则框架会将 replyChannel 标头填充为 nullChannel Bean 引用。这允许丢弃下游流的任何可能的回复,从而满足单向网关契约。 |
使用注释和 XML 配置网关
考虑以下示例,它通过添加 @Gateway
注释扩展了之前的 Cafe
接口示例
public interface Cafe {
@Gateway(requestChannel="orders")
void placeOrder(Order order);
}
@Header
注释允许您添加解释为消息头的值,如下例所示
public interface FileWriter {
@Gateway(requestChannel="filesOut")
void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);
}
如果您更喜欢使用 XML 方法配置网关方法,则可以向网关配置添加 method
元素,如下例所示
<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB"/>
<int:method name="echoViaDefault"/>
</int:gateway>
您还可以使用 XML 为每次方法调用提供单独的标头。如果要设置的标头本质上是静态的,并且您不想使用 @Header
注释将它们嵌入到网关的方法签名中,这将非常有用。例如,在贷款经纪人示例中,我们希望根据启动的请求类型(单一报价或所有报价)影响贷款报价的聚合方式。通过评估调用了哪个网关方法来确定请求的类型,虽然可行,但这会违反关注点分离范式(该方法是 Java 工件)。但是,在消息传递体系结构中,在消息头中表达您的意图(元信息)是很自然的。以下示例显示了如何为两种方法中的每种方法添加不同的消息头
<int:gateway id="loanBrokerGateway"
service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
<int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="BEST"/>
</int:method>
<int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="ALL"/>
</int:method>
</int:gateway>
在上述示例中,根据网关的方法为“RESPONSE_TYPE”标头设置了不同的值。
例如,如果您在 <int:method/> 中以及在 @Gateway 注释中指定 requestChannel ,则注释值优先。 |
如果在 XML 中指定了无参数网关,并且接口方法同时具有 @Payload 和 @Gateway 注释(在 <int:method/> 元素中具有 payloadExpression 或 payload-expression ),则会忽略 @Payload 值。 |
表达式和“全局”标头
<header/>
元素支持 expression
作为 value
的替代方案。计算 SpEL 表达式以确定标头的值。从 5.2 版开始,评估上下文的 #root
对象是 MethodArgsHolder
,具有 getMethod()
和 getArgs()
访问器。例如,如果希望根据简单的方法名称进行路由,则可以添加具有以下表达式的标头:method.name
。
java.reflect.Method 不可序列化。如果您稍后序列化消息,则具有 method 表达式的标头将丢失。因此,在这些情况下,您可能希望使用 method.name 或 method.toString() 。toString() 方法提供方法的字符串表示形式,包括参数和返回类型。 |
从 3.0 版开始,可以定义 <default-header/>
元素以将标头添加到网关生成的所有消息中,而不管调用的方法是什么。为方法定义的特定标头优先于默认标头。此处为方法定义的特定标头会覆盖服务接口中的任何 @Header
注释。但是,默认标头不会覆盖服务接口中的任何 @Header
注释。
网关现在还支持 default-payload-expression
,它适用于所有方法(除非被覆盖)。
将方法参数映射到消息
使用上一节中的配置技术可以控制如何将方法参数映射到消息元素(有效负载和标头)。当未使用显式配置时,会使用某些约定来执行映射。在某些情况下,这些约定无法确定哪个参数是有效负载,哪个参数应映射到标头。考虑以下示例
public String send1(Object thing1, Map thing2);
public String send2(Map thing1, Map thing2);
在第一种情况下,约定是将第一个参数映射到有效负载(只要它不是 Map
),第二个参数的内容成为标头。
在第二种情况下(或当参数 thing1
是 Map
时),框架无法确定哪个参数应该是有效负载。因此,映射失败。这通常可以使用 payload-expression
、@Payload
注释或 @Headers
注释来解决。
或者(以及在约定失效时),您可以完全负责将方法调用映射到消息。为此,实现 MethodArgsMessageMapper
并使用 mapper
属性将其提供给 <gateway/>
。映射器映射 MethodArgsHolder
,这是一个简单的类,它包装 java.reflect.Method
实例和包含参数的 Object[]
。当提供自定义映射器时,不允许在网关上使用 default-payload-expression
属性和 <default-header/>
元素。类似地,不允许在任何 <method/>
元素上使用 payload-expression
属性和 <header/>
元素。
映射方法参数
以下示例显示了如何将方法参数映射到消息,并显示了一些无效配置的示例
public interface MyGateway {
void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);
void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);
void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);
void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added
void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);
@Payload("args[0] + args[1] + '!'")
void payloadAnnotationAtMethodLevel(String a, String b);
@Payload("@someBean.exclaim(args[0])")
void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);
void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);
void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); // (1)
// invalid
void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);
// invalid
void twoPayloads(@Payload String s1, @Payload String s2);
// invalid
void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);
// invalid
void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);
}
1 | 请注意,在此示例中,SpEL 变量 #this 指的是参数(在本例中为 s 的值)。 |
XML 等效项看起来略有不同,因为方法参数没有 #this
上下文。但是,表达式可以通过使用 MethodArgsHolder
根对象的 args
属性来引用方法参数(有关更多信息,请参阅表达式和“全局”标头),如下例所示
<int:gateway id="myGateway" service-interface="org.something.MyGateway">
<int:method name="send1" payload-expression="args[0] + 'thing2'"/>
<int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
<int:method name="send3" payload-expression="method"/>
<int:method name="send4">
<int:header name="thing1" expression="args[2].toUpperCase()"/>
</int:method>
</int:gateway>
@MessagingGateway
注释
从 4.0 版开始,可以使用 @MessagingGateway
注释标记网关服务接口,而无需为配置定义 <gateway />
xml 元素。以下两个示例比较了配置相同网关的两种方法
<int:gateway id="myGateway" service-interface="org.something.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB">
<int:header name="thing1" value="thing2"/>
</int:method>
<int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
defaultHeaders = @GatewayHeader(name = "calledMethod",
expression="#gatewayMethod.name"))
public interface TestGateway {
@Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
String echo(String payload);
@Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
String echoUpperCase(String payload);
String echoViaDefault(String payload);
}
与 XML 版本类似,当 Spring Integration 在组件扫描期间发现这些注解时,它会使用其消息基础设施创建 proxy 实现。为了执行此扫描并在应用程序上下文中注册 BeanDefinition ,请在 @Configuration 类中添加 @IntegrationComponentScan 注解。标准的 @ComponentScan 基础设施不处理接口。因此,我们引入了自定义的 @IntegrationComponentScan 逻辑来查找接口上的 @MessagingGateway 注解,并为其注册 GatewayProxyFactoryBean 实例。另请参见 注解支持。 |
除了 @MessagingGateway
注解之外,您还可以使用 @Profile
注解标记服务接口,以避免在该配置文件未激活时创建 bean。
从 6.0 版本开始,带有 @MessagingGateway
的接口也可以使用 @Primary
注解进行相应的配置逻辑,就像使用任何 Spring @Component
定义一样。
从 6.0 版本开始,@MessagingGateway
接口可以在标准 Spring @Import
配置中使用。这可以用作 @IntegrationComponentScan
或手动 AnnotationGatewayProxyFactoryBean
bean 定义的替代方案。
从 6.0
版本开始,@MessagingGateway
使用 @MessageEndpoint
进行元注解,并且 name()
属性实际上是 @Compnent.value()
的别名。这样,网关代理的 bean 名称生成策略就与扫描和导入组件的标准 Spring 注解配置保持一致。可以通过 AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR
全局覆盖默认的 AnnotationBeanNameGenerator
,或者作为 @IntegrationComponentScan.nameGenerator()
属性。
如果您没有 XML 配置,则至少需要在一个 @Configuration 类上使用 @EnableIntegration 注解。有关更多信息,请参见 配置和 @EnableIntegration 。 |
调用无参数方法
当调用网关接口上没有任何参数的方法时,默认行为是从 PollableChannel
接收 Message
。
但是,有时您可能希望触发无参数方法,以便与下游的其他组件交互,这些组件不需要用户提供的参数,例如触发无参数的 SQL 调用或存储过程。
为了实现发送和接收语义,您必须提供有效负载。为了生成有效负载,接口上的方法参数不是必需的。您可以使用 @Payload
注解或 XML 中 method
元素上的 payload-expression
属性。以下列表包含一些有效负载可能是什么的示例
-
一个字面字符串
-
#gatewayMethod.name
-
new java.util.Date()
-
@someBean.someMethod() 的返回值
以下示例显示了如何使用 @Payload
注解
public interface Cafe {
@Payload("new java.util.Date()")
List<Order> retrieveOpenOrders();
}
您也可以使用 @Gateway
注解。
public interface Cafe {
@Gateway(payloadExpression = "new java.util.Date()")
List<Order> retrieveOpenOrders();
}
如果两个注解都存在(并且提供了 payloadExpression ),则 @Gateway 优先。 |
另请参见 使用注解和 XML 进行网关配置。
如果方法没有参数并且没有返回值,但包含有效负载表达式,则将其视为仅发送操作。
调用 default
方法
网关代理的接口也可以有 default
方法,从 5.3 版本开始,框架将 DefaultMethodInvokingMethodInterceptor
注入代理,以使用 java.lang.invoke.MethodHandle
方法而不是代理来调用 default
方法。来自 JDK 的接口(例如 java.util.function.Function
)仍然可以用于网关代理,但由于针对 JDK 类实例化 MethodHandles.Lookup
的内部 Java 安全原因,无法调用它们的 default
方法。这些方法也可以使用方法上的显式 @Gateway
注解或 @MessagingGateway
注解或 <gateway>
XML 组件上的 proxyDefaultMethods
进行代理(丢失其实现逻辑,同时恢复以前的网关代理行为)。
错误处理
网关调用可能导致错误。默认情况下,在下游发生的任何错误都会在网关的方法调用时“按原样”重新抛出。例如,考虑以下简单流程
gateway -> service-activator
如果服务激活器调用的服务抛出 MyException
(例如),框架会将其包装在 MessagingException
中,并在 failedMessage
属性中附加传递给服务激活器消息。因此,框架执行的任何日志记录都具有完整的故障上下文。默认情况下,当网关捕获异常时,MyException
会被解包并抛给调用方。您可以在网关方法声明上配置 throws
子句以匹配原因链中的特定异常类型。例如,如果您想捕获包含下游错误原因的所有消息传递信息的整个 MessagingException
,则应使用类似于以下内容的网关方法
public interface MyGateway {
void performProcess() throws MessagingException;
}
由于我们鼓励 POJO 编程,因此您可能不想将调用方暴露给消息传递基础设施。
如果您的网关方法没有 throws
子句,则网关会遍历原因树,查找不是 MessagingException
的 RuntimeException
。如果未找到,则框架会抛出 MessagingException
。如果前面讨论中的 MyException
的原因是 SomeOtherException
并且您的方法 throws SomeOtherException
,则网关会进一步解包并将其抛给调用方。
当网关声明没有 service-interface
时,将使用内部框架接口 RequestReplyExchanger
。
考虑以下示例
public interface RequestReplyExchanger {
Message<?> exchange(Message<?> request) throws MessagingException;
}
在 5.0 版本之前,此 exchange
方法没有 throws
子句,因此异常被解包。如果您使用此接口并希望恢复以前的解包行为,请改用自定义 service-interface
或自己访问 MessagingException
的 cause
。
但是,您可能希望记录错误而不是传播它,或者您可能希望将异常视为有效回复(通过将其映射到符合调用方理解的某些“错误消息”契约的消息)。为了实现这一点,网关通过支持 error-channel
属性来提供对专用于错误的消息通道的支持。在以下示例中,“转换器”从 Exception
创建回复 Message
<int:gateway id="sampleGateway"
default-request-channel="gatewayChannel"
service-interface="foo.bar.SimpleGateway"
error-channel="exceptionTransformationChannel"/>
<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
exceptionTransformer
可以是一个简单的 POJO,它知道如何创建预期的错误响应对象。这将成为发送回调用方的有效负载。如果需要,您可以在此类“错误流”中执行更多更复杂的操作。它可能涉及路由器(包括 Spring Integration 的 ErrorMessageExceptionTypeRouter
)、过滤器等。但是,大多数情况下,一个简单的“转换器”就足够了。
或者,您可能只想记录异常(或异步将其发送到某个地方)。如果您提供单向流,则不会将任何内容发送回调用方。如果您想完全抑制异常,则可以提供对全局 nullChannel
的引用(本质上是一种 /dev/null
方法)。最后,如上所述,如果未定义 error-channel
,则异常将照常传播。
当您使用 @MessagingGateway
注解(请参见 @MessagingGateway` 注解
) 时,您可以使用 `errorChannel
属性。
从 5.0 版本开始,当您使用具有 void
返回类型(单向流)的网关方法时,标准 errorChannel
标头中的每个发送消息都会填充 error-channel
引用(如果提供)。此功能允许基于标准 ExecutorChannel
配置(或 QueueChannel
)的下游异步流覆盖默认的全局 errorChannel
异常发送行为。以前,您必须使用 @GatewayHeader
注解或 <header>
元素手动指定 errorChannel
标头。对于具有异步流的 void
方法,error-channel
属性被忽略。相反,错误消息将发送到默认的 errorChannel
。
通过简单的 POJI 网关公开消息传递系统可以带来好处,但是“隐藏”底层消息传递系统的现实确实需要付出代价,因此您应该考虑某些事情。我们希望我们的 Java 方法尽快返回,而不是在调用方等待它返回(无论是 void、返回值还是抛出的异常)时无限期地挂起。当常规方法用作消息传递系统前面的代理时,我们必须考虑底层消息传递的潜在异步特性。这意味着由网关启动的消息可能会被过滤器丢弃,并且永远不会到达负责生成回复的组件。某些服务激活器方法可能会导致异常,因此不提供回复(因为我们不会生成空消息)。换句话说,多种情况会导致回复消息永远不会到来。这在消息传递系统中是完全正常的。但是,考虑一下对网关方法的影响。网关的方法输入参数已合并到消息中并发送到下游。回复消息将转换为网关方法的返回值。因此,您可能希望确保对于每个网关调用,始终存在回复消息。否则,如果 reply-timeout 设置为负值,则您的网关方法可能永远不会返回并无限期挂起。处理这种情况的一种方法是使用异步网关(在本节后面解释)。另一种处理方法是依赖于默认的 reply-timeout 为 30 秒。这样,网关不会挂起超过 reply-timeout 指定的时间,如果该超时时间过去则返回“null”。最后,您可能希望考虑设置下游标志,例如服务激活器上的“requires-reply”或过滤器上的“throw-exceptions-on-rejection”。这些选项将在本章的最后一部分更详细地讨论。
|
如果下游流返回一个ErrorMessage ,则其payload (一个Throwable )将被视为常规的下游错误。如果配置了error-channel ,则将其发送到错误流。否则,将有效负载抛给网关的调用方。类似地,如果error-channel 上的错误流返回一个ErrorMessage ,则其有效负载将被抛给调用方。对于任何具有Throwable 有效负载的消息,都适用此规则。这在异步情况下非常有用,当您需要将Exception 直接传播到调用方时。为此,您可以返回一个Exception (作为某个服务的reply )或抛出它。通常,即使在异步流中,框架也会负责将下游流抛出的异常传播回网关。该TCP客户端-服务器多路复用示例演示了将异常返回给调用方的两种技术。它通过使用带group-timeout 的aggregator (请参阅聚合器和组超时)和丢弃流上的MessagingTimeoutException 回复来模拟套接字IO错误到等待线程。 |
网关超时
网关有两个超时属性:requestTimeout
和replyTimeout
。请求超时仅在通道可以阻塞时适用(例如,已满的有界QueueChannel
)。replyTimeout
值是网关等待回复或返回null
的时间长度。默认为无限大。
超时可以设置为网关上所有方法的默认值(defaultRequestTimeout
和defaultReplyTimeout
)或在MessagingGateway
接口注释中设置。各个方法可以覆盖这些默认值(在<method/>
子元素中)或在@Gateway
注释中覆盖。
从版本5.0开始,超时可以定义为表达式,如下例所示
@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);
评估上下文具有BeanResolver
(使用@someBean
引用其他bean),并且#root
对象的args
数组属性可用。有关此根对象的更多信息,请参阅表达式和“全局”标头。在使用XML配置时,超时属性可以是长整型值或SpEL表达式,如下例所示
<method name="someMethod" request-channel="someRequestChannel"
payload-expression="args[0]"
request-timeout="1000"
reply-timeout="args[1]">
</method>
异步网关
作为一种模式,消息网关提供了一种隐藏特定于消息传递的代码的好方法,同时仍然公开消息传递系统的全部功能。如前面所述,GatewayProxyFactoryBean
提供了一种便捷的方式,可以通过服务接口公开代理,从而让您基于POJO访问消息传递系统(基于您自己域中的对象、基本类型/字符串或其他对象)。但是,当网关通过返回值的简单POJO方法公开时,这意味着对于每个请求消息(在调用方法时生成),都必须有一个回复消息(在方法返回时生成)。由于消息传递系统本质上是异步的,因此您可能无法始终保证“每个请求都将始终有回复”的契约。Spring Integration 2.0引入了对异步网关的支持,它提供了一种方便的方式来启动流,而您可能不知道是否需要回复或回复需要多长时间才能到达。
为了处理这些类型的场景,Spring Integration使用java.util.concurrent.Future
实例来支持异步网关。
从XML配置来看,没有任何变化,您仍然可以像定义常规网关一样定义异步网关,如下例所示
<int:gateway id="mathService"
service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
default-request-channel="requestChannel"/>
但是,网关接口(服务接口)略有不同,如下所示
public interface MathServiceGateway {
Future<Integer> multiplyByTwo(int i);
}
如上例所示,网关方法的返回类型是Future
。当GatewayProxyFactoryBean
看到网关方法的返回类型是Future
时,它会立即通过使用AsyncTaskExecutor
切换到异步模式。这就是差异的全部内容。对这种方法的调用始终会立即返回一个Future
实例。然后,您可以按照自己的节奏与Future
交互以获取结果、取消等等。此外,与任何其他使用Future
实例一样,调用get()
可能会显示超时、执行异常等等。以下示例显示了如何使用从异步网关返回的Future
MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult = result.get(1000, TimeUnit.SECONDS);
有关更详细的示例,请参阅Spring Integration示例中的async-gateway示例。
AsyncTaskExecutor
默认情况下,GatewayProxyFactoryBean
在提交任何返回类型为Future
的网关方法的内部AsyncInvocationTask
实例时,使用org.springframework.core.task.SimpleAsyncTaskExecutor
。但是,<gateway/>
元素配置中的async-executor
属性允许您提供对Spring应用程序上下文中可用的任何java.util.concurrent.Executor
实现的引用。
(默认)SimpleAsyncTaskExecutor
支持Future
和CompletableFuture
返回类型。请参阅CompletableFuture
。即使存在默认执行程序,提供外部执行程序通常也很有用,这样您就可以在日志中识别其线程(使用XML时,线程名称基于执行程序的bean名称),如下例所示
@Bean
public AsyncTaskExecutor exec() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
return simpleAsyncTaskExecutor;
}
@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果您希望返回不同的Future
实现,您可以提供自定义执行程序或完全禁用执行程序,并在下游流的回复消息有效负载中返回Future
。要禁用执行程序,请在GatewayProxyFactoryBean
中将其设置为null
(通过使用setAsyncTaskExecutor(null)
)。在使用XML配置网关时,使用async-executor=""
。在使用@MessagingGateway
注释进行配置时,使用类似以下的代码
@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果返回类型是特定的具体Future 实现或配置的执行程序不支持的其他一些子接口,则流将在调用方的线程上运行,并且流必须在回复消息有效负载中返回所需类型。 |
CompletableFuture
从版本4.2开始,网关方法现在可以返回CompletableFuture<?>
。返回此类型时有两种操作模式
-
当提供异步执行程序且返回类型正好是
CompletableFuture
(而不是子类)时,框架将在执行程序上运行任务并立即向调用方返回一个CompletableFuture
。CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
用于创建期货。 -
当异步执行程序显式设置为
null
且返回类型为CompletableFuture
或返回类型为CompletableFuture
的子类时,流将在调用方的线程上调用。在这种情况下,预期下游流将返回适当类型的CompletableFuture
。
org.springframework.util.concurrent.ListenableFuture 已从Spring Framework 6.0 开始弃用。现在建议迁移到CompletableFuture ,它提供了类似的处理功能。 |
使用场景
在以下场景中,调用方线程立即返回一个CompletableFuture<Invoice>
,当下游流回复网关(使用Invoice
对象)时,该期货将完成。
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />
在以下场景中,当下游流将其作为对网关的回复的有效负载提供时,调用方线程将返回一个CompletableFuture<Invoice>
。当发票准备就绪时,某些其他流程必须完成期货。
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
async-executor="" />
在以下场景中,当下游流将其作为对网关的回复的有效负载提供时,调用方线程将返回一个CompletableFuture<Invoice>
。当发票准备就绪时,某些其他流程必须完成期货。如果启用了DEBUG
日志记录,则会发出日志条目,指示在这种情况下无法使用异步执行程序。
MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />
CompletableFuture
实例可用于对回复执行其他操作,如下例所示
CompletableFuture<String> process(String data);
...
CompletableFuture result = process("foo")
.thenApply(t -> t.toUpperCase());
...
String out = result.get(10, TimeUnit.SECONDS);
Reactor Mono
从版本5.0开始,GatewayProxyFactoryBean
允许使用Project Reactor和网关接口方法,使用Mono<T>
返回类型。内部AsyncInvocationTask
包装在Mono.fromCallable()
中。
可以使用Mono
稍后检索结果(类似于Future<?>
),或者您可以通过在将结果返回到网关时调用您的Consumer
来使用调度程序从中使用。
Mono 不会被框架立即刷新。因此,在网关方法返回之前不会启动底层消息流(与Future<?> Executor 任务一样)。当订阅Mono 时,流将开始。或者,Mono (作为“可组合”)可能是Reactor流的一部分,当subscribe() 与整个Flux 相关时。以下示例显示了如何使用Project Reactor创建网关 |
@MessagingGateway
public interface TestGateway {
@Gateway(requestChannel = "multiplyChannel")
Mono<Integer> multiply(Integer value);
}
@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
return value * 2;
}
其中此类网关可用于处理Flux
数据的某些服务中
@Autowired
TestGateway testGateway;
public void hadnleFlux() {
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(System.out::println);
}
另一个使用Project Reactor的示例是一个简单的回调场景,如下例所示
Mono<Invoice> mono = service.process(myOrder);
mono.subscribe(invoice -> handleInvoice(invoice));
调用线程继续,在流完成时调用handleInvoice()
。
另请参阅Kotlin协程以获取更多信息。
下游流返回异步类型
如上文AsyncTaskExecutor
部分所述,如果您希望某些下游组件返回带有异步有效负载的消息(Future
、Mono
等),则必须显式将异步执行程序设置为null
(或使用XML配置时为""
)。然后在调用方线程上调用流,并且稍后可以检索结果。
异步void
返回类型
消息网关方法可以这样声明
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "sendAsyncChannel")
@Async
void sendAsync(String payload);
}
但是下游异常不会传播回调用方。为了确保下游流调用的异步行为以及异常传播到调用方,从版本6.0开始,框架提供了对Future<Void>
和Mono<Void>
返回类型的支持。用例类似于之前针对普通void
返回类型描述的发送并忘记行为,但不同之处在于流执行异步发生,并且返回的Future
(或Mono
)根据send
操作结果以null
或异常方式完成。
如果Future<Void> 是精确的下游流回复,则网关的asyncExecutor 选项必须设置为null(对于@MessagingGateway 配置,设置为AnnotationConstants.NULL ),并且send 部分在生产者线程上执行。回复则取决于下游流的配置。这样,目标应用程序就可以正确地生成Future<Void> 回复。Mono 的使用案例已经超出了框架线程控制的范围,因此将asyncExecutor 设置为null毫无意义。在这种情况下,作为请求-回复网关操作结果的Mono<Void> 必须配置为网关方法的Mono<?> 返回类型。 |
网关在没有收到响应时的行为
如前面所述,网关提供了一种通过POJO方法调用与消息系统交互的便捷方式。但是,典型的方法调用通常预期始终返回(即使出现异常),可能并不总是与消息交换一一对应(例如,回复消息可能不会到达——相当于方法没有返回)。
本节的其余部分涵盖了各种场景以及如何使网关的行为更可预测。可以配置某些属性以使同步网关的行为更可预测,但其中一些属性可能并不总是按预期工作。其中之一是reply-timeout
(在方法级别或网关级别的default-reply-timeout
)。我们检查reply-timeout
属性以了解它如何在各种场景中影响同步网关的行为。我们检查单线程场景(所有下游组件通过直接通道连接)和多线程场景(例如,在下游某个地方,您可能有一个可轮询或执行器通道,它打破了单线程边界)。
下游的长时间运行进程
- 同步网关,单线程
-
如果下游组件仍在运行(可能是由于无限循环或服务缓慢),设置
reply-timeout
无效,并且网关方法调用不会返回,直到下游服务退出(通过返回或抛出异常)。 - 同步网关,多线程
-
如果下游组件在一个多线程消息流中仍在运行(可能是由于无限循环或服务缓慢),设置
reply-timeout
会产生影响,因为GatewayProxyFactoryBean
会在回复通道上轮询,等待消息直到超时过期,从而允许网关方法调用返回。但是,如果在实际回复生成之前已达到超时,则可能导致网关方法返回“null”。您应该了解,回复消息(如果生成)是在网关方法调用可能已返回后发送到回复通道的,因此您必须注意这一点并在设计流程时牢记这一点。
另请参阅errorOnTimeout
属性,以便在发生超时时抛出MessageTimeoutException
而不是返回null
。
下游组件返回“null”
- 同步网关——单线程
-
如果下游组件返回“null”并且
reply-timeout
已配置为负值,则网关方法调用会无限期挂起,除非requires-reply
属性已在下游组件(例如,服务激活器)上设置,该组件可能会返回“null”。在这种情况下,将抛出异常并传播到网关。 - 同步网关——多线程
-
行为与上一个案例相同。
下游组件返回签名为“void”,而网关方法签名为非“void”
- 同步网关——单线程
-
如果下游组件返回“void”并且
reply-timeout
已配置为负值,则网关方法调用会无限期挂起。 - 同步网关——多线程
-
行为与上一个案例相同。
下游组件导致运行时异常
- 同步网关——单线程
-
如果下游组件抛出运行时异常,则该异常将通过错误消息传播回网关并重新抛出。
- 同步网关——多线程
-
行为与上一个案例相同。
您应该了解,默认情况下,reply-timeout 是无界的。因此,如果您将reply-timeout 设置为负值,则网关方法调用可能会无限期挂起。因此,为了确保您分析您的流程,如果存在任何远程可能性发生这些场景之一,您应该将reply-timeout 属性设置为“安全”值。默认值为30 秒。更好的是,您可以将下游组件的requires-reply 属性设置为“true”,以确保及时响应,因为一旦该下游组件在内部返回null,就会抛出异常。但是,您也应该意识到存在一些场景(请参阅第一个场景),其中reply-timeout 无济于事。这意味着分析消息流并确定何时使用同步网关而不是异步网关也很重要。如前面所述,后一种情况是定义返回Future 实例的网关方法的问题。然后,您可以保证收到该返回值,并且可以更精细地控制调用的结果。此外,在处理路由器时,您应该记住,将resolution-required 属性设置为“true”会导致路由器抛出异常,如果它无法解析特定的通道。同样,在处理过滤器时,您可以设置throw-exception-on-rejection 属性。在这两种情况下,生成的流程的行为都类似于它包含具有“requires-reply”属性的服务激活器。换句话说,它有助于确保网关方法调用及时响应。 |
您应该了解,计时器在线程返回网关时启动——即,流程完成或消息传递给另一个线程时。此时,调用线程开始等待回复。如果流程完全同步,则回复会立即可用。对于异步流程,线程最多等待此时间。 |
从 6.2 版开始,MessagingGatewaySupport
的内部MethodInvocationGateway
扩展的errorOnTimeout
属性在@MessagingGateway
和GatewayEndpointSpec
上公开。此选项与端点摘要章节末尾解释的任何入站网关的含义完全相同。换句话说,将此选项设置为true
会导致从发送和接收网关操作抛出MessageTimeoutException
,而不是在接收超时耗尽时返回null
。
请参阅Java DSL 章节中的IntegrationFlow
作为网关,了解通过IntegrationFlow
定义网关的选项。