消息通道实现

Spring Integration 提供了不同的消息通道实现。以下部分简要描述了每种实现。

PublishSubscribeChannel

PublishSubscribeChannel 实现将其接收到的任何 Message 广播给所有订阅的处理器。这最常用于发送事件消息,其主要作用是通知(而不是文档消息,文档消息通常旨在由单个处理器处理)。请注意,PublishSubscribeChannel 仅用于发送。由于它在其 send(Message) 方法被调用时直接广播给其订阅者,消费者无法轮询消息(它不实现 PollableChannel,因此没有 receive() 方法)。相反,任何订阅者本身必须是一个 MessageHandler,并且订阅者的 handleMessage(Message) 方法会被依次调用。

在版本 3.0 之前,在没有订阅者的 PublishSubscribeChannel 上调用 send 方法会返回 false。与 MessagingTemplate 结合使用时,会抛出 MessageDeliveryException。从版本 3.0 开始,行为已更改,只要存在至少最低数量的订阅者(并且成功处理消息),发送就始终被视为成功。可以通过设置 minSubscribers 属性来修改此行为,该属性默认为 0

如果您使用 TaskExecutor,仅使用正确数量的订阅者是否存在来确定此行为,因为消息的实际处理是异步执行的。

QueueChannel

QueueChannel 实现包装了一个队列。与 PublishSubscribeChannel 不同,QueueChannel 具有点对点语义。换句话说,即使通道有多个消费者,发送到该通道的任何 Message 也只应由其中一个接收。它提供了一个默认的无参构造函数(提供了实际上无限容量的 Integer.MAX_VALUE),以及一个接受队列容量的构造函数,如下所示

public QueueChannel(int capacity)

未达到容量限制的通道将其消息存储在内部队列中,即使没有接收方准备好处理消息,send(Message<?>) 方法也会立即返回。如果队列已满,发送方会阻塞直到队列中有可用空间。或者,如果您使用带额外超时参数的 send 方法,队列会阻塞直到有可用空间或超时时间结束,取两者先发生者。类似地,如果队列中有可用消息,receive() 调用会立即返回;但如果队列为空,receive 调用可能会阻塞直到有消息可用或超时时间(如果提供)结束。在任何一种情况下,都可以通过传递 0 的超时值来强制立即返回,而不管队列的状态如何。然而,请注意,不带 timeout 参数的 send()receive() 调用会无限期阻塞。

PriorityChannel

QueueChannel 强制执行先进先出(FIFO)顺序,而 PriorityChannel 是另一种实现,它允许消息根据优先级在通道内排序。默认情况下,优先级由每条消息中的 priority 头部确定。但是,对于自定义的优先级确定逻辑,可以向 PriorityChannel 构造函数提供一个类型为 Comparator<Message<?>> 的比较器。

RendezvousChannel

RendezvousChannel 支持“直接交接”场景,其中发送方会阻塞直到另一方调用通道的 receive() 方法。接收方会阻塞直到发送方发送消息。在内部,此实现与 QueueChannel 非常相似,不同之处在于它使用了 SynchronousQueue(一个容量为零的 BlockingQueue 实现)。这在发送方和接收方在不同线程中操作,但不适合将消息异步放入队列的情况下效果很好。换句话说,使用 RendezvousChannel,发送方知道有接收方已经接受了消息,而使用 QueueChannel,消息会被存储到内部队列中,并且可能永远不会被接收。

请记住,默认情况下,所有这些基于队列的通道都仅将消息存储在内存中。需要持久化时,您可以在 'queue' 元素内提供 'message-store' 属性来引用持久化的 MessageStore 实现,或者用由持久化 broker 支持的通道替换本地通道,例如 JMS 支持的通道或通道适配器。后一种选项允许您利用任何 JMS 提供程序的消息持久化实现,如JMS 支持中所述。但是,当不需要队列缓冲时,最简单的方法是依赖于下一节讨论的 DirectChannel

RendezvousChannel 对于实现请求-回复操作也很有用。发送方可以创建一个临时的、匿名的 RendezvousChannel 实例,然后在构建 Message 时将其设置为 'replyChannel' 头部。发送该 Message 后,发送方可以立即调用 receive(可选提供超时值),以便在等待回复 Message 时阻塞。这与 Spring Integration 许多请求-回复组件内部使用的实现非常相似。

DirectChannel

DirectChannel 具有点对点语义,但在其他方面更类似于 PublishSubscribeChannel,而不是之前描述的任何基于队列的通道实现。它实现了 SubscribableChannel 接口而不是 PollableChannel 接口,因此它将消息直接分派给订阅者。然而,作为点对点通道,它与 PublishSubscribeChannel 的区别在于它将每条 Message 发送到一个订阅的 MessageHandler

除了是最简单的点对点通道选项外,其最重要的特性之一是它允许单个线程执行通道“两端”的操作。例如,如果处理器订阅了 DirectChannel,那么向该通道发送 Message 会直接在发送方的线程中触发该处理器的 handleMessage(Message) 方法的调用,然后 send() 方法调用才能返回。

提供具有这种行为的通道实现的关键动机是支持必须跨通道的事务,同时仍然受益于通道提供的抽象和松耦合。如果在事务范围内调用 send(),处理程序调用的结果(例如,更新数据库记录)将影响该事务的最终结果(提交或回滚)。

由于 DirectChannel 是最简单的选项,并且不增加调度和管理轮询器线程所需的任何额外开销,因此它是 Spring Integration 中的默认通道类型。一般的思路是为应用程序定义通道,考虑其中哪些需要提供缓冲或限制输入,并将这些通道修改为基于队列的 PollableChannels。同样,如果通道需要广播消息,它不应该是 DirectChannel,而应该是 PublishSubscribeChannel。稍后,我们将展示如何配置这些通道中的每一个。

DirectChannel 内部将消息分派委托给一个消息调度器来调用其订阅的消息处理器,该调度器可以通过 load-balancerload-balancer-ref 属性(互斥)公开负载均衡策略。当多个消息处理器订阅同一通道时,消息调度器使用负载均衡策略来帮助确定如何在消息处理器之间分发消息。为了方便起见,load-balancer 属性公开了一个枚举值列表,指向预先存在的 LoadBalancingStrategy 实现。round-robin(循环负载均衡)和 none(用于明确禁用负载均衡的情况)是仅有的可用值。将来版本可能会添加其他策略实现。然而,自版本 3.0 起,您可以通过使用 load-balancer-ref 属性提供自己的 LoadBalancingStrategy 实现并注入它,该属性应指向一个实现 LoadBalancingStrategy 的 bean,如下例所示

FixedSubscriberChannel 是一个 SubscribableChannel,它仅支持一个不可取消订阅的 MessageHandler 订阅者。这对于高吞吐量性能用例非常有用,当没有其他订阅者参与且不需要通道拦截器时。

<int:channel id="lbRefChannel">
  <int:dispatcher load-balancer-ref="lb"/>
</int:channel>

<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>

请注意,load-balancerload-balancer-ref 属性是互斥的。

负载均衡也与布尔值属性 failover 协同工作。如果 failover 的值为 true(默认值),当先前的处理器抛出异常时,调度器会回退到任何后续处理器(如有必要)。顺序由处理器自身定义的可选顺序值决定,如果不存在此值,则由处理器订阅的顺序决定。

如果特定情况要求调度器在每次发生错误时始终尝试调用第一个处理器,然后按照相同的固定顺序序列回退,则不应提供负载均衡策略。换句话说,即使未启用负载均衡,调度器仍然支持 failover 布尔属性。然而,在没有负载均衡的情况下,处理器的调用总是从第一个开始,按照它们的顺序。例如,当对主要、次要、第三等有明确定义时,这种方法效果很好。使用命名空间支持时,任何端点上的 order 属性决定顺序。

请记住,负载均衡和 failover 仅适用于通道有多个订阅的消息处理器的情况。使用命名空间支持时,这意味着多个端点共享在 input-channel 属性中定义的同一个通道引用。

从版本 5.2 开始,当 failover 为 true 时,当前处理器的失败以及失败的消息会分别记录在 debuginfo 日志级别下(如果已配置)。

ExecutorChannel

ExecutorChannel 是一个点对点通道,它支持与 DirectChannel 相同的调度器配置(负载均衡策略和 failover 布尔属性)。这两种调度通道类型之间的关键区别在于,ExecutorChannel 将分派委托给一个 TaskExecutor 实例来执行。这意味着 send 方法通常不会阻塞,但也意味着处理程序的调用可能不会发生在发送方的线程中。因此,它不支持跨越发送方和接收处理器的事务。

发送方有时可能会阻塞。例如,当使用具有节流客户端的拒绝策略(例如 ThreadPoolExecutor.CallerRunsPolicy)的 TaskExecutor 时,在线程池达到最大容量且执行器的amp;apos;工作队列已满的情况下,发送方的线程随时可以执行该方法。由于这种情况只会在不可预测的情况下发生,因此不应依赖它进行事务处理。

PartitionedChannel

从版本 6.1 开始,提供了 PartitionedChannel 实现。这是 AbstractExecutorChannel 的扩展,表示点对点分派逻辑,其中实际的消费在特定线程上处理,该线程由发送到此通道的消息评估出的分区键确定。此通道与上面提到的 ExecutorChannel 类似,但不同之处在于具有相同分区键的消息始终在同一个线程中处理,从而保留顺序。它不需要外部的 TaskExecutor,但可以使用自定义的 ThreadFactory 进行配置(例如 Thread.ofVirtual().name("partition-", 0).factory())。此工厂用于为每个分区向 MessageDispatcher 委托填充单线程执行器。默认情况下,IntegrationMessageHeaderAccessor.CORRELATION_ID 消息头用作分区键。此通道可以配置为一个简单的 bean

@Bean
PartitionedChannel somePartitionedChannel() {
    return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}

该通道将有 3 个分区 - 专用线程;将使用 partitionKey 头部来确定消息将在哪个分区中处理。有关详细信息,请参阅 PartitionedChannel 类的 Javadocs。

FluxMessageChannel

FluxMessageChannel 是一个 org.reactivestreams.Publisher 实现,用于将发送的消息“沉入”内部的 reactor.core.publisher.Flux 中,供下游的响应式订阅者按需消费。此通道实现既不是 SubscribableChannel,也不是 PollableChannel,因此只能使用 org.reactivestreams.Subscriber 实例从此通道消费,同时遵守响应式流的背压特性。另一方面,FluxMessageChannel 实现了 ReactiveStreamsSubscribableChannel 接口,其 subscribeTo(Publisher<Message<?>>) 契约允许接收来自响应式源发布者的事件,将响应式流桥接到集成流中。为了实现整个集成流的完全响应式行为,必须在流中的所有端点之间放置此类通道。

有关与 Reactive Streams 交互的更多信息,请参阅Reactive Streams 支持

Scoped Channel

Spring Integration 1.0 提供了一个 ThreadLocalChannel 实现,但在 2.0 版本中已删除。现在,处理相同需求更通用的方法是向通道添加 scope 属性。该属性的值可以是上下文中可用的作用域名称。例如,在 Web 环境中,某些作用域是可用的,并且任何自定义作用域实现都可以向上下文注册。以下示例展示了将线程本地作用域应用于通道,包括作用域本身的注册

<int:channel id="threadScopedChannel" scope="thread">
     <int:queue />
</int:channel>

<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
    <property name="scopes">
        <map>
            <entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
        </map>
    </property>
</bean>

上例中定义的通道内部也委托给一个队列,但该通道绑定到当前线程,因此队列的内容也类似地绑定。这样,发送消息到通道的线程稍后可以接收相同的消息,而其他线程则无法访问它们。虽然线程作用域的通道很少需要,但在使用 DirectChannel 实例来强制执行单个线程操作,但任何回复消息应发送到“终端”通道的情况下,它们可能会很有用。如果该终端通道是线程作用域的,则原始发送线程可以从终端通道收集其回复。

现在,由于任何通道都可以设置作用域,除了线程本地作用域外,您还可以定义自己的作用域。