注解支持

除了使用 XML 命名空间来配置消息端点,您还可以使用注解。首先,Spring 集成提供类级别的 @MessageEndpoint 作为一种原型注解,这意味着它本身使用 Spring 的 @Component 注解进行注解,因此 Spring 的组件扫描会自动将其识别为 bean 定义。

更重要的是各种方法级别的注解。它们表明被注解的方法能够处理消息。以下示例演示了类级别和方法级别的注解。

@MessageEndpoint
public class FooService {

    @ServiceActivator
    public void processMessage(Message message) {
        ...
    }
}

方法“处理”消息的具体含义取决于特定的注解。Spring Integration 中可用的注解包括

如果您将 XML 配置与注解结合使用,则不需要 @MessageEndpoint 注解。如果您想从 <service-activator/> 元素的 ref 属性配置 POJO 引用,您只需提供方法级别的注解。在这种情况下,即使 <service-activator/> 元素上不存在方法级别的属性,注解也能防止歧义。

在大多数情况下,被注解的处理方法不需要 Message 类型作为其参数。相反,方法参数类型可以与消息的有效负载类型匹配,如下例所示

public class ThingService {

    @ServiceActivator
    public void bar(Thing thing) {
        ...
    }

}

当方法参数应该从 MessageHeaders 中的值映射时,另一个选择是使用参数级别的 @Header 注解。一般来说,用 Spring Integration 注解注解的方法可以接受 Message 本身、消息有效负载或头值(使用 @Header)作为参数。实际上,该方法可以接受组合,如下例所示

public class ThingService {

    @ServiceActivator
    public void otherThing(String payload, @Header("x") int valueX, @Header("y") int valueY) {
        ...
    }

}

您还可以使用 @Headers 注解提供所有消息头作为 Map,如下例所示

public class ThingService {

    @ServiceActivator
    public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
        ...
    }

}
注解的值也可以是 SpEL 表达式(例如,someHeader.toUpperCase()),这在您希望在注入头值之前对其进行操作时非常有用。它还提供了一个可选的 required 属性,该属性指定属性值是否必须在头文件中可用。required 属性的默认值为 true

对于这些注解中的几个,当消息处理方法返回非空值时,端点尝试发送回复。这在两种配置选项(命名空间和注解)中是一致的,因为这样的端点的输出通道将被使用(如果可用),并且 REPLY_CHANNEL 消息头值将用作回退。

端点上的输出通道组合和回复通道消息头支持管道方法,其中多个组件具有输出通道,而最终组件允许将回复消息转发到回复通道(如原始请求消息中指定)。换句话说,最终组件依赖于原始发送者提供的信息,因此可以动态支持任意数量的客户端。这是一个 返回地址 模式的示例。

除了这里显示的示例之外,这些注释还支持 inputChanneloutputChannel 属性,如下例所示

@Service
public class ThingService {

    @ServiceActivator(inputChannel="input", outputChannel="output")
    public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
        ...
    }

}

处理这些注释会创建与相应的 XML 组件相同的 bean——AbstractEndpoint 实例和 MessageHandler 实例(或用于入站通道适配器的 MessageSource 实例)。参见 @Bean 方法上的注释。bean 名称根据以下模式生成:[componentName].[methodName].[decapitalizedAnnotationClassShortName]。在前面的示例中,bean 名称是 thingService.otherThing.serviceActivator(用于 AbstractEndpoint),并且对于 MessageHandlerMessageSource)bean,该名称后面会附加 .handler.source)后缀。可以使用 @EndpointId 注释以及这些消息传递注释来自定义此类名称。MessageHandler 实例(MessageSource 实例)也有资格被 消息历史记录 跟踪。

从 4.0 版本开始,所有消息传递注释都提供 SmartLifecycle 选项(autoStartupphase)以允许在应用程序上下文初始化时控制端点生命周期。它们分别默认为 true0。要更改端点的状态(例如 start()stop()),可以使用 BeanFactory(或自动装配)获取对端点 bean 的引用并调用方法。或者,您可以向 Control Bus 发送一条命令消息(参见 控制总线)。为此,您应该使用前面段落中提到的 beanName

在解析上述注释后自动创建的通道(当没有配置特定通道 bean 时)以及相应的消费者端点,在上下文初始化结束时声明为 bean。这些 bean **可以** 在其他服务中自动装配,但必须用 @Lazy 注释标记,因为在正常的自动装配处理过程中,定义通常还不可用。

@Autowired
@Lazy
@Qualifier("someChannel")
MessageChannel someChannel;
...

@Bean
Thing1 dependsOnSPCA(@Qualifier("someInboundAdapter") @Lazy SourcePollingChannelAdapter someInboundAdapter) {
    ...
}

从 6.0 版本开始,所有消息注解现在都是 @Repeatable,因此可以在同一个服务方法上声明多个相同类型的注解,其含义是创建与重复注解数量相同的端点。

@Transformer(inputChannel = "inputChannel1", outputChannel = "outputChannel1")
@Transformer(inputChannel = "inputChannel2", outputChannel = "outputChannel2")
public String transform(String input) {
    return input.toUpperCase();
}

使用 @Poller 注解

在 Spring Integration 4.0 之前,消息注解要求 inputChannel 是对 SubscribableChannel 的引用。对于 PollableChannel 实例,需要一个 <int:bridge/> 元素来配置 <int:poller/> 并使复合端点成为 PollingConsumer。4.0 版本引入了 @Poller 注解,允许直接在消息注解上配置 poller 属性,如下例所示。

public class AnnotationService {

    @Transformer(inputChannel = "input", outputChannel = "output",
        poller = @Poller(maxMessagesPerPoll = "${poller.maxMessagesPerPoll}", fixedDelay = "${poller.fixedDelay}"))
    public String handle(String payload) {
        ...
    }
}

@Poller 注解仅提供简单的 PollerMetadata 选项。您可以使用属性占位符配置 @Poller 注解的属性(maxMessagesPerPollfixedDelayfixedRatecron)。此外,从 5.1 版本开始,还提供了 PollingConsumerreceiveTimeout 选项。如果需要提供更多轮询选项(例如,transactionadvice-chainerror-handler 等),则应将 PollerMetadata 配置为通用 bean,并将它的 bean 名称用作 @Pollervalue 属性。在这种情况下,不允许使用其他属性(必须在 PollerMetadata bean 上指定)。请注意,如果 inputChannelPollableChannel 且未配置 @Poller,则使用默认 PollerMetadata(如果它存在于应用程序上下文中)。要使用 @Configuration 注解声明默认轮询器,请使用类似于以下示例的代码。

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setTrigger(new PeriodicTrigger(10));
    return pollerMetadata;
}

以下示例展示了如何使用默认轮询器。

public class AnnotationService {

    @Transformer(inputChannel = "aPollableChannel", outputChannel = "output")
    public String handle(String payload) {
        ...
    }
}

以下示例展示了如何使用命名轮询器。

@Bean
public PollerMetadata myPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setTrigger(new PeriodicTrigger(1000));
    return pollerMetadata;
}

以下示例展示了使用默认轮询器的端点。

public class AnnotationService {

    @Transformer(inputChannel = "aPollableChannel", outputChannel = "output"
                           poller = @Poller("myPoller"))
    public String handle(String payload) {
         ...
    }
}

从 4.3.3 版本开始,@Poller 注解具有 errorChannel 属性,以便更轻松地配置底层的 MessagePublishingErrorHandler。此属性在 <poller> XML 组件中起着与 error-channel 相同的作用。有关更多信息,请参见 端点命名空间支持

消息注解上的 poller() 属性与 reactive() 属性互斥。有关更多信息,请参见下一节。

使用 @Reactive 注解

ReactiveStreamsConsumer 自 5.0 版本起就已存在,但它仅在端点的输入通道为 FluxMessageChannel(或任何 org.reactivestreams.Publisher 实现)时才适用。从 5.3 版本开始,当目标消息处理程序为 ReactiveMessageHandler 时,无论输入通道类型如何,框架也会创建其实例。@Reactive 子注解(类似于上面提到的 @Poller)已从 5.5 版本开始引入到所有消息传递注解中。它接受一个可选的 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> bean 引用,并且无论输入通道类型和消息处理程序如何,都会将目标端点转换为 ReactiveStreamsConsumer 实例。该函数从 Flux.transform() 运算符中使用,用于对来自输入通道的反应式流源进行一些自定义(publishOn()doOnNext()log()retry() 等)。

以下示例演示了如何独立于最终订阅者和生产者,从输入通道更改发布线程到该 DirectChannel

@Bean
public Function<Flux<?>, Flux<?>> publishOnCustomizer() {
    return flux -> flux.publishOn(Schedulers.parallel());
}

@ServiceActivator(inputChannel = "directChannel", reactive = @Reactive("publishOnCustomizer"))
public void handleReactive(String payload) {
    ...
}

消息传递注解上的 reactive() 属性与 poller() 属性互斥。有关更多信息,请参见 使用 @Poller 注解反应式流支持

使用 @InboundChannelAdapter 注解

版本 4.0 引入了 @InboundChannelAdapter 方法级注解。它基于 MethodInvokingMessageSource 为带注解的方法生成 SourcePollingChannelAdapter 集成组件。此注解是 <int:inbound-channel-adapter> XML 组件的类似物,具有相同的限制:该方法不能有参数,返回值类型不能为 void。它有两个属性:value(必需的 MessageChannel bean 名称)和 poller(一个可选的 @Poller 注解,如 前面所述)。如果您需要提供一些 MessageHeaders,请使用 Message<?> 返回类型,并使用 MessageBuilder 来构建 Message<?>。使用 MessageBuilder 可以配置 MessageHeaders。以下示例演示了如何使用 @InboundChannelAdapter 注解

@InboundChannelAdapter("counterChannel")
public Integer count() {
    return this.counter.incrementAndGet();
}

@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixed-rate = "5000"))
public String foo() {
    return "foo";
}

版本 4.3 引入了 channel 作为 value 注解属性的别名,以提供更好的源代码可读性。此外,目标 MessageChannel bean 在 SourcePollingChannelAdapter 中通过提供的名称(由 outputChannelName 选项设置)在第一次 receive() 调用时解析,而不是在初始化阶段解析。它允许“延迟绑定”逻辑:从消费者角度来看,目标 MessageChannel bean 的创建和注册时间略晚于 @InboundChannelAdapter 解析阶段。

第一个示例要求默认轮询器已在应用程序上下文的其他地方声明。

使用 `@MessagingGateway` 注解

使用 `@IntegrationComponentScan` 注解

标准的 Spring 框架 `@ComponentScan` 注解不会扫描接口以查找 `@Component` 注解。为了克服这个限制并允许配置 `@MessagingGateway`(参见 @MessagingGateway 注解),我们引入了 `@IntegrationComponentScan` 机制。此注解必须与 `@Configuration` 注解一起使用,并进行自定义以定义其扫描选项,例如 `basePackages` 和 `basePackageClasses`。在这种情况下,所有发现的带有 `@MessagingGateway` 注解的接口都会被解析并注册为 `GatewayProxyFactoryBean` 实例。所有其他基于类的组件都会被标准的 `@ComponentScan` 解析。