4.0.5

前言

Spring 数据集成历程简史

Spring 的数据集成历程始于 Spring Integration。凭借其编程模型,它提供了稳定一致的开发体验,用于构建可以采用 企业集成模式 来连接外部系统(例如数据库、消息代理等)的应用。

快进到云时代,微服务在企业环境中变得越来越重要。Spring Boot 改变了开发者构建应用的方式。凭借 Spring 的编程模型以及 Spring Boot 处理的运行时职责,开发基于 Spring 的独立、生产级微服务变得无缝顺畅。

为了将此扩展到数据集成工作负载,Spring Integration 和 Spring Boot 被整合到一个新项目中。Spring Cloud Stream 就此诞生。

借助 Spring Cloud Stream,开发者可以:

  • 独立构建、测试和部署以数据为中心的应用。

  • 应用现代微服务架构模式,包括通过消息传递进行组合。

  • 通过以事件为中心的思维方式解耦应用职责。事件可以代表在某个时间点发生的事情,下游消费者应用可以对此作出反应,而无需知道事件的来源或生产者的身份。

  • 将业务逻辑迁移到消息代理上(例如 RabbitMQ, Apache Kafka, Amazon Kinesis)。

  • 依赖框架对常见用例的自动内容类型支持。也可以扩展支持不同的数据转换类型。

  • 还有更多功能。 . .

快速入门

您可以按照这个三步指南,在不到 5 分钟的时间内尝试 Spring Cloud Stream,甚至在深入了解任何细节之前。

我们将向您展示如何创建一个 Spring Cloud Stream 应用,该应用接收来自您选择的消息中间件的消息(稍后会详细介绍),并将收到的消息记录到控制台。我们称之为 LoggingConsumer。虽然不是很实用,但它很好地介绍了 Spring Cloud Stream 的一些主要概念和抽象,使您更容易理解本用户指南的其余部分。

这三个步骤如下:

使用 Spring Initializr 创建示例应用

要开始使用,请访问 Spring Initializr。在那里,您可以生成我们的 LoggingConsumer 应用。具体步骤如下:

  1. Dependencies 部分,开始输入 stream。当出现“Cloud Stream”选项时,选择它。

  2. 开始输入 'kafka' 或 'rabbit'。

  3. 选择“Kafka”或“RabbitMQ”。

    基本上,您选择应用要绑定的消息中间件。我们建议使用您已安装或更熟悉安装和运行的中间件。此外,正如您从 Initializr 屏幕上看到的,还有其他一些选项可供选择。例如,您可以选择 Gradle 作为构建工具,而不是 Maven(默认)。

  4. Artifact 字段中,输入 'logging-consumer'。

    Artifact 字段的值将成为应用名称。如果您选择 RabbitMQ 作为中间件,您的 Spring Initializr 现在应该如下所示:

spring initializr
  1. 点击 Generate Project 按钮。

    这样做会将生成的项目的压缩版本下载到您的硬盘。

  2. 将文件解压缩到您想要用作项目目录的文件夹中。

我们鼓励您探索 Spring Initializr 中的众多可能性。它允许您创建多种不同类型的 Spring 应用。

将项目导入您的 IDE

现在您可以将项目导入您的 IDE。请记住,根据不同的 IDE,您可能需要遵循特定的导入过程。例如,根据项目的生成方式(Maven 或 Gradle),您可能需要遵循特定的导入过程(例如,在 Eclipse 或 STS 中,您需要使用 File → Import → Maven → Existing Maven Project)。

导入后,项目不应有任何错误。此外,src/main/java 目录应包含 com.example.loggingconsumer.LoggingConsumerApplication 文件。

技术上讲,此时您可以运行应用的主类。它已经是一个有效的 Spring Boot 应用。但是,它目前没有任何功能,所以我们需要添加一些代码。

添加消息处理器、构建和运行

修改 com.example.loggingconsumer.LoggingConsumerApplication 类,使其看起来如下所示:

@SpringBootApplication
public class LoggingConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(LoggingConsumerApplication.class, args);
	}

	@Bean
	public Consumer<Person> log() {
	    return person -> {
	        System.out.println("Received: " + person);
	    };
	}

	public static class Person {
		private String name;
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public String toString() {
			return this.name;
		}
	}
}

正如您从上面的代码清单中看到的:

  • 我们正在使用函数式编程模型(参见 Spring Cloud Function 支持)将单个消息处理器定义为一个 Consumer

  • 我们依赖于框架约定,将该处理器绑定到 Binder 公开的输入目标绑定。

这样做还可以让您看到框架的核心功能之一:它尝试自动将传入消息的有效载荷转换为 Person 类型。

现在您有了一个功能齐全的 Spring Cloud Stream 应用,它可以监听消息。为了简单起见,我们假设您在第一步中选择了 RabbitMQ。假设您已经安装并运行了 RabbitMQ,您可以在 IDE 中运行其 main 方法来启动应用。

您应该看到以下输出:

	--- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2a3a299:0/SimpleConnection@66c83fc8. . .
	. . .
	--- [ main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
	. . .
	--- [ main] c.e.l.LoggingConsumerApplication         : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)

转到 RabbitMQ 管理控制台或任何其他 RabbitMQ 客户端,向 input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg 发送消息。anonymous.CbMIwdkJSBO1ZoPDOtHtCg 部分代表组名,它是生成的,因此在您的环境中肯定会不同。为了更具可预测性,您可以通过设置 spring.cloud.stream.bindings.input.group=hello(或您喜欢的任何名称)来使用显式的组名。

消息内容应是 Person 类的 JSON 表示形式,如下所示:

{"name":"Sam Spade"}

然后,在您的控制台中,您应该会看到:

Received: Sam Spade

您还可以将应用构建并打包成一个可执行 JAR(使用 ./mvnw clean install 命令),然后使用 java -jar 命令运行构建好的 JAR。

现在您有了一个可以工作的(尽管非常基础的)Spring Cloud Stream 应用。

流式数据上下文中的 Spring 表达式语言 (SpEL)

在本参考手册中,您将遇到许多可以利用 Spring 表达式语言 (SpEL) 的功能和示例。在使用它时,了解某些限制非常重要。

SpEL 允许您访问当前 Message 以及您正在运行的 Application Context。但是,了解 SpEL 能够看到的数据类型非常重要,特别是在接收到的 Message 的上下文中。从消息代理传来的消息是以 byte[] 形式到达的。然后由 Binder 将其转换为 Message<byte[]>,您可以看到消息的有效载荷保持其原始形式。消息的头是 <String, Object> 类型,其中值通常是另一个基本类型或基本类型的集合/数组,因此是 Object。这是因为 Binder 不知道所需的输入类型,因为它无法访问用户代码(函数)。所以,实际上,Binder 传递了一个包含有效载荷和一些以消息头形式存在的、可读的元数据的信封,就像邮件递送的信件一样。这意味着,虽然可以访问消息的有效载荷,但您只能以原始数据(即 byte[])的形式访问它。虽然开发者经常要求 SpEL 能够以具体类型(例如 Foo, Bar 等)访问有效载荷对象的字段,但您可以看到这有多么困难甚至不可能实现。这里有一个例子来说明这个问题;想象一下您有一个路由表达式,根据有效载荷类型将消息路由到不同的函数。这个需求意味着需要将有效载荷从 byte[] 转换为特定类型,然后应用 SpEL。然而,为了执行这种转换,我们需要知道要传递给转换器的实际类型,而这来源于函数的签名,我们不知道是哪个签名。解决这个需求的更好方法是将类型信息作为消息头传递(例如,application/json;type=foo.bar.Baz)。您将获得一个清晰可读的字符串值,可以在一个易于理解的 SpEL 表达式中访问和评估。

此外,将有效载荷用于路由决策被认为是非常糟糕的做法,因为有效载荷被视为特权数据——只能由最终接收者读取的数据。再次使用邮件递送的类比,您不会希望邮递员打开您的信封并阅读信件内容来做出一些递送决策。同样的概念也适用于此,特别是在生成 Message 时包含此类信息相对容易的情况下。这强制要求在设计通过网络传输的数据时遵循一定程度的规范,明确哪些数据片段可以被视为公共的,哪些是特权的。

Spring Cloud Stream 介绍

Spring Cloud Stream 是一个用于构建消息驱动微服务应用的框架。Spring Cloud Stream 构建于 Spring Boot 之上,用于创建独立的、生产级的 Spring 应用,并使用 Spring Integration 提供与消息代理的连接。它提供了对多个供应商中间件的规定性配置,引入了持久化发布/订阅语义、消费者组和分区等概念。

通过将 spring-cloud-stream 依赖添加到您的应用 classpath 中,您可以立即连接到由提供的 spring-cloud-stream Binder 公开的消息代理(稍后将详细介绍),并且您可以实现您的功能需求,该需求由 java.util.function.Function(根据传入消息)运行。

以下代码清单展示了一个快速示例:

@SpringBootApplication
public class SampleApplication {

	public static void main(String[] args) {
		SpringApplication.run(SampleApplication.class, args);
	}

    @Bean
	public Function<String, String> uppercase() {
	    return value -> value.toUpperCase();
	}
}

以下代码清单展示了相应的测试:

@SpringBootTest(classes =  SampleApplication.class)
@Import({TestChannelBinderConfiguration.class})
class BootTestStreamApplicationTests {

	@Autowired
	private InputDestination input;

	@Autowired
	private OutputDestination output;

	@Test
	void contextLoads() {
		input.send(new GenericMessage<byte[]>("hello".getBytes()));
		assertThat(output.receive().getPayload()).isEqualTo("HELLO".getBytes());
	}
}

主要概念

Spring Cloud Stream 提供了一些抽象和原语,简化了消息驱动微服务应用的编写。本节概述了以下内容:

应用模型

Spring Cloud Stream 应用由一个中间件无关的核心组成。应用通过在外部消息代理暴露的目标与代码中的输入/输出参数之间建立 绑定 来与外部世界通信。建立绑定所需的特定于消息代理的详细信息由特定于中间件的 Binder 实现处理。

SCSt with binder
图 1. Spring Cloud Stream 应用

Fat JAR(可执行 JAR 包)

Spring Cloud Stream 应用可以在您的 IDE 中以独立模式运行以进行测试。要在生产环境中运行 Spring Cloud Stream 应用,您可以使用 Maven 或 Gradle 提供的标准 Spring Boot 工具创建一个可执行的(或称为“胖”)JAR 包。有关更多详细信息,请参见Spring Boot 参考指南

Binder 抽象

Spring Cloud Stream 提供了针对 KafkaRabbit MQ 的 Binder 实现。框架还包含一个用于对您的应用进行集成测试的测试 Binder,作为 Spring Cloud Stream 应用。有关更多详细信息,请参见测试部分。

Binder 抽象也是框架的扩展点之一,这意味着您可以在 Spring Cloud Stream 之上实现自己的 Binder。在 如何从头开始创建 Spring Cloud Stream Binder 这篇文章中,一位社区成员详细记录了实现自定义 Binder 所需的一系列步骤,并提供了示例。这些步骤也在 实现自定义 Binder 部分中有所强调。

Spring Cloud Stream 使用 Spring Boot 进行配置,Binder 抽象使得 Spring Cloud Stream 应用在连接到中间件方面具有灵活性。例如,部署者可以在运行时动态选择外部目标(如 Kafka 主题或 RabbitMQ 交换机)与消息处理器的输入和输出(如函数的输入参数及其返回值)之间的映射关系。此类配置可以通过外部配置属性提供,并且可以是 Spring Boot 支持的任何形式(包括应用参数、环境变量以及 application.ymlapplication.properties 文件)。在Spring Cloud Stream 介绍部分中的 Sink 示例中,将 spring.cloud.stream.bindings.input.destination 应用属性设置为 raw-sensor-data 将使其从 raw-sensor-data Kafka 主题或绑定到 raw-sensor-data RabbitMQ 交换机的队列读取。

Spring Cloud Stream 会自动检测并使用 classpath 中找到的 Binder。您可以使用相同的代码与不同类型的中间件进行交互。为此,只需在构建时包含不同的 Binder 即可。对于更复杂的用例,您还可以将多个 Binder 打包到您的应用中,并在运行时选择 Binder(甚至为不同的绑定使用不同的 Binder)。

持久化发布/订阅支持

应用之间的通信遵循发布/订阅模型,其中数据通过共享主题进行广播。这可以在下图中看到,该图显示了一组交互式 Spring Cloud Stream 应用的典型部署。

SCSt sensors
图 2. Spring Cloud Stream 发布/订阅

传感器报告到 HTTP 端点的数据被发送到一个名为 raw-sensor-data 的公共目标。从该目标,它被一个计算时间窗口平均值的微服务应用以及另一个将原始数据导入 HDFS (Hadoop 分布式文件系统) 的微服务应用独立处理。为了处理数据,这两个应用都在运行时将该主题声明为其输入。

发布/订阅通信模型降低了生产者和消费者的复杂性,并允许在不中断现有流程的情况下向拓扑结构添加新应用。例如,在计算平均值的应用下游,您可以添加一个应用来计算最高温度值以供显示和监控。然后,您可以添加另一个应用来解释相同的平均值流以进行故障检测。通过共享主题而不是点对点队列进行所有通信,可以减少微服务之间的耦合。

虽然发布/订阅消息传递的概念并不新颖,但 Spring Cloud Stream 更进一步,将其作为其应用模型的规定性选择。通过使用原生中间件支持,Spring Cloud Stream 还简化了在不同平台中使用发布/订阅模型的方式。

消费者组

虽然发布/订阅模型使得通过共享主题连接应用变得容易,但通过创建给定应用的多个实例来实现横向扩展同样重要。在这种情况下,应用的各个实例之间处于竞争消费者关系,其中只有一个实例预期会处理给定的消息。

Spring Cloud Stream 通过消费者组的概念来模拟这种行为。(Spring Cloud Stream 的消费者组类似于 Kafka 的消费者组,并受到其启发。)每个消费者绑定都可以使用 spring.cloud.stream.bindings.<bindingName>.group 属性来指定组名。对于下图中显示的消费者,该属性将设置为 spring.cloud.stream.bindings.<bindingName>.group=hdfsWritespring.cloud.stream.bindings.<bindingName>.group=average

SCSt groups
图 3. Spring Cloud Stream 消费者组

所有订阅给定目标的组都会收到发布数据的一个副本,但每个组中只有一名成员会收到来自该目标的给定消息。默认情况下,当未指定组时,Spring Cloud Stream 会将应用分配给一个匿名且独立的单成员消费者组,该组与所有其他消费者组处于发布/订阅关系中。

消费者类型

支持两种类型的消费者:

  • 消息驱动型(有时称为异步型)

  • 轮询型(有时称为同步型)

在版本 2.0 之前,仅支持异步消费者。只要消息可用且有线程可处理,消息就会被立即传递。

当您希望控制消息处理的速度时,您可能希望使用同步消费者。

持久性

与 Spring Cloud Stream 的规定性应用模型一致,消费者组订阅是持久化的。也就是说,Binder 实现确保组订阅是持久的,并且一旦为一个组创建了至少一个订阅,该组就会收到消息,即使在组中的所有应用都停止时发送的消息也能收到。

匿名订阅本质上是非持久化的。对于某些 Binder 实现(例如 RabbitMQ),可以有非持久化的组订阅。

通常,在将应用绑定到给定目标时,最好始终指定一个消费者组。横向扩展 Spring Cloud Stream 应用时,必须为其每个输入绑定指定一个消费者组。这样做可以防止应用的实例接收重复的消息(除非需要这种行为,但这并不常见)。

分区支持

Spring Cloud Stream 支持在给定应用的多个实例之间对数据进行分区。在分区场景中,物理通信媒介(如消息代理主题)被视为结构化为多个分区。一个或多个生产者应用实例将数据发送到多个消费者应用实例,并确保具有共同特征的数据由同一个消费者实例处理。

Spring Cloud Stream 提供了一个通用抽象,用于以统一的方式实现分区处理用例。因此,无论消息代理本身是否天然支持分区(例如 Kafka)或不支持(例如 RabbitMQ),都可以使用分区功能。

SCSt partitioning
图 4. Spring Cloud Stream 分区

分区是状态处理中的一个关键概念,在状态处理中,确保所有相关数据一起处理至关重要(无论是出于性能还是一致性原因)。例如,在时间窗口平均值计算示例中,来自任何给定传感器的所有测量数据都由同一个应用实例处理非常重要。

要设置分区处理场景,您必须同时配置数据生产端和数据消费端。

编程模型

要理解编程模型,您应该熟悉以下核心概念:

  • 目标 Binder: 负责提供与外部消息系统集成的组件。

  • 绑定: 连接外部消息系统与应用提供的消息 生产者消费者(由目标 Binder 创建)之间的桥梁。

  • 消息: 生产者和消费者用于与目标 Binder(以及通过外部消息系统与其他应用)通信的规范数据结构。

SCSt overview

目标 Binder

目标 Binder 是 Spring Cloud Stream 的扩展组件,负责提供必要的配置和实现,以促进与外部消息系统的集成。这种集成负责连接、消息到生产者和消费者的委托和路由、数据类型转换、用户代码调用等等。

Binder 处理了许多本应由您承担的样板职责。然而,为了实现这一点,Binder 仍然需要用户提供一些帮助,以最小但必需的指令集形式,这通常以某种类型的 绑定 配置出现。

虽然本节的范围不包括讨论所有可用的 Binder 和绑定配置选项(手册的其余部分将广泛介绍它们),但 绑定 作为概念确实需要特别关注。下一节将详细讨论它。

绑定

如前所述,绑定 提供了外部消息系统(例如队列、主题等)与应用提供的消息 生产者消费者 之间的桥梁。

以下示例展示了一个完整配置并正常运行的 Spring Cloud Stream 应用,该应用接收消息的有效载荷(payload)为 String 类型(参见 内容类型协商 部分),将其记录到控制台,并在将其转换为大写后发送到下游。

@SpringBootApplication
public class SampleApplication {

	public static void main(String[] args) {
		SpringApplication.run(SampleApplication.class, args);
	}

	@Bean
	public Function<String, String> uppercase() {
	    return value -> {
	        System.out.println("Received: " + value);
	        return value.toUpperCase();
	    };
	}
}

上述示例看起来与任何普通的 Spring Boot 应用没有什么不同。它定义了一个 Function 类型的 bean,仅此而已。那么,它是如何成为一个 Spring Cloud Stream 应用的呢?它之所以成为 Spring Cloud Stream 应用,仅仅是因为 classpath 中存在 Spring Cloud Stream 和 Binder 的依赖及自动配置类,从而有效地将您的 Boot 应用上下文设置为一个 Spring Cloud Stream 应用。在这种上下文下,SupplierFunctionConsumer 类型的 bean 被视为实际的消息处理器,根据特定的命名约定和规则触发与提供的 Binder 公开的目标的绑定,从而避免额外的配置。

绑定和绑定名称

绑定是一个抽象,它代表了 Binder 公开的源和目标与用户代码之间的桥梁。这个抽象有一个名称,虽然我们尽最大努力限制运行 Spring Cloud Stream 应用所需的配置,但在需要额外的按绑定配置的情况下,了解此类名称是必要的。

在本手册中,您将看到诸如 spring.cloud.stream.bindings.input.destination=myQueue 之类的配置属性示例。此属性名称中的 input 部分就是我们所说的 绑定名称,它可以通过多种机制派生而来。以下子节将描述 Spring Cloud Stream 用于控制绑定名称的命名约定和配置元素。

如果您的绑定名称包含特殊字符,例如 . 字符,您需要用方括号 ([]) 将绑定键括起来,然后再用引号包裹。例如:spring.cloud.stream.bindings."[my.output.binding.key]".destination
函数式绑定名称

与 Spring Cloud Stream 早期版本中使用的基于注解的支持(遗留方式)所需的显式命名不同,函数式编程模型在绑定命名方面默认采用简单的约定,从而极大地简化了应用配置。让我们看第一个例子:

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<String, String> uppercase() {
	    return value -> value.toUpperCase();
	}
}

在前面的例子中,我们有一个应用,其中包含一个充当消息处理器的函数。作为一个 Function,它有一个输入和一个输出。用于命名输入和输出绑定的命名约定如下:

  • 输入 - <函数名> + -in- + <索引>

  • 输出 - <函数名> + -out- + <索引>

inout 对应于绑定的类型(例如 输入输出)。index 是输入或输出绑定的索引。对于典型的单输入/输出函数,它总是 0,因此它只与 具有多个输入和输出参数的函数 相关。

因此,如果例如您想将此函数的输入映射到一个名为 "my-topic" 的远程目标(例如主题、队列等),您将使用以下属性进行配置:

--spring.cloud.stream.bindings.uppercase-in-0.destination=my-topic

注意 uppercase-in-0 如何用作属性名称的一部分。uppercase-out-0 也一样。

描述性绑定名称

有时为了提高可读性,您可能希望给您的绑定一个更具描述性的名称(例如 'account', 'orders' 等)。另一种看待它的方式是,您可以将一个 隐式绑定名称 映射到一个 显式绑定名称。您可以使用 spring.cloud.stream.function.bindings.<绑定名称> 属性来实现这一点。此属性也为依赖需要显式名称的自定义基于接口的绑定的现有应用提供了迁移路径。

例如,

--spring.cloud.stream.function.bindings.uppercase-in-0=input

在前面的示例中,您映射并有效地将绑定名称 uppercase-in-0 重命名为 input。现在所有配置属性都可以引用 input 绑定名称(例如,--spring.cloud.stream.bindings.input.destination=my-topic)。

虽然描述性的绑定名称可以增强配置的可读性,但它们通过将隐式绑定名称映射到显式绑定名称,也引入了另一层误导。并且由于所有后续配置属性都将使用显式绑定名称,您必须始终引用此 'bindings' 属性来关联它实际对应哪个函数。我们认为在大多数情况下(函数式组合除外),这可能是过度设计,因此,我们建议完全避免使用它,特别是因为不使用它可以在绑定器目标和绑定名称之间提供清晰的路径,例如 spring.cloud.stream.bindings.uppercase-in-0.destination=sample-topic,您可以通过它清楚地将 uppercase 函数的输入与 sample-topic 目标关联起来。

有关属性和其他配置选项的更多信息,请参阅配置选项部分。

显式绑定创建

在上一节中,我们解释了如何根据您的应用程序提供的 FunctionSupplierConsumer bean 的名称隐式创建绑定。然而,有时您可能需要显式创建绑定,这些绑定不与任何函数关联。这通常是为了通过 StreamBridge 支持与其他框架的集成。

Spring Cloud Stream 允许您通过 spring.cloud.stream.input-bindingsspring.cloud.stream.output-bindings 属性显式定义输入和输出绑定。请注意属性名称中的复数形式,这允许您通过简单地使用 ; 作为分隔符来定义多个绑定。请看下面的测试用例作为示例。

@Test
public void testExplicitBindings() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
		TestChannelBinderConfiguration.getCompleteConfiguration(EmptyConfiguration.class))
				.web(WebApplicationType.NONE)
				.run("--spring.jmx.enabled=false",
					"--spring.cloud.stream.input-bindings=fooin;barin",
					"--spring.cloud.stream.output-bindings=fooout;barout")) {


	. . .
	}
}

@EnableAutoConfiguration
@Configuration
public static class EmptyConfiguration {
}

正如您所见,我们声明了两个输入绑定和两个输出绑定,而我们的配置中没有定义任何函数,但我们仍然能够成功创建这些绑定并访问它们相应的通道。

生产和消费消息

您只需编写函数并将其公开为 @Bean,即可编写 Spring Cloud Stream 应用程序。您还可以使用基于 Spring Integration 注解的配置或基于 Spring Cloud Stream 注解的配置,尽管从 spring-cloud-stream 3.x 开始,我们推荐使用函数式实现。

Spring Cloud Function 支持

概述

自 Spring Cloud Stream v2.1 起,定义流处理器的另一种替代方法是使用对Spring Cloud Function的内置支持,它们可以表示为类型为 java.util.function.[Supplier/Function/Consumer] 的 bean。

要指定将哪个函数式 bean 绑定到绑定公开的外部目标,您必须提供 spring.cloud.function.definition 属性。

如果您只有类型为 java.util.function.[Supplier/Function/Consumer] 的单个 bean,则可以跳过 spring.cloud.function.definition 属性,因为此类函数式 bean 将被自动发现。然而,最佳实践是使用此属性以避免任何混淆。有时这种自动发现可能会带来麻烦,因为类型为 java.util.function.[Supplier/Function/Consumer] 的单个 bean 可能用于消息处理以外的目的,但由于是单个 bean,它会被自动发现并自动绑定。对于这些罕见情况,您可以通过提供 spring.cloud.stream.function.autodetect 属性并将其值设置为 false 来禁用自动发现。

以下是应用程序将消息处理器公开为 java.util.function.Function 的示例,通过充当数据的消费者和生产者,有效支持*直通*语义。

@SpringBootApplication
public class MyFunctionBootApp {

	public static void main(String[] args) {
		SpringApplication.run(MyFunctionBootApp.class);
	}

	@Bean
	public Function<String, String> toUpperCase() {
		return s -> s.toUpperCase();
	}
}

在前面的示例中,我们定义了一个名为 toUpperCasejava.util.function.Function 类型的 bean,它充当消息处理器,其 'input' 和 'output' 必须绑定到提供的目标绑定器公开的外部目标。默认情况下,'input' 和 'output' 绑定名称将是 toUpperCase-in-0toUpperCase-out-0。有关用于建立绑定名称的命名约定的详细信息,请参阅函数式绑定名称部分。

以下是支持其他语义的简单函数式应用程序示例

以下是将*源*语义公开为 java.util.function.Supplier 的示例

@SpringBootApplication
public static class SourceFromSupplier {

	@Bean
	public Supplier<Date> date() {
		return () -> new Date(12345L);
	}
}

以下是将*接收器语义*公开为 java.util.function.Consumer 的示例

@SpringBootApplication
public static class SinkFromConsumer {

	@Bean
	public Consumer<String> sink() {
		return System.out::println;
	}
}
提供者 (源)

就触发方式而言,FunctionConsumer 都相当直接。它们是根据发送到它们绑定目标的数据(事件)触发的。换句话说,它们是经典的事件驱动组件。

然而,Supplier 在触发方面属于其独有的类别。因为它根据定义是数据的来源(起点),所以它不订阅任何入站目标,因此必须由其他机制触发。还有一个 Supplier 实现的问题,它可以是*命令式*或*响应式*的,这与此类提供者的触发直接相关。

考虑以下示例

@SpringBootApplication
public static class SupplierConfiguration {

	@Bean
	public Supplier<String> stringSupplier() {
		return () -> "Hello from Supplier";
	}
}

前面的 Supplier bean 在其 get() 方法被调用时生成一个字符串。然而,谁调用这个方法以及调用频率如何?框架提供了一个默认的轮询机制(回答了“谁?”的问题),该机制将触发提供者的调用,默认情况下它每秒都会执行一次(回答了“频率如何?”的问题)。换句话说,以上配置每秒生成一条消息,并且每条消息都发送到绑定器公开的 output 目标。要了解如何自定义轮询机制,请参阅轮询配置属性部分。

考虑另一个示例

@SpringBootApplication
public static class SupplierConfiguration {

    @Bean
    public Supplier<Flux<String>> stringSupplier() {
        return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(1000);
                    return "Hello from Supplier";
                } catch (Exception e) {
                    // ignore
                }
            }
        })).subscribeOn(Schedulers.elastic()).share();
    }
}

前面的 Supplier bean 采用了响应式编程风格。通常,与命令式提供者不同,它应该只被触发一次,因为其 get() 方法的调用会产生(提供)连续的消息流而不是单个消息。

框架识别出编程风格的差异,并保证此类提供者只被触发一次。

然而,想象一下这样的用例:您想轮询某个数据源并返回表示结果集的有限数据流。响应式编程风格是此类提供者的完美机制。然而,考虑到生成流的有限性,此类提供者仍然需要定期调用。

考虑以下示例,它通过生成有限数据流来模拟此类用例

@SpringBootApplication
public static class SupplierConfiguration {

	@PollableBean
	public Supplier<Flux<String>> stringSupplier() {
		return () -> Flux.just("hello", "bye");
	}
}

该 bean 本身使用 PollableBean 注解进行标注(它是 @Bean 的一个子集),从而向框架表明,尽管此类提供者的实现是响应式的,但仍需要进行轮询。

PollableBean 中定义了一个 splittable 属性,它向此注解的后处理器发出信号,表明被注解组件产生的结果必须被拆分,并且默认设置为 true。这意味着框架将拆分返回的结果,并将每个项作为单独的消息发送出去。如果这不是期望的行为,您可以将其设置为 false,此时此类提供者将简单地返回生成的 Flux 而不进行拆分。
提供者与线程
正如您现在了解到的,与由事件触发(它们有输入数据)的 FunctionConsumer 不同,Supplier 没有任何输入,因此由不同的机制 - *轮询器* - 触发,而轮询器可能具有不可预测的线程机制。虽然线程机制的细节在大多数情况下与函数的下游执行无关,但在某些情况下可能会出现问题,特别是在与可能对线程亲和性有特定期望的集成框架配合使用时。例如,依赖于存储在线程本地中的跟踪数据的Spring Cloud Sleuth。对于这些情况,我们提供了另一种通过 StreamBridge 的机制,用户可以对线程机制有更多控制。您可以在发送任意数据到输出(例如,外部事件驱动源)部分获取更多详细信息。
消费者 (响应式)

响应式 Consumer 有点特别,因为它返回类型是 void,使得框架无法获得用于订阅的引用。您很可能不需要编写 Consumer<Flux<?>>,而是将其编写为 Function<Flux<?>, Mono<Void>>,并在流上将 then 运算符作为最后一个运算符调用。

例如

public Function<Flux<?>, Mono<Void>> consumer() {
	return flux -> flux.map(..).filter(..).then();
}

但是如果您确实需要编写显式的 Consumer<Flux<?>>,请记住订阅输入的 Flux。

此外,请记住,在混合响应式和命令式函数时,同样的规则也适用于函数组合。Spring Cloud Function 确实支持响应式函数与命令式函数组合,但您必须注意某些限制。例如,假设您将响应式函数与命令式消费者组合。这种组合的结果是响应式 Consumer。然而,正如本节前面讨论的,无法订阅此类消费者,因此此限制只能通过使您的消费者成为响应式的并手动订阅(如前所述),或将您的函数更改为命令式来解决。

轮询配置属性

Spring Cloud Stream 公开了以下属性,这些属性以 spring.integration.poller. 为前缀。

fixedDelay

默认轮询器的固定延迟,单位为毫秒。

默认值:1000L。

maxMessagesPerPoll

默认轮询器每次轮询事件的最大消息数。

默认值:1L。

cron

Cron Trigger 的 Cron 表达式值。

默认值:无。

initialDelay

周期性触发器的初始延迟。

默认值:0。

timeUnit

应用于延迟值的 TimeUnit。

默认值:MILLISECONDS。

例如,--spring.integration.poller.fixed-delay=2000 将轮询器间隔设置为每两秒轮询一次。

每绑定轮询配置

上一节展示了如何配置一个应用于所有绑定的单个默认轮询器。虽然这与 Spring Cloud Stream 设计的微服务模型非常契合(其中每个微服务代表一个组件,例如 Supplier,因此默认轮询配置就足够了),但在某些极端情况下,您可能有多个组件需要不同的轮询配置。

对于此类情况,请使用每绑定配置轮询器的方式。例如,假设您有一个输出绑定 supply-out-0。在这种情况下,您可以使用 spring.cloud.stream.bindings.supply-out-0.producer.poller.. 前缀为此类绑定配置轮询器(例如,spring.cloud.stream.bindings.supply-out-0.producer.poller.fixed-delay=2000)。

发送任意数据到输出(例如,外部事件驱动源)

在某些情况下,数据的实际来源可能来自非绑定器的外部系统。例如,数据的来源可能是一个经典的 REST 端点。我们如何将此类源与 spring-cloud-stream 使用的函数机制桥接起来?

Spring Cloud Stream 提供了两种机制,让我们详细了解它们

在这里,对于这两个示例,我们将使用一个标准的 MVC 端点方法 delegateToSupplier,它绑定到根 Web 上下文,通过 StreamBridge 机制将传入请求委托给流。

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.output-bindings=toStream");
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("toStream", body);
	}
}

这里我们自动注入了一个 StreamBridge bean,它允许我们将数据发送到输出绑定,有效地将非流应用程序与 spring-cloud-stream 连接起来。请注意,前面的示例没有定义任何源函数(例如 Supplier bean),这使得框架没有预先创建源绑定的触发器,而这在配置包含函数 bean 的情况下是典型的。这没有问题,因为 StreamBridge 会在首次调用其 send(..) 操作时启动为不存在的绑定创建输出绑定(如果需要,还会自动配置目标),并对其进行缓存以供后续重用(有关更多详细信息,请参阅StreamBridge 和动态目标)。

然而,如果您想在初始化(启动)时预先创建输出绑定,您可以利用 spring.cloud.stream.output-bindings 属性,在该属性中声明源的名称。提供的名称将用作创建源绑定的触发器。您可以使用 ; 表示多个源(多个输出绑定)(例如,--spring.cloud.stream.output-bindings=foo;bar)。

此外,请注意 streamBridge.send(..) 方法接受一个 Object 作为数据。这意味着您可以向其发送 POJO 或 Message,它在发送输出时会经历与来自任何 Function 或 Supplier 时相同的流程,从而提供与函数相同的级别的一致性。这意味着输出类型转换、分区等都将像函数产生的输出一样被尊重和处理。

StreamBridge 与动态目标

StreamBridge 也可以用于输出目标事先未知的情况,这类似于从消费者路由部分描述的用例。

让我们看一个示例

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, args);
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("myDestination", body);
	}
}

正如您所见,前面的示例与前一个示例非常相似,不同之处在于没有通过 spring.cloud.stream.output-bindings 属性提供显式绑定指令(此处未提供)。这里我们将数据发送到名称为 myDestination 的目标,该名称作为绑定不存在。因此,此类名称将被视为动态目标,如从消费者路由部分所述。

在前面的示例中,我们使用 ApplicationRunner 作为*外部源*来为流提供数据。

一个更实际的示例,其中外部源是 REST 端点。

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class);
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		streamBridge.send("myBinding", body);
	}
}

正如您所见,在 delegateToSupplier 方法内部,我们使用 StreamBridge 将数据发送到 myBinding 绑定。在这里,您也受益于 StreamBridge 的动态特性,如果 myBinding 不存在,它将自动创建并缓存,否则将使用现有绑定。

缓存动态目标(绑定)可能导致内存泄漏,如果存在许多动态目标。为了进行一定程度的控制,我们为输出绑定提供了一个自淘汰缓存机制,默认缓存大小为 10。这意味着如果您的动态目标数量超过此数值,则有可能现有绑定会被淘汰,从而需要重新创建,这可能会导致轻微的性能下降。您可以通过 spring.cloud.stream.dynamic-destination-cache-size 属性将缓存大小设置为所需的值来增加缓存大小。
curl -H "Content-Type: text/plain" -X POST -d "hello from the other side" http://localhost:8080/

通过展示这两个示例,我们想强调这种方法适用于任何类型的外部源。

如果您使用的是 Solace PubSub+ 绑定器,Spring Cloud Stream 保留了 scst_targetDestination 头部(可通过 BinderHeaders.TARGET_DESTINATION 获取),该头部允许将消息从其绑定配置的目标重定向到此头部指定的目标。这使得绑定器能够管理发布到动态目标所需的资源,从而减轻了框架的负担,并避免了之前注释中提到的缓存问题。更多信息请见此处
使用 StreamBridge 的输出内容类型

如果需要,您还可以使用以下方法签名 public boolean send(String bindingName, Object data, MimeType outputContentType) 提供特定的内容类型。或者,如果您将数据作为 Message 发送,其内容类型将得到尊重。

使用 StreamBridge 指定绑定器类型

Spring Cloud Stream 支持多种绑定器场景。例如,您可能从 Kafka 接收数据并将其发送到 RabbitMQ。

有关多种绑定器场景的更多信息,请参阅绑定器部分,特别是Classpath 上的多种绑定器

如果您计划使用 StreamBridge 并在应用程序中配置了多个绑定器,则还必须告诉 StreamBridge 使用哪个绑定器。为此,send 方法还有两个变体

public boolean send(String bindingName, @Nullable String binderType, Object data)

public boolean send(String bindingName, @Nullable String binderType, Object data, MimeType outputContentType)

正如您所见,您可以提供一个附加参数 - binderType,用于告诉 BindingService 在创建动态绑定时使用哪个绑定器。

对于使用 spring.cloud.stream.output-bindings 属性的情况,或者绑定已在不同的绑定器下创建的情况,binderType 参数将无效。
使用 StreamBridge 的通道拦截器

由于 StreamBridge 使用 MessageChannel 建立输出绑定,因此您可以在通过 StreamBridge 发送数据时激活通道拦截器。由应用程序决定哪些通道拦截器应用于 StreamBridge。Spring Cloud Stream 不会将所有检测到的通道拦截器注入到 StreamBridge 中,除非它们使用 @GlobalChannelInterceptor(patterns = "*") 进行标注。

假设您的应用程序中有以下两个不同的 StreamBridge 绑定。

streamBridge.send("foo-out-0", message);

以及

streamBridge.send("bar-out-0", message);

现在,如果您想让通道拦截器应用于这两个 StreamBridge 绑定,那么您可以声明以下 GlobalChannelInterceptor bean。

@Bean
@GlobalChannelInterceptor(patterns = "*")
public ChannelInterceptor customInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

但是,如果您不喜欢上面的全局方法,并且想为每个绑定设置一个专用的拦截器,那么您可以这样做。

@Bean
@GlobalChannelInterceptor(patterns = "foo-*")
public ChannelInterceptor fooInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

以及

@Bean
@GlobalChannelInterceptor(patterns = "bar-*")
public ChannelInterceptor barInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

您可以灵活地使模式更严格或根据您的业务需求进行定制。

通过这种方法,应用程序可以决定将哪些拦截器注入到 StreamBridge 中,而不是应用所有可用的拦截器。

StreamBridge 通过 StreamOperations 接口提供了一个契约,该接口包含了 StreamBridge 的所有 send 方法。因此,应用程序可以选择使用 StreamOperations 进行自动注入。这对于通过为 StreamOperations 接口提供模拟或类似机制来单元测试使用 StreamBridge 的代码非常方便。
响应式函数支持

由于 *Spring Cloud Function* 构建在 Project Reactor 之上,因此在实现 SupplierFunctionConsumer 时,您无需做太多即可受益于响应式编程模型。

例如

@SpringBootApplication
public static class SinkFromConsumer {

	@Bean
	public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
		return flux -> flux.map(val -> val.toUpperCase());
	}
}

在选择响应式或命令式编程模型时,必须理解一些重要事项。

完全响应式还是仅 API?

使用响应式 API 不一定意味着您可以从该 API 的所有响应式特性中受益。换句话说,背压等高级特性只有在与兼容系统(例如 Reactive Kafka 绑定器)一起使用时才能发挥作用。如果您使用的是常规 Kafka、Rabbit 或任何其他非响应式绑定器,您只能从响应式 API 本身的便利性中受益,而不能从其高级特性中受益,因为流的实际源或目标不是响应式的。

错误处理和重试

在本手册中,您将看到一些关于基于框架的错误处理、重试及其他特性以及与其相关的配置属性的引用。重要的是要理解,它们仅影响命令式函数,对于响应式函数,您不应抱有同样的期望。原因如下:响应式函数和命令式函数之间存在根本区别。命令式函数是一个由框架在接收到每条消息时调用的*消息处理器*。因此,对于 N 条消息,此类函数将被调用 N 次,正因为如此,我们可以包装此类函数并添加额外的功能,如错误处理、重试等。响应式函数是*初始化函数*。它只被调用一次,以获取用户提供的 Flux/Mono 的引用,该引用将与框架提供的引用连接起来。之后,我们(框架)对流完全没有可见性或控制权。因此,对于响应式函数,在错误处理和重试方面(即 doOnError(), .onError*() 等),您必须依赖于响应式 API 的丰富性。

函数式组合

使用函数式编程模型,您还可以受益于函数式组合,通过它您可以从一组简单函数动态组合复杂的处理器。例如,让我们将以下函数 bean 添加到上面定义的应用程序中

@Bean
public Function<String, String> wrapInQuotes() {
	return s -> "\"" + s + "\"";
}

并修改 spring.cloud.function.definition 属性以反映您从 ‘toUpperCase’ 和 ‘wrapInQuotes’ 组合一个新函数的意图。为此,Spring Cloud Function 依赖于 |(管道)符号。因此,为了完成我们的示例,我们的属性现在看起来像这样

--spring.cloud.function.definition=toUpperCase|wrapInQuotes
Spring Cloud Function 提供的函数式组合支持的一个巨大优势是,您可以组合*响应式*和*命令式*函数。

组合的结果是一个单一函数,正如您可能猜到的那样,它可能有一个非常长且相当难以理解的名称(例如,foo|bar|baz|xyz. . .),这在涉及其他配置属性时会带来很大的不便。这就是函数式绑定名称部分描述的*描述性绑定名称*特性可以提供帮助的地方。

例如,如果我们想给我们的 toUpperCase|wrapInQuotes 一个更具描述性的名称,我们可以使用以下属性来实现:spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-in-0=quotedUpperCaseInput,从而允许其他配置属性引用该绑定名称(例如,spring.cloud.stream.bindings.quotedUpperCaseInput.destination=myDestination)。

函数式组合与横切关注点

函数组合有效地允许您通过将其分解为一组简单且可单独管理/测试的组件来解决复杂性,这些组件在运行时仍然可以表示为一个整体。但这并不是唯一的好处。

您还可以使用组合来处理某些横切的非功能性关注点,例如内容丰富。例如,假设您收到一条传入消息,该消息可能缺少某些头部,或者某些头部不处于您的业务函数所期望的精确状态。现在,您可以实现一个单独的函数来解决这些问题,然后将其与主要的业务函数组合起来。

让我们看一个示例

@SpringBootApplication
public class DemoStreamApplication {

	public static void main(String[] args) {
		SpringApplication.run(DemoStreamApplication.class,
				"--spring.cloud.function.definition=enrich|echo",
				"--spring.cloud.stream.function.bindings.enrich|echo-in-0=input",
				"--spring.cloud.stream.bindings.input.destination=myDestination",
				"--spring.cloud.stream.bindings.input.group=myGroup");

	}

	@Bean
	public Function<Message<String>, Message<String>> enrich() {
		return message -> {
			Assert.isTrue(!message.getHeaders().containsKey("foo"), "Should NOT contain 'foo' header");
			return MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
		};
	}

	@Bean
	public Function<Message<String>, Message<String>> echo() {
		return message -> {
			Assert.isTrue(message.getHeaders().containsKey("foo"), "Should contain 'foo' header");
			System.out.println("Incoming message " + message);
			return message;
		};
	}
}

虽然简单,但此示例展示了一个函数如何用附加头部(非功能性关注点)丰富传入的消息,以便另一个函数 - echo - 可以从中受益。echo 函数保持简洁,只专注于业务逻辑。您还可以看到使用 spring.cloud.stream.function.bindings 属性来简化组合绑定名称。

具有多个输入和输出参数的函数

从 3.0 版本开始,Spring Cloud Stream 支持具有多个输入和/或多个输出(返回值)的函数。这到底意味着什么,以及它针对哪类用例?

  • 大数据:想象一下您正在处理的数据源高度无组织,包含各种类型的数据元素(例如订单、交易等),您需要有效地对其进行整理。

  • 数据聚合:另一个用例可能需要您合并来自 2 个或更多传入_流的数据元素。.

上面描述的只是少数几个用例,您可能需要在这些用例中使用单个函数来接收和/或产生多个数据*流*。这就是我们在这里针对的用例类型。

此外,请注意此处对*流*概念略有不同的强调。假设此类函数只有在能够访问实际的数据流(而非单个元素)时才具有价值。因此,为此我们依赖于Project Reactor提供的抽象(即 FluxMono),这些抽象作为 spring-cloud-functions 引入的依赖项的一部分已在 classpath 中可用。

另一个重要方面是多个输入和输出的表示。虽然 Java 提供了各种不同的抽象来表示*多个事物*,但这些抽象是 *a) 无界的*、*b) 缺乏元数*,并且 *c) 缺乏类型信息*,这些在此上下文中都很重要。例如,让我们看看 Collection 或数组,它们只允许我们描述单个类型的*多个*或将所有内容向上转换为 Object,这会影响 Spring Cloud Stream 的透明类型转换特性等等。

因此,为了满足所有这些要求,初始支持依赖于使用 Project Reactor 提供的另一种抽象 - Tuples 的签名。然而,我们正在努力允许更灵活的签名。

请参阅绑定与绑定名称部分,了解此类应用程序用于建立*绑定名称*的命名约定。

让我们看几个示例

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<String>> gather() {
		return tuple -> {
			Flux<String> stringStream = tuple.getT1();
			Flux<String> intStream = tuple.getT2().map(i -> String.valueOf(i));
			return Flux.merge(stringStream, intStream);
		};
	}
}

上面的示例演示了一个函数,它接受两个输入(第一个类型为 String,第二个类型为 Integer)并产生一个类型为 String 的输出。

因此,对于上面的示例,两个输入绑定将是 gather-in-0gather-in-1,为了保持一致性,输出绑定也遵循相同的约定,名称为 gather-out-0

了解这一点将允许您设置绑定特定的属性。例如,以下配置将覆盖 gather-in-0 绑定的内容类型

--spring.cloud.stream.bindings.gather-in-0.content-type=text/plain
@SpringBootApplication
public class SampleApplication {

	@Bean
	public static Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> scatter() {
		return flux -> {
			Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
			UnicastProcessor even = UnicastProcessor.create();
			UnicastProcessor odd = UnicastProcessor.create();
			Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0).doOnNext(number -> even.onNext("EVEN: " + number));
			Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0).doOnNext(number -> odd.onNext("ODD: " + number));

			return Tuples.of(Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()), Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()));
		};
	}
}

上面的示例与前一个示例有些相反,它演示了一个函数,该函数接受一个类型为 Integer 的输入并产生两个输出(都类型为 String)。

因此,对于上面的示例,输入绑定是 scatter-in-0,输出绑定是 scatter-out-0scatter-out-1

您可以使用以下代码进行测试

@Test
public void testSingleInputMultiOutput() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					SampleApplication.class))
							.run("--spring.cloud.function.definition=scatter")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		for (int i = 0; i < 10; i++) {
			inputDestination.send(MessageBuilder.withPayload(String.valueOf(i).getBytes()).build());
		}

		int counter = 0;
		for (int i = 0; i < 5; i++) {
			Message<byte[]> even = outputDestination.receive(0, 0);
			assertThat(even.getPayload()).isEqualTo(("EVEN: " + String.valueOf(counter++)).getBytes());
			Message<byte[]> odd = outputDestination.receive(0, 1);
			assertThat(odd.getPayload()).isEqualTo(("ODD: " + String.valueOf(counter++)).getBytes());
		}
	}
}
单个应用程序中的多个函数

也可能需要在单个应用程序中将多个消息处理器进行分组。您可以通过定义多个函数来实现这一点。

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<String, String> uppercase() {
		return value -> value.toUpperCase();
	}

	@Bean
	public Function<String, String> reverse() {
		return value -> new StringBuilder(value).reverse().toString();
	}
}

在上面的示例中,我们有一个配置,它定义了两个函数 uppercasereverse。所以首先,如前所述,我们需要注意到存在冲突(函数多于一个),因此需要通过提供指向我们希望绑定的实际函数的 spring.cloud.function.definition 属性来解决它。不同之处在于,在这里我们将使用 ; 分隔符来指向这两个函数(参见下面的测试用例)。

与具有多个输入/输出的函数一样,请参阅绑定与绑定名称部分,了解此类应用程序用于建立*绑定名称*的命名约定。

您可以使用以下代码进行测试

@Test
public void testMultipleFunctions() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					ReactiveFunctionConfiguration.class))
							.run("--spring.cloud.function.definition=uppercase;reverse")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
		inputDestination.send(inputMessage, "uppercase-in-0");
		inputDestination.send(inputMessage, "reverse-in-0");

		Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());

		outputMessage = outputDestination.receive(0, "reverse-out-1");
		assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
	}
}
批量消费者

使用支持批处理监听器的 MessageChannelBinder 时,如果消费者绑定启用了该特性,可以将 spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode 设置为 true,以将整批消息以 List 的形式传递给函数。

@Bean
public Function<List<Person>, Person> findFirstPerson() {
    return persons -> persons.get(0);
}
批处理生产者

您也可以在生产者端使用批处理的概念,通过返回一个消息集合来实现。这实际上提供了一种相反的效果,即集合中的每条消息都将由绑定器单独发送。

考虑以下函数

@Bean
public Function<String, List<Message<String>>> batch() {
	return p -> {
		List<Message<String>> list = new ArrayList<>();
		list.add(MessageBuilder.withPayload(p + ":1").build());
		list.add(MessageBuilder.withPayload(p + ":2").build());
		list.add(MessageBuilder.withPayload(p + ":3").build());
		list.add(MessageBuilder.withPayload(p + ":4").build());
		return list;
	};
}

返回列表中的每条消息都将单独发送,最终发送四条消息到输出目标。

将 Spring Integration 流作为函数

当您实现一个函数时,您可能有一些复杂的、符合企业集成模式 (EIP) 类别要求。使用 Spring Integration (SI) 这样的框架最适合处理这些需求,SI 是 EIP 的参考实现。

值得庆幸的是,SI 已经提供了通过集成流作为网关来将集成流暴露为函数的功能。考虑以下示例

@SpringBootApplication
public class FunctionSampleSpringIntegrationApplication {

	public static void main(String[] args) {
		SpringApplication.run(FunctionSampleSpringIntegrationApplication.class, args);
	}

	@Bean
	public IntegrationFlow uppercaseFlow() {
		return IntegrationFlows.from(MessageFunction.class, "uppercase")
				.<String, String>transform(String::toUpperCase)
				.logAndReply(LoggingHandler.Level.WARN);
	}

	public interface MessageFunction extends Function<Message<String>, Message<String>> {

	}
}

对于熟悉 SI 的用户,您可以看到我们定义了一个类型为 IntegrationFlow 的 bean,在此我们声明了一个希望将其暴露为名为 uppercaseFunction<String, String>(使用 SI DSL)的集成流。MessageFunction 接口允许我们明确声明输入和输出的类型以便进行正确的类型转换。有关类型转换的更多信息,请参阅内容类型协商部分。

要接收原始输入,您可以使用 from(Function.class, …​)

生成的函数绑定到目标绑定器所暴露的输入和输出目标。

请参阅绑定与绑定名称部分,了解此类应用程序用于建立*绑定名称*的命名约定。

有关 Spring Integration 和 Spring Cloud Stream 之间互操作性的更多详细信息,特别是围绕函数式编程模型,您可能会发现这篇博文非常有趣,因为它深入探讨了通过融合 Spring Integration 和 Spring Cloud Stream/Functions 的最佳特性可以应用的各种模式。

使用轮询消费者

概述

使用轮询消费者时,您可以按需轮询 PollableMessageSource。要定义轮询消费者的绑定,您需要提供 spring.cloud.stream.pollable-source 属性。

考虑以下轮询消费者绑定的示例

--spring.cloud.stream.pollable-source=myDestination

前例中的轮询源名称 myDestination 将生成 myDestination-in-0 作为绑定名称,以与函数式编程模型保持一致。

鉴于前例中的轮询消费者,您可以按如下方式使用它

@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
    return args -> {
        while (someCondition()) {
            try {
                if (!destIn.poll(m -> {
                    String newPayload = ((String) m.getPayload()).toUpperCase();
                    destOut.send(new GenericMessage<>(newPayload));
                })) {
                    Thread.sleep(1000);
                }
            }
            catch (Exception e) {
                // handle failure
            }
        }
    };
}

一种不那么手动化、更像 Spring 的替代方案是配置一个计划任务 bean。例如,

@Scheduled(fixedDelay = 5_000)
public void poll() {
	System.out.println("Polling...");
	this.source.poll(m -> {
		System.out.println(m.getPayload());

	}, new ParameterizedTypeReference<Foo>() { });
}

PollableMessageSource.poll() 方法接受一个 MessageHandler 参数(通常是一个 lambda 表达式,如这里所示)。如果消息已被接收并成功处理,它将返回 true

与消息驱动的消费者一样,如果 MessageHandler 抛出异常,消息将被发布到错误通道,如 错误处理 中所述。

通常,当 MessageHandler 退出时,poll() 方法会确认消息。如果方法异常退出,消息将被拒绝(不会重新入队),但请参阅处理错误。您可以通过自己负责确认来覆盖该行为,如下例所示

@Bean
public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
    return args -> {
        while (someCondition()) {
            if (!dest1In.poll(m -> {
                StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
                // e.g. hand off to another thread which can perform the ack
                // or acknowledge(Status.REQUEUE)

            })) {
                Thread.sleep(1000);
            }
        }
    };
}
您必须在某个时候 ack(或 nack)消息,以避免资源泄漏。
一些消息系统(如 Apache Kafka)在日志中维护一个简单的偏移量。如果投递失败并使用 StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE); 重新入队,任何稍后成功确认的消息都会被重新投递。

还有一个重载的 poll 方法,其定义如下

poll(MessageHandler handler, ParameterizedTypeReference<?> type)

type 是一个转换提示,允许对传入消息的有效载荷进行转换,如下例所示

boolean result = pollableSource.poll(received -> {
			Map<String, Foo> payload = (Map<String, Foo>) received.getPayload();
            ...

		}, new ParameterizedTypeReference<Map<String, Foo>>() {});
错误处理

默认情况下,为轮询源配置了一个错误通道;如果回调抛出异常,一个 ErrorMessage 将被发送到错误通道 (`<destination>.<group>.errors`);这个错误通道也被桥接到全局的 Spring Integration errorChannel

您可以使用 `@ServiceActivator` 订阅任何一个错误通道来处理错误;如果没有订阅,错误将简单地被记录日志,并且消息将被确认为成功。如果错误通道服务激活器抛出异常,消息将被拒绝(默认情况下)并且不会被重新投递。如果服务激活器抛出 RequeueCurrentMessageException,消息将在消息代理处重新入队,并在后续轮询时再次检索。

如果监听器直接抛出 RequeueCurrentMessageException,消息将按上述方式重新入队,并且不会发送到错误通道。

事件路由

事件路由在 Spring Cloud Stream 的上下文中是指 a) 将事件路由到特定的事件订阅者,或 b) 将事件订阅者产生的事件路由到特定的目标。这里我们将其称为“路由到”和“路由自”。

路由到消费者

路由可以通过依赖 Spring Cloud Function 3.0 中提供的 RoutingFunction 来实现。您只需通过 --spring.cloud.stream.function.routing.enabled=true 应用属性启用它,或提供 spring.cloud.function.routing-expression 属性。启用后,RoutingFunction 将绑定到接收所有消息的输入目标,并根据提供的指令将它们路由到其他函数。

为了绑定的目的,路由目标的名称是 functionRouter-in-0(请参阅 RoutingFunction.FUNCTION_NAME 和绑定命名约定 函数式绑定名称)。

指令可以通过单个消息或应用属性提供。

以下是几个示例

使用消息头
@SpringBootApplication
public class SampleApplication {

	public static void main(String[] args) {
		SpringApplication.run(SampleApplication.class,
                       "--spring.cloud.stream.function.routing.enabled=true");
	}

	@Bean
	public Consumer<String> even() {
		return value -> {
			System.out.println("EVEN: " + value);
		};
	}

	@Bean
	public Consumer<String> odd() {
		return value -> {
			System.out.println("ODD: " + value);
		};
    }
}

通过将消息发送到绑定器(即 rabbit, kafka)暴露的 functionRouter-in-0 目标,该消息将被路由到相应的(“even”或“odd”)消费者。

默认情况下,RoutingFunction 将查找 spring.cloud.function.definitionspring.cloud.function.routing-expression(对于更动态的 SpEL 场景)头。如果找到,其值将被视为路由指令。

例如,将 spring.cloud.function.routing-expression 头的值设置为 T(java.lang.System).currentTimeMillis() % 2 == 0 ? 'even' : 'odd' 最终会将请求半随机地路由到 oddeven 函数。此外,对于 SpEL,评估上下文的根对象是 Message,因此您也可以对单个头(或消息)进行评估,例如 …​.routing-expression=headers['type']

使用应用属性

spring.cloud.function.routing-expression 和/或 spring.cloud.function.definition 可以作为应用属性传递(例如,spring.cloud.function.routing-expression=headers['type'])。

@SpringBootApplication
public class RoutingStreamApplication {

  public static void main(String[] args) {
      SpringApplication.run(RoutingStreamApplication.class,
	  "--spring.cloud.function.routing-expression="
	  + "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
  }
  @Bean
  public Consumer<Integer> even() {
    return value -> System.out.println("EVEN: " + value);
  }

  @Bean
  public Consumer<Integer> odd() {
    return value -> System.out.println("ODD: " + value);
  }
}
通过应用属性传递指令对于响应式函数尤其重要,因为响应式函数只调用一次以传递 Publisher,因此对单个项的访问是受限的。
路由函数和输出绑定

RoutingFunction 是一个 Function,因此与其他任何函数没有区别。嗯……差不多。

RoutingFunction 路由到另一个 Function 时,其输出会发送到 RoutingFunction 的输出绑定,该绑定按预期是 functionRouter-in-0。但是如果 RoutingFunction 路由到一个 Consumer 呢?换句话说,调用 RoutingFunction 的结果可能不会产生任何需要发送到输出绑定的内容,因此甚至有输出绑定也不必要。因此,我们在创建绑定时对 RoutingFunction 进行了一些不同的处理。尽管这对用户是透明的(您实际上无需做任何事情),但了解一些机制将有助于您理解其内部工作原理。

因此,规则是:我们从不为 RoutingFunction 创建输出绑定,只创建输入绑定。所以当您路由到 Consumer 时,RoutingFunction 通过没有任何输出绑定,实际上成为了一个 Consumer。然而,如果 RoutingFunction 恰好路由到另一个会产生输出的 Function,那么 RoutingFunction 的输出绑定将动态创建,此时 RoutingFunction 将在绑定方面表现得像一个常规的 Function(同时拥有输入和输出绑定)。

路由自消费者

除了静态目标外,Spring Cloud Stream 还允许应用程序将消息发送到动态绑定的目标。这非常有用,例如当目标需要在运行时确定时。应用程序可以通过以下两种方式之一实现。

spring.cloud.stream.sendto.destination

您还可以委托框架通过指定 spring.cloud.stream.sendto.destination 头来动态解析输出目标,该头的值设置为要解析的目标名称。

考虑以下示例

@SpringBootApplication
@Controller
public class SourceWithDynamicDestination {

    @Bean
	public Function<String, Message<String>> destinationAsPayload() {
		return value -> {
			return MessageBuilder.withPayload(value)
				.setHeader("spring.cloud.stream.sendto.destination", value).build();};
	}
}

尽管微不足道,但在此示例中您可以清楚地看到,我们的输出是一个消息,其中 spring.cloud.stream.sendto.destination 头被设置为输入参数的值。框架将查阅此头,并尝试创建或发现具有该名称的目标,然后将输出发送到该目标。

如果目标名称是预先知道的,您可以像配置其他任何目标一样配置生产者属性。或者,如果您注册了一个 NewDestinationBindingCallback<> bean,它会在绑定创建之前被调用。该回调函数接收绑定器使用的扩展生产者属性的泛型类型。它有一个方法

void configure(String destinationName, MessageChannel channel, ProducerProperties producerProperties,
        T extendedProducerProperties);

以下示例展示了如何使用 RabbitMQ 绑定器

@Bean
public NewDestinationBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
    return (name, channel, props, extended) -> {
        props.setRequiredGroups("bindThisQueue");
        extended.setQueueNameGroupOnly(true);
        extended.setAutoBindDlq(true);
        extended.setDeadLetterQueueName("myDLQ");
    };
}
如果您需要支持具有多种绑定器类型的动态目标,请使用 Object 作为泛型类型,并根据需要强制转换 extended 参数。

此外,请参阅[使用 StreamBridge] 部分,了解如何利用另一种选项 (StreamBridge) 来处理类似情况。

后处理(发送消息后)

函数被调用后,其结果由框架发送到目标目的地,这实际上完成了函数调用周期。

然而,从业务角度来看,只有在此周期完成后执行一些附加任务,该周期才算完全完成。虽然这可以通过此 Stack Overflow 文章中描述的 ConsumerStreamBridge 的简单组合来实现,但自 4.0.3 版本以来,框架通过 Spring Cloud Function 项目提供的 PostProcessingFunction 提供了一种更规范的方法来解决此问题。PostProcessingFunction 是一个特殊的半标记函数,它包含一个额外的 postProcess(Message>) 方法,旨在为实现此类后处理任务提供一个位置。

package org.springframework.cloud.function.context
. . .
public interface PostProcessingFunction<I, O> extends Function<I, O> {
	default void postProcess(Message<O> result) {
	}
}

因此,您现在有两个选择。

选项 1:您可以将您的函数实现为 PostProcessingFunction,并通过实现其 postProcess(Message>) 方法来包含额外的后处理行为。

private static class Uppercase implements PostProcessingFunction<String, String> {

	@Override
	public String apply(String input) {
		return input.toUpperCase();
	}

	@Override
	public void postProcess(Message<String> result) {
		System.out.println("Function Uppercase has been successfully invoked and its result successfully sent to target destination");
	}
}
. . .
@Bean
public Function<String, String> uppercase() {
	return new Uppercase();
}

选项 2:如果您已经有一个现有函数,并且不想更改其实现或想将您的函数保留为 POJO,您可以只实现 postProcess(Message>) 方法,并将这个新的后处理函数与您的其他函数进行组合。

private static class Logger implements PostProcessingFunction<?, String> {

	@Override
	public void postProcess(Message<String> result) {
		System.out.println("Function has been successfully invoked and its result successfully sent to target destination");
	}
}
. . .
@Bean
public Function<String, String> uppercase() {
	return v -> v.toUpperCase();
}
@Bean
public Function<String, String> logger() {
	return new Logger();
}
. . .
//  and then have your function definition as such `uppercase|logger`

注意:在函数组合的情况下,只有最后一个 PostProcessingFunction 实例(如果存在)会生效。例如,假设您有以下函数定义 - foo|bar|baz,并且 foobaz 都是 PostProcessingFunction 的实例。只有 baz.postProcess(Message>) 会被调用。如果 baz 不是 PostProcessingFunction 的实例,则不会执行任何后处理功能。

有人可能会争辩说,您可以通过函数组合轻松实现这一点,只需将后处理器作为另一个 Function 进行组合即可。这确实是一种可能性,但在这种情况下,后处理功能将在前一个函数调用之后立即被调用,并在消息发送到目标目的地之前,而这发生在函数调用周期完成之前。

错误处理

在本节中,我们将解释框架提供的错误处理机制背后的总体思路。我们将使用 Rabbit 绑定器作为示例,因为不同的绑定器为底层消息代理的特定能力(例如 Kafka 绑定器)支持的某些机制定义了不同的属性集。

错误总是会发生,Spring Cloud Stream 提供了几种灵活的机制来处理它们。请注意,这些技术取决于绑定器的实现、底层消息中间件的能力以及编程模型(稍后会详细介绍)。

每当消息处理程序(函数)抛出异常时,异常会传播回绑定器,此时绑定器将使用 Spring Retry 库提供的 RetryTemplate 多次尝试重试同一条消息(默认重试 3 次)。如果重试失败,则取决于错误处理机制来决定是丢弃消息、将消息重新入队以便重新处理,还是将失败的消息发送到 DLQ

Rabbit 和 Kafka 都支持这些概念(尤其是 DLQ)。但是,其他绑定器可能不支持,因此请参阅您所使用的绑定器的文档,以了解支持的错误处理选项的详细信息。

然而,请记住,响应式函数不属于消息处理程序,因为它不处理单个消息,而是提供了一种将框架提供的流(即 Flux)与用户提供的流连接起来的方式。这为什么很重要?这是因为您在本节后面关于 Retry Template、丢弃失败消息、重试、DLQ 以及有助于实现所有这些的配置属性的任何内容,适用于消息处理程序(即命令式函数)。

响应式 API 提供了非常丰富的自身操作符和机制库,可以帮助您处理各种响应式用例特有的错误,这些用例比简单的消息处理程序用例复杂得多。因此,请使用它们,例如您可以在 reactor.core.publisher.Flux 中找到的 public final Flux<T> retryWhen(Retry retrySpec);

@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
	return flux -> flux
			.retryWhen(Retry.backoff(3, Duration.ofMillis(1000)))
			.map(v -> v.toUpperCase());
}

丢弃失败的消息

默认情况下,系统提供了错误处理程序。第一个错误处理程序将简单地记录错误消息。第二个错误处理程序是特定于绑定器的错误处理程序,负责在特定消息系统(例如,发送到 DLQ)的上下文中处理错误消息。但由于未提供额外的错误处理配置(在此当前场景中),此处理程序将不会执行任何操作。因此,本质上在记录日志后,消息将被丢弃。

尽管在某些情况下可以接受,但在大多数情况下,这是不可接受的,我们需要某种恢复机制来避免消息丢失。

处理错误消息

在上一节中,我们提到默认情况下,导致错误的消息实际上会被记录日志并丢弃。框架还提供了一种机制,允许您提供自定义错误处理程序(例如,发送通知或写入数据库等)。您可以通过添加一个专门设计用于接受 ErrorMessageConsumer 来实现,该 ErrorMessage 除了包含有关错误的所有信息(例如堆栈跟踪等)之外,还包含原始消息(触发错误的那条消息)。注意:自定义错误处理程序与框架提供的错误处理程序(即日志记录和绑定器错误处理程序 - 请参阅上一节)是互斥的,以确保它们不会相互干扰。

@Bean
public Consumer<ErrorMessage> myErrorHandler() {
	return v -> {
		// send SMS notification code
	};
}

要将此类消费者标识为错误处理程序,您只需提供指向函数名称的 error-handler-definition 属性 - spring.cloud.stream.bindings.<binding-name>.error-handler-definition=myErrorHandler

例如,对于绑定名称 uppercase-in-0,属性将如下所示

spring.cloud.stream.bindings.uppercase-in-0.error-handler-definition=myErrorHandler

如果您使用特殊的映射指令将绑定映射到更易读的名称 - spring.cloud.stream.function.bindings.uppercase-in-0=upper,那么该属性将如下所示

spring.cloud.stream.bindings.upper.error-handler-definition=myErrorHandler.
如果您不小心将此类处理程序声明为 Function,它仍然会工作,但其输出将不会做任何处理。然而,考虑到此类处理程序仍然依赖于 Spring Cloud Function 提供的功能,如果您的处理程序有一些您希望通过函数组合来解决的复杂性(尽管不太可能),您也可以从中受益于函数组合。

默认错误处理程序

如果您想为所有函数 bean 设置单个错误处理程序,可以使用标准的 spring-cloud-stream 机制来定义默认属性 spring.cloud.stream.default.error-handler-definition=myErrorHandler

DLQ - 死信队列

DLQ 可能是最常用的机制,它允许将失败的消息发送到特殊目标:死信队列。

配置后,失败的消息将被发送到此目标,以便后续重新处理或审计和对账。

考虑以下示例

@SpringBootApplication
public class SimpleStreamApplication {

	public static void main(String[] args) throws Exception {
		SpringApplication.run(SimpleStreamApplication.class,
		  "--spring.cloud.function.definition=uppercase",
		  "--spring.cloud.stream.bindings.uppercase-in-0.destination=uppercase",
		  "--spring.cloud.stream.bindings.uppercase-in-0.group=myGroup",
		  "--spring.cloud.stream.rabbit.bindings.uppercase-in-0.consumer.auto-bind-dlq=true"
		);
	}

	@Bean
	public Function<Person, Person> uppercase() {
		return personIn -> {
		   throw new RuntimeException("intentional");
	      });
		};
	}
}

提醒一下,在此示例中,属性的 uppercase-in-0 部分对应于输入目标绑定的名称。consumer 部分表示这是一个消费者属性。

使用 DLQ 时,至少必须提供 group 属性,以便正确命名 DLQ 目标。然而,`group` 通常与 `destination` 属性一起使用,如我们的示例所示。

除了一些标准属性外,我们还设置了 auto-bind-dlq,指示绑定器为 uppercase-in-0 绑定创建并配置 DLQ 目标,该绑定对应于 uppercase 目标(请参阅相应属性),这会导致一个名为 uppercase.myGroup.dlq 的额外 Rabbit 队列(有关 Kafka 特定的 DLQ 属性,请参阅 Kafka 文档)。

配置完成后,所有失败的消息都将被路由到此目标,并保留原始消息以供进一步处理。

您可以看到错误消息包含与原始错误相关的更多信息,如下所示

. . . .
x-exception-stacktrace:	org.springframework.messaging.MessageHandlingException: nested exception is
      org.springframework.messaging.MessagingException: has an error, failedMessage=GenericMessage [payload=byte[15],
      headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=input.hello, amqp_deliveryTag=1,
      deliveryAttempt=3, amqp_consumerQueue=input.hello, amqp_redelivered=false, id=a15231e6-3f80-677b-5ad7-d4b1e61e486e,
      amqp_consumerTag=amq.ctag-skBFapilvtZhDsn0k3ZmQg, contentType=application/json, timestamp=1522327846136}]
      at org.spring...integ...han...MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107)
      at. . . . .
Payload: blah

您还可以通过将 max-attempts 设置为 '1' 来促进立即分派到 DLQ(无需重试)。例如,

--spring.cloud.stream.bindings.uppercase-in-0.consumer.max-attempts=1

重试模板

在本节中,我们介绍与配置重试能力相关的配置属性。

RetryTemplateSpring Retry 库的一部分。虽然本文档的范围不包括 RetryTemplate 的所有功能,但我们将提及以下与 RetryTemplate 特别相关的消费者属性

maxAttempts

处理消息的尝试次数。

默认值:3。

backOffInitialInterval

重试时的退避初始间隔。

默认值:1000 毫秒。

backOffMaxInterval

最大退避间隔。

默认值:10000 毫秒。

backOffMultiplier

退避乘数。

默认值:2.0。

defaultRetryable

监听器抛出的未列在 retryableExceptions 中的异常是否可重试。

默认值:true

retryableExceptions

一个以 Throwable 类名为键、布尔值为值的映射。指定哪些异常(及其子类)将或不会被重试。另请参阅 defaultRetriable。示例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false

默认值:空。

虽然前面的设置对于大多数定制需求来说已经足够,但它们可能无法满足某些复杂的要求,此时您可能希望提供自己的 RetryTemplate 实例。为此,请在您的应用程序配置中将其配置为一个 bean。应用程序提供的实例将覆盖框架提供的实例。此外,为避免冲突,您必须将您希望由绑定器使用的 RetryTemplate 实例限定为 `@StreamRetryTemplate`。例如,

@StreamRetryTemplate
public RetryTemplate myRetryTemplate() {
    return new RetryTemplate();
}

正如您从上述示例中看到的,您不需要使用 `@Bean` 注解它,因为 `@StreamRetryTemplate` 是一个限定的 `@Bean`。

如果您需要对您的 RetryTemplate 更精确地控制,可以在您的 ConsumerProperties 中通过名称指定 bean,从而为每个绑定关联特定的重试 bean。

spring.cloud.stream.bindings.<foo>.consumer.retry-template-name=<your-retry-template-bean-name>

绑定器

Spring Cloud Stream 提供了一个 Binder 抽象层,用于连接外部中间件上的物理目标。本节提供了关于 Binder SPI 的主要概念、其主要组件以及实现特定细节的信息。

生产者和消费者

下图显示了生产者和消费者的一般关系

producers consumers
图 5. 生产者和消费者

生产者是任何向绑定目标发送消息的组件。绑定目标可以通过针对该消息代理的 Binder 实现绑定到外部消息代理。调用 bindProducer() 方法时,第一个参数是消息代理内部目标的名称,第二个参数是生产者发送消息的本地目标实例,第三个参数包含用于为该绑定目标创建的适配器的属性(例如分区键表达式)。

消费者是任何从绑定目标接收消息的组件。与生产者一样,消费者可以绑定到外部消息代理。调用 bindConsumer() 方法时,第一个参数是目标名称,第二个参数提供了一组逻辑消费者的名称。对于给定目标,由消费者绑定表示的每个组都会收到生产者发送到该目标的每条消息的副本(即,遵循正常的发布-订阅语义)。如果存在多个具有相同组名绑定的消费者实例,则消息会在这些消费者实例之间进行负载均衡,以便生产者发送的每条消息仅由每个组内的一个消费者实例消费(即,遵循正常的队列语义)。

Binder SPI

Binder SPI 由多个接口、开箱即用的实用程序类和发现策略组成,它们提供了一种可插拔的机制来连接外部中间件。

SPI 的关键在于 Binder 接口,它是连接输入和输出到外部中间件的策略。以下清单显示了 Binder 接口的定义

public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
    Binding<T> bindConsumer(String bindingName, String group, T inboundBindTarget, C consumerProperties);

    Binding<T> bindProducer(String bindingName, T outboundBindTarget, P producerProperties);
}

该接口是参数化的,提供了许多扩展点

  • 输入和输出绑定目标。

  • 扩展的消费者和生产者属性,允许特定的 Binder 实现添加可以以类型安全方式支持的补充属性。

典型的绑定器实现包括以下内容

  • 一个实现 Binder 接口的类;

  • 一个 Spring `@Configuration` 类,它会创建 Binder 类型的 bean 以及中间件连接基础架构。

  • 在 classpath 上找到一个 META-INF/spring.binders 文件,其中包含一个或多个绑定器定义,如下例所示

    kafka:\
    org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration
如前所述,Binder 抽象也是框架的扩展点之一。因此,如果您在前面的列表中找不到合适的绑定器,您可以在 Spring Cloud Stream 之上实现自己的绑定器。在如何从零开始创建 Spring Cloud Stream Binder 一文中,社区成员详细记录了实现自定义绑定器所需的一系列步骤,并提供了示例。这些步骤也在 实现自定义绑定器 部分中突出显示。

绑定器检测

Spring Cloud Stream 依靠 Binder SPI 的实现来执行连接(绑定)用户代码到消息代理的任务。每个 Binder 实现通常连接到一种类型的消息系统。

Classpath 检测

默认情况下,Spring Cloud Stream 依赖于 Spring Boot 的自动配置来配置绑定过程。如果在 classpath 上找到一个 Binder 实现,Spring Cloud Stream 会自动使用它。例如,一个只打算绑定到 RabbitMQ 的 Spring Cloud Stream 项目可以添加以下依赖项

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

有关其他绑定器依赖项的具体 Maven 坐标,请参阅该绑定器实现的文档。

Classpath 上的多个绑定器

当 classpath 上存在多个绑定器时,应用程序必须指示每个目标绑定使用哪个绑定器。每个绑定器配置都包含一个 META-INF/spring.binders 文件,这是一个简单的属性文件,如下例所示

rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitServiceAutoConfiguration

其他提供的绑定器实现(如 Kafka)也存在类似的文件,自定义绑定器实现也应该提供它们。键表示绑定器实现的标识名称,而值是逗号分隔的配置类列表,每个配置类仅包含一个类型为 org.springframework.cloud.stream.binder.Binder 的 bean 定义。

绑定器选择可以全局执行,使用 spring.cloud.stream.defaultBinder 属性(例如,spring.cloud.stream.defaultBinder=rabbit),也可以单独执行,通过在每个绑定上配置绑定器。例如,一个从 Kafka 读取并写入 RabbitMQ 的处理器应用程序(分别具有名为 inputoutput 的读写绑定)可以指定以下配置

spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.output.binder=rabbit

连接到多个系统

默认情况下,绑定器共享应用程序的 Spring Boot 自动配置,以便创建 classpath 上找到的每个绑定器的一个实例。如果您的应用程序需要连接到多个相同类型的消息代理,您可以指定多个绑定器配置,每个配置具有不同的环境设置。

启用显式绑定器配置会完全禁用默认的绑定器配置过程。如果这样做,所有正在使用的绑定器都必须包含在配置中。旨在透明使用 Spring Cloud Stream 的框架可以创建可通过名称引用的绑定器配置,但它们不影响默认的绑定器配置。为此,绑定器配置可以将其 defaultCandidate 标志设置为 false(例如,spring.cloud.stream.binders.<configurationName>.defaultCandidate=false)。这表示一个独立于默认绑定器配置过程而存在的配置。

以下示例显示了一个连接到两个 RabbitMQ 消息代理实例的处理器应用程序的典型配置

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: thing1
          binder: rabbit1
        output:
          destination: thing2
          binder: rabbit2
      binders:
        rabbit1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host1>
        rabbit2:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host2>
特定绑定器的 environment 属性也可以用于任何 Spring Boot 属性,包括 spring.main.sources,这对于为特定绑定器添加额外配置非常有用,例如覆盖自动配置的 bean。

例如;

environment:
    spring:
        main:
           sources: com.acme.config.MyCustomBinderConfiguration

要为特定绑定器环境激活特定配置文件,您应该使用 spring.profiles.active 属性

environment:
    spring:
        profiles:
           active: myBinderProfile

在多绑定器应用程序中定制绑定器

当应用程序中包含多个绑定器并希望对绑定器进行定制时,可以通过提供 BinderCustomizer 实现来实现。在单绑定器应用程序的情况下,不需要此特殊的定制器,因为绑定器上下文可以直接访问定制 bean。然而,在多绑定器场景中并非如此,因为各种绑定器位于不同的应用程序上下文中。通过提供 BinderCustomizer 接口的实现,绑定器尽管位于不同的应用程序上下文中,但将接收到定制。Spring Cloud Stream 确保在应用程序开始使用绑定器之前进行定制。用户必须检查绑定器类型,然后应用必要的定制。

以下是提供 BinderCustomizer bean 的示例。

@Bean
public BinderCustomizer binderCustomizer() {
    return (binder, binderName) -> {
        if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
            kafkaMessageChannelBinder.setRebalanceListener(...);
        }
        else if (binder instanceof KStreamBinder) {
            ...
        }
        else if (binder instanceof RabbitMessageChannelBinder) {
            ...
        }
    };
}

请注意,当存在多个相同类型的绑定器实例时,可以使用绑定器名称来过滤定制。

绑定可视化和控制

Spring Cloud Stream 通过 Actuator 端点和编程式方式支持绑定的可视化和控制。

编程式方式

自 3.1 版本起,我们暴露了 org.springframework.cloud.stream.binding.BindingsLifecycleController,它被注册为一个 bean,一旦注入即可用于控制单个绑定的生命周期

例如,看看其中一个测试用例的片段。正如您所见,我们从 Spring 应用程序上下文中检索 BindingsLifecycleController 并执行单个方法来控制 echo-in-0 绑定的生命周期。

BindingsLifecycleController bindingsController = context.getBean(BindingsLifecycleController.class);
Binding binding = bindingsController.queryState("echo-in-0");
assertThat(binding.isRunning()).isTrue();
bindingsController.changeState("echo-in-0", State.STOPPED);
//Alternative way of changing state. For convenience we expose start/stop and pause/resume operations.
//bindingsController.stop("echo-in-0")
assertThat(binding.isRunning()).isFalse();

Actuator

由于 Actuator 和 Web 是可选的,您必须首先手动添加一个 Web 依赖项以及 Actuator 依赖项。以下示例展示了如何为 Web 框架添加依赖项

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

以下示例展示了如何为 WebFlux 框架添加依赖项

<dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

您可以按如下方式添加 Actuator 依赖项

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
要在 Cloud Foundry 中运行 Spring Cloud Stream 2.0 应用程序,您必须将 spring-boot-starter-webspring-boot-starter-actuator 添加到 classpath 中。否则,由于健康检查失败,应用程序将无法启动。

您还必须通过设置以下属性来启用 bindings Actuator 端点:--management.endpoints.web.exposure.include=bindings

满足这些先决条件后。应用程序启动时,您应该在日志中看到以下内容

: Mapped "{[/actuator/bindings/{name}],methods=[POST]. . .
: Mapped "{[/actuator/bindings],methods=[GET]. . .
: Mapped "{[/actuator/bindings/{name}],methods=[GET]. . .

要可视化当前的绑定,请访问以下 URL:http://<host>:<port>/actuator/bindings

或者,要查看单个绑定,请访问类似于以下 URL 的一个:http://<host>:<port>/actuator/bindings/<bindingName>;

您还可以通过向同一 URL 发送请求,并在 JSON 中提供一个 state 参数来停止、启动、暂停和恢复单个绑定,如下例所示

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"PAUSED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"RESUMED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
PAUSEDRESUMED 仅在相应的绑定器及其底层技术支持时有效。否则,您将在日志中看到警告消息。目前,只有 Kafka 和 [Solace](https://github.com/SolaceProducts/solace-spring-cloud/tree/master/solace-spring-cloud-starters/solace-spring-cloud-stream-starter#consumer-bindings-pauseresume) 绑定器支持 PAUSEDRESUMED 状态。

绑定器配置属性

自定义绑定器配置时,以下属性可用。这些属性通过 org.springframework.cloud.stream.config.BinderProperties 公开

它们必须以 spring.cloud.stream.binders.<configurationName> 为前缀。

type

绑定器类型。它通常引用类路径中找到的绑定器之一,特别是 META-INF/spring.binders 文件中的一个键。

默认情况下,它与配置名称相同。

inheritEnvironment

配置是否继承应用程序本身的环境。

默认值:true

environment

一组属性的根,可用于自定义绑定器的环境。设置此属性后,创建绑定器的上下文不是应用程序上下文的子上下文。此设置允许绑定器组件与应用程序组件完全分离。

默认值:empty

defaultCandidate

绑定器配置是否可以被视为默认绑定器,或只能在明确引用时使用。此设置允许添加绑定器配置而不干扰默认处理。

默认值:true

实现自定义绑定器

为了实现自定义 Binder,您只需要

  • 添加所需依赖

  • 提供 ProvisioningProvider 实现

  • 提供 MessageProducer 实现

  • 提供 MessageHandler 实现

  • 提供 Binder 实现

  • 创建绑定器配置

  • 在 META-INF/spring.binders 中定义您的绑定器

添加所需依赖

spring-cloud-stream 依赖添加到您的项目 *(例如,对于 Maven)*

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
    <version>${spring.cloud.stream.version}</version>
</dependency>

提供 ProvisioningProvider 实现

ProvisioningProvider 负责消费者和生产者目的地的配置,并需要将 application.yml 或 application.properties 文件中包含的逻辑目的地转换为物理目的地引用。

下面是一个 ProvisioningProvider 实现示例,它仅修剪通过输入/输出绑定配置提供的目的地

public class FileMessageBinderProvisioner implements ProvisioningProvider<ConsumerProperties, ProducerProperties> {

    @Override
    public ProducerDestination provisionProducerDestination(
            final String name,
            final ProducerProperties properties) {

        return new FileMessageDestination(name);
    }

    @Override
    public ConsumerDestination provisionConsumerDestination(
            final String name,
            final String group,
            final ConsumerProperties properties) {

        return new FileMessageDestination(name);
    }

    private class FileMessageDestination implements ProducerDestination, ConsumerDestination {

        private final String destination;

        private FileMessageDestination(final String destination) {
            this.destination = destination;
        }

        @Override
        public String getName() {
            return destination.trim();
        }

        @Override
        public String getNameForPartition(int partition) {
            throw new UnsupportedOperationException("Partitioning is not implemented for file messaging.");
        }

    }

}

提供 MessageProducer 实现

MessageProducer 负责消费事件并将其作为消息处理给配置为消费此类事件的客户端应用程序。

这里是一个 MessageProducer 实现示例,它扩展了 MessageProducerSupport 抽象,以便轮询项目路径中与修剪后的目的地名称匹配的文件,同时还会归档已读消息并丢弃后续的相同消息。

public class FileMessageProducer extends MessageProducerSupport {

    public static final String ARCHIVE = "archive.txt";
    private final ConsumerDestination destination;
    private String previousPayload;

    public FileMessageProducer(ConsumerDestination destination) {
        this.destination = destination;
    }

    @Override
    public void doStart() {
        receive();
    }

    private void receive() {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);

        executorService.scheduleWithFixedDelay(() -> {
            String payload = getPayload();

            if(payload != null) {
                Message<String> receivedMessage = MessageBuilder.withPayload(payload).build();
                archiveMessage(payload);
                sendMessage(receivedMessage);
            }

        }, 0, 50, MILLISECONDS);
    }

    private String getPayload() {
        try {
            List<String> allLines = Files.readAllLines(Paths.get(destination.getName()));
            String currentPayload = allLines.get(allLines.size() - 1);

            if(!currentPayload.equals(previousPayload)) {
                previousPayload = currentPayload;
                return currentPayload;
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        return null;
    }

    private void archiveMessage(String payload) {
        try {
            Files.write(Paths.get(ARCHIVE), (payload + "\n").getBytes(), CREATE, APPEND);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

}
在实现自定义绑定器时,此步骤并非严格必需,因为您始终可以使用已有的 MessageProducer 实现!

提供 MessageHandler 实现

MessageHandler 提供生成事件所需的逻辑。

这里是 MessageHandler 实现示例

public class FileMessageHandler implements MessageHandler{

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        //write message to file
    }

}
在实现自定义绑定器时,此步骤并非严格必需,因为您始终可以使用已有的 MessageHandler 实现!

提供 Binder 实现

您现在可以提供自己的 Binder 抽象实现。这可以通过以下方式轻松完成:

  • 扩展 AbstractMessageChannelBinder

  • 将您的 ProvisioningProvider 指定为 AbstractMessageChannelBinder 的泛型参数

  • 覆盖 createProducerMessageHandlercreateConsumerEndpoint 方法

例如:

public class FileMessageBinder extends AbstractMessageChannelBinder<ConsumerProperties, ProducerProperties, FileMessageBinderProvisioner> {

    public FileMessageBinder(
            String[] headersToEmbed,
            FileMessageBinderProvisioner provisioningProvider) {

        super(headersToEmbed, provisioningProvider);
    }

    @Override
    protected MessageHandler createProducerMessageHandler(
            final ProducerDestination destination,
            final ProducerProperties producerProperties,
            final MessageChannel errorChannel) throws Exception {

        return message -> {
            String fileName = destination.getName();
            String payload = new String((byte[])message.getPayload()) + "\n";

            try {
                Files.write(Paths.get(fileName), payload.getBytes(), CREATE, APPEND);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        };
    }

    @Override
    protected MessageProducer createConsumerEndpoint(
            final ConsumerDestination destination,
            final String group,
            final ConsumerProperties properties) throws Exception {

        return new FileMessageProducer(destination);
    }

}

创建绑定器配置

严格要求您创建一个 Spring 配置来初始化您的绑定器实现所需的 bean *(以及您可能需要的其他所有 bean)*

@Configuration
public class FileMessageBinderConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public FileMessageBinderProvisioner fileMessageBinderProvisioner() {
        return new FileMessageBinderProvisioner();
    }

    @Bean
    @ConditionalOnMissingBean
    public FileMessageBinder fileMessageBinder(FileMessageBinderProvisioner fileMessageBinderProvisioner) {
        return new FileMessageBinder(null, fileMessageBinderProvisioner);
    }

}

在 META-INF/spring.binders 中定义您的绑定器

最后,您必须在类路径上的 META-INF/spring.binders 文件中定义您的绑定器,指定绑定器的名称以及您的绑定器配置类的完全限定名。

myFileBinder:\
com.example.springcloudstreamcustombinder.config.FileMessageBinderConfiguration

配置选项

Spring Cloud Stream 支持通用配置选项以及绑定和绑定器的配置。某些绑定器允许额外的绑定属性支持中间件特定功能。

可以通过 Spring Boot 支持的任何机制向 Spring Cloud Stream 应用程序提供配置选项。这包括应用程序参数、环境变量以及 YAML 或 .properties 文件。

绑定服务属性

这些属性通过 org.springframework.cloud.stream.config.BindingServiceProperties 公开

spring.cloud.stream.instanceCount

应用程序的部署实例数。在生产者端进行分区时必须设置。使用 RabbitMQ 和 Kafka(如果 autoRebalanceEnabled=false)时,必须在消费者端设置。

默认值:1

spring.cloud.stream.instanceIndex

应用程序的实例索引:从 0instanceCount - 1 的数字。用于 RabbitMQ 和 Kafka(如果 autoRebalanceEnabled=false)的分区。在 Cloud Foundry 中自动设置以匹配应用程序的实例索引。

spring.cloud.stream.dynamicDestinations

可以动态绑定的目的地列表(例如,在动态路由场景中)。如果设置,则只能绑定列出的目的地。

默认值:空(允许绑定任何目的地)。

spring.cloud.stream.defaultBinder

如果配置了多个绑定器,要使用的默认绑定器。参见 类路径上的多个绑定器

默认值:空。

spring.cloud.stream.overrideCloudConnectors

此属性仅在 cloud profile 激活且应用程序提供了 Spring Cloud Connectors 时适用。如果属性为 false(默认值),绑定器会检测合适的已绑定服务(例如,在 Cloud Foundry 中为 RabbitMQ 绑定器绑定的 RabbitMQ 服务)并使用它来创建连接(通常通过 Spring Cloud Connectors)。当设置为 true 时,此属性指示绑定器完全忽略已绑定服务并依赖 Spring Boot 属性(例如,对于 RabbitMQ 绑定器,依赖环境中提供的 spring.rabbitmq.* 属性)。此属性的典型用法是在自定义环境中 连接到多个系统时进行嵌套。

默认值:false

spring.cloud.stream.bindingRetryInterval

重试绑定创建的间隔(秒),例如当绑定器不支持后期绑定且 broker(例如 Apache Kafka)宕机时。将其设置为零会将此类情况视为致命错误,阻止应用程序启动。

默认值:30

绑定属性

绑定属性通过使用 spring.cloud.stream.bindings.<bindingName>.<property>=<value> 的格式提供。<bindingName> 表示正在配置的绑定的名称。

例如,对于以下函数

@Bean
public Function<String, String> uppercase() {
	return v -> v.toUpperCase();
}

对于输入,有两个绑定,名为 uppercase-in-0;对于输出,有一个绑定,名为 uppercase-out-0。更多详情请参见 绑定和绑定名称

为避免重复,Spring Cloud Stream 支持为所有绑定设置值,通用绑定属性使用 spring.cloud.stream.default.<property>=<value>spring.cloud.stream.default.<producer|consumer>.<property>=<value> 的格式。

对于扩展绑定属性,为避免重复,应使用以下格式 - spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>

通用绑定属性

这些属性通过 org.springframework.cloud.stream.config.BindingProperties 公开

以下绑定属性适用于输入和输出绑定,并且必须以 spring.cloud.stream.bindings.<bindingName>. 为前缀(例如,spring.cloud.stream.bindings.uppercase-in-0.destination=ticktock)。

可以通过使用 spring.cloud.stream.default 前缀设置默认值(例如 spring.cloud.stream.default.contentType=application/json)。

destination

绑定在已绑定中间件上的目标目的地(例如,RabbitMQ exchange 或 Kafka topic)。如果绑定代表消费者绑定(输入),它可以绑定到多个目的地,目的地名称可以指定为逗号分隔的 String 值。如果未指定,则使用实际的绑定名称。此属性的默认值无法覆盖。

group

绑定的消费者组。仅适用于入站绑定。参见 消费者组

默认值:null(表示匿名消费者)。

contentType

此绑定的内容类型。参见 内容类型协商

默认值:application/json

binder

此绑定使用的绑定器。更多详情请参见 类路径上的多个绑定器

默认值:null(如果存在,则使用默认绑定器)。

消费者属性

这些属性通过 org.springframework.cloud.stream.binder.ConsumerProperties 公开

以下绑定属性仅适用于输入绑定,并且必须以 spring.cloud.stream.bindings.<bindingName>.consumer. 为前缀(例如,spring.cloud.stream.bindings.input.consumer.concurrency=3)。

可以通过使用 spring.cloud.stream.default.consumer 前缀设置默认值(例如,spring.cloud.stream.default.consumer.headerMode=none)。

autoStartup

指示此消费者是否需要自动启动

默认值:true

concurrency

入站消费者的并发度。

默认值:1

partitioned

消费者是否接收来自已分区生产者的数据。

默认值:false

headerMode

当设置为 none 时,禁用输入的 header 解析。仅对不原生支持消息 header 且需要嵌入 header 的消息中间件有效。此选项在消费来自非 Spring Cloud Stream 应用程序的数据(当不支持原生 header 时)时很有用。当设置为 headers 时,它使用中间件的原生 header 机制。当设置为 embeddedHeaders 时,它将 header 嵌入到消息负载中。

默认值:取决于绑定器实现。

maxAttempts

如果处理失败,处理消息的尝试次数(包括第一次)。设置为 1 以禁用重试。

默认值:3

backOffInitialInterval

重试时的退避初始间隔。

默认值:1000

backOffMaxInterval

最大退避间隔。

默认值:10000

backOffMultiplier

退避乘数。

默认值:2.0

defaultRetryable

监听器抛出的未列在 retryableExceptions 中的异常是否可重试。

默认值:true

instanceCount

当设置为大于等于零的值时,允许自定义此消费者的实例数(如果与 spring.cloud.stream.instanceCount 不同)。当设置为负值时,默认使用 spring.cloud.stream.instanceCount。更多信息请参见 实例索引和实例数

默认值:-1

instanceIndex

当设置为大于等于零的值时,允许自定义此消费者的实例索引(如果与 spring.cloud.stream.instanceIndex 不同)。当设置为负值时,默认使用 spring.cloud.stream.instanceIndex。如果提供了 instanceIndexList,则忽略。更多信息请参见 实例索引和实例数

默认值:-1

instanceIndexList

与不支持原生分区(例如 RabbitMQ)的绑定器一起使用;允许应用程序实例从多个分区消费。

默认值:空。

retryableExceptions

一个以 Throwable 类名为键、布尔值为值的映射。指定哪些异常(及其子类)将或不会被重试。另请参阅 defaultRetriable。示例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false

默认值:空。

useNativeDecoding

当设置为 true 时,入站消息由客户端库直接反序列化,客户端库必须进行相应的配置(例如,设置适当的 Kafka 生产者值反序列化器)。使用此配置时,入站消息的解组不基于绑定的 contentType。使用原生解码时,生产者有责任使用适当的编码器(例如,Kafka 生产者值序列化器)来序列化出站消息。此外,使用原生编码和解码时,headerMode=embeddedHeaders 属性将被忽略,header 不会嵌入到消息中。参见生产者属性 useNativeEncoding

默认值:false

multiplex

当设置为 true 时,底层绑定器将在同一输入绑定上原生复用目的地。

默认值:false

高级消费者配置

对于消息驱动消费者底层消息监听容器的高级配置,请向应用程序上下文添加一个 ListenerContainerCustomizer bean。它将在应用上述属性后调用,可用于设置附加属性。类似地,对于轮询消费者,请添加一个 MessageSourceCustomizer bean。

以下是 RabbitMQ 绑定器示例

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer> containerCustomizer() {
    return (container, dest, group) -> container.setAdviceChain(advice1, advice2);
}

@Bean
public MessageSourceCustomizer<AmqpMessageSource> sourceCustomizer() {
    return (source, dest, group) -> source.setPropertiesConverter(customPropertiesConverter);
}

生产者属性

这些属性通过 org.springframework.cloud.stream.binder.ProducerProperties 公开

以下绑定属性仅适用于输出绑定,并且必须以 spring.cloud.stream.bindings.<bindingName>.producer. 为前缀(例如,spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id)。

可以通过使用前缀 spring.cloud.stream.default.producer 设置默认值(例如,spring.cloud.stream.default.producer.partitionKeyExpression=headers.id)。

autoStartup

指示此消费者是否需要自动启动

默认值:true

partitionKeyExpression

一个 SpEL 表达式,用于确定如何对出站数据进行分区。如果设置,此绑定的出站数据将进行分区。partitionCount 必须设置为大于 1 的值才能生效。参见 分区支持

默认值:null。

partitionKeyExtractorName

实现 PartitionKeyExtractorStrategy 的 bean 名称。用于提取一个键,该键用于计算分区 ID(参见 'partitionSelector*')。与 'partitionKeyExpression' 互斥。

默认值:null。

partitionSelectorName

实现 PartitionSelectorStrategy 的 bean 名称。用于根据分区键确定分区 ID(参见 'partitionKeyExtractor*')。与 'partitionSelectorExpression' 互斥。

默认值:null。

partitionSelectorExpression

一个 SpEL 表达式,用于自定义分区选择。如果两者都未设置,则分区选择方式为 hashCode(key) % partitionCount,其中 key 通过 partitionKeyExpression 计算得出。

默认值:null

partitionCount

数据的目标分区数,如果启用了分区。如果生产者已分区,则必须设置为大于 1 的值。在 Kafka 上,它被解释为一个提示。将使用此值和目标 topic 分区数的较大者。

默认值:1

requiredGroups

一个逗号分隔的组列表,生产者必须确保消息传递给这些组,即使它们在生产者创建后才启动(例如,通过在 RabbitMQ 中预先创建持久队列)。

headerMode

当设置为 none 时,禁用输出的 header 嵌入。仅对不原生支持消息 header 且需要嵌入 header 的消息中间件有效。此选项在为非 Spring Cloud Stream 应用程序生成数据(当不支持原生 header 时)时很有用。当设置为 headers 时,它使用中间件的原生 header 机制。当设置为 embeddedHeaders 时,它将 header 嵌入到消息负载中。

默认值:取决于绑定器实现。

useNativeEncoding

当设置为 true 时,出站消息由客户端库直接序列化,客户端库必须进行相应的配置(例如,设置适当的 Kafka 生产者值序列化器)。使用此配置时,出站消息的编组不基于绑定的 contentType。使用原生编码时,消费者有责任使用适当的解码器(例如,Kafka 消费者值反序列化器)来反序列化入站消息。此外,使用原生编码和解码时,headerMode=embeddedHeaders 属性将被忽略,header 不会嵌入到消息中。参见消费者属性 useNativeDecoding

默认值:false

errorChannelEnabled

当设置为 true 时,如果绑定器支持异步发送结果,发送失败会发送到目标错误通道。更多信息请参见错误处理。

默认值:false。

高级生产者配置

在某些情况下,生产者属性不足以正确配置绑定器中的生产 MessageHandler,或者您可能更喜欢以编程方式配置此类生产 MessageHandler。无论原因如何,Spring Cloud Stream 都提供了 ProducerMessageHandlerCustomizer 来实现此目的。

@FunctionalInterface
public interface ProducerMessageHandlerCustomizer<H extends MessageHandler> {

	/**
	 * Configure a {@link MessageHandler} that is being created by the binder for the
	 * provided destination name.
	 * @param handler the {@link MessageHandler} from the binder.
	 * @param destinationName the bound destination name.
	 */
	void configure(H handler, String destinationName);

}

如您所见,它使您可以访问实际的生产 MessageHandler 实例,您可以根据需要对其进行配置。您只需提供此策略的实现并将其配置为 `@Bean` 即可。

内容类型协商

数据转换是任何消息驱动微服务架构的核心功能之一。鉴于在 Spring Cloud Stream 中,此类数据表示为 Spring 的 Message,消息在到达其目的地之前可能需要转换为期望的形状或大小。这出于两个原因:

  1. 将入站消息的内容转换为匹配应用程序提供的处理器的签名。

  2. 将出站消息的内容转换为线格式。

线格式通常是 byte[](对于 Kafka 和 Rabbit 绑定器确实如此),但它由绑定器实现决定。

在 Spring Cloud Stream 中,消息转换是通过 org.springframework.messaging.converter.MessageConverter 完成的。

作为后续详情的补充,您可能还想阅读以下 博客文章

机制

为了更好地理解内容类型协商的机制和必要性,我们以以下消息处理器为例,来看一个非常简单的用例:

public Function<Person, String> personFunction {..}
为简单起见,我们假设这是应用程序中唯一的处理器函数(我们假设没有内部管道)。

前例中所示的处理器期望以 Person 对象作为参数,并生成 String 类型作为输出。为了使框架成功地将入站 Message 作为参数传递给此处理器,它必须以某种方式将 Message 类型的 payload 从线格式转换为 Person 类型。换句话说,框架必须找到并应用适当的 MessageConverter。为了实现这一点,框架需要用户的一些指示。其中一个指示已经由处理器方法本身的签名(Person 类型)提供。因此,理论上,这应该(并且在某些情况下)足够了。然而,对于大多数用例,为了选择适当的 MessageConverter,框架需要额外的信息。缺失的部分就是 contentType

Spring Cloud Stream 提供了三种机制来定义 contentType(按优先级排序):

  1. HEADER:内容类型可以通过 Message 本身传递。通过提供 contentType header,您可以声明要用于查找和应用适当 MessageConverter 的内容类型。

  2. BINDING:内容类型可以按目的地绑定设置,通过设置 spring.cloud.stream.bindings.input.content-type 属性。

    属性名称中的 input 段对应于实际目的地的名称(在本例中为“input”)。这种方法允许您按绑定声明要用于查找和应用适当 MessageConverter 的内容类型。
  3. DEFAULT:如果 contentType 不存在于 Message header 或绑定中,则使用默认的 application/json 内容类型来查找和应用适当 MessageConverter

如前所述,前述列表还展示了在冲突情况下的优先级顺序。例如,由 header 提供的内容类型优先于任何其他内容类型。按绑定设置的内容类型也同样适用,它本质上允许您覆盖默认内容类型。然而,它也提供了一个合理的默认值(这是根据社区反馈确定的)。

application/json 设置为默认值的另一个原因源于分布式微服务架构带来的互操作性要求,在这种架构中,生产者和消费者不仅可以在不同的 JVM 中运行,还可以在不同的非 JVM 平台上运行。

当非 void 处理器方法返回时,如果返回值已经是 Message,则该 Message 将成为 payload。然而,当返回值不是 Message 时,将使用返回值作为 payload 构建新的 Message,同时继承来自入站 Message 的 header,但排除由 SpringIntegrationProperties.messageHandlerNotPropagatedHeaders 定义或过滤的 header。默认情况下,那里只设置了一个 header:contentType。这意味着新的 Message 没有设置 contentType header,从而确保 contentType 可以演进。您始终可以选择不从处理器方法返回 Message,在那里您可以注入任何您想要的 header。

如果存在内部管道,则 Message 将通过相同的转换过程发送到下一个处理器。然而,如果不存在内部管道或您已到达其末尾,则 Message 将发送回输出目的地。

内容类型与参数类型

如前所述,为了使框架选择适当的 MessageConverter,它需要参数类型信息,并且可选地需要内容类型信息。选择适当 MessageConverter 的逻辑位于参数解析器(HandlerMethodArgumentResolvers)中,它们在调用用户定义的处理器方法之前触发(此时框架才知道实际的参数类型)。如果参数类型与当前 payload 的类型不匹配,框架会委托给预配置 MessageConverter 栈,看是否有任何一个可以转换 payload。如您所见,MessageConverter 的 Object fromMessage(Message<?> message, Class<?> targetClass); 操作将 targetClass 作为其参数之一。框架还确保提供的 Message 始终包含 contentType header。当不存在 contentType header 时,它会注入按绑定设置的 contentType header 或默认的 contentType header。contentType 和参数类型的组合是框架确定消息是否可以转换为目标类型的机制。如果找不到适当的 MessageConverter,则会抛出异常,您可以通过添加自定义 MessageConverter 来处理(参见 用户自定义消息转换器)。

但是,如果 payload 类型与处理器方法声明的目标类型匹配怎么办?在这种情况下,无需转换,payload 会原封不动地传递。尽管这听起来相当直接且合乎逻辑,但请记住将 Message<?>Object 作为参数的处理器方法。通过将目标类型声明为 Object(它是 Java 中所有内容的 instanceof),您实际上放弃了转换过程。

不要期望仅基于 contentType 将 Message 转换为其他类型。请记住,contentType 是对目标类型的补充。如果您愿意,可以提供一个提示,MessageConverter 可能会考虑,也可能不考虑。

消息转换器

MessageConverters 定义了两个方法

Object fromMessage(Message<?> message, Class<?> targetClass);

Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);

理解这些方法的契约及其用法非常重要,尤其是在 Spring Cloud Stream 的上下文中。

fromMessage 方法将入站 Message 转换为参数类型。Message 的 payload 可以是任何类型,并且 MessageConverter 的实际实现决定了是否支持多种类型。例如,某些 JSON 转换器可能支持 byte[]String 等 payload 类型。这在应用程序包含内部管道(即 input → handler1 → handler2 →. . . → output)且上游处理器的输出结果为可能不是初始线格式的 Message 时非常重要。

然而,toMessage 方法具有更严格的契约,并且必须始终将 Message 转换为线格式:byte[]

因此,为了所有目的(特别是在实现您自己的转换器时),您可以将这两个方法视为具有以下签名:

Object fromMessage(Message<?> message, Class<?> targetClass);

Message<byte[]> toMessage(Object payload, @Nullable MessageHeaders headers);

提供的消息转换器

如前所述,框架已经提供了一系列 MessageConverters 来处理最常见的用例。以下列表描述了提供的 MessageConverters,按优先级排序(第一个起作用的 MessageConverter 将被使用):

  1. JsonMessageConverter:顾名思义,当 contentTypeapplication/json(默认)时,它支持将 Message 的 payload 转换为 POJO 或从 POJO 转换。

  2. ByteArrayMessageConverter:当 contentTypeapplication/octet-stream 时,它支持将 Message 的 payload 从 byte[] 转换为 byte[]。它本质上是直通的,主要为了向后兼容而存在。

  3. ObjectStringMessageConverter:当 contentTypetext/plain 时,它支持将任何类型转换为 String。它会调用 Object 的 toString() 方法,或者如果 payload 是 byte[],则调用 new String(byte[])

如果找不到适当的转换器,框架会抛出异常。发生这种情况时,您应该检查代码和配置,确保没有遗漏任何内容(即,确保通过使用绑定或 header 提供了 contentType)。然而,最有可能的情况是,您发现了一些不常见的用例(例如,自定义 contentType),并且当前提供的 MessageConverters 栈不知道如何转换。如果是这种情况,您可以添加自定义 MessageConverter。参见 用户自定义消息转换器

用户自定义消息转换器

Spring Cloud Stream 提供了一种机制来定义和注册额外的 MessageConverter。要使用它,请实现 org.springframework.messaging.converter.MessageConverter,并将其配置为 `@Bean`。然后,它会被添加到现有的 MessageConverter 栈中。

重要的是要理解,自定义 MessageConverter 实现会添加到现有栈的头部。因此,自定义 MessageConverter 实现优先于现有实现,这使您可以覆盖现有转换器并添加新的转换器。

以下示例展示了如何创建一个消息转换器 bean 以支持名为 application/bar 的新内容类型:

@SpringBootApplication
public static class SinkApplication {

    ...

    @Bean
    public MessageConverter customMessageConverter() {
        return new MyCustomMessageConverter();
    }
}

public class MyCustomMessageConverter extends AbstractMessageConverter {

    public MyCustomMessageConverter() {
        super(new MimeType("application", "bar"));
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return (Bar.class.equals(clazz));
    }

    @Override
    protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
        Object payload = message.getPayload();
        return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
    }
}

应用间通信

Spring Cloud Stream 实现了应用程序之间的通信。应用间通信是一个复杂的问题,涵盖了以下主题所述的多个方面:

连接多个应用程序实例

虽然 Spring Cloud Stream 使单个 Spring Boot 应用程序易于连接到消息系统,但 Spring Cloud Stream 的典型场景是创建多应用程序管道,其中微服务应用程序相互发送数据。您可以通过关联“相邻”应用程序的输入和输出目的地来实现此场景。

假设设计要求 Time Source 应用程序将数据发送到 Log Sink 应用程序。您可以使用名为 ticktock 的通用目的地用于两个应用程序内的绑定。

Time Source(具有名为 output 的绑定)将设置以下属性

spring.cloud.stream.bindings.output.destination=ticktock

Log Sink(具有名为 input 的绑定)将设置以下属性

spring.cloud.stream.bindings.input.destination=ticktock

实例索引和实例数

在扩展 Spring Cloud Stream 应用程序时,每个实例都可以接收关于同应用程序存在多少其他实例以及其自身的实例索引是什么的信息。Spring Cloud Stream 通过 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 属性来实现这一点。例如,如果一个 HDFS sink 应用程序有三个实例,所有三个实例的 spring.cloud.stream.instanceCount 都设置为 3,而各个应用程序的 spring.cloud.stream.instanceIndex 分别设置为 012

当通过 Spring Cloud Data Flow 部署 Spring Cloud Stream 应用程序时,这些属性会自动配置;当 Spring Cloud Stream 应用程序独立启动时,这些属性必须正确设置。默认情况下,spring.cloud.stream.instanceCount1spring.cloud.stream.instanceIndex0

在扩展场景中,正确配置这两个属性对于总体而言处理分区行为(参见下文)非常重要,并且某些绑定器(例如 Kafka 绑定器)总是需要这两个属性,以确保数据在多个消费者实例之间正确拆分。

分区

Spring Cloud Stream 中的分区包括两个任务:

配置输出绑定以进行分区

您可以通过设置 partitionKeyExpressionpartitionKeyExtractorName 属性中的一个且仅一个,以及 partitionCount 属性,来配置输出绑定以发送已分区数据。

例如,以下是一个有效且典型的配置:

spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id
spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5

根据该示例配置,数据通过以下逻辑发送到目标分区。

对于发送到已分区输出绑定的每条消息,根据 partitionKeyExpression 计算分区键的值。partitionKeyExpression 是一个 SpEL 表达式,它对出站消息进行评估(在前例中是消息 header 中 id 的值),以提取分区键。

如果 SpEL 表达式不足以满足您的需求,您可以提供 org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy 的实现,并将其配置为 Bean (使用 @Bean 注解),以此来计算分区键值。如果 Application Context 中存在多个类型为 org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy 的 Bean,您可以通过指定其名称并使用 partitionKeyExtractorName 属性进一步筛选,如下例所示:

--spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExtractorName=customPartitionKeyExtractor
--spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5
. . .
@Bean
public CustomPartitionKeyExtractorClass customPartitionKeyExtractor() {
    return new CustomPartitionKeyExtractorClass();
}
在 Spring Cloud Stream 的早期版本中,您可以通过设置 spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass 属性来指定 org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy 的实现。自版本 3.0 起,此属性已移除。

计算出消息键后,分区选择过程会确定目标分区,其值为 0partitionCount - 1 之间。大多数场景下适用的默认计算基于以下公式:key.hashCode() % partitionCount。这可以在绑定上进行自定义,方法是设置一个针对“key”进行评估的 SpEL 表达式(通过 partitionSelectorExpression 属性),或者将 org.springframework.cloud.stream.binder.PartitionSelectorStrategy 的实现配置为 Bean(使用 @Bean 注解)。与 PartitionKeyExtractorStrategy 类似,当 Application Context 中存在多个此类型的 Bean 时,您可以使用 spring.cloud.stream.bindings.output.producer.partitionSelectorName 属性进一步筛选,如下例所示:

--spring.cloud.stream.bindings.func-out-0.producer.partitionSelectorName=customPartitionSelector
. . .
@Bean
public CustomPartitionSelectorClass customPartitionSelector() {
    return new CustomPartitionSelectorClass();
}
在 Spring Cloud Stream 的早期版本中,您可以通过设置 spring.cloud.stream.bindings.output.producer.partitionSelectorClass 属性来指定 org.springframework.cloud.stream.binder.PartitionSelectorStrategy 的实现。自版本 3.0 起,此属性已移除。

配置用于分区的输入绑定

通过设置其 partitioned 属性以及应用程序自身的 instanceIndexinstanceCount 属性,可以将输入绑定(绑定名称为 uppercase-in-0)配置为接收分区数据,如下例所示:

spring.cloud.stream.bindings.uppercase-in-0.consumer.partitioned=true
spring.cloud.stream.instanceIndex=3
spring.cloud.stream.instanceCount=5

instanceCount 值表示数据应在其中分区的应用程序实例总数。instanceIndex 在多个实例中必须是唯一值,其值在 0instanceCount - 1 之间。实例索引帮助每个应用程序实例识别其接收数据的唯一分区。需要支持本地不支持分区技术的 Binder。例如,对于 RabbitMQ,每个分区都有一个队列,队列名称包含实例索引。对于 Kafka,如果 autoRebalanceEnabledtrue(默认),Kafka 会负责在实例之间分发分区,此时无需这些属性。如果 autoRebalanceEnabled 设置为 false,Binder 将使用 instanceCountinstanceIndex 来确定实例订阅哪些分区(您必须至少拥有与实例数量一样多的分区)。Binder 代替 Kafka 分配分区。如果您希望特定分区的消息总是发送到同一实例,这可能会很有用。当 Binder 配置需要它们时,正确设置这两个值非常重要,以确保所有数据都被消费,并且应用程序实例接收到互斥的数据集。

尽管在使用多个实例进行分区数据处理的独立场景中设置可能很复杂,但 Spring Cloud Dataflow 可以通过正确填充输入和输出值,并让您依赖运行时基础设施提供有关实例索引和实例计数的信息,从而显著简化这一过程。

测试

Spring Cloud Stream 提供了对微服务应用程序进行测试的支持,无需连接到消息系统。

Spring Integration 测试 Binder

Spring Cloud Stream 提供了一个测试 Binder,您可以使用它来测试各种应用程序组件,而无需实际的真实世界 Binder 实现或消息代理。

这个测试 Binder 充当**单元测试**和**集成测试**之间的桥梁,它基于 Spring Integration 框架,本质上是一个 JVM 内的消息代理,为您提供了两全其美的优势——一个真实的 Binder,但无需网络。

测试 Binder 配置

要启用 Spring Integration 测试 Binder,您只需将其添加为依赖项即可。

添加所需的依赖项

以下是所需的 Maven POM 条目示例。

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream-test-binder</artifactId>
	<scope>test</scope>
</dependency>

或者用于 build.gradle.kts

testImplementation("org.springframework.cloud:spring-cloud-stream-test-binder")

测试 Binder 用法

现在您可以将微服务作为一个简单的单元测试进行测试

@SpringBootTest
public class SampleStreamTests {

	@Autowired
	private InputDestination input;

	@Autowired
	private OutputDestination output;

	@Test
	public void testEmptyConfiguration() {
		this.input.send(new GenericMessage<byte[]>("hello".getBytes()));
		assertThat(output.receive().getPayload()).isEqualTo("HELLO".getBytes());
	}

	@SpringBootApplication
	@Import(TestChannelBinderConfiguration.class)
	public static class SampleConfiguration {
		@Bean
		public Function<String, String> uppercase() {
			return v -> v.toUpperCase();
		}
	}
}

如果您需要更多控制或想在同一个测试套件中测试多种配置,您也可以这样做:

@EnableAutoConfiguration
public static class MyTestConfiguration {
	@Bean
	public Function<String, String> uppercase() {
			return v -> v.toUpperCase();
	}
}

. . .

@Test
public void sampleTest() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
				TestChannelBinderConfiguration.getCompleteConfiguration(
						MyTestConfiguration.class))
				.run("--spring.cloud.function.definition=uppercase")) {
		InputDestination source = context.getBean(InputDestination.class);
		OutputDestination target = context.getBean(OutputDestination.class);
		source.send(new GenericMessage<byte[]>("hello".getBytes()));
		assertThat(target.receive().getPayload()).isEqualTo("HELLO".getBytes());
	}
}

对于具有多个绑定和/或多个输入和输出,或者只是想明确指定发送或接收的目标名称的场景,InputDestinationOutputDestinationsend()receive() 方法被重载,允许您提供输入和输出目标的名称。

考虑以下示例

@EnableAutoConfiguration
public static class SampleFunctionConfiguration {

	@Bean
	public Function<String, String> uppercase() {
		return value -> value.toUpperCase();
	}

	@Bean
	public Function<String, String> reverse() {
		return value -> new StringBuilder(value).reverse().toString();
	}
}

以及实际的测试

@Test
public void testMultipleFunctions() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					SampleFunctionConfiguration.class))
							.run("--spring.cloud.function.definition=uppercase;reverse")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
		inputDestination.send(inputMessage, "uppercase-in-0");
		inputDestination.send(inputMessage, "reverse-in-0");

		Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());

		outputMessage = outputDestination.receive(0, "reverse-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
	}
}

对于具有额外映射属性(如 destination)的场景,您应该使用这些名称。例如,考虑前面测试的一个不同版本,我们在其中将 uppercase 函数的输入和输出显式地映射到 myInputmyOutput 绑定名称:

@Test
public void testMultipleFunctions() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					SampleFunctionConfiguration.class))
							.run(
							"--spring.cloud.function.definition=uppercase;reverse",
							"--spring.cloud.stream.bindings.uppercase-in-0.destination=myInput",
							"--spring.cloud.stream.bindings.uppercase-out-0.destination=myOutput"
							)) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
		inputDestination.send(inputMessage, "myInput");
		inputDestination.send(inputMessage, "reverse-in-0");

		Message<byte[]> outputMessage = outputDestination.receive(0, "myOutput");
		assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());

		outputMessage = outputDestination.receive(0, "reverse-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
	}
}

测试 Binder 和 PollableMessageSource

Spring Integration 测试 Binder 也允许您在使用 PollableMessageSource 时编写测试(更多详细信息请参阅 使用轮询消费者)。

然而,需要理解的重要一点是,轮询不是事件驱动的,并且 PollableMessageSource 是一种策略,它暴露了用于生产(轮询)**一个**消息(Message,单数)的操作。您多久轮询一次、使用多少线程或者从哪里轮询(消息队列还是文件系统)完全取决于您;换句话说,配置轮询器 (Poller)、线程或消息 (Message) 的实际源是您的责任。幸运的是,Spring 有许多抽象来精确地配置这些。

让我们看一个示例

@Test
public void samplePollingTest() {
	ApplicationContext context = new SpringApplicationBuilder(SamplePolledConfiguration.class)
				.web(WebApplicationType.NONE)
				.run("--spring.jmx.enabled=false", "--spring.cloud.stream.pollable-source=myDestination");
	OutputDestination destination = context.getBean(OutputDestination.class);
	System.out.println("Message 1: " + new String(destination.receive().getPayload()));
	System.out.println("Message 2: " + new String(destination.receive().getPayload()));
	System.out.println("Message 3: " + new String(destination.receive().getPayload()));
}

@Import(TestChannelBinderConfiguration.class)
@EnableAutoConfiguration
public static class SamplePolledConfiguration {
	@Bean
	public ApplicationRunner poller(PollableMessageSource polledMessageSource, StreamBridge output, TaskExecutor taskScheduler) {
		return args -> {
			taskScheduler.execute(() -> {
				for (int i = 0; i < 3; i++) {
					try {
						if (!polledMessageSource.poll(m -> {
							String newPayload = ((String) m.getPayload()).toUpperCase();
							output.send("myOutput", newPayload);
						})) {
							Thread.sleep(2000);
						}
					}
					catch (Exception e) {
						// handle failure
					}
				}
			});
		};
	}
}

上面的(非常基本的)示例将以 2 秒的间隔产生 3 条消息,并将它们发送到 Source 的输出目标,Binder 将这些消息发送到 OutputDestination,我们在那里检索它们(进行断言)。目前,它会打印以下内容:

Message 1: POLLED DATA
Message 2: POLLED DATA
Message 3: POLLED DATA

如您所见,数据是相同的。这是因为该 Binder 定义了实际的 MessageSource 的默认实现——消息通过 poll() 操作从中轮询的源。虽然对于大多数测试场景来说足够了,但在某些情况下您可能想定义自己的 MessageSource。为此,只需在测试配置中配置一个类型为 MessageSource 的 Bean,提供您自己的消息源实现。

示例如下:

@Bean
public MessageSource<?> source() {
	return () -> new GenericMessage<>("My Own Data " + UUID.randomUUID());
}

生成以下输出:

Message 1: MY OWN DATA 1C180A91-E79F-494F-ABF4-BA3F993710DA
Message 2: MY OWN DATA D8F3A477-5547-41B4-9434-E69DA7616FEE
Message 3: MY OWN DATA 20BF2E64-7FF4-4CB6-A823-4053D30B5C74
**请勿**将此 Bean 命名为 messageSource,因为它会与 Spring Boot 为无关原因提供的同名(不同类型)的 Bean 冲突。

关于在测试中混合使用测试 Binder 和常规中间件 Binder 的特别说明

基于 Spring Integration 的测试 Binder 用于在不涉及实际中间件 Binder(如 Kafka 或 RabbitMQ Binder)的情况下测试应用程序。如上文所述,测试 Binder 通过依赖内存中的 Spring Integration 通道帮助您快速验证应用程序的行为。当测试类路径中存在测试 Binder 时,Spring Cloud Stream 将尝试使用此 Binder 进行所有需要 Binder 进行通信的测试目的。换句话说,您不能在同一模块中混合使用测试 Binder 和常规中间件 Binder 进行测试。在使用测试 Binder 测试应用程序后,如果您想继续使用实际中间件 Binder 进行进一步的集成测试,建议将使用实际 Binder 的测试放在单独的模块中,以便这些测试能够与实际中间件建立正确的连接,而不是依赖测试 Binder 提供的内存通道。

健康指示器

Spring Cloud Stream 为 Binder 提供了健康指示器。它注册在名称 binders 下,可以通过设置 management.health.binders.enabled 属性来启用或禁用。

要启用健康检查,首先需要通过包含其依赖项来同时启用“web”和“actuator”(参阅 绑定可视化和控制)。

如果应用程序未显式设置 management.health.binders.enabled,则 management.health.defaults.enabled 将被匹配为 true,Binder 健康指示器将启用。如果您想完全禁用健康指示器,则必须将 management.health.binders.enabled 设置为 false

您可以使用 Spring Boot actuator 健康端点访问健康指示器 - /actuator/health。默认情况下,当您访问上述端点时,只会收到顶层应用程序状态。为了接收 Binder 特定健康指示器的完整详细信息,您需要在应用程序中包含属性 management.endpoint.health.show-details,并将其值设置为 ALWAYS

健康指示器是 Binder 特定的,某些 Binder 实现可能不一定提供健康指示器。

如果您想完全禁用所有开箱即用的健康指示器并提供自己的健康指示器,可以通过将属性 management.health.binders.enabled 设置为 false,然后在应用程序中提供自己的 HealthIndicator Bean 来实现。在这种情况下,Spring Boot 的健康指示器基础设施仍然会拾取这些自定义 Bean。即使您没有禁用 Binder 健康指示器,您仍然可以通过在开箱即用健康检查之外提供自己的 HealthIndicator Bean 来增强健康检查。

当同一个应用程序中有多个 Binder 时,除非应用程序通过将 management.health.binders.enabled 设置为 false 来关闭它们,否则默认会启用健康指示器。在这种情况下,如果用户想禁用部分 Binder 的健康检查,则应该在多 Binder 配置的环境中将 management.health.binders.enabled 设置为 false。有关如何提供特定于环境的属性的详细信息,请参阅 连接到多个系统

如果在类路径中存在多个 Binder,但并非所有 Binder 都被应用程序使用,这可能会在健康指示器方面引起一些问题。具体的实现细节可能因 Binder 如何执行健康检查而异。例如,Kafka Binder 可能在没有 Binder 注册任何目标 (destinations) 的情况下判定状态为 DOWN

举一个具体的例子。假设您的类路径中同时存在 Kafka 和 Kafka Streams Binder,但在应用程序代码中只使用了 Kafka Streams Binder,即只提供了使用 Kafka Streams Binder 的绑定。由于 Kafka Binder 未被使用,并且它有特定的检查来查看是否注册了任何目标,因此该 Binder 的健康检查将失败。顶层应用程序健康检查状态将报告为 DOWN。在这种情况下,由于您没有使用 Kafka Binder,可以直接从应用程序中移除对它的依赖。

示例

有关 Spring Cloud Stream 示例,请参阅 GitHub 上的 spring-cloud-stream-samples 仓库。

在 CloudFoundry 上部署流应用程序

在 CloudFoundry 上,服务通常通过一个名为 VCAP_SERVICES 的特殊环境变量暴露。

配置 Binder 连接时,您可以按照 dataflow Cloud Foundry Server 文档中的说明,使用环境变量中的值。

Binder 实现

以下是可用的 Binder 实现列表:

如前所述,Binder 抽象也是框架的扩展点之一。因此,如果您在前面的列表中找不到合适的绑定器,您可以在 Spring Cloud Stream 之上实现自己的绑定器。在如何从零开始创建 Spring Cloud Stream Binder 一文中,社区成员详细记录了实现自定义绑定器所需的一系列步骤,并提供了示例。这些步骤也在 实现自定义绑定器 部分中突出显示。