消息发布
(面向切面编程) AOP 消息发布功能允许您在方法调用完成后构建并发送消息作为其副产品。例如,想象您有一个组件,并且每次该组件的状态发生变化时,您都希望通过消息收到通知。发送此类通知的最简单方法是将消息发送到专用通道,但是如何将更改对象状态的方法调用连接到消息发送过程,以及通知消息应该如何构建呢?AOP 消息发布功能通过配置驱动的方法处理这些职责。
消息发布配置
Spring Integration 提供了两种方法:XML 配置和注解驱动 (Java) 配置。
使用 @Publisher
注解的注解驱动配置
注解驱动的方法允许您使用 @Publisher
注解来标记任何方法,以指定一个 'channel' 属性。从 5.1 版本开始,要启用此功能,您必须在某个 @Configuration
类上使用 @EnablePublisher
注解。有关更多信息,请参阅配置和 @EnableIntegration
。消息由方法调用的返回值构建,并发送到 'channel' 属性指定的通道。为了进一步管理消息结构,您还可以结合使用 @Payload
和 @Header
注解。
在内部,Spring Integration 的此消息发布功能通过定义 PublisherAnnotationAdvisor
来使用 Spring AOP,并使用 Spring 表达式语言 (SpEL),这使您可以对发布的 Message
结构进行极大的灵活性和控制。
PublisherAnnotationAdvisor
定义并绑定以下变量
-
#return
:绑定到返回值,允许您引用它或其属性(例如,#return.something
,其中 'something' 是绑定到#return
的对象的属性) -
#exception
:如果方法调用抛出异常,则绑定到该异常 -
#args
:绑定到方法参数,以便您可以按名称提取单个参数(例如,#args.fname
)
请考虑以下示例
@Publisher
public String defaultPayload(String fname, String lname) {
return fname + " " + lname;
}
在前面的示例中,消息按以下结构构建
-
消息负载是方法的返回类型和值。这是默认行为。
-
构建的新消息被发送到默认的发布者通道,该通道已配置了注解后处理器(本节后面将介绍)。
以下示例与前面的示例相同,但它不使用默认的发布通道
@Publisher(channel="testChannel")
public String defaultPayload(String fname, @Header("last") String lname) {
return fname + " " + lname;
}
我们没有使用默认的发布通道,而是通过设置 @Publisher
注解的 'channel' 属性来指定发布通道。我们还添加了一个 @Header
注解,这导致名为 'last' 的消息头与 'lname' 方法参数具有相同的值。该消息头被添加到新构建的消息中。
以下示例与前面的示例几乎相同
@Publisher(channel="testChannel")
@Payload
public String defaultPayloadButExplicitAnnotation(String fname, @Header String lname) {
return fname + " " + lname;
}
唯一的区别在于我们在方法上使用了 @Payload
注解,明确指定方法的返回值应作为消息的负载。
以下示例通过在 @Payload
注解中使用 Spring 表达式语言,进一步扩展了先前的配置,以指导框架如何构建消息。
@Publisher(channel="testChannel")
@Payload("#return + #args.lname")
public String setName(String fname, String lname, @Header("x") int num) {
return fname + " " + lname;
}
在前面的示例中,消息是方法调用的返回值与 'lname' 输入参数的拼接。名为 'x' 的消息头的值由 'num' 输入参数确定。该消息头被添加到新构建的消息中。
@Publisher(channel="testChannel")
public String argumentAsPayload(@Payload String fname, @Header String lname) {
return fname + " " + lname;
}
在前面的示例中,您看到了 @Payload
注解的另一种用法。在这里,我们标记了一个方法参数,该参数成为新构建消息的负载。
与 Spring 中大多数其他注解驱动的功能一样,您需要注册一个后处理器 (PublisherAnnotationBeanPostProcessor
)。以下示例展示了如何做到这一点。
<bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>
为了更简洁的配置,您可以使用命名空间支持,如下例所示。
<int:annotation-config>
<int:enable-publisher default-publisher-channel="defaultChannel"/>
</int:annotation-config>
对于 Java 配置,您必须使用 @EnablePublisher
注解,如下例所示。
@Configuration
@EnableIntegration
@EnablePublisher("defaultChannel")
public class IntegrationConfiguration {
...
}
从 5.1.3 版本开始,<int:enable-publisher>
组件以及 @EnablePublisher
注解拥有 proxy-target-class
和 order
属性,用于调整 ProxyFactory
配置。
与其他 Spring 注解(如 @Component
、@Scheduled
等)类似,您也可以将 @Publisher
用作元注解。这意味着您可以定义自己的注解,它们将与 @Publisher
本身以相同的方式处理。以下示例展示了如何做到这一点。
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Publisher(channel="auditChannel")
public @interface Audit {
...
}
在前面的示例中,我们定义了 @Audit
注解,它本身被 @Publisher
注解标记。另请注意,您可以在元注解上定义一个 channel
属性,以封装此注解内部发送消息的位置。现在,您可以使用 @Audit
注解标记任何方法,如下例所示。
@Audit
public String test() {
return "Hello";
}
在前面的示例中,每次调用 test()
方法都会产生一条消息,其负载由其返回值创建。每条消息都被发送到名为 auditChannel
的通道。这种技术的好处之一是,您可以避免在多个注解中重复使用相同的通道名称。您还可以为您自己的、可能特定于领域的注解与框架提供的注解之间提供一定程度的间接性。
您还可以注解类,这允许您将此注解的属性应用于该类的每个公共方法,如下例所示。
@Audit
static class BankingOperationsImpl implements BankingOperations {
public String debit(String amount) {
. . .
}
public String credit(String amount) {
. . .
}
}
使用 <publishing-interceptor>
元素的基于 XML 的方法
基于 XML 的方法允许您将相同的基于 AOP 的消息发布功能配置为 MessagePublishingInterceptor
的命名空间配置。它肯定比注解驱动的方法有一些优势,因为它允许您使用 AOP 切入点表达式,从而可能一次拦截多个方法,或者拦截和发布您没有源代码的方法。
要使用 XML 配置消息发布,您只需执行以下两项操作
-
使用
<publishing-interceptor>
XML 元素提供MessagePublishingInterceptor
的配置。 -
提供 AOP 配置以将
MessagePublishingInterceptor
应用于托管对象。
以下示例展示了如何配置 publishing-interceptor
元素。
<aop:config>
<aop:advisor advice-ref="interceptor" pointcut="bean(testBean)" />
</aop:config>
<publishing-interceptor id="interceptor" default-channel="defaultChannel">
<method pattern="echo" payload="'Echoing: ' + #return" channel="echoChannel">
<header name="things" value="something"/>
</method>
<method pattern="repl*" payload="'Echoing: ' + #return" channel="echoChannel">
<header name="things" expression="'something'.toUpperCase()"/>
</method>
<method pattern="echoDef*" payload="#return"/>
</publishing-interceptor>
<publishing-interceptor>
配置看起来与基于注解的方法非常相似,它也使用了 Spring 表达式语言的强大功能。
在前面的示例中,执行 testBean
的 echo
方法会生成一个具有以下结构的 Message
。
-
Message
的负载类型为String
,内容如下:Echoing: [value]
,其中value
是被执行方法返回的值。 -
Message
有一个名为things
的消息头,其值为something
。 -
Message
被发送到echoChannel
。
第二种方法与第一种非常相似。在这里,每个以 'repl' 开头的方法都会生成一个具有以下结构的 Message
。
-
Message
负载与前面的示例相同。 -
Message
有一个名为things
的消息头,其值是 SpEL 表达式'something'.toUpperCase()
的结果。 -
Message
被发送到echoChannel
。
第二种方法,映射任何以 echoDef
开头的方法执行,会产生一个具有以下结构的 Message
。
-
Message
负载是被执行方法返回的值。 -
由于没有提供
channel
属性,Message
被发送到由publisher
定义的defaultChannel
。
对于简单的映射规则,您可以依赖 publisher
的默认设置,如下例所示。
<publishing-interceptor id="anotherInterceptor"/>
前面的示例将匹配切入点表达式的每个方法的返回值映射为负载,并发送到 default-channel
。如果您没有指定 defaultChannel
(就像前面的示例那样),则消息会发送到全局 nullChannel
(相当于 /dev/null
)。
异步发布
发布与您的组件执行在同一线程中发生。因此,默认情况下它是同步的。这意味着整个消息流必须等待发布者的流完成。然而,开发人员通常想要完全相反的效果:使用此消息发布功能来启动异步流。例如,您可能托管一个服务(HTTP、WS 等),它接收远程请求。您可能希望将此请求内部发送到一个可能需要一段时间才能完成的进程。但是,您也可能希望立即回复用户。因此,您可以不将入站请求发送到输出通道进行处理(传统方式),而是使用 'output-channel' 或 'replyChannel' 头部向调用者发送一个简单的确认式回复,同时使用消息发布功能启动复杂的流。
以下示例中的服务接收一个复杂的负载(需要进一步发送进行处理),但它也需要向调用者回复一个简单的确认。
public String echo(Object complexPayload) {
return "ACK";
}
因此,我们没有将复杂流连接到输出通道,而是使用了消息发布功能。我们配置它使用服务方法(如前面的示例所示)的输入参数来创建一个新消息,并将其发送到 'localProcessChannel'。为了确保此流是异步的,我们只需将其发送到任何类型的异步通道(下一个示例中的 ExecutorChannel
)。以下示例展示了如何配置异步 publishing-interceptor
。
<int:service-activator input-channel="inputChannel" output-channel="outputChannel" ref="sampleservice"/>
<bean id="sampleService" class="test.SampleService"/>
<aop:config>
<aop:advisor advice-ref="interceptor" pointcut="bean(sampleService)" />
</aop:config>
<int:publishing-interceptor id="interceptor" >
<int:method pattern="echo" payload="#args[0]" channel="localProcessChannel">
<int:header name="sample_header" expression="'some sample value'"/>
</int:method>
</int:publishing-interceptor>
<int:channel id="localProcessChannel">
<int:dispatcher task-executor="executor"/>
</int:channel>
<task:executor id="executor" pool-size="5"/>
处理此类场景的另一种方法是使用线控(wire-tap)。请参阅线控(Wire Tap)。
基于调度触发器生成和发布消息
在前面的章节中,我们研究了消息发布功能,该功能在方法调用完成后构建并发布消息作为其副产品。但是,在这些情况下,您仍然负责调用方法。Spring Integration 2.0 添加了对调度消息生产者和发布者的支持,通过在 'inbound-channel-adapter' 元素上新增了 expression
属性。您可以基于几种触发器进行调度,其中任何一种都可以在 'poller' 元素上配置。目前,我们支持 cron
、fixed-rate
、fixed-delay
以及您实现的任何自定义触发器(通过 'trigger' 属性值引用)。
如前所述,通过 <inbound-channel-adapter>
XML 元素提供对调度生产者和发布者的支持。请考虑以下示例。
<int:inbound-channel-adapter id="fixedDelayProducer"
expression="'fixedDelayTest'"
channel="fixedDelayChannel">
<int:poller fixed-delay="1000"/>
</int:inbound-channel-adapter>
前面的示例创建了一个入站通道适配器,它构造一个 Message
,其负载是 expression
属性中定义的表达式的结果。每当 fixed-delay
属性指定的延迟发生时,就会创建并发送此类消息。
以下示例与前面的示例类似,但它使用了 fixed-rate
属性。
<int:inbound-channel-adapter id="fixedRateProducer"
expression="'fixedRateTest'"
channel="fixedRateChannel">
<int:poller fixed-rate="1000"/>
</int:inbound-channel-adapter>
fixed-rate
属性允许您以固定的速率(从每个任务的开始时间测量)发送消息。
以下示例展示了如何应用 Cron 触发器,并在 cron
属性中指定值。
<int:inbound-channel-adapter id="cronProducer"
expression="'cronTest'"
channel="cronChannel">
<int:poller cron="7 6 5 4 3 ?"/>
</int:inbound-channel-adapter>
以下示例展示了如何向消息中插入附加的消息头。
<int:inbound-channel-adapter id="headerExpressionsProducer"
expression="'headerExpressionsTest'"
channel="headerExpressionsChannel"
auto-startup="false">
<int:poller fixed-delay="5000"/>
<int:header name="foo" expression="6 * 7"/>
<int:header name="bar" value="x"/>
</int:inbound-channel-adapter>
附加的消息头可以采用标量值或评估 Spring 表达式的结果。
如果您需要实现自己的自定义触发器,可以使用 trigger
属性提供对实现 org.springframework.scheduling.Trigger
接口的任何 Spring 配置的 Bean 的引用。以下示例展示了如何做到这一点。
<int:inbound-channel-adapter id="triggerRefProducer"
expression="'triggerRefTest'" channel="triggerRefChannel">
<int:poller trigger="customTrigger"/>
</int:inbound-channel-adapter>
<beans:bean id="customTrigger" class="o.s.scheduling.support.PeriodicTrigger">
<beans:constructor-arg value="9999"/>
</beans:bean>