4.0.5

序言

Spring 数据集成之旅的简史

Spring 的数据集成之旅始于 Spring 集成。通过其编程模型,它提供了一致的开发者体验,用于构建应用程序,这些应用程序可以采用 企业集成模式 来连接到外部系统,例如数据库、消息代理等。

快进到云时代,微服务在企业环境中变得突出。Spring Boot 改变了开发者构建应用程序的方式。通过 Spring 的编程模型和由 Spring Boot 处理的运行时职责,开发独立的、生产级的基于 Spring 的微服务变得无缝。

为了将其扩展到数据集成工作负载,Spring 集成和 Spring Boot 被整合到一个新项目中。Spring Cloud Stream 诞生了。

使用 Spring Cloud Stream,开发者可以

  • 独立构建、测试和部署以数据为中心的应用程序。

  • 应用现代微服务架构模式,包括通过消息传递进行组合。

  • 使用事件为中心思维解耦应用程序职责。事件可以表示在时间中发生的事情,下游使用者应用程序可以对其做出反应,而无需知道其来源或生产者的身份。

  • 将业务逻辑移植到消息代理(例如 RabbitMQ、Apache Kafka、Amazon Kinesis)。

  • 依靠框架对常见用例的自动内容类型支持。可以扩展到不同的数据转换类型。

  • 等等。

快速入门

按照此三步指南,您甚至可以在了解任何详细信息之前,在不到 5 分钟的时间内试用 Spring Cloud Stream。

我们向您展示如何创建一个 Spring Cloud Stream 应用程序,该应用程序接收来自您选择的邮件中间件(稍后详细介绍)的消息,并将接收到的消息记录到控制台。我们称之为 LoggingConsumer。虽然不太实用,但它很好地介绍了一些主要概念和抽象,使您更容易理解本用户指南的其余部分。

三个步骤如下

使用 Spring Initializr 创建示例应用程序

要开始,请访问 Spring Initializr。在那里,您可以生成我们的 LoggingConsumer 应用程序。为此

  1. 依赖项部分,开始输入 stream。当“Cloud Stream”选项出现时,选择它。

  2. 开始输入“kafka”或“rabbit”。

  3. 选择“Kafka”或“RabbitMQ”。

    基本上,您选择应用程序绑定的邮件中间件。我们建议使用您已安装的或感觉更方便安装和运行的中间件。此外,正如您从 Initilaizer 屏幕中看到的那样,您还可以选择其他一些选项。例如,您可以选择 Gradle 作为构建工具,而不是 Maven(默认值)。

  4. 工件字段中,输入“logging-consumer”。

    工件字段的值变为应用程序名称。如果您为中间件选择了 RabbitMQ,那么您的 Spring Initializr 现在应该是这样的

spring initializr
  1. 单击生成项目按钮。

    这样做会将已生成项目的压缩版本下载到你的硬盘。

  2. 将文件解压到你想用作项目目录的文件夹中。

我们鼓励你探索 Spring Initializr 中提供的多种可能性。它让你可以创建多种不同类型的 Spring 应用程序。

将项目导入到你的 IDE

现在你可以将项目导入到你的 IDE。请记住,根据 IDE 的不同,你可能需要遵循特定的导入过程。例如,根据项目生成的类型(Maven 或 Gradle),你可能需要遵循特定的导入过程(例如,在 Eclipse 或 STS 中,你需要使用文件 → 导入 → Maven → 现有 Maven 项目)。

导入后,项目不应该有任何类型的错误。此外,src/main/java 应包含 com.example.loggingconsumer.LoggingConsumerApplication

从技术上讲,此时你可以运行应用程序的主类。它已经是一个有效的 Spring Boot 应用程序。但是,它没有任何作用,所以我们想添加一些代码。

添加消息处理程序、构建和运行

修改 com.example.loggingconsumer.LoggingConsumerApplication 类,使其如下所示

@SpringBootApplication
public class LoggingConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(LoggingConsumerApplication.class, args);
	}

	@Bean
	public Consumer<Person> log() {
	    return person -> {
	        System.out.println("Received: " + person);
	    };
	}

	public static class Person {
		private String name;
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public String toString() {
			return this.name;
		}
	}
}

正如你从前面的清单中看到的

  • 我们使用函数式编程模型(参见 Spring Cloud Function 支持)将单个消息处理程序定义为 Consumer

  • 我们依赖框架约定将此类处理程序绑定到绑定器公开的输入目的地绑定。

这样做还可以让你看到框架的一个核心功能:它尝试自动将传入的消息有效负载转换为类型 Person

你现在有一个完全功能的 Spring Cloud Stream 应用程序,它可以监听消息。为了简单起见,我们假设你在 第一步 中选择了 RabbitMQ。假设你已经安装并运行了 RabbitMQ,你可以通过在 IDE 中运行其 main 方法来启动应用程序。

你应该看到以下输出

	--- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2a3a299:0/SimpleConnection@66c83fc8. . .
	. . .
	--- [ main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
	. . .
	--- [ main] c.e.l.LoggingConsumerApplication         : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)

转到 RabbitMQ 管理控制台或任何其他 RabbitMQ 客户端,然后向 input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg 发送消息。anonymous.CbMIwdkJSBO1ZoPDOtHtCg 部分表示组名称并且是生成的,因此在您的环境中必定不同。对于更可预测的内容,您可以通过设置 spring.cloud.stream.bindings.input.group=hello(或您喜欢的任何名称)来使用显式组名称。

消息的内容应为 Person 类的 JSON 表示,如下所示

{"name":"Sam Spade"}

然后,在您的控制台中,您应该看到

已收到:Sam Spade

您还可以将应用程序构建并打包到引导 JAR 中(通过使用 ./mvnw clean install),并使用 java -jar 命令运行构建的 JAR。

现在,您拥有一个可用的(尽管非常基本)Spring Cloud Stream 应用程序。

流数据上下文的 Spring 表达式语言 (SpEL)

在整个参考手册中,您会遇到许多功能和示例,您可以在其中利用 Spring 表达式语言 (SpEL)。在使用它时了解某些限制非常重要。

SpEL 使您可以访问当前消息以及您正在运行的应用程序上下文。但是,了解 SpEL 可以看到哪种类型的数据非常重要,尤其是在传入消息的上下文中。从代理接收的消息采用 byte[] 的形式。然后,它由绑定程序转换为 Message<byte[]>,正如您所看到的,消息的有效负载保持其原始形式。消息的标头为 <String, Object>,其中值通常是另一个基元或基元的集合/数组,因此为 Object。这是因为绑定程序不知道所需的输入类型,因为它无法访问用户代码(函数)。因此,绑定程序有效地传递了一个信封,其中包含有效负载和一些可读元数据(采用消息标头的形式),就像通过邮件传递的信件一样。这意味着虽然可以访问消息的有效负载,但您只能以原始数据(即 byte[])的形式访问它。虽然开发人员非常普遍地要求能够以具体类型(例如,Foo、Bar 等)访问有效负载对象的字段的 SpEL,但您会发现实现起来有多么困难甚至不可能。这里有一个示例来说明这个问题;假设您有一个路由表达式,可以根据有效负载类型路由到不同的函数。此要求意味着将有效负载从 byte[] 转换为特定类型,然后应用 SpEL。但是,为了执行此类转换,我们需要知道要传递给转换器的实际类型,而这来自我们不知道哪一个的函数签名。解决此要求的更好方法是将类型信息作为消息标头传递(例如,application/json;type=foo.bar.Baz )。您将获得一个清晰可读的字符串值,可以在一年后访问和评估,并且易于阅读 SpEL 表达式。

此外,将有效负载用于路由决策被认为是一种非常糟糕的做法,因为有效负载被认为是特权数据 - 仅由其最终收件人读取的数据。同样,使用邮件传递类比,您不希望邮递员打开您的信封并阅读信件内容以做出一些传递决策。同样的概念也适用于这里,尤其是在生成消息时包含此类信息相对容易时。它强制执行与通过网络传输的数据的设计相关的特定级别的纪律,以及哪些数据可以被视为公共数据,哪些是特权数据。

介绍 Spring Cloud Stream

Spring Cloud Stream 是一个用于构建消息驱动的微服务应用程序的框架。Spring Cloud Stream 构建在 Spring Boot 之上,以创建独立的生产级 Spring 应用程序,并使用 Spring Integration 提供与消息代理的连接。它提供了对来自多个供应商的中间件的观点化配置,引入了持久发布-订阅语义、消费者组和分区等概念。

通过将 spring-cloud-stream 依赖项添加到应用程序的类路径,您可以立即连接到由提供的 spring-cloud-stream 绑定程序公开的消息代理(稍后会详细介绍),并且可以实现您的功能性要求,该要求由 java.util.function.Function 根据传入消息运行。

以下列表显示了一个快速示例

@SpringBootApplication
public class SampleApplication {

	public static void main(String[] args) {
		SpringApplication.run(SampleApplication.class, args);
	}

    @Bean
	public Function<String, String> uppercase() {
	    return value -> value.toUpperCase();
	}
}

以下列表显示了相应的测试

@SpringBootTest(classes =  SampleApplication.class)
@Import({TestChannelBinderConfiguration.class})
class BootTestStreamApplicationTests {

	@Autowired
	private InputDestination input;

	@Autowired
	private OutputDestination output;

	@Test
	void contextLoads() {
		input.send(new GenericMessage<byte[]>("hello".getBytes()));
		assertThat(output.receive().getPayload()).isEqualTo("HELLO".getBytes());
	}
}

主要概念

Spring Cloud Stream 提供了许多抽象和基本元素,简化了消息驱动的微服务应用程序的编写。本节概述了以下内容

应用程序模型

Spring Cloud Stream 应用程序由中间件中立核心组成。应用程序通过在外部代理公开的目标与代码中的输入/输出参数之间建立绑定来与外部世界通信。建立绑定所需的特定的代理详细信息由特定于中间件的绑定程序实现处理。

SCSt with binder
图 1. Spring Cloud Stream 应用程序

Fat JAR

Spring Cloud Stream 应用程序可以在独立模式下从你的 IDE 中运行,以进行测试。要运行生产环境中的 Spring Cloud Stream 应用程序,你可以使用为 Maven 或 Gradle 提供的标准 Spring Boot 工具创建可执行(或“胖”)JAR。有关更多详细信息,请参阅Spring Boot 参考指南

Binder 抽象

Spring Cloud Stream 为KafkaRabbit MQ提供 Binder 实现。该框架还包括一个测试 Binder,用于对你的应用程序进行集成测试,作为 Spring Cloud Stream 应用程序。有关更多详细信息,请参阅测试部分。

Binder 抽象也是该框架的扩展点之一,这意味着你可以在 Spring Cloud Stream 之上实现自己的 Binder。在如何从头开始创建 Spring Cloud Stream Binder帖子中,社区成员详细记录了一组实现自定义 Binder 所需的步骤,并附带了一个示例。这些步骤也在实现自定义 Binder部分中突出显示。

Spring Cloud Stream 使用 Spring Boot 进行配置,而 Binder 抽象使得 Spring Cloud Stream 应用程序在连接到中间件时的灵活性变得可能。例如,部署人员可以在运行时动态选择外部目标(例如 Kafka 主题或 RabbitMQ 交换)与消息处理程序的输入和输出(例如函数的输入参数及其返回参数)之间的映射。此类配置可以通过外部配置属性和 Spring Boot 支持的任何形式(包括应用程序参数、环境变量以及application.ymlapplication.properties文件)提供。在介绍 Spring Cloud Stream部分的接收器示例中,将spring.cloud.stream.bindings.input.destination应用程序属性设置为raw-sensor-data会导致它从raw-sensor-data Kafka 主题或绑定到raw-sensor-data RabbitMQ 交换的队列中读取。

Spring Cloud Stream 自动检测并使用在类路径中找到的 Binder。你可以使用相同代码的不同类型的中间件。为此,请在构建时包含不同的 Binder。对于更复杂的使用案例,你还可以将多个 Binder 与你的应用程序打包在一起,并在运行时让它选择 Binder(甚至是否为不同的绑定使用不同的 Binder)。

持久发布-订阅支持

应用程序之间的通信遵循发布-订阅模型,其中数据通过共享主题进行广播。这可以在下图中看到,该图显示了一组交互式 Spring Cloud Stream 应用程序的典型部署。

SCSt sensors
图 2. Spring Cloud Stream 发布-订阅

传感器通过 HTTP 端点报告的数据将发送到名为 raw-sensor-data 的公共目标。从目标开始,它将由一个计算时间窗口平均值的微服务应用程序和一个将原始数据导入 HDFS(Hadoop 分布式文件系统)的微服务应用程序独立处理。为了处理数据,这两个应用程序在运行时都将主题声明为其输入。

发布-订阅通信模型降低了生产者和消费者的复杂性,并允许在不中断现有流程的情况下将新应用程序添加到拓扑中。例如,在计算平均值的应用程序的下游,您可以添加一个计算最高温度值以进行显示和监控的应用程序。然后,您可以添加另一个应用程序来解释相同的平均值流以进行故障检测。通过共享主题而不是点对点队列进行所有通信可以减少微服务之间的耦合。

虽然发布-订阅消息传递的概念并不新鲜,但 Spring Cloud Stream 采取了进一步措施,使其成为其应用程序模型的意见选择。通过使用本机中间件支持,Spring Cloud Stream 还简化了跨不同平台使用发布-订阅模型。

消费者组

虽然发布-订阅模型使通过共享主题连接应用程序变得容易,但通过创建给定应用程序的多个实例来扩展的能力同样重要。这样做时,应用程序的不同实例将被置于竞争消费者关系中,其中只有一个实例有望处理给定消息。

Spring Cloud Stream 通过消费者组的概念对这种行为进行建模。(Spring Cloud Stream 消费者组类似于 Kafka 消费者组,并受到其启发。)每个消费者绑定都可以使用 spring.cloud.stream.bindings.<bindingName>.group 属性指定组名。对于下图中显示的消费者,此属性将设置为 spring.cloud.stream.bindings.<bindingName>.group=hdfsWritespring.cloud.stream.bindings.<bindingName>.group=average

SCSt groups
图 3. Spring Cloud Stream 消费者组

订阅给定目标的所有组都会收到已发布数据的副本,但每个组的只有一个成员会收到该目标的给定消息。默认情况下,当未指定组时,Spring Cloud Stream 会将应用程序分配给一个匿名且独立的单成员消费者组,该组与所有其他消费者组处于发布-订阅关系中。

消费者类型

支持两种类型的消费者

  • 消息驱动的(有时称为异步)

  • 轮询(有时称为同步)

在 2.0 版本之前,仅支持异步消费者。消息在可用且有线程可用于处理它时立即传递。

当您希望控制处理消息的速度时,您可能想要使用同步消费者。

持久性

与 Spring Cloud Stream 的固执己见的应用程序模型一致,消费者组订阅是持久的。也就是说,绑定器实现确保组订阅是持久的,并且一旦为组创建了至少一个订阅,该组就会收到消息,即使在组中的所有应用程序都已停止时也是如此。

匿名订阅本质上是不持久的。对于某些绑定器实现(例如 RabbitMQ),可以有非持久的组订阅。

通常,在将应用程序绑定到给定目标时,最好始终指定消费者组。在扩展 Spring Cloud Stream 应用程序时,您必须为其每个输入绑定指定一个消费者组。这样做可以防止应用程序的实例接收重复的消息(除非需要这种行为,这种情况不常见)。

分区支持

Spring Cloud Stream 为给定应用程序的多个实例之间的数据分区提供支持。在分区场景中,物理通信媒介(如代理主题)被视为被构建成多个分区。一个或多个生产者应用程序实例向多个消费者应用程序实例发送数据,并确保由公共特征标识的数据由同一个消费者实例处理。

Spring Cloud Stream 为以统一方式实现分区处理用例提供了一个公共抽象。因此,无论代理本身是否天然分区(例如,Kafka)或不分区(例如,RabbitMQ),都可以使用分区。

SCSt partitioning
图 4. Spring Cloud Stream 分区

分区是有状态处理中的一个关键概念,在有状态处理中,确保所有相关数据一起处理至关重要(出于性能或一致性原因)。例如,在时间窗口平均计算示例中,由任何给定传感器产生的所有测量值由同一个应用程序实例处理非常重要。

要设置分区处理场景,您必须配置数据生成端和数据消耗端。

编程模型

要了解编程模型,您应该熟悉以下核心概念

  • 目标绑定:负责提供与外部消息系统集成的组件。

  • 绑定:外部消息系统与应用程序提供的消息生产者消费者(由目标绑定创建)之间的桥梁。

  • 消息:生产者和消费者用于与目标绑定(以及通过外部消息系统与其他应用程序)通信的规范数据结构。

SCSt overview

目标绑定

目标绑定是 Spring Cloud Stream 的扩展组件,负责提供必要的配置和实现,以促进与外部消息系统的集成。此集成负责与生产者和消费者之间建立连接、委派和路由消息、数据类型转换、调用用户代码等。

绑定处理了很多原本会落在您肩上的样板职责。但是,为了实现这一点,绑定仍然需要用户提供一些极简但必需的指令,这些指令通常以某种类型的绑定配置的形式出现。

虽然讨论所有可用的粘合剂和粘合配置选项超出了本节的范围(手册的其余部分对此进行了广泛的介绍),但粘合作为一个概念确实需要特别注意。下一节将对此进行详细讨论。

粘合

如前所述,粘合在外部消息系统(例如队列、主题等)和应用程序提供的生产者消费者之间提供了一个桥梁。

以下示例显示了一个配置齐全且功能正常的 Spring Cloud Stream 应用程序,该应用程序将消息的有效负载接收为 String 类型(请参阅内容类型协商部分),将其记录到控制台,然后在将其转换为大写后将其向下发送。

@SpringBootApplication
public class SampleApplication {

	public static void main(String[] args) {
		SpringApplication.run(SampleApplication.class, args);
	}

	@Bean
	public Function<String, String> uppercase() {
	    return value -> {
	        System.out.println("Received: " + value);
	        return value.toUpperCase();
	    };
	}
}

上述示例看起来与任何普通的 Spring Boot 应用程序没有什么不同。它定义了一个类型为 Function 的单一 bean,仅此而已。那么,它如何成为 Spring Cloud Stream 应用程序呢?它成为 Spring Cloud Stream 应用程序仅仅是因为类路径上存在 Spring Cloud Stream 和粘合剂依赖项以及自动配置类,从而有效地将引导应用程序的上下文设置为 Spring Cloud Stream 应用程序。在此上下文中,类型为 SupplierFunctionConsumer 的 bean 被视为事实上的消息处理程序,触发对由提供的粘合剂公开的目标的绑定,遵循某些命名约定和规则以避免额外的配置。

粘合和粘合名称

粘合是一个抽象概念,表示粘合剂和用户代码公开的源和目标之间的桥梁,此抽象概念有一个名称,虽然我们尽最大努力限制运行 Spring Cloud Stream 应用程序所需的配置,但了解此类名称对于需要额外按粘合配置的情况是必要的。

在整个手册中,您将看到配置属性的示例,例如 spring.cloud.stream.bindings.input.destination=myQueue。此属性名称中的 input 段是我们所说的粘合名称,它可以通过多种机制派生。以下小节将描述 Spring Cloud Stream 用于控制粘合名称的命名约定和配置元素。

如果您的绑定名称包含特殊字符,例如 . 字符,则需要使用方括号 ([]) 将绑定键括起来,然后用引号将其引起来。例如 spring.cloud.stream.bindings."[my.output.binding.key]".destination
函数绑定名称

与 spring-cloud-stream 早期版本中使用的基于注释的支持(旧版)所需的显式命名不同,函数式编程模型在绑定名称方面默认采用了一个简单的约定,从而极大地简化了应用程序配置。我们来看第一个示例

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<String, String> uppercase() {
	    return value -> value.toUpperCase();
	}
}

在前面的示例中,我们有一个应用程序,其中包含一个作为消息处理程序的函数。作为 Function,它有一个输入和一个输出。用于命名输入和输出绑定的命名约定如下

  • 输入 - <functionName> + -in- + <index>

  • 输出 - <functionName> + -out- + <index>

inout 对应于绑定类型(例如 inputoutput)。index 是输入或输出绑定的索引。对于典型的单输入/输出函数,它始终为 0,因此它仅与 具有多个输入和输出参数的函数 相关。

因此,例如,如果您想将此函数的输入映射到名为“my-topic”的远程目标(例如,主题、队列等),则可以使用以下属性来执行此操作

--spring.cloud.stream.bindings.uppercase-in-0.destination=my-topic

请注意如何将 uppercase-in-0 用作属性名称中的一个片段。uppercase-out-0 也是如此。

描述性绑定名称

有时为了提高可读性,您可能希望为您的绑定指定一个更具描述性的名称(例如“account”、“orders”等)。另一种看待它的方式是,您可以将一个隐式绑定名称映射到一个显式绑定名称。您可以使用 spring.cloud.stream.function.bindings.<binding-name> 属性来实现此目的。此属性还为依赖于需要显式名称的基于自定义接口的绑定的现有应用程序提供了一个迁移路径。

例如,

--spring.cloud.stream.function.bindings.uppercase-in-0=input

在前面的示例中,您将 uppercase-in-0 绑定名称映射并有效地重命名为 input。现在,所有配置属性都可以引用 input 绑定名称(例如,--spring.cloud.stream.bindings.input.destination=my-topic)。

虽然描述性绑定名称可以增强配置的可读性,但它们也通过将隐式绑定名称映射到显式绑定名称来创建另一个级别的错误引导。并且由于所有后续配置属性都将使用显式绑定名称,因此您必须始终参考此“bindings”属性以关联它实际对应的函数。我们认为,对于大多数情况(函数组合 除外),这可能是一种矫枉过正,因此,我们建议完全避免使用它,特别是因为它不使用它提供了绑定程序目标和绑定名称之间的清晰路径,例如 spring.cloud.stream.bindings.uppercase-in-0.destination=sample-topic,您在此处清楚地将 uppercase 函数的输入与 sample-topic 目标相关联。

有关属性和其他配置选项的更多信息,请参阅 配置选项 部分。

显式绑定创建

在上一部分中,我们解释了如何由应用程序提供的 FunctionSupplierConsumer bean 的名称隐式驱动创建绑定。但是,有时您可能需要显式创建绑定,其中绑定不与任何函数相关联。这通常通过 StreamBridge 来完成与其他框架的集成。

Spring Cloud Stream 允许您通过 spring.cloud.stream.input-bindingsspring.cloud.stream.output-bindings 属性显式定义输入和输出绑定。注意属性名称中的复数形式允许您使用 ; 作为分隔符来定义多个绑定。只需查看以下测试用例作为示例

@Test
public void testExplicitBindings() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
		TestChannelBinderConfiguration.getCompleteConfiguration(EmptyConfiguration.class))
				.web(WebApplicationType.NONE)
				.run("--spring.jmx.enabled=false",
					"--spring.cloud.stream.input-bindings=fooin;barin",
					"--spring.cloud.stream.output-bindings=fooout;barout")) {


	. . .
	}
}

@EnableAutoConfiguration
@Configuration
public static class EmptyConfiguration {
}

如您所见,我们声明了两个输入绑定和两个输出绑定,而我们的配置没有定义任何函数,但我们能够成功创建这些绑定并访问其相应的通道。

生成和使用消息

您可以通过编写函数并将其作为 @Bean s 暴露来编写 Spring Cloud Stream 应用程序。您还可以使用基于 Spring Integration 注释的配置或基于 Spring Cloud Stream 注释的配置,尽管从 spring-cloud-stream 3.x 开始,我们建议使用函数式实现。

Spring Cloud Function 支持

概述

自 Spring Cloud Stream v2.1 以来,定义 流处理程序 的另一种选择是使用对 Spring Cloud Function 的内置支持,其中它们可以表示为 java.util.function.[Supplier/Function/Consumer] 类型的 bean。

要指定要绑定到绑定公开的外部目标的函数式 bean,您必须提供 spring.cloud.function.definition 属性。

如果您仅有一个类型为 java.util.function.[Supplier/Function/Consumer] 的单一 Bean,则可以跳过 spring.cloud.function.definition 属性,因为此类功能 Bean 将被自动发现。但是,最好使用此类属性以避免混淆。有时,这种自动发现可能会造成障碍,因为类型为 java.util.function.[Supplier/Function/Consumer] 的单一 Bean 可能用于处理消息以外的其他目的,但由于它是单一的,因此会被自动发现和自动绑定。对于这些罕见的情况,您可以通过提供值设置为 falsespring.cloud.stream.function.autodetect 属性来禁用自动发现。

以下是如何将消息处理程序公开为 java.util.function.Function 的应用程序示例,该示例通过充当数据的使用者和生成者来有效支持直通语义。

@SpringBootApplication
public class MyFunctionBootApp {

	public static void main(String[] args) {
		SpringApplication.run(MyFunctionBootApp.class);
	}

	@Bean
	public Function<String, String> toUpperCase() {
		return s -> s.toUpperCase();
	}
}

在前面的示例中,我们定义了一个类型为 java.util.function.Function 的 Bean,称为 toUpperCase,它充当消息处理程序,其“输入”和“输出”必须绑定到由提供的目标绑定程序公开的外部目标。默认情况下,“输入”和“输出”绑定名称将为 toUpperCase-in-0toUpperCase-out-0。有关用于建立绑定名称的命名约定的详细信息,请参阅功能绑定名称部分。

以下是支持其他语义的简单功能应用程序示例

以下是如何公开为 java.util.function.Supplier语义的示例

@SpringBootApplication
public static class SourceFromSupplier {

	@Bean
	public Supplier<Date> date() {
		return () -> new Date(12345L);
	}
}

以下是如何公开为 java.util.function.Consumer接收器语义的示例

@SpringBootApplication
public static class SinkFromConsumer {

	@Bean
	public Consumer<String> sink() {
		return System.out::println;
	}
}
供应商(源)

在触发其调用方式方面,FunctionConsumer 非常直接。它们根据发送到其绑定的目标的数据(事件)触发。换句话说,它们是经典的事件驱动组件。

但是,Supplier 在触发方面属于其自己的类别。由于它根据定义是数据的源(起源),因此它不会订阅任何入站目标,因此必须由某些其他机制触发。还有 Supplier 实现的问题,它可以是命令式的响应式的,并且与此类供应商的触发直接相关。

考虑以下示例

@SpringBootApplication
public static class SupplierConfiguration {

	@Bean
	public Supplier<String> stringSupplier() {
		return () -> "Hello from Supplier";
	}
}

前面的 Supplier bean 每当调用其 get() 方法时都会生成一个字符串。但是,谁调用此方法以及多久调用一次?该框架提供了一个默认轮询机制(回答“谁?”的问题),该机制将触发对供应商的调用,并且默认情况下每秒执行一次(回答“多久一次?”的问题)。换句话说,上述配置每秒生成一个消息,并且每个消息都发送到由绑定器公开的 output 目标。要了解如何自定义轮询机制,请参阅 轮询配置属性 部分。

考虑一个不同的示例

@SpringBootApplication
public static class SupplierConfiguration {

    @Bean
    public Supplier<Flux<String>> stringSupplier() {
        return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(1000);
                    return "Hello from Supplier";
                } catch (Exception e) {
                    // ignore
                }
            }
        })).subscribeOn(Schedulers.elastic()).share();
    }
}

前面的 Supplier bean 采用响应式编程风格。通常,与命令式供应商不同,它应该只触发一次,因为调用其 get() 方法会生成(提供)连续的消息流,而不是单个消息。

该框架识别编程风格的差异,并保证此类供应商仅触发一次。

但是,想象一下您希望轮询某个数据源并返回表示结果集的有限数据流的用例。响应式编程风格是此类供应商的完美机制。但是,鉴于生成流的有限性,此类供应商仍需要定期调用。

考虑以下示例,它通过生成有限的数据流来模拟此类用例

@SpringBootApplication
public static class SupplierConfiguration {

	@PollableBean
	public Supplier<Flux<String>> stringSupplier() {
		return () -> Flux.just("hello", "bye");
	}
}

bean 本身使用 PollableBean 注释(@Bean 的子集)进行注释,从而向框架发出信号,表明尽管此类供应商的实现是响应式的,但仍需要对其进行轮询。

PollableBean 中定义了一个 splittable 属性,它向此注释的后处理器发出信号,表明注释组件生成的结果必须拆分,并且默认设置为 true。这意味着框架将拆分返回,将每个项目作为单个消息发送出去。如果这不是所需的行为,则可以将其设置为 false,此时此类供应商将简单地返回生成的 Flux,而不将其拆分。
供应商和线程
正如你现在所了解的,与由事件触发的FunctionConsumer(它们有输入数据)不同,Supplier没有任何输入,因此由不同的机制触发 - 轮询器,它可能有一个不可预测的线程机制。虽然大多数情况下线程机制的详细信息与函数的下游执行无关,但在某些情况下,它可能会出现问题,尤其是在可能对线程关联性有某些期望的集成框架中。例如,Spring Cloud Sleuth依赖于存储在本地线程中的跟踪数据。对于这些情况,我们通过StreamBridge提供了另一种机制,用户可以在其中对线程机制有更多控制。你可以在向输出发送任意数据(例如外部事件驱动源)部分中获取更多详细信息。
消费者(反应式)

反应式Consumer有点特殊,因为它有一个void返回类型,框架没有引用可以订阅。你很可能不需要编写Consumer<Flux<?>>,而是将其写为Function<Flux<?>, Mono<Void>>,将then运算符作为流上的最后一个运算符调用。

例如

public Function<Flux<?>, Mono<Void>> consumer() {
	return flux -> flux.map(..).filter(..).then();
}

但是,如果你确实需要编写一个显式的Consumer<Flux<?>>,请记住订阅传入的Flux。

此外,请记住,在混合反应式函数和命令式函数时,函数组合也适用相同的规则。Spring Cloud Function确实支持将反应式函数与命令式函数组合,但你必须意识到某些限制。例如,假设你已经将反应式函数与命令式消费者组合。这种组合的结果是一个反应式Consumer。但是,没有办法订阅这样的消费者,如本节前面所讨论的,因此这个限制只能通过使你的消费者反应并手动订阅(如前面所讨论的)或将你的函数更改为命令式来解决。

轮询配置属性

Spring Cloud Stream公开了以下属性,并以spring.integration.poller.为前缀。

fixedDelay

默认轮询器的固定延迟(以毫秒为单位)。

默认值:1000L。

maxMessagesPerPoll

默认轮询器的每个轮询事件的最大消息数。

默认值:1L。

cron

Cron 触发器的 Cron 表达式值。

默认值:无。

initialDelay

周期性触发器的初始延迟。

默认值:0。

timeUnit

应用于延迟值的 TimeUnit。

默认值:MILLISECONDS。

例如,--spring.integration.poller.fixed-delay=2000 将轮询器间隔设置为每两秒轮询一次。

按绑定轮询配置

上一节展示了如何配置将应用于所有绑定的单个默认轮询器。虽然它非常适合 Spring Cloud Stream 设计的微服务模型,其中每个微服务表示一个组件(例如,Supplier),因此默认轮询器配置就足够了,但在某些情况下,您可能有多个需要不同轮询配置的组件

对于此类情况,请使用按绑定方式配置轮询器。例如,假设您有一个输出绑定 supply-out-0。在这种情况下,您可以使用前缀 spring.cloud.stream.bindings.supply-out-0.producer.poller.. 为此类绑定配置轮询器(例如,spring.cloud.stream.bindings.supply-out-0.producer.poller.fixed-delay=2000)。

向输出发送任意数据(例如,外部事件驱动的源)

在某些情况下,实际数据源可能来自外部(外部)系统,而不是 Binder。例如,数据源可能是经典的 REST 端点。我们如何使用 Spring Cloud Stream 使用的功能机制桥接此类源?

Spring Cloud Stream 提供了两种机制,让我们详细了解一下

在这里,对于这两个示例,我们将使用一个标准的 MVC 端点方法,称为 delegateToSupplier,该方法绑定到根 Web 上下文,通过 StreamBridge 机制将传入请求委派到流。

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.output-bindings=toStream");
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("toStream", body);
	}
}

在这里,我们自动装配了一个 StreamBridge Bean,它允许我们将数据发送到输出绑定,从而有效地将非流应用程序与 Spring Cloud Stream 桥接起来。请注意,前面的示例没有定义任何源函数(例如,Supplier Bean),从而使框架没有触发器来预先创建源绑定,这对于包含函数 Bean 的配置来说是典型的。这很好,因为 StreamBridge 将在第一次调用其 send(..) 操作时为不存在的绑定启动输出绑定的创建(以及必要的目标自动配置),并将其缓存起来以供后续重用(有关更多详细信息,请参见 StreamBridge 和动态目标)。

但是,如果您希望在初始化(启动)时预先创建输出绑定,您可以受益于 spring.cloud.stream.output-bindings 属性,您可以在其中声明源的名称。提供的名称将用作创建源绑定的触发器。您可以使用 ; 表示多个源(多个输出绑定)(例如,--spring.cloud.stream.output-bindings=foo;bar

另外,请注意 streamBridge.send(..) 方法为数据采用 Object。这意味着您可以向其发送 POJO 或 Message,并且在发送输出时,它将经历与任何 Function 或 Supplier 相同的例程,提供与函数相同的级别的一致性。这意味着输出类型转换、分区等将被视为来自函数产生的输出。

StreamBridge 和动态目标

StreamBridge 还可以用于在输出目标在类似于 路由到消费者 部分中描述的用例中提前未知的情况下。

我们来看一个示例

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, args);
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("myDestination", body);
	}
}

如您所见,前面的示例与上一个示例非常相似,只是通过 spring.cloud.stream.output-bindings 属性(未提供)提供了显式的绑定指令。在这里,我们正在向不存在作为绑定的 myDestination 名称发送数据。因此,此类名称将被视为动态目标,如 路由到消费者 部分中所述。

在前面的示例中,我们使用 ApplicationRunner 作为外部源来提供流。

一个更实际的示例,其中外部源是 REST 端点。

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class);
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		streamBridge.send("myBinding", body);
	}
}

如您在 delegateToSupplier 方法中所见,我们正在使用 StreamBridge 将数据发送到 myBinding 绑定。在这里,您还受益于 StreamBridge 的动态特性,即如果 myBinding 不存在,它将自动创建并缓存,否则将使用现有绑定。

缓存动态目标(绑定)可能会导致在存在许多动态目标时出现内存泄漏。为了具有一定程度的控制,我们为输出绑定提供了一种自驱逐缓存机制,默认缓存大小为 10。这意味着如果您的动态目标大小超过该数字,则现有绑定可能会被驱逐,因此需要重新创建,这可能会导致轻微的性能下降。您可以通过 spring.cloud.stream.dynamic-destination-cache-size 属性将其设置为所需值来增加缓存大小。
curl -H "Content-Type: text/plain" -X POST -d "hello from the other side" https://127.0.0.1:8080/

通过展示两个示例,我们希望强调这种方法适用于任何类型的外部源。

如果您正在使用 Solace PubSub+ 绑定器,Spring Cloud Stream 保留了 scst_targetDestination 标头(可通过 BinderHeaders.TARGET_DESTINATION 检索),该标头允许将消息从其绑定的配置目标重定向到此标头指定的目标目标。这允许绑定器管理发布到动态目标所需的资源,从而减轻了框架的负担,并避免了前面注意中提到的缓存问题。更多信息 在此处
使用 StreamBridge 输出内容类型

如有必要,您还可以使用以下方法签名提供特定内容类型:public boolean send(String bindingName, Object data, MimeType outputContentType)。或者,如果您将数据作为 Message 发送,则会保留其内容类型。

使用 StreamBridge 的特定绑定器类型

Spring Cloud Stream 支持多种绑定器场景。例如,您可能从 Kafka 接收数据并将其发送到 RabbitMQ。

有关多个绑定器场景的更多信息,请参阅 绑定器 部分,特别是 类路径上的多个绑定器

如果您计划使用 StreamBridge 并在应用程序中配置了多个绑定器,则还必须告诉 StreamBridge 使用哪个绑定器。为此,还有两种 send 方法变体

public boolean send(String bindingName, @Nullable String binderType, Object data)

public boolean send(String bindingName, @Nullable String binderType, Object data, MimeType outputContentType)

如您所见,您可以提供一个附加参数 - binderType,它会告诉 BindingService 在创建动态绑定时使用哪个绑定器。

对于使用 spring.cloud.stream.output-bindings 属性或已在不同绑定器下创建绑定的情况,binderType 参数将不起作用。
使用 StreamBridge 的通道拦截器

由于 StreamBridge 使用 MessageChannel 建立输出绑定,因此您可以在通过 StreamBridge 发送数据时激活通道拦截器。应用程序决定在 StreamBridge 上应用哪些通道拦截器。除非使用 @GlobalChannelInterceptor(patterns = "*") 对其进行注释,否则 Spring Cloud Stream 不会将检测到的所有通道拦截器注入到 StreamBridge 中。

让我们假设您在应用程序中具有以下两个不同的 StreamBridge 绑定。

streamBridge.send("foo-out-0", message);

streamBridge.send("bar-out-0", message);

现在,如果您希望在两个 StreamBridge 绑定上应用通道拦截器,则可以声明以下 GlobalChannelInterceptor bean。

@Bean
@GlobalChannelInterceptor(patterns = "*")
public ChannelInterceptor customInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

但是,如果您不喜欢上述全局方法,并且希望为每个绑定设置一个专用拦截器,则可以执行以下操作。

@Bean
@GlobalChannelInterceptor(patterns = "foo-*")
public ChannelInterceptor fooInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

@Bean
@GlobalChannelInterceptor(patterns = "bar-*")
public ChannelInterceptor barInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

您可以灵活地根据业务需求使模式更严格或更定制。

通过这种方法,应用程序能够决定注入到 StreamBridge 中的拦截器,而不是应用所有可用的拦截器。

StreamBridge 通过 StreamOperations 接口提供了一个契约,该接口包含 StreamBridge 的所有 send 方法。因此,应用程序可以选择使用 StreamOperations 进行自动装配。在使用 StreamBridge 的单元测试代码时,这非常方便,因为它为 StreamOperations 接口提供了模拟或类似机制。
反应式函数支持

由于 Spring Cloud Function 构建在 Project Reactor 之上,因此在实现 SupplierFunctionConsumer 时,无需做太多工作即可受益于反应式编程模型。

例如

@SpringBootApplication
public static class SinkFromConsumer {

	@Bean
	public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
		return flux -> flux.map(val -> val.toUpperCase());
	}
}

在选择反应式或命令式编程模型时,必须理解一些重要事项。

完全反应式还是仅限 API?

使用反应式 API 并不一定意味着你可以受益于此类 API 的所有反应式特性。换句话说,诸如反压和其他高级特性之类的功能仅在与兼容系统(例如 Reactive Kafka 绑定器)配合使用时才有效。如果你使用常规 Kafka 或 Rabbit 或任何其他非反应式绑定器,那么你只能受益于反应式 API 本身的便利性,而不能受益于其高级特性,因为流的实际来源或目标不是反应式的。

错误处理和重试

在整个手册中,你将看到对基于框架的错误处理、重试和其他特性的若干引用,以及与它们关联的配置属性。重要的是要理解,它们只影响命令式函数,对于反应式函数,你不应该有同样的期望。原因如下。反应式函数和命令式函数之间存在根本区别。命令式函数是一个消息处理程序,由框架在接收到的每条消息上调用。因此,对于 N 条消息,将调用此类函数 N 次,因此我们可以包装此类函数并添加其他功能,例如错误处理、重试等。反应式函数是初始化函数。它仅被调用一次,以获取用户提供的 Flux/Mono 的引用,以便与框架提供的 Flux/Mono 连接。在此之后,我们(框架)对流完全没有可见性或控制权。因此,对于反应式函数,在错误处理和重试方面,你必须依赖反应式 API 的丰富性(即 doOnError().onError*() 等)。

函数式组合

使用函数式编程模型,你还可以受益于函数式组合,你可以在其中从一组简单函数动态组合复杂的处理程序。作为一个示例,让我们将以下函数 bean 添加到上面定义的应用程序中

@Bean
public Function<String, String> wrapInQuotes() {
	return s -> "\"" + s + "\"";
}

并修改 spring.cloud.function.definition 属性,以反映你打算从“toUpperCase”和“wrapInQuotes”组合一个新函数的意图。为此,Spring Cloud Function 依赖于 |(管道)符号。因此,为了完成我们的示例,我们的属性现在将如下所示

--spring.cloud.function.definition=toUpperCase|wrapInQuotes
Spring Cloud Function 提供的函数式组合支持的一大好处是,你可以组合反应式命令式函数。

组合的结果是一个单一函数,正如你所猜测的,它可能有一个非常长且难以理解的名称(例如,foo|bar|baz|xyz. . .),在涉及其他配置属性时会带来极大的不便。这就是功能绑定名称部分中描述的描述性绑定名称功能可以提供帮助的地方。

例如,如果我们想为toUpperCase|wrapInQuotes提供一个更具描述性的名称,我们可以使用以下属性spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-in-0=quotedUpperCaseInput,允许其他配置属性引用该绑定名称(例如,spring.cloud.stream.bindings.quotedUpperCaseInput.destination=myDestination)。

功能组合和横切关注点

函数组合实际上允许你通过将复杂性分解为一组简单且可单独管理/测试的组件来解决复杂性,这些组件在运行时仍可以表示为一个组件。但这并不是唯一的优点。

你还可以使用组合来解决某些横切非功能性关注点,例如内容丰富。例如,假设你有一个可能缺少某些标头或某些标头不在你的业务功能所期望的确切状态的传入消息。你现在可以实现一个单独的函数来解决这些问题,然后将其与主业务函数组合起来。

我们来看一个示例

@SpringBootApplication
public class DemoStreamApplication {

	public static void main(String[] args) {
		SpringApplication.run(DemoStreamApplication.class,
				"--spring.cloud.function.definition=enrich|echo",
				"--spring.cloud.stream.function.bindings.enrich|echo-in-0=input",
				"--spring.cloud.stream.bindings.input.destination=myDestination",
				"--spring.cloud.stream.bindings.input.group=myGroup");

	}

	@Bean
	public Function<Message<String>, Message<String>> enrich() {
		return message -> {
			Assert.isTrue(!message.getHeaders().containsKey("foo"), "Should NOT contain 'foo' header");
			return MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
		};
	}

	@Bean
	public Function<Message<String>, Message<String>> echo() {
		return message -> {
			Assert.isTrue(message.getHeaders().containsKey("foo"), "Should contain 'foo' header");
			System.out.println("Incoming message " + message);
			return message;
		};
	}
}

虽然微不足道,但此示例演示了一个函数如何使用附加标头(非功能性关注点)丰富传入消息,因此另一个函数 - echo - 可以从中受益。echo函数保持简洁,只关注业务逻辑。你还可以看到spring.cloud.stream.function.bindings属性的用法,以简化组合绑定名称。

具有多个输入和输出参数的函数

从版本 3.0 开始,spring-cloud-stream 为具有多个输入和/或多个输出(返回值)的函数提供支持。这实际上意味着什么,它针对哪种类型的用例?

  • 大数据:想象一下你正在处理的数据源非常混乱,包含各种类型的数据元素(例如,订单、交易等),而你实际上需要对它进行整理。

  • 数据聚合:另一个用例可能要求你合并来自 2 个以上传入的数据元素.

上述内容仅描述了可能需要使用单个函数来接受和/或生成多个数据的一些用例。而这正是我们在此针对的用例类型。

另外,请注意此处关于概念的略微不同的重点。假设只有在允许访问数据实际流(而不是各个元素)时,此类函数才具有价值。因此,我们依赖于 Project Reactor(即 FluxMono)提供的抽象,它已作为 spring-cloud-functions 引入的依赖项的一部分在类路径上可用。

另一个重要方面是多个输入和输出的表示。虽然 java 提供了各种不同的抽象来表示多个某物,但这些抽象a) 无界b) 缺乏元数,并且c) 缺乏类型信息,而这些在该上下文中都很重要。例如,让我们看看 Collection 或数组,它只允许我们描述单个类型的多个或将所有内容向上转型为 Object,从而影响 spring-cloud-stream 的透明类型转换功能,依此类推。

因此,为了满足所有这些要求,最初的支持依赖于利用 Project Reactor 提供的另一个抽象 - 元组的签名。但是,我们正在努力允许更灵活的签名。

请参阅 绑定和绑定名称 部分,以了解用于建立此类应用程序使用的绑定名称的命名约定。

让我们看几个示例

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<String>> gather() {
		return tuple -> {
			Flux<String> stringStream = tuple.getT1();
			Flux<String> intStream = tuple.getT2().map(i -> String.valueOf(i));
			return Flux.merge(stringStream, intStream);
		};
	}
}

上述示例演示了获取两个输入(第一个为 String 类型,第二个为 Integer 类型)并生成一个 String 类型的单个输出的函数。

因此,对于上述示例,两个输入绑定将是 gather-in-0gather-in-1,为了保持一致性,输出绑定也遵循相同的约定,并命名为 gather-out-0

了解这一点将允许您设置特定于绑定的属性。例如,以下内容将覆盖 gather-in-0 绑定的内容类型

--spring.cloud.stream.bindings.gather-in-0.content-type=text/plain
@SpringBootApplication
public class SampleApplication {

	@Bean
	public static Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> scatter() {
		return flux -> {
			Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
			UnicastProcessor even = UnicastProcessor.create();
			UnicastProcessor odd = UnicastProcessor.create();
			Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0).doOnNext(number -> even.onNext("EVEN: " + number));
			Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0).doOnNext(number -> odd.onNext("ODD: " + number));

			return Tuples.of(Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()), Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()));
		};
	}
}

上述示例与前一个示例有点相反,演示了获取 Integer 类型的单个输入并生成两个输出(均为 String 类型)的函数。

因此,对于上述示例,输入绑定为 scatter-in-0,输出绑定为 scatter-out-0scatter-out-1

您可以使用以下代码对其进行测试

@Test
public void testSingleInputMultiOutput() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					SampleApplication.class))
							.run("--spring.cloud.function.definition=scatter")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		for (int i = 0; i < 10; i++) {
			inputDestination.send(MessageBuilder.withPayload(String.valueOf(i).getBytes()).build());
		}

		int counter = 0;
		for (int i = 0; i < 5; i++) {
			Message<byte[]> even = outputDestination.receive(0, 0);
			assertThat(even.getPayload()).isEqualTo(("EVEN: " + String.valueOf(counter++)).getBytes());
			Message<byte[]> odd = outputDestination.receive(0, 1);
			assertThat(odd.getPayload()).isEqualTo(("ODD: " + String.valueOf(counter++)).getBytes());
		}
	}
}
单个应用程序中的多个函数

可能还需要在单个应用程序中对多个消息处理程序进行分组。您可以通过定义多个函数来执行此操作。

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<String, String> uppercase() {
		return value -> value.toUpperCase();
	}

	@Bean
	public Function<String, String> reverse() {
		return value -> new StringBuilder(value).reverse().toString();
	}
}

在上面的示例中,我们有配置,其中定义了两个函数 uppercasereverse。因此,首先,如前所述,我们需要注意到存在冲突(多个函数),因此我们需要通过提供指向我们想要绑定的实际函数的 spring.cloud.function.definition 属性来解决它。除了这里我们将使用 ; 分隔符来指向这两个函数(参见下面的测试用例)。

对于具有多个输入/输出的函数,请参阅 绑定和绑定名称 部分,以了解用于建立此类应用程序使用的 绑定名称 的命名约定。

您可以使用以下代码对其进行测试

@Test
public void testMultipleFunctions() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					ReactiveFunctionConfiguration.class))
							.run("--spring.cloud.function.definition=uppercase;reverse")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
		inputDestination.send(inputMessage, "uppercase-in-0");
		inputDestination.send(inputMessage, "reverse-in-0");

		Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());

		outputMessage = outputDestination.receive(0, "reverse-out-1");
		assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
	}
}
批量消费者

在使用支持批量侦听器的 MessageChannelBinder 并且为消费者绑定启用了该功能时,可以将 spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode 设置为 true,以启用将整个消息批次作为 List 传递给函数。

@Bean
public Function<List<Person>, Person> findFirstPerson() {
    return persons -> persons.get(0);
}
批量生产者

您还可以在生产者端使用批处理的概念,方法是返回一个消息集合,该集合有效地提供了相反的效果,其中集合中的每条消息都将由绑定器单独发送。

考虑以下函数

@Bean
public Function<String, List<Message<String>>> batch() {
	return p -> {
		List<Message<String>> list = new ArrayList<>();
		list.add(MessageBuilder.withPayload(p + ":1").build());
		list.add(MessageBuilder.withPayload(p + ":2").build());
		list.add(MessageBuilder.withPayload(p + ":3").build());
		list.add(MessageBuilder.withPayload(p + ":4").build());
		return list;
	};
}

返回列表中的每条消息都将单独发送,从而导致四条消息发送到输出目的地。

Spring Integration 流作为函数

在实现函数时,您可能会有复杂的要求,这些要求适合 企业集成模式 (EIP) 类别。最好使用 Spring Integration (SI) 等框架来处理这些问题,它是 EIP 的参考实现。

值得庆幸的是,SI 已经通过 集成流作为网关 提供了将集成流公开为函数的支持,请考虑以下示例

@SpringBootApplication
public class FunctionSampleSpringIntegrationApplication {

	public static void main(String[] args) {
		SpringApplication.run(FunctionSampleSpringIntegrationApplication.class, args);
	}

	@Bean
	public IntegrationFlow uppercaseFlow() {
		return IntegrationFlows.from(MessageFunction.class, "uppercase")
				.<String, String>transform(String::toUpperCase)
				.logAndReply(LoggingHandler.Level.WARN);
	}

	public interface MessageFunction extends Function<Message<String>, Message<String>> {

	}
}

对于熟悉 SI 的人,您可以看到我们定义了一个类型为 IntegrationFlow 的 bean,其中我们声明了一个集成流,我们希望将其公开为 Function<String, String>(使用 SI DSL)称为 uppercaseMessageFunction 接口允许我们明确声明输入和输出的类型以进行适当的类型转换。有关类型转换的更多信息,请参阅 内容类型协商 部分。

要接收原始输入,可以使用 from(Function.class, …​)

生成的函数绑定到目标绑定器公开的输入和输出目的地。

请参阅 绑定和绑定名称 部分,以了解用于建立此类应用程序使用的绑定名称的命名约定。

有关 Spring Integration 和 Spring Cloud Stream 的互操作性(特别是围绕函数式编程模型)的更多详细信息,您可能会发现这篇文章非常有趣,因为它深入探讨了您可以通过合并 Spring Integration 和 Spring Cloud Stream/Functions 的最佳功能而应用的各种模式。

使用轮询使用者

概述

使用轮询使用者时,您可以按需轮询 PollableMessageSource。要定义轮询使用者的绑定,您需要提供 spring.cloud.stream.pollable-source 属性。

考虑以下轮询使用者绑定的示例

--spring.cloud.stream.pollable-source=myDestination

前一个示例中的轮询来源名称 myDestination 将导致 myDestination-in-0 绑定名称保持与函数式编程模型一致。

鉴于前一个示例中的轮询使用者,您可能会像下面这样使用它

@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
    return args -> {
        while (someCondition()) {
            try {
                if (!destIn.poll(m -> {
                    String newPayload = ((String) m.getPayload()).toUpperCase();
                    destOut.send(new GenericMessage<>(newPayload));
                })) {
                    Thread.sleep(1000);
                }
            }
            catch (Exception e) {
                // handle failure
            }
        }
    };
}

一种不太手动且更像 Spring 的替代方法是配置一个计划任务 Bean。例如,

@Scheduled(fixedDelay = 5_000)
public void poll() {
	System.out.println("Polling...");
	this.source.poll(m -> {
		System.out.println(m.getPayload());

	}, new ParameterizedTypeReference<Foo>() { });
}

PollableMessageSource.poll() 方法采用 MessageHandler 参数(通常是 lambda 表达式,如这里所示)。如果消息被接收并成功处理,它将返回 true

与消息驱动的使用者一样,如果 MessageHandler 抛出异常,消息将发布到错误通道,如 错误处理 中所述。

通常,当 MessageHandler 退出时,poll() 方法确认消息。如果该方法异常退出,消息将被拒绝(不会重新排队),但请参阅 处理错误。您可以通过承担确认的责任来覆盖该行为,如下例所示

@Bean
public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
    return args -> {
        while (someCondition()) {
            if (!dest1In.poll(m -> {
                StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
                // e.g. hand off to another thread which can perform the ack
                // or acknowledge(Status.REQUEUE)

            })) {
                Thread.sleep(1000);
            }
        }
    };
}
您必须在某个时刻 ack(或 nack)消息,以避免资源泄漏。
一些消息传递系统(例如 Apache Kafka)在日志中维护一个简单的偏移量。如果传递失败并使用 StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE); 重新排队,任何稍后成功确认的消息都将重新传递。

还有一个重载的 poll 方法,其定义如下

poll(MessageHandler handler, ParameterizedTypeReference<?> type)

type 是一个转换提示,允许转换传入的消息有效负载,如下例所示

boolean result = pollableSource.poll(received -> {
			Map<String, Foo> payload = (Map<String, Foo>) received.getPayload();
            ...

		}, new ParameterizedTypeReference<Map<String, Foo>>() {});
处理错误

默认情况下,轮询源配置了一个错误通道;如果回调抛出异常,则会向错误通道(<destination>.<group>.errors)发送一个 ErrorMessage;此错误通道还桥接到全局 Spring Integration errorChannel

你可以使用 @ServiceActivator 订阅任一错误通道以处理错误;如果没有订阅,则错误将被简单地记录,并且消息将被确认成功。如果错误通道服务激活器抛出异常,则消息将被拒绝(默认情况下)并且不会重新发送。如果服务激活器抛出 RequeueCurrentMessageException,则消息将在代理中重新排队,并且将在后续轮询中再次检索。

如果侦听器直接抛出 RequeueCurrentMessageException,则消息将如上所述重新排队,并且不会发送到错误通道。

事件路由

在 Spring Cloud Stream 的上下文中,事件路由是指将事件路由到特定事件订阅者或将事件订阅者产生的事件路由到特定目标的能力。在这里,我们将称之为路由“到”和路由“从”。

路由到消费者

可以通过依赖 Spring Cloud Function 3.0 中提供的 RoutingFunction 来实现路由。你所需要做的就是通过 --spring.cloud.stream.function.routing.enabled=true 应用程序属性启用它,或者提供 spring.cloud.function.routing-expression 属性。启用后,RoutingFunction 将绑定到接收所有消息的输入目标,并根据提供的指令将它们路由到其他函数。

为了绑定,路由目标的名称是 functionRouter-in-0(参见 RoutingFunction.FUNCTION_NAME 和绑定命名约定 功能绑定名称)。

可以使用各个消息以及应用程序属性提供指令。

这里有一些示例

使用消息头
@SpringBootApplication
public class SampleApplication {

	public static void main(String[] args) {
		SpringApplication.run(SampleApplication.class,
                       "--spring.cloud.stream.function.routing.enabled=true");
	}

	@Bean
	public Consumer<String> even() {
		return value -> {
			System.out.println("EVEN: " + value);
		};
	}

	@Bean
	public Consumer<String> odd() {
		return value -> {
			System.out.println("ODD: " + value);
		};
    }
}

通过将消息发送到由绑定程序公开的 functionRouter-in-0 目标(即 rabbit、kafka),此消息将被路由到适当的(“偶数”或“奇数”)消费者。

默认情况下,RoutingFunction 将查找 spring.cloud.function.definitionspring.cloud.function.routing-expression(对于使用 SpEL 的更动态场景)头,如果找到,则其值将被视为路由指令。

例如,将 spring.cloud.function.routing-expression 头设置为值 T(java.lang.System).currentTimeMillis() % 2 == 0 ? 'even' : 'odd' 将最终以半随机的方式将请求路由到 oddeven 函数。此外,对于 SpEL,评估上下文的根对象Message,因此你也可以对各个头(或消息)进行评估,例如 …​.routing-expression=headers['type']

使用应用程序属性

spring.cloud.function.routing-expression 和/或 spring.cloud.function.definition 可以作为应用程序属性传递(例如,spring.cloud.function.routing-expression=headers['type']

@SpringBootApplication
public class RoutingStreamApplication {

  public static void main(String[] args) {
      SpringApplication.run(RoutingStreamApplication.class,
	  "--spring.cloud.function.routing-expression="
	  + "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
  }
  @Bean
  public Consumer<Integer> even() {
    return value -> System.out.println("EVEN: " + value);
  }

  @Bean
  public Consumer<Integer> odd() {
    return value -> System.out.println("ODD: " + value);
  }
}
通过应用程序属性传递指令对于响应式函数尤为重要,因为响应式函数仅调用一次以传递发布者,因此对各个项目的访问受到限制。
路由函数和输出绑定

RoutingFunction 是一个 Function,因此与任何其他函数没有不同。嗯... 几乎。

RoutingFunction 路由到另一个 Function 时,它的输出将被发送到 RoutingFunction 的输出绑定,即预期的 functionRouter-in-0。但是,如果 RoutingFunction 路由到 Consumer 怎么办?换句话说,调用 RoutingFunction 的结果可能不会产生任何要发送到输出绑定的内容,因此甚至没有必要有一个。因此,我们在创建绑定时对 RoutingFunction 的处理略有不同。即使这对您作为用户来说是透明的(实际上您无需执行任何操作),但了解一些机制将有助于您了解其内部工作原理。

因此,规则是;我们从不为 RoutingFunction 创建输出绑定,只创建输入绑定。因此,当您路由到 Consumer 时,RoutingFunction 通过不具有任何输出绑定而有效地成为 Consumer。但是,如果 RoutingFunction 碰巧路由到另一个产生输出的 Function,则 RoutingFunction 的输出绑定将在动态创建,此时 RoutingFunction 将在绑定方面充当常规 Function(具有输入和输出绑定)。

从 Consumer 路由

除了静态目标之外,Spring Cloud Stream 允许应用程序将消息发送到动态绑定的目标。例如,当需要在运行时确定目标目标时,这非常有用。应用程序可以通过两种方式之一进行此操作。

spring.cloud.stream.sendto.destination

您还可以委托框架通过指定 spring.cloud.stream.sendto.destination 标头(设置为要解析的目标的名称)来动态解析输出目标。

考虑以下示例

@SpringBootApplication
@Controller
public class SourceWithDynamicDestination {

    @Bean
	public Function<String, Message<String>> destinationAsPayload() {
		return value -> {
			return MessageBuilder.withPayload(value)
				.setHeader("spring.cloud.stream.sendto.destination", value).build();};
	}
}

尽管很琐碎,但您可以在此示例中清楚地看到,我们的输出是一个消息,其中 spring.cloud.stream.sendto.destination 标头设置为输入参数的值。框架将咨询此标头,并将尝试创建或发现具有该名称的目标并向其发送输出。

如果目标名称是预先已知的,您可以像使用任何其他目标一样配置生产者属性。或者,如果您注册了一个 NewDestinationBindingCallback<> bean,它将在创建绑定之前被调用。回调采用绑定程序使用的扩展生产者属性的泛型类型。它有一个方法

void configure(String destinationName, MessageChannel channel, ProducerProperties producerProperties,
        T extendedProducerProperties);

以下示例演示如何使用 RabbitMQ 绑定程序

@Bean
public NewDestinationBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
    return (name, channel, props, extended) -> {
        props.setRequiredGroups("bindThisQueue");
        extended.setQueueNameGroupOnly(true);
        extended.setAutoBindDlq(true);
        extended.setDeadLetterQueueName("myDLQ");
    };
}
如果您需要使用多种绑定程序类型支持动态目标,请使用 Object 作为泛型类型,并根据需要强制转换 extended 参数。

此外,请参阅 [使用 StreamBridge] 部分,了解如何将另一种选项(StreamBridge)用于类似的情况。

后处理(在发送消息后)

一旦调用函数,其结果就会由框架发送到目标目标,从而有效地完成函数调用周期。

然而,在完成此周期之后执行一些其他任务之前,从业务角度来看,此周期可能不会完全完成。虽然这可以通过 ConsumerStreamBridge 的简单组合来完成,如 此 Stack Overflow 帖子 中所述,但从 4.0.3 版本开始,框架提供了一种更惯用的方法来通过 Spring Cloud Function 项目提供的 PostProcessingFunction 解决此问题。PostProcessingFunction 是一个特殊的半标记函数,它包含一个附加方法 postProcess(Message>),旨在为实现此类后处理任务提供一个位置。

package org.springframework.cloud.function.context
. . .
public interface PostProcessingFunction<I, O> extends Function<I, O> {
	default void postProcess(Message<O> result) {
	}
}

所以,现在您有两个选择。

选项 1:您可以将您的函数实现为 PostProcessingFunction,并通过实现其 postProcess(Message>) 方法来包含额外的后处理行为。

private static class Uppercase implements PostProcessingFunction<String, String> {

	@Override
	public String apply(String input) {
		return input.toUpperCase();
	}

	@Override
	public void postProcess(Message<String> result) {
		System.out.println("Function Uppercase has been successfully invoked and its result successfully sent to target destination");
	}
}
. . .
@Bean
public Function<String, String> uppercase() {
	return new Uppercase();
}

选项 2:如果您已经有一个现有函数,并且不想更改其实现或希望将您的函数保留为 POJO,则可以只实现 postProcess(Message>) 方法,并将此新的后处理函数与您的其他函数组合。

private static class Logger implements PostProcessingFunction<?, String> {

	@Override
	public void postProcess(Message<String> result) {
		System.out.println("Function has been successfully invoked and its result successfully sent to target destination");
	}
}
. . .
@Bean
public Function<String, String> uppercase() {
	return v -> v.toUpperCase();
}
@Bean
public Function<String, String> logger() {
	return new Logger();
}
. . .
//  and then have your function definition as such `uppercase|logger`

注意:在函数组合的情况下,只有 PostProcessingFunction 的最后一个实例(如果存在)才会生效。例如,假设您有以下函数定义 - foo|bar|baz,并且 foobaz 都是 PostProcessingFunction 的实例。只有 baz.postProcess(Message>) 将被调用。如果 baz 不是 PostProcessingFunction 的实例,则不会执行任何后处理功能。

有人可能会争辩说,你可以通过函数组合轻松地通过将后处理器简单地作为另一个Function进行组合来实现这一点。这确实是一种可能性,但这种情况下的后处理功能将在调用前一个函数后立即调用,并在将消息发送到目标目的地之前调用,即在函数调用周期完成之前。

错误处理

在本节中,我们将解释框架提供的错误处理机制背后的总体思路。我们将使用 Rabbit 绑定器作为示例,因为各个绑定器为某些受支持的机制定义了不同的属性集,这些机制特定于底层代理功能(例如 Kafka 绑定器)。

错误时有发生,Spring Cloud Stream 提供了几种灵活的机制来处理它们。请注意,这些技术取决于绑定器实现和底层消息传递中间件以及编程模型的功能(稍后会详细介绍)。

每当消息处理程序(函数)抛出异常时,它就会传播回绑定器,此时绑定器将使用 Spring Retry 库提供的 RetryTemplate 尝试重新发送同一消息(默认情况下为 3 次)。如果重试不成功,则取决于错误处理机制,它可能会丢弃消息、重新排队消息以进行重新处理或将失败的消息发送到 DLQ

Rabbit 和 Kafka 都支持这些概念(尤其是 DLQ)。但是,其他绑定器可能不支持,因此请参阅各个绑定器的文档以了解受支持的错误处理选项的详细信息。

但是请记住,反应式函数不符合消息处理程序的资格,因为它不处理单个消息,而是提供了一种将框架提供的流(即 Flux)与用户提供的流连接起来的方法。这为什么重要?这是因为稍后在本节中你读到的任何有关重试模板、丢弃失败消息、重试、DLQ 和帮助实现所有这些功能的配置属性适用于消息处理程序(即命令式函数)。

反应式 API 提供了一个非常丰富的库,其中包含自己的运算符和机制,以帮助你处理针对各种反应式用例的错误,这些用例比简单的消息处理程序用例复杂得多,因此请使用它们,例如你可以在 reactor.core.publisher.Flux 中找到的 public final Flux<T> retryWhen(Retry retrySpec);

@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
	return flux -> flux
			.retryWhen(Retry.backoff(3, Duration.ofMillis(1000)))
			.map(v -> v.toUpperCase());
}

丢弃失败的消息

默认情况下,系统提供错误处理程序。第一个错误处理程序将简单地记录错误消息。第二个错误处理程序是特定于绑定程序的错误处理程序,负责在特定消息传递系统的上下文中处理错误消息(例如,发送到 DLQ)。但由于(在此当前场景中)未提供其他错误处理配置,此处理程序将不会执行任何操作。因此,在记录后,消息将被丢弃。

虽然在某些情况下是可以接受的,但大多数情况下不是,我们需要一些恢复机制来避免消息丢失。

处理错误消息

在上一节中,我们提到默认情况下,导致错误的消息将被有效地记录并丢弃。该框架还公开了一种机制,供您提供自定义错误处理程序(即发送通知或写入数据库等)。您可以通过添加专门设计为接受 ErrorMessageConsumer 来实现此目的,除了有关错误的所有信息(例如,堆栈跟踪等)之外,它还包含原始消息(触发错误的消息)。注意:自定义错误处理程序与框架提供的错误处理程序(即日志记录和绑定程序错误处理程序 - 请参阅上一节)互斥,以确保它们不会相互干扰。

@Bean
public Consumer<ErrorMessage> myErrorHandler() {
	return v -> {
		// send SMS notification code
	};
}

要将此类使用者标识为错误处理程序,您只需提供指向函数名称的 error-handler-definition 属性 - spring.cloud.stream.bindings.<binding-name>.error-handler-definition=myErrorHandler

例如,对于绑定名称 uppercase-in-0,属性将如下所示

spring.cloud.stream.bindings.uppercase-in-0.error-handler-definition=myErrorHandler

如果您使用特殊的映射指令将绑定映射到更具可读性的名称 - spring.cloud.stream.function.bindings.uppercase-in-0=upper,则此属性将如下所示

spring.cloud.stream.bindings.upper.error-handler-definition=myErrorHandler.
如果您意外地将此类处理程序声明为 Function,它仍然可以工作,但不会对其输出执行任何操作。但是,鉴于此类处理程序仍然依赖于 Spring Cloud Function 提供的功能,因此您还可以从函数组合中受益,以防您的处理程序具有一些您希望通过函数组合解决的复杂性(但不太可能)。

默认错误处理程序

如果您希望为所有函数 bean 拥有一个错误处理程序,则可以使用标准 spring-cloud-stream 机制定义默认属性 spring.cloud.stream.default.error-handler-definition=myErrorHandler

DLQ - 死信队列

DLQ 可能是最常见的机制,它允许将失败的消息发送到特殊目的地:死信队列

配置后,失败消息将发送到此目标,以便进行后续重新处理或审计和对账。

考虑以下示例

@SpringBootApplication
public class SimpleStreamApplication {

	public static void main(String[] args) throws Exception {
		SpringApplication.run(SimpleStreamApplication.class,
		  "--spring.cloud.function.definition=uppercase",
		  "--spring.cloud.stream.bindings.uppercase-in-0.destination=uppercase",
		  "--spring.cloud.stream.bindings.uppercase-in-0.group=myGroup",
		  "--spring.cloud.stream.rabbit.bindings.uppercase-in-0.consumer.auto-bind-dlq=true"
		);
	}

	@Bean
	public Function<Person, Person> uppercase() {
		return personIn -> {
		   throw new RuntimeException("intentional");
	      });
		};
	}
}

在此示例中,uppercase-in-0 属性段对应于输入目标绑定的名称,请注意这一点。consumer 段表示它是一个消费者属性。

使用 DLQ 时,至少必须提供 group 属性才能正确命名 DLQ 目标。但是,group 通常与 destination 属性一起使用,如我们的示例所示。

除了某些标准属性之外,我们还设置了 auto-bind-dlq,以指示绑定器为 uppercase-in-0 绑定创建和配置 DLQ 目标,该绑定对应于 uppercase 目标(请参见相应的属性),这将产生一个名为 uppercase.myGroup.dlq 的附加 Rabbit 队列(请参见 Kafka 文档以了解特定于 Kafka 的 DLQ 属性)。

配置后,所有失败消息都将路由到此目标,同时保留原始消息以供采取进一步操作。

您可以看到错误消息包含更多与原始错误相关的信息,如下所示

. . . .
x-exception-stacktrace:	org.springframework.messaging.MessageHandlingException: nested exception is
      org.springframework.messaging.MessagingException: has an error, failedMessage=GenericMessage [payload=byte[15],
      headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=input.hello, amqp_deliveryTag=1,
      deliveryAttempt=3, amqp_consumerQueue=input.hello, amqp_redelivered=false, id=a15231e6-3f80-677b-5ad7-d4b1e61e486e,
      amqp_consumerTag=amq.ctag-skBFapilvtZhDsn0k3ZmQg, contentType=application/json, timestamp=1522327846136}]
      at org.spring...integ...han...MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107)
      at. . . . .
Payload: blah

您还可以通过将 max-attempts 设置为“1”来促进立即分派到 DLQ(无需重试)。例如,

--spring.cloud.stream.bindings.uppercase-in-0.consumer.max-attempts=1

重试模板

在本节中,我们将介绍与重试功能配置相关的配置属性。

RetryTemplateSpring Retry 库的一部分。虽然涵盖 RetryTemplate 的所有功能超出了本文档的范围,但我们将提及以下与 RetryTemplate 特别相关的消费者属性

maxAttempts

处理消息的尝试次数。

默认值:3。

backOffInitialInterval

重试时的初始退避间隔。

默认值:1000 毫秒。

backOffMaxInterval

最大退避间隔。

默认值:10000 毫秒。

backOffMultiplier

退避乘数。

默认值:2.0。

defaultRetryable

未在 retryableExceptions 中列出的侦听器引发的异常是否可重试。

默认值:true

retryableExceptions

键中为 Throwable 类名,值中为布尔值的映射。指定将或将不会重试的那些异常(和子类)。另请参见 defaultRetriable。示例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false

默认值:空。

虽然上述设置足以满足大多数定制要求,但它们可能无法满足某些复杂的要求,此时您可能希望提供您自己的 RetryTemplate 实例。为此,请在您的应用程序配置中将其配置为一个 bean。应用程序提供的实例将覆盖框架提供的实例。此外,为避免冲突,您必须将您希望绑定器使用的 RetryTemplate 实例限定为 @StreamRetryTemplate。例如,

@StreamRetryTemplate
public RetryTemplate myRetryTemplate() {
    return new RetryTemplate();
}

正如您从上面的示例中看到的那样,您不需要使用 @Bean 对其进行注释,因为 @StreamRetryTemplate 是一个合格的 @Bean

如果您需要更精确地使用 RetryTemplate,则可以在 ConsumerProperties 中按名称指定 bean,以将特定的重试 bean 与每个绑定关联。

spring.cloud.stream.bindings.<foo>.consumer.retry-template-name=<your-retry-template-bean-name>

绑定器

Spring Cloud Stream 提供了一个绑定器抽象,用于连接到外部中间件中的物理目标。本部分提供了有关 Binder SPI 背后的主要概念、其主要组件和特定于实现的详细信息的信息。

生产者和消费者

下图显示了生产者和消费者的总体关系

producers consumers
图 5. 生产者和消费者

生产者是向绑定目标发送消息的任何组件。绑定目标可以通过该代理的 Binder 实现绑定到外部消息代理。调用 bindProducer() 方法时,第一个参数是代理中目标的名称,第二个参数是生产者向其发送消息的本地目标的实例,第三个参数包含将在为该绑定目标创建的适配器中使用的属性(例如分区键表达式)。

消费者是接收来自绑定目标的消息的任何组件。与生产者一样,消费者可以绑定到外部消息代理。调用 bindConsumer() 方法时,第一个参数是目标名称,第二个参数提供消费者逻辑组的名称。由给定目标的消费者绑定表示的每个组都会收到生产者发送到该目标的每条消息的副本(即,它遵循正常的发布-订阅语义)。如果有多个消费者实例绑定到同一组名称,则消息将在这些消费者实例之间进行负载平衡,以便生产者发送的每条消息仅由每个组中的单个消费者实例使用(即,它遵循正常的队列语义)。

绑定器 SPI

绑定器 SPI 包含许多接口、开箱即用的实用程序类和发现策略,这些策略提供了一种可插入的机制来连接到外部中间件。

SPI 的关键点是 Binder 接口,它是一种将输入和输出连接到外部中间件的策略。以下列表显示了 Binder 接口的定义

public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
    Binding<T> bindConsumer(String bindingName, String group, T inboundBindTarget, C consumerProperties);

    Binding<T> bindProducer(String bindingName, T outboundBindTarget, P producerProperties);
}

该接口是参数化的,提供了许多扩展点

  • 输入和输出绑定目标。

  • 扩展的使用者和生产者属性,允许特定的 Binder 实现添加可在类型安全方式下支持的补充属性。

典型的 Binder 实现包含以下内容

  • 实现 Binder 接口的类;

  • 创建类型为 Binder 的 bean 以及中间件连接基础设施的 Spring @Configuration 类。

  • 在类路径上找到的 META-INF/spring.binders 文件,包含一个或多个 Binder 定义,如下例所示

    kafka:\
    org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration
如前所述,Binder 抽象也是框架的扩展点之一。因此,如果您在前面的列表中找不到合适的 Binder,您可以在 Spring Cloud Stream 之上实现自己的 Binder。在 如何从头开始创建 Spring Cloud Stream Binder 帖子中,社区成员详细记录了一组实现自定义 Binder 所需的步骤,并附有示例。这些步骤也在 实现自定义 Binder 部分中突出显示。

Binder 检测

Spring Cloud Stream 依赖 Binder SPI 的实现来执行将用户代码连接(绑定)到消息代理的任务。每个 Binder 实现通常连接到一种消息系统。

类路径检测

默认情况下,Spring Cloud Stream 依赖 Spring Boot 的自动配置来配置绑定过程。如果在类路径上找到单个 Binder 实现,Spring Cloud Stream 会自动使用它。例如,一个仅旨在绑定到 RabbitMQ 的 Spring Cloud Stream 项目可以添加以下依赖项

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

有关其他 Binder 依赖项的特定 Maven 坐标,请参阅该 Binder 实现的文档。

类路径上的多个 Binder

当类路径上存在多个 Binder 时,应用程序必须指示每个目标绑定要使用哪个 Binder。每个 Binder 配置包含一个 META-INF/spring.binders 文件,这是一个简单的属性文件,如下例所示

rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitServiceAutoConfiguration

其他提供的 Binder 实现(如 Kafka)也有类似的文件,并且自定义 Binder 实现也应提供这些文件。键表示 Binder 实现的标识名称,而值是逗号分隔的配置类列表,每个类仅包含一个类型为 org.springframework.cloud.stream.binder.Binder 的 bean 定义。

Binder 选择可以通过使用 spring.cloud.stream.defaultBinder 属性(例如,spring.cloud.stream.defaultBinder=rabbit)全局执行,也可以通过在每个绑定上配置 Binder 来单独执行。例如,一个处理器应用程序(分别具有名为 inputoutput 的绑定用于读取和写入)从 Kafka 读取并写入 RabbitMQ,可以指定以下配置

spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.output.binder=rabbit

连接到多个系统

默认情况下,绑定器共享应用程序的 Spring Boot 自动配置,这样就会创建类路径上找到的每个绑定器的一个实例。如果您的应用程序应该连接到多个同类型的代理,您可以指定多个绑定器配置,每个配置具有不同的环境设置。

启用显式绑定器配置会完全禁用默认绑定器配置过程。如果您这样做,则使用中的所有绑定器都必须包含在配置中。打算透明地使用 Spring Cloud Stream 的框架可能会创建可按名称引用的绑定器配置,但它们不会影响默认绑定器配置。要做到这一点,绑定器配置可以将其 defaultCandidate 标志设置为 false(例如,spring.cloud.stream.binders.<configurationName>.defaultCandidate=false)。这表示独立于默认绑定器配置过程存在的配置。

以下示例显示了连接到两个 RabbitMQ 代理实例的处理器应用程序的典型配置

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: thing1
          binder: rabbit1
        output:
          destination: thing2
          binder: rabbit2
      binders:
        rabbit1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host1>
        rabbit2:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host2>
特定绑定器的 environment 属性也可用于任何 Spring Boot 属性,包括此 spring.main.sources,这对于为特定绑定器添加其他配置(例如,覆盖自动配置的 Bean)很有用。

例如;

environment:
    spring:
        main:
           sources: com.acme.config.MyCustomBinderConfiguration

要为特定绑定器环境激活特定配置文件,您应该使用 spring.profiles.active 属性

environment:
    spring:
        profiles:
           active: myBinderProfile

在多绑定器应用程序中自定义绑定器

当应用程序中有多个绑定器并希望自定义绑定器时,可以通过提供 BinderCustomizer 实现来实现。对于具有单个绑定器的应用程序,此特殊自定义器不是必需的,因为绑定器上下文可以直接访问自定义 Bean。但是,在多绑定器场景中并非如此,因为不同的绑定器存在于不同的应用程序上下文中。通过提供 BinderCustomizer 接口的实现,绑定器(尽管驻留在不同的应用程序上下文中)将接收自定义。Spring Cloud Stream 确保在应用程序开始使用绑定器之前进行自定义。用户必须检查绑定器类型,然后应用必要的自定义。

以下是一个提供 BinderCustomizer Bean 的示例。

@Bean
public BinderCustomizer binderCustomizer() {
    return (binder, binderName) -> {
        if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
            kafkaMessageChannelBinder.setRebalanceListener(...);
        }
        else if (binder instanceof KStreamBinder) {
            ...
        }
        else if (binder instanceof RabbitMessageChannelBinder) {
            ...
        }
    };
}

请注意,当同一类型的绑定有多个实例时,可以使用绑定名称来筛选自定义。

绑定可视化和控制

Spring Cloud Stream 支持通过 Actuator 端点以及编程方式对绑定进行可视化和控制。

编程方式

从 3.1 版本开始,我们公开 org.springframework.cloud.stream.binding.BindingsLifecycleController,它注册为 Bean,一旦注入,就可以用于控制各个绑定的生命周期

例如,查看其中一个测试用例中的片段。如你所见,我们从 Spring 应用程序上下文中检索 BindingsLifecycleController,并执行各个方法来控制 echo-in-0 绑定的生命周期。

BindingsLifecycleController bindingsController = context.getBean(BindingsLifecycleController.class);
Binding binding = bindingsController.queryState("echo-in-0");
assertThat(binding.isRunning()).isTrue();
bindingsController.changeState("echo-in-0", State.STOPPED);
//Alternative way of changing state. For convenience we expose start/stop and pause/resume operations.
//bindingsController.stop("echo-in-0")
assertThat(binding.isRunning()).isFalse();

Actuator

由于 actuator 和 web 是可选的,因此你必须首先添加一个 web 依赖项,并手动添加 actuator 依赖项。以下示例展示了如何添加 Web 框架的依赖项

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

以下示例展示了如何添加 WebFlux 框架的依赖项

<dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

你可以按如下方式添加 Actuator 依赖项

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
要在 Cloud Foundry 中运行 Spring Cloud Stream 2.0 应用程序,你必须将 spring-boot-starter-webspring-boot-starter-actuator 添加到类路径。否则,由于运行状况检查失败,应用程序将无法启动。

你还必须通过设置以下属性来启用 bindings actuator 端点:--management.endpoints.web.exposure.include=bindings

满足这些先决条件后,你应该在应用程序启动时在日志中看到以下内容

: Mapped "{[/actuator/bindings/{name}],methods=[POST]. . .
: Mapped "{[/actuator/bindings],methods=[GET]. . .
: Mapped "{[/actuator/bindings/{name}],methods=[GET]. . .

要可视化当前绑定,请访问以下 URL:http://<host>:<port>/actuator/bindings

或者,要查看单个绑定,请访问类似于以下内容的 URL 之一:http://<host>:<port>/actuator/bindings/<bindingName>;

你还可以通过将 state 参数作为 JSON 发布到同一 URL,来停止、启动、暂停和恢复各个绑定,如下面的示例所示

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"PAUSED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"RESUMED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
PAUSEDRESUMED 仅在相应的绑定器及其底层技术支持时才起作用。否则,您会在日志中看到警告消息。目前,只有 Kafka 和 [Solace](https://github.com/SolaceProducts/solace-spring-cloud/tree/master/solace-spring-cloud-starters/solace-spring-cloud-stream-starter#consumer-bindings-pauseresume) 绑定器支持 PAUSEDRESUMED 状态。

绑定器配置属性

自定义绑定器配置时,可以使用以下属性。这些属性通过 org.springframework.cloud.stream.config.BinderProperties 公开

它们必须以 spring.cloud.stream.binders.<configurationName> 为前缀。

type

绑定器类型。它通常引用类路径上找到的某个绑定器——特别是 META-INF/spring.binders 文件中的键。

默认情况下,它与配置名称的值相同。

inheritEnvironment

配置是否继承应用程序本身的环境。

默认值:true

environment

可用于自定义绑定器环境的一组属性的根。设置此属性后,创建绑定器的上下文不是应用程序上下文的子级。此设置允许绑定器组件和应用程序组件完全分离。

默认值:empty

defaultCandidate

绑定器配置是否候选为默认绑定器或仅在明确引用时使用。此设置允许添加绑定器配置,而不会干扰默认处理。

默认值:true

实现自定义绑定器

为了实现自定义 Binder,您需要

  • 添加必需的依赖项

  • 提供 ProvisioningProvider 实现

  • 提供 MessageProducer 实现

  • 提供 MessageHandler 实现

  • 提供 Binder 实现

  • 创建绑定器配置

  • 在 META-INF/spring.binders 中定义您的绑定器

添加必需的依赖项

spring-cloud-stream 依赖项添加到您的项目中(例如,对于 Maven)

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
    <version>${spring.cloud.stream.version}</version>
</dependency>

提供 ProvisioningProvider 实现

ProvisioningProvider 负责提供消费者和生产者目标,并且需要将 application.yml 或 application.properties 文件中包含的逻辑目标转换为物理目标引用。

下面是一个 ProvisioningProvider 实现的示例,它只修剪通过输入/输出绑定配置提供的目标

public class FileMessageBinderProvisioner implements ProvisioningProvider<ConsumerProperties, ProducerProperties> {

    @Override
    public ProducerDestination provisionProducerDestination(
            final String name,
            final ProducerProperties properties) {

        return new FileMessageDestination(name);
    }

    @Override
    public ConsumerDestination provisionConsumerDestination(
            final String name,
            final String group,
            final ConsumerProperties properties) {

        return new FileMessageDestination(name);
    }

    private class FileMessageDestination implements ProducerDestination, ConsumerDestination {

        private final String destination;

        private FileMessageDestination(final String destination) {
            this.destination = destination;
        }

        @Override
        public String getName() {
            return destination.trim();
        }

        @Override
        public String getNameForPartition(int partition) {
            throw new UnsupportedOperationException("Partitioning is not implemented for file messaging.");
        }

    }

}

提供 MessageProducer 实现

MessageProducer 负责使用事件并将其作为消息处理到已配置为使用此类事件的客户端应用程序。

以下是 MessageProducer 实现的示例,它扩展了 MessageProducerSupport 抽象以轮询与修剪后的目标名称匹配且位于项目路径中的文件,同时还存档已读取的消息并丢弃后续的相同消息

public class FileMessageProducer extends MessageProducerSupport {

    public static final String ARCHIVE = "archive.txt";
    private final ConsumerDestination destination;
    private String previousPayload;

    public FileMessageProducer(ConsumerDestination destination) {
        this.destination = destination;
    }

    @Override
    public void doStart() {
        receive();
    }

    private void receive() {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);

        executorService.scheduleWithFixedDelay(() -> {
            String payload = getPayload();

            if(payload != null) {
                Message<String> receivedMessage = MessageBuilder.withPayload(payload).build();
                archiveMessage(payload);
                sendMessage(receivedMessage);
            }

        }, 0, 50, MILLISECONDS);
    }

    private String getPayload() {
        try {
            List<String> allLines = Files.readAllLines(Paths.get(destination.getName()));
            String currentPayload = allLines.get(allLines.size() - 1);

            if(!currentPayload.equals(previousPayload)) {
                previousPayload = currentPayload;
                return currentPayload;
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        return null;
    }

    private void archiveMessage(String payload) {
        try {
            Files.write(Paths.get(ARCHIVE), (payload + "\n").getBytes(), CREATE, APPEND);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

}
在实现自定义绑定时,此步骤不是严格必需的,因为您可以始终求助于使用已有的 MessageProducer 实现!

提供 MessageHandler 实现

MessageHandler 提供生成事件所需的逻辑。

以下是 MessageHandler 实现的示例

public class FileMessageHandler implements MessageHandler{

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        //write message to file
    }

}
在实现自定义绑定时,此步骤不是严格必需的,因为您可以始终求助于使用已有的 MessageHandler 实现!

提供 Binder 实现

您现在可以提供 Binder 抽象的自己的实现。这可以通过以下方式轻松完成

  • 扩展 AbstractMessageChannelBinder

  • 将您的 ProvisioningProvider 指定为 AbstractMessageChannelBinder 的泛型参数

  • 覆盖 createProducerMessageHandlercreateConsumerEndpoint 方法

例如。

public class FileMessageBinder extends AbstractMessageChannelBinder<ConsumerProperties, ProducerProperties, FileMessageBinderProvisioner> {

    public FileMessageBinder(
            String[] headersToEmbed,
            FileMessageBinderProvisioner provisioningProvider) {

        super(headersToEmbed, provisioningProvider);
    }

    @Override
    protected MessageHandler createProducerMessageHandler(
            final ProducerDestination destination,
            final ProducerProperties producerProperties,
            final MessageChannel errorChannel) throws Exception {

        return message -> {
            String fileName = destination.getName();
            String payload = new String((byte[])message.getPayload()) + "\n";

            try {
                Files.write(Paths.get(fileName), payload.getBytes(), CREATE, APPEND);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        };
    }

    @Override
    protected MessageProducer createConsumerEndpoint(
            final ConsumerDestination destination,
            final String group,
            final ConsumerProperties properties) throws Exception {

        return new FileMessageProducer(destination);
    }

}

创建绑定器配置

您必须创建 Spring 配置来初始化绑定实现的 bean (以及您可能需要的其他所有 bean)

@Configuration
public class FileMessageBinderConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public FileMessageBinderProvisioner fileMessageBinderProvisioner() {
        return new FileMessageBinderProvisioner();
    }

    @Bean
    @ConditionalOnMissingBean
    public FileMessageBinder fileMessageBinder(FileMessageBinderProvisioner fileMessageBinderProvisioner) {
        return new FileMessageBinder(null, fileMessageBinderProvisioner);
    }

}

在 META-INF/spring.binders 中定义您的绑定器

最后,您必须在类路径上的 META-INF/spring.binders 文件中定义您的绑定,同时指定绑定名称和 Binder 配置类的完全限定名称

myFileBinder:\
com.example.springcloudstreamcustombinder.config.FileMessageBinderConfiguration

配置选项

Spring Cloud Stream 支持常规配置选项以及绑定和绑定器的配置。一些绑定器允许其他绑定属性支持特定于中间件的功能。

可以通过 Spring Boot 支持的任何机制向 Spring Cloud Stream 应用程序提供配置选项。这包括应用程序参数、环境变量以及 YAML 或 .properties 文件。

绑定服务属性

这些属性通过 org.springframework.cloud.stream.config.BindingServiceProperties 暴露

spring.cloud.stream.instanceCount

应用程序的已部署实例数。必须设置以在生产者端进行分区。在使用 RabbitMQ 时必须在消费者端设置,如果 autoRebalanceEnabled=false,则在使用 Kafka 时也必须设置。

默认值:1

spring.cloud.stream.instanceIndex

应用程序的实例索引:从 0instanceCount - 1 的数字。用于与 RabbitMQ 分区,如果 autoRebalanceEnabled=false,则用于与 Kafka 分区。在 Cloud Foundry 中自动设置以匹配应用程序的实例索引。

spring.cloud.stream.dynamicDestinations

可以动态绑定的目标列表(例如,在动态路由场景中)。如果设置,则只能绑定列出的目标。

默认值:空(允许绑定任何目标)。

spring.cloud.stream.defaultBinder

如果配置了多个绑定器,则使用默认绑定器。请参阅类路径上的多个绑定器

默认值:空。

spring.cloud.stream.overrideCloudConnectors

此属性仅在激活cloud配置文件且 Spring Cloud Connectors 与应用程序一起提供时才适用。如果属性为false(默认值),则绑定器将检测合适的绑定服务(例如,在 Cloud Foundry 中为 RabbitMQ 绑定器绑定的 RabbitMQ 服务),并使用它来创建连接(通常通过 Spring Cloud Connectors)。当设置为true时,此属性指示绑定器完全忽略绑定的服务,并依赖于 Spring Boot 属性(例如,依赖于环境中为 RabbitMQ 绑定器提供的spring.rabbitmq.*属性)。此属性的典型用法是嵌套在自定义环境中连接到多个系统时

默认值:false

spring.cloud.stream.bindingRetryInterval

例如,当绑定器不支持延迟绑定且代理(例如,Apache Kafka)关闭时,在重试创建绑定之间的间隔(以秒为单位)。将其设置为零以将此类条件视为致命错误,从而阻止应用程序启动。

默认值:30

绑定属性

使用spring.cloud.stream.bindings.<bindingName>.<property>=<value>的格式提供绑定属性。<bindingName>表示要配置的绑定的名称。

例如,对于以下函数

@Bean
public Function<String, String> uppercase() {
	return v -> v.toUpperCase();
}

有两个名为uppercase-in-0(用于输入)和uppercase-out-0(用于输出)的绑定。有关更多详细信息,请参阅绑定和绑定名称

为了避免重复,Spring Cloud Stream 支持以spring.cloud.stream.default.<property>=<value>spring.cloud.stream.default.<producer|consumer>.<property>=<value>的格式为所有绑定设置值,以用于公共绑定属性。

当涉及避免为扩展绑定属性重复时,应使用此格式 - spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>

公共绑定属性

这些属性通过org.springframework.cloud.stream.config.BindingProperties公开

以下绑定属性同时适用于输入和输出绑定,并且必须以 spring.cloud.stream.bindings.<bindingName>. 为前缀(例如,spring.cloud.stream.bindings.uppercase-in-0.destination=ticktock)。

可以使用 spring.cloud.stream.default 前缀设置默认值(例如 spring.cloud.stream.default.contentType=application/json)。

destination

绑定在绑定中间件上的目标目的地(例如,RabbitMQ 交换或 Kafka 主题)。如果绑定表示使用者绑定(输入),则它可以绑定到多个目的地,并且可以将目的地名称指定为以逗号分隔的 String 值。如果没有,则使用实际绑定名称。此属性的默认值不能被覆盖。

group

绑定的使用者组。仅适用于入站绑定。请参阅 使用者组

默认值:null(表示匿名使用者)。

contentType

此绑定的内容类型。请参阅 内容类型协商

默认值:application/json

binder

此绑定使用的绑定器。有关详细信息,请参阅 类路径上的多个绑定器

默认值:null(如果存在,则使用默认绑定器)。

使用者属性

这些属性通过 org.springframework.cloud.stream.binder.ConsumerProperties 公开

以下绑定属性仅适用于输入绑定,并且必须以 spring.cloud.stream.bindings.<bindingName>.consumer. 为前缀(例如,spring.cloud.stream.bindings.input.consumer.concurrency=3)。

可以使用 spring.cloud.stream.default.consumer 前缀设置默认值(例如,spring.cloud.stream.default.consumer.headerMode=none)。

autoStartup

指示此使用者是否需要自动启动

默认值:true

concurrency

入站使用者的并发性。

默认值:1

partitioned

使用者是否从分区生产者接收数据。

默认值:false

headerMode

设置为 none 时,禁用输入上的标头解析。仅对本机不支持消息标头且需要标头嵌入的消息传递中间件有效。当从不支持本机标头的非 Spring Cloud Stream 应用程序使用数据时,此选项非常有用。设置为 headers 时,它使用中间件的本机标头机制。设置为 embeddedHeaders 时,它将标头嵌入到消息有效负载中。

默认值:取决于绑定器实现。

maxAttempts

如果处理失败,则处理消息的尝试次数(包括第一次)。设置为 1 以禁用重试。

默认值:3

backOffInitialInterval

重试时的初始退避间隔。

默认值:1000

backOffMaxInterval

最大退避间隔。

默认值:10000

backOffMultiplier

退避乘数。

默认值:2.0

defaultRetryable

未在 retryableExceptions 中列出的侦听器引发的异常是否可重试。

默认值:true

instanceCount

当设置为大于或等于零的值时,它允许自定义此消费者的实例数(如果不同于spring.cloud.stream.instanceCount)。当设置为负值时,它默认为spring.cloud.stream.instanceCount。有关更多信息,请参阅实例索引和实例数

默认值:-1

instanceIndex

当设置为大于或等于零的值时,它允许自定义此消费者的实例索引(如果不同于spring.cloud.stream.instanceIndex)。当设置为负值时,它默认为spring.cloud.stream.instanceIndex。如果提供了instanceIndexList,则忽略。有关更多信息,请参阅实例索引和实例数

默认值:-1

instanceIndexList

与不支持原生分区(例如 RabbitMQ)的绑定器一起使用;允许应用程序实例从多个分区使用。

默认值:空。

retryableExceptions

键中为 Throwable 类名,值中为布尔值的映射。指定将或将不会重试的那些异常(和子类)。另请参见 defaultRetriable。示例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false

默认值:空。

useNativeDecoding

当设置为true时,入站消息将直接由客户端库反序列化,该库必须相应地进行配置(例如,设置适当的 Kafka 生产者值反序列化器)。当使用此配置时,入站消息解组不基于绑定的contentType。当使用原生解码时,由生产者负责使用适当的编码器(例如,Kafka 生产者值序列化器)来序列化出站消息。此外,当使用原生编码和解码时,将忽略headerMode=embeddedHeaders属性,并且标头不会嵌入到消息中。请参阅生产者属性useNativeEncoding

默认值:false

multiplex

当设置为 true 时,底层绑定器将在同一输入绑定上原生多路复用目标。

默认值:false

高级消费者配置

对于消息驱动的消费者的底层消息侦听器容器的高级配置,请向应用程序上下文添加一个ListenerContainerCustomizer bean。在应用上述属性后,将调用它,并且可用于设置其他属性。类似地,对于轮询消费者,请添加一个MessageSourceCustomizer bean。

以下是 RabbitMQ 绑定器的示例

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer> containerCustomizer() {
    return (container, dest, group) -> container.setAdviceChain(advice1, advice2);
}

@Bean
public MessageSourceCustomizer<AmqpMessageSource> sourceCustomizer() {
    return (source, dest, group) -> source.setPropertiesConverter(customPropertiesConverter);
}

生产者属性

这些属性通过org.springframework.cloud.stream.binder.ProducerProperties公开

以下绑定属性仅适用于输出绑定,并且必须以spring.cloud.stream.bindings.<bindingName>.producer.为前缀(例如,spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id)。

可以使用前缀spring.cloud.stream.default.producer设置默认值(例如,spring.cloud.stream.default.producer.partitionKeyExpression=headers.id)。

autoStartup

指示此使用者是否需要自动启动

默认值:true

partitionKeyExpression

一个 SpEL 表达式,用于确定如何对出站数据进行分区。如果设置,则对该绑定上的出站数据进行分区。partitionCount 必须设置为大于 1 的值才能生效。请参阅分区支持

默认值:null。

partitionKeyExtractorName

实现 PartitionKeyExtractorStrategy 的 bean 的名称。用于提取用于计算分区 ID 的键(参见“partitionSelector*”)。与“partitionKeyExpression”互斥。

默认值:null。

partitionSelectorName

实现 PartitionSelectorStrategy 的 bean 的名称。用于根据分区键确定分区 ID(参见“partitionKeyExtractor*”)。与“partitionSelectorExpression”互斥。

默认值:null。

partitionSelectorExpression

用于自定义分区选择的 SpEL 表达式。如果两者均未设置,则分区将选择为 hashCode(key) % partitionCount,其中 key 通过 partitionKeyExpression 计算。

默认值:null

partitionCount

如果启用了分区,则数据目标分区的数量。如果生产者已分区,则必须将其设置为大于 1 的值。在 Kafka 上,它被解释为提示。将使用此值和目标主题的分区计数中较大的一个。

默认值:1

requiredGroups

生产者即使在创建后启动(例如,通过在 RabbitMQ 中预先创建持久队列)也必须确保向其传递消息的组的逗号分隔列表。

headerMode

设置为 none 时,它将禁用输出上的标头嵌入。它仅对原生不支持消息标头且需要标头嵌入的消息传递中间件有效。当为非 Spring Cloud Stream 应用程序生成数据且不支持原生标头时,此选项很有用。设置为 headers 时,它使用中间件的原生标头机制。设置为 embeddedHeaders 时,它将标头嵌入到消息负载中。

默认值:取决于绑定器实现。

useNativeEncoding

设置为 true 时,出站消息将由客户端库直接序列化,客户端库必须相应地进行配置(例如,设置适当的 Kafka 生产者值序列化器)。使用此配置时,出站消息编组不基于绑定的 contentType。使用原生编码时,消费者有责任使用适当的解码器(例如,Kafka 消费者值反序列化器)对入站消息进行反序列化。此外,在使用原生编码和解码时,将忽略 headerMode=embeddedHeaders 属性,并且标头不会嵌入到消息中。请参见消费者属性 useNativeDecoding

默认值:false

errorChannelEnabled

设置为 true 时,如果绑定器支持异步发送结果,则发送失败将发送到目标的错误通道。有关更多信息,请参见错误处理。

默认值:false。

高级生产者配置

在某些情况下,生产者属性不足以在绑定器中正确配置生产 MessageHandler,或者你可能更喜欢在配置此类生产 MessageHandler 时采用编程方法。无论出于何种原因,spring-cloud-stream 都提供 ProducerMessageHandlerCustomizer 来实现此目的。

@FunctionalInterface
public interface ProducerMessageHandlerCustomizer<H extends MessageHandler> {

	/**
	 * Configure a {@link MessageHandler} that is being created by the binder for the
	 * provided destination name.
	 * @param handler the {@link MessageHandler} from the binder.
	 * @param destinationName the bound destination name.
	 */
	void configure(H handler, String destinationName);

}

正如你所看到的,它让你可以访问正在生产的 MessageHandler 的实际实例,你可以根据需要对其进行配置。你只需提供此策略的实现并将其配置为 @Bean 即可。

内容类型协商

数据转换是任何消息驱动的微服务架构的核心功能之一。鉴于在 Spring Cloud Stream 中,此类数据表示为 Spring Message,因此在消息到达其目的地之前,可能需要将消息转换为所需形状或大小。出于两个原因需要这样做

  1. 将传入消息的内容转换为与应用程序提供的处理程序的签名匹配。

  2. 将传出消息的内容转换为线格式。

线格式通常为 byte[](对于 Kafka 和 Rabbit 绑定器而言为真),但它受绑定器实现控制。

在 Spring Cloud Stream 中,消息转换使用 org.springframework.messaging.converter.MessageConverter 完成。

作为后续详细信息的补充,您可能还希望阅读以下 博客文章

机制

为了更好地理解内容类型协商背后的机制和必要性,我们以以下消息处理程序为例,来看一个非常简单的用例

public Function<Person, String> personFunction {..}
为简单起见,我们假设这是应用程序中唯一的处理程序函数(我们假设没有内部管道)。

前面示例中显示的处理程序期望一个 Person 对象作为参数,并生成一个 String 类型作为输出。为了使框架成功将传入的 Message 作为参数传递给此处理程序,它必须以某种方式将 Message 类型的有效负载从线格式转换为 Person 类型。换句话说,框架必须找到并应用适当的 MessageConverter。为了实现这一点,框架需要来自用户的某些指令。其中一个指令已经由处理程序方法本身的签名(Person 类型)提供。因此,理论上,这应该是(并且在某些情况下是)足够的。但是,对于大多数用例,为了选择适当的 MessageConverter,框架需要额外的信息。缺少的部分是 contentType

Spring Cloud Stream 提供了三种机制来定义 contentType(按优先级顺序)

  1. HEADERcontentType 可以通过消息本身进行通信。通过提供 contentType 头,您可以声明要用于查找和应用适当 MessageConverter 的内容类型。

  2. BINDINGcontentType 可以通过设置 spring.cloud.stream.bindings.input.content-type 属性,按目标绑定设置。

    属性名称中的 input 段对应于目标的实际名称(在本例中为“input”)。这种方法允许您逐绑定地声明用于查找和应用适当的 MessageConverter 的内容类型。
  3. 默认值:如果 contentType 不存在于 Message 头或绑定中,则使用默认 application/json 内容类型来查找和应用适当的 MessageConverter

如前所述,前面的列表还演示了在出现平局时的优先级顺序。例如,头提供的 Content-Type 优先于任何其他 Content-Type。对于逐绑定设置的 Content-Type 也是如此,它本质上允许您覆盖默认 Content-Type。但是,它还提供了明智的默认值(这是从社区反馈中确定的)。

application/json 设置为默认值的另一个原因源于分布式微服务架构驱动的互操作性要求,其中生产者和消费者不仅在不同的 JVM 中运行,还可以在不同的非 JVM 平台上运行。

当非空处理程序方法返回时,如果返回值已经是 Message,则该 Message 成为有效负载。但是,当返回值不是 Message 时,将使用返回值作为有效负载构造新的 Message,同时从输入 Message 继承头,减去 SpringIntegrationProperties.messageHandlerNotPropagatedHeaders 定义或过滤的头。默认情况下,那里只设置了一个头:contentType。这意味着新的 Message 没有设置 contentType 头,从而确保 contentType 可以演化。您始终可以选择不从处理程序方法返回 Message,您可以在其中注入任何您希望的头。

如果存在内部管道,则 Message 通过经历相同的转换过程发送到下一个处理程序。但是,如果不存在内部管道或您已达到其末尾,则 Message 将被发送回输出目标。

内容类型与参数类型

如前所述,对于框架选择适当的 MessageConverter,它需要参数类型,并且可以选择内容类型信息。选择适当 MessageConverter 的逻辑存在于参数解析器(HandlerMethodArgumentResolvers)中,该解析器在调用用户定义的处理程序方法之前触发(此时框架已知实际参数类型)。如果参数类型与当前有效负载的类型不匹配,则框架委托给预配置的 MessageConverters 堆栈,以查看其中是否有任何一个可以转换有效负载。如您所见,MessageConverter 的 Object fromMessage(Message<?> message, Class<?> targetClass); 操作将 targetClass 作为其参数之一。框架还确保提供的 Message 始终包含 contentType 头。当不存在 contentType 头时,它会注入逐绑定的 contentType 头或默认 contentType 头。contentType 参数类型的组合是框架确定消息是否可以转换为目标类型的机制。如果没有找到合适的 MessageConverter,则会抛出异常,您可以通过添加自定义 MessageConverter 来处理该异常(请参阅 用户定义的消息转换器)。

但是如果有效负载类型与处理程序方法声明的目标类型匹配,该怎么办?在这种情况下,无需进行任何转换,有效负载将按原样传递。虽然这听起来非常简单且合乎逻辑,但请记住将 Message<?>Object 作为参数的处理程序方法。通过将目标类型声明为 Object(这是 Java 中所有内容的 instanceof),你实际上放弃了转换过程。

不要指望 Message 仅基于 contentType 转换为其他类型。请记住,contentType 是对目标类型的补充。如果你愿意,可以提供一个提示,MessageConverter 可能考虑或不考虑该提示。

消息转换器

MessageConverters 定义了两种方法

Object fromMessage(Message<?> message, Class<?> targetClass);

Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);

了解这些方法的契约及其用法非常重要,尤其是在 Spring Cloud Stream 的上下文中。

fromMessage 方法将传入的 Message 转换为参数类型。Message 的有效负载可以是任何类型,由 MessageConverter 的实际实现来支持多种类型。例如,某些 JSON 转换器可能支持有效负载类型为 byte[]String 等。当应用程序包含内部管道(即输入 → 处理程序 1 → 处理程序 2 →. . . → 输出)且上游处理程序的输出导致 Message 可能不是初始线格式时,这一点很重要。

但是,toMessage 方法具有更严格的契约,并且必须始终将 Message 转换为线格式:byte[]

因此,出于所有意图和目的(尤其是在实现你自己的转换器时),你将这两个方法视为具有以下签名

Object fromMessage(Message<?> message, Class<?> targetClass);

Message<byte[]> toMessage(Object payload, @Nullable MessageHeaders headers);

提供的消息转换器

如前所述,该框架已经提供了一组 MessageConverters 来处理大多数常见用例。以下列表描述了提供的 MessageConverters,按优先级顺序(使用第一个可用的 MessageConverter

  1. JsonMessageConverter:顾名思义,它支持将 Message 的有效负载转换为 POJO,反之亦然,适用于 contentTypeapplication/json(默认值)的情况。

  2. ByteArrayMessageConverter:支持将 Message 的有效负载从 byte[] 转换为 byte[],适用于 contentTypeapplication/octet-stream 的情况。它本质上是一种传递,主要用于向后兼容。

  3. ObjectStringMessageConverter:支持在 contentTypetext/plain 时将任何类型转换为 String。它调用对象的 toString() 方法,或者如果有效负载为 byte[],则调用新的 String(byte[])

如果找不到合适的转换器,框架将抛出异常。发生这种情况时,您应该检查您的代码和配置,并确保您没有遗漏任何内容(即确保您使用绑定或标头提供了 contentType)。但是,很可能您发现了某些不常见的情况(例如自定义 contentType),并且当前提供的 MessageConverters 栈不知道如何转换。如果是这种情况,您可以添加自定义 MessageConverter。请参阅 用户定义的消息转换器

用户定义的消息转换器

Spring Cloud Stream 公开了一种机制来定义和注册其他 MessageConverter。要使用它,请实现 org.springframework.messaging.converter.MessageConverter,将其配置为 @Bean。然后将其附加到现有的 MessageConverter 栈。

了解自定义 MessageConverter 实现被添加到现有栈的头部非常重要。因此,自定义 MessageConverter 实现优先于现有的实现,这使您可以覆盖现有转换器并向其添加内容。

以下示例展示了如何创建消息转换器 Bean 来支持称为 application/bar 的新内容类型

@SpringBootApplication
public static class SinkApplication {

    ...

    @Bean
    public MessageConverter customMessageConverter() {
        return new MyCustomMessageConverter();
    }
}

public class MyCustomMessageConverter extends AbstractMessageConverter {

    public MyCustomMessageConverter() {
        super(new MimeType("application", "bar"));
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return (Bar.class.equals(clazz));
    }

    @Override
    protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
        Object payload = message.getPayload();
        return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
    }
}

应用程序间通信

Spring Cloud Stream 启用应用程序之间的通信。应用程序间通信是一个涉及多个关注点的复杂问题,如下面的主题中所述

连接多个应用程序实例

虽然 Spring Cloud Stream 使得各个 Spring Boot 应用程序可以轻松连接到消息系统,但 Spring Cloud Stream 的典型场景是创建多应用程序管道,其中微服务应用程序相互发送数据。您可以通过关联“相邻”应用程序的输入和输出目标来实现此场景。

假设设计要求时间源应用程序将数据发送到日志接收应用程序。您可以在两个应用程序内的绑定中使用名为 ticktock 的通用目标。

时间源(其绑定名为 output)将设置以下属性

spring.cloud.stream.bindings.output.destination=ticktock

日志接收(其绑定名为 input)将设置以下属性

spring.cloud.stream.bindings.input.destination=ticktock

实例索引和实例计数

在扩展 Spring Cloud Stream 应用程序时,每个实例都可以接收有关同一应用程序的其他实例有多少个以及其自己的实例索引是什么的信息。Spring Cloud Stream 通过 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 属性执行此操作。例如,如果 HDFS 接收应用程序有三个实例,则所有三个实例的 spring.cloud.stream.instanceCount 都设置为 3,并且各个应用程序的 spring.cloud.stream.instanceIndex 分别设置为 012

当 Spring Cloud Stream 应用程序通过 Spring Cloud Data Flow 部署时,这些属性会自动配置;当 Spring Cloud Stream 应用程序独立启动时,必须正确设置这些属性。默认情况下,spring.cloud.stream.instanceCount1spring.cloud.stream.instanceIndex0

在扩展的场景中,正确配置这两个属性对于解决分区行为(见下文)非常重要,并且某些绑定器(例如 Kafka 绑定器)始终需要这两个属性,以确保数据在多个使用者实例之间正确拆分。

分区

Spring Cloud Stream 中的分区包含两项任务

为分区配置输出绑定

您可以通过设置其partitionKeyExpressionpartitionKeyExtractorName属性(仅限其中一个)以及其partitionCount属性,来配置输出绑定以发送分区数据。

例如,以下是一个有效且典型的配置

spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id
spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5

基于该示例配置,数据将使用以下逻辑发送到目标分区。

基于partitionKeyExpression,为发送到分区输出绑定的每条消息计算分区键值。partitionKeyExpression是针对出站消息(在上一个示例中,它是消息头中id的值)评估的 SpEL 表达式,用于提取分区键。

如果 SpEL 表达式无法满足您的需求,您可以通过提供org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy的实现并将其配置为 Bean(使用@Bean注释),来计算分区键值。如果您在应用程序上下文中有多个类型为org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy的 Bean,则可以通过使用partitionKeyExtractorName属性指定其名称来进一步筛选,如下例所示

--spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExtractorName=customPartitionKeyExtractor
--spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5
. . .
@Bean
public CustomPartitionKeyExtractorClass customPartitionKeyExtractor() {
    return new CustomPartitionKeyExtractorClass();
}
在 Spring Cloud Stream 的早期版本中,您可以通过设置spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass属性来指定org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy的实现。自 3.0 版本起,此属性已删除。

一旦计算出消息键,分区选择过程就会确定目标分区,其值为介于0partitionCount - 1之间的值。默认计算适用于大多数场景,基于以下公式:key.hashCode() % partitionCount。这可以在绑定中进行自定义,方法是设置针对“键”评估的 SpEL 表达式(通过partitionSelectorExpression属性)或通过使用 @Bean 注释将org.springframework.cloud.stream.binder.PartitionSelectorStrategy的实现配置为 Bean。与PartitionKeyExtractorStrategy类似,当应用程序上下文中有多个此类型的 Bean 时,您可以使用spring.cloud.stream.bindings.output.producer.partitionSelectorName属性进一步筛选,如下例所示

--spring.cloud.stream.bindings.func-out-0.producer.partitionSelectorName=customPartitionSelector
. . .
@Bean
public CustomPartitionSelectorClass customPartitionSelector() {
    return new CustomPartitionSelectorClass();
}
在 Spring Cloud Stream 的早期版本中,可以通过设置 spring.cloud.stream.bindings.output.producer.partitionSelectorClass 属性来指定 org.springframework.cloud.stream.binder.PartitionSelectorStrategy 的实现。从 3.0 版本开始,此属性已被移除。

配置输入绑定以进行分区

通过设置输入绑定的 partitioned 属性以及应用程序本身上的 instanceIndexinstanceCount 属性,可以配置输入绑定(绑定名称为 uppercase-in-0)以接收分区数据,如下例所示

spring.cloud.stream.bindings.uppercase-in-0.consumer.partitioned=true
spring.cloud.stream.instanceIndex=3
spring.cloud.stream.instanceCount=5

instanceCount 值表示应该在哪些应用程序实例之间对数据进行分区。instanceIndex 在多个实例中必须是唯一值,其值介于 0instanceCount - 1 之间。实例索引可帮助每个应用程序实例识别它接收数据的唯一分区。使用不支持原生分区的技术的绑定器需要它。例如,使用 RabbitMQ 时,每个分区都有一个队列,队列名称包含实例索引。使用 Kafka 时,如果 autoRebalanceEnabledtrue(默认值),Kafka 会负责在实例之间分配分区,并且不需要这些属性。如果 autoRebalanceEnabled 设置为 false,则绑定器会使用 instanceCountinstanceIndex 来确定实例订阅的分区(实例必须至少与分区一样多)。绑定器分配分区,而不是 Kafka。如果你希望特定分区的消息始终发送到同一实例,这可能很有用。当绑定器配置需要它们时,正确设置这两个值非常重要,以确保消耗所有数据,并且应用程序实例接收互斥数据集。

虽然在独立的情况下,使用多个实例进行分区数据处理的场景可能很复杂,但 Spring Cloud Dataflow 可以通过正确填充输入和输出值,并让你依赖运行时基础设施来提供有关实例索引和实例计数的信息,从而极大地简化此过程。

测试

Spring Cloud Stream 提供了在不连接到消息传递系统的情况下测试微服务应用程序的支持。

Spring Integration 测试绑定器

Spring Cloud Stream 带有一个测试绑定器,你可以使用它来测试各种应用程序组件,而不需要实际的真实绑定器实现或消息代理。

此测试绑定器充当单元集成测试之间的桥梁,并基于 Spring Integration 框架作为 JVM 内消息代理,本质上为您提供两全其美的体验 - 真正的绑定器,无需网络。

测试绑定器配置

要启用 Spring Integration 测试绑定器,您只需将其添加为依赖项即可。

添加必需的依赖项

以下是必需的 Maven POM 条目的示例。

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream-test-binder</artifactId>
	<scope>test</scope>
</dependency>

或对于 build.gradle.kts

testImplementation("org.springframework.cloud:spring-cloud-stream-test-binder")

测试绑定器用法

现在,您可以将微服务作为简单的单元测试进行测试

@SpringBootTest
public class SampleStreamTests {

	@Autowired
	private InputDestination input;

	@Autowired
	private OutputDestination output;

	@Test
	public void testEmptyConfiguration() {
		this.input.send(new GenericMessage<byte[]>("hello".getBytes()));
		assertThat(output.receive().getPayload()).isEqualTo("HELLO".getBytes());
	}

	@SpringBootApplication
	@Import(TestChannelBinderConfiguration.class)
	public static class SampleConfiguration {
		@Bean
		public Function<String, String> uppercase() {
			return v -> v.toUpperCase();
		}
	}
}

如果您需要更多控制或想要在同一测试套件中测试多个配置,您还可以执行以下操作

@EnableAutoConfiguration
public static class MyTestConfiguration {
	@Bean
	public Function<String, String> uppercase() {
			return v -> v.toUpperCase();
	}
}

. . .

@Test
public void sampleTest() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
				TestChannelBinderConfiguration.getCompleteConfiguration(
						MyTestConfiguration.class))
				.run("--spring.cloud.function.definition=uppercase")) {
		InputDestination source = context.getBean(InputDestination.class);
		OutputDestination target = context.getBean(OutputDestination.class);
		source.send(new GenericMessage<byte[]>("hello".getBytes()));
		assertThat(target.receive().getPayload()).isEqualTo("HELLO".getBytes());
	}
}

对于您有多个绑定和/或多个输入和输出的情况,或者只是想明确您发送或接收到的目标的名称,InputDestinationOutputDestinationsend()receive() 方法被覆盖以允许您提供输入和输出目标的名称。

考虑以下示例

@EnableAutoConfiguration
public static class SampleFunctionConfiguration {

	@Bean
	public Function<String, String> uppercase() {
		return value -> value.toUpperCase();
	}

	@Bean
	public Function<String, String> reverse() {
		return value -> new StringBuilder(value).reverse().toString();
	}
}

和实际测试

@Test
public void testMultipleFunctions() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					SampleFunctionConfiguration.class))
							.run("--spring.cloud.function.definition=uppercase;reverse")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
		inputDestination.send(inputMessage, "uppercase-in-0");
		inputDestination.send(inputMessage, "reverse-in-0");

		Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());

		outputMessage = outputDestination.receive(0, "reverse-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
	}
}

对于您有其他映射属性(例如 destination)的情况,您应该使用这些名称。例如,考虑前一个测试的不同版本,其中我们将 uppercase 函数的输入和输出显式映射到 myInputmyOutput 绑定名称

@Test
public void testMultipleFunctions() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					SampleFunctionConfiguration.class))
							.run(
							"--spring.cloud.function.definition=uppercase;reverse",
							"--spring.cloud.stream.bindings.uppercase-in-0.destination=myInput",
							"--spring.cloud.stream.bindings.uppercase-out-0.destination=myOutput"
							)) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
		inputDestination.send(inputMessage, "myInput");
		inputDestination.send(inputMessage, "reverse-in-0");

		Message<byte[]> outputMessage = outputDestination.receive(0, "myOutput");
		assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());

		outputMessage = outputDestination.receive(0, "reverse-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
	}
}

测试绑定器和 PollableMessageSource

Spring Integration 测试绑定器还允许您在使用 PollableMessageSource 时编写测试(有关更多详细信息,请参阅 使用轮询使用者)。

但需要理解的重要一点是,轮询不是事件驱动的,并且 PollableMessageSource 是一种策略,它公开操作以生成(轮询)一条消息(单数)。您多久轮询一次,使用多少个线程,或者从哪里轮询(消息队列或文件系统)完全取决于您;换句话说,配置轮询器、线程或消息的实际来源是您的责任。幸运的是,Spring 有很多抽象可以准确地配置这一点。

我们来看一个示例

@Test
public void samplePollingTest() {
	ApplicationContext context = new SpringApplicationBuilder(SamplePolledConfiguration.class)
				.web(WebApplicationType.NONE)
				.run("--spring.jmx.enabled=false", "--spring.cloud.stream.pollable-source=myDestination");
	OutputDestination destination = context.getBean(OutputDestination.class);
	System.out.println("Message 1: " + new String(destination.receive().getPayload()));
	System.out.println("Message 2: " + new String(destination.receive().getPayload()));
	System.out.println("Message 3: " + new String(destination.receive().getPayload()));
}

@Import(TestChannelBinderConfiguration.class)
@EnableAutoConfiguration
public static class SamplePolledConfiguration {
	@Bean
	public ApplicationRunner poller(PollableMessageSource polledMessageSource, StreamBridge output, TaskExecutor taskScheduler) {
		return args -> {
			taskScheduler.execute(() -> {
				for (int i = 0; i < 3; i++) {
					try {
						if (!polledMessageSource.poll(m -> {
							String newPayload = ((String) m.getPayload()).toUpperCase();
							output.send("myOutput", newPayload);
						})) {
							Thread.sleep(2000);
						}
					}
					catch (Exception e) {
						// handle failure
					}
				}
			});
		};
	}
}

上述(非常基本的)示例将在 2 秒间隔内生成 3 条消息,将它们发送到此绑定器发送到 OutputDestinationSource 的输出目标,我们在此处检索它们(用于任何断言)。目前,它打印以下内容

Message 1: POLLED DATA
Message 2: POLLED DATA
Message 3: POLLED DATA

如您所见,数据是相同的。这是因为此绑定器定义了实际 MessageSource 的默认实现 - 使用 poll() 操作轮询消息的源。虽然对于大多数测试场景来说已经足够,但在某些情况下,您可能希望定义自己的 MessageSource。为此,只需在测试配置中配置类型为 MessageSource 的 bean,提供您自己的消息来源实现即可。

以下是示例

@Bean
public MessageSource<?> source() {
	return () -> new GenericMessage<>("My Own Data " + UUID.randomUUID());
}

呈现以下输出;

Message 1: MY OWN DATA 1C180A91-E79F-494F-ABF4-BA3F993710DA
Message 2: MY OWN DATA D8F3A477-5547-41B4-9434-E69DA7616FEE
Message 3: MY OWN DATA 20BF2E64-7FF4-4CB6-A823-4053D30B5C74
不要将此 bean 命名为 messageSource,因为它将与 Spring Boot 出于无关原因提供的同名 bean(不同类型)冲突。

有关混合测试绑定程序和常规中间件绑定程序进行测试的特别说明

基于 Spring Integration 的测试绑定程序用于在不涉及实际基于中间件的绑定程序(例如 Kafka 或 RabbitMQ 绑定程序)的情况下测试应用程序。如上文所述,测试绑定程序通过依赖于内存中的 Spring Integration 通道,帮助您快速验证应用程序行为。当测试绑定程序存在于测试类路径中时,Spring Cloud Stream 将尝试在所有测试目的中使用此绑定程序,无论何时需要绑定程序进行通信。换句话说,您不能在同一模块中混合使用测试绑定程序和常规中间件绑定程序进行测试。在使用测试绑定程序测试应用程序后,如果您想继续使用实际中间件绑定程序进行进一步的集成测试,建议将使用实际绑定程序的那些测试添加到一个单独的模块中,以便这些测试可以与实际中间件建立适当的连接,而不是依赖于测试绑定程序提供的内存中通道。

健康指示器

Spring Cloud Stream 为绑定程序提供了一个健康指示器。它在名称 binders 下注册,并且可以通过设置 management.health.binders.enabled 属性来启用或禁用它。

要启用运行状况检查,您首先需要通过包含其依赖项来启用“web”和“actuator”(请参阅绑定可视化和控制

如果应用程序未显式设置 management.health.binders.enabled,则 management.health.defaults.enabled 将匹配为 true,并且将启用绑定程序运行状况指示器。如果您想完全禁用运行状况指示器,则必须将 management.health.binders.enabled 设置为 false

您可以使用 Spring Boot actuator 运行状况端点来访问运行状况指示器 - /actuator/health。默认情况下,当您点击上述端点时,您只会收到顶级应用程序状态。为了从绑定程序特定运行状况指示器接收完整详细信息,您需要在应用程序中包含属性 management.endpoint.health.show-details,其值为 ALWAYS

运行状况指示器是特定于绑定程序的,某些绑定程序实现不一定提供运行状况指示器。

如果您想完全禁用所有开箱即用的健康指标,而提供您自己的健康指标,您可以通过将属性 management.health.binders.enabled 设置为 false 来实现,然后在您的应用程序中提供您自己的 HealthIndicator bean。在这种情况下,Spring Boot 中的健康指标基础设施仍将获取这些自定义 bean。即使您没有禁用绑定器健康指标,您仍然可以通过在开箱即用的健康检查之外提供您自己的 HealthIndicator bean 来增强健康检查。

当您在同一应用程序中有多个绑定器时,健康指标默认启用,除非应用程序通过将 management.health.binders.enabled 设置为 false 来关闭它们。在这种情况下,如果用户想禁用对绑定器子集的健康检查,那么应该在多绑定器配置的环境中将 management.health.binders.enabled 设置为 false。有关如何提供特定于环境的属性的详细信息,请参阅 连接到多个系统

如果类路径中存在多个绑定器,但并非所有绑定器都在应用程序中使用,这可能会在健康指标的上下文中导致一些问题。关于如何执行健康检查,可能存在特定于实现的详细信息。例如,如果绑定器未注册任何目标,Kafka 绑定器可能会将状态确定为 DOWN

让我们来看一个具体的情况。假设类路径中同时存在 Kafka 和 Kafka Streams 绑定器,但只在应用程序代码中使用 Kafka Streams 绑定器,即只使用 Kafka Streams 绑定器提供绑定。由于 Kafka 绑定器未被使用,并且它有特定的检查来查看是否注册了任何目标,因此绑定器健康检查将失败。顶级应用程序健康检查状态将报告为 DOWN。在这种情况下,由于您没有使用 Kafka 绑定器,您可以简单地从您的应用程序中移除 Kafka 绑定器的依赖项。

示例

有关 Spring Cloud Stream 示例,请参阅 GitHub 上的 spring-cloud-stream-samples 存储库。

在 CloudFoundry 上部署流应用程序

在 CloudFoundry 上,服务通常通过一个名为 VCAP_SERVICES 的特殊环境变量公开。

在配置您的绑定器连接时,您可以使用环境变量中的值,如 数据流 Cloud Foundry 服务器 文档中所述。

Binder 实现

以下为可用 binder 实现的列表

如前所述,Binder 抽象也是框架的扩展点之一。因此,如果您在前面的列表中找不到合适的 Binder,您可以在 Spring Cloud Stream 之上实现自己的 Binder。在 如何从头开始创建 Spring Cloud Stream Binder 帖子中,社区成员详细记录了一组实现自定义 Binder 所需的步骤,并附有示例。这些步骤也在 实现自定义 Binder 部分中突出显示。