Spring 的数据集成之旅简史

Spring 的数据集成之旅始于 Spring Integration。凭借其编程模型,它提供了一致的开发体验,用于构建能够采纳 Enterprise Integration Patterns 以连接数据库、消息代理等外部系统的应用。

快进到云时代,微服务已在企业环境中变得举足轻重。Spring Boot 改变了开发者构建应用的方式。借助 Spring 的编程模型和 Spring Boot 处理的运行时职责,开发独立的、生产级 Spring 微服务变得无缝便捷。

为了将此扩展到数据集成工作负载,Spring Integration 和 Spring Boot 被整合到了一个新项目中。Spring Cloud Stream 应运而生。

使用 Spring Cloud Stream,开发者可以:

  • 独立构建、测试和部署以数据为中心的应用。

  • 应用现代微服务架构模式,包括通过消息进行组合。

  • 以事件为中心进行思考,解耦应用职责。事件可以代表时间中发生的某个事情,下游消费者应用可以对此作出反应,而无需知道事件的来源或生产者的身份。

  • 将业务逻辑移植到消息代理上(如 RabbitMQ、Apache Kafka、Amazon Kinesis)。

  • 对于常见用例,可以依赖框架的自动内容类型支持。也可以扩展支持不同的数据转换类型。

  • 以及更多功能...

快速入门

在深入了解任何细节之前,您可以通过遵循这个三步指南,在不到 5 分钟内尝试 Spring Cloud Stream。

我们将向您展示如何创建一个 Spring Cloud Stream 应用,该应用接收来自您选择的消息中间件(稍后会详细介绍)的消息,并将接收到的消息记录到控制台。我们将其命名为 `LoggingConsumer`。虽然不是很实用,但它很好地介绍了 Spring Cloud Stream 的一些主要概念和抽象,使您更容易理解本用户指南的其余部分。

这三个步骤如下:

使用 Spring Initializr 创建示例应用

开始之前,请访问 Spring Initializr。在那里,您可以生成我们的 `LoggingConsumer` 应用。具体操作如下:

  1. Dependencies 部分,开始输入 `stream`。当出现“Cloud Stream”选项时,选中它。

  2. 开始输入 'kafka' 或 'rabbit'。

  3. 选择“Kafka”或“RabbitMQ”。

    基本上,您选择的是您的应用将绑定的消息中间件。我们建议使用您已安装或更熟悉安装和运行的中间件。此外,正如您在 Initializr 屏幕上看到的,还有一些其他选项可供选择。例如,您可以选择 Gradle 作为构建工具而不是 Maven(默认)。

  4. Artifact 字段中,输入 'logging-consumer'。

    Artifact 字段的值将成为应用名称。如果您为中间件选择了 RabbitMQ,您的 Spring Initializr 现在应该如下所示:

spring initializr
  1. 点击 Generate Project 按钮。

    这样做会将生成的项目的 zip 版本下载到您的硬盘驱动器上。

  2. 将文件解压到您希望用作项目目录的文件夹中。

我们鼓励您探索 Spring Initializr 中提供的许多可能性。它可以让您创建各种不同的 Spring 应用。

将项目导入 IDE

现在您可以将项目导入到您的 IDE 中。请记住,根据不同的 IDE,您可能需要遵循特定的导入步骤。例如,根据项目的生成方式(Maven 或 Gradle),您可能需要遵循特定的导入步骤(例如,在 Eclipse 或 STS 中,您需要使用 File → Import → Maven → Existing Maven Project)。

导入后,项目不应该有任何错误。此外,`src/main/java` 目录应该包含 `com.example.loggingconsumer.LoggingConsumerApplication` 文件。

理论上,此时您可以运行应用的主类。它已经是一个有效的 Spring Boot 应用。但是,它不会执行任何操作,所以我们需要添加一些代码。

添加消息处理器,构建和运行

修改 `com.example.loggingconsumer.LoggingConsumerApplication` 类,使其如下所示:

@SpringBootApplication
public class LoggingConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(LoggingConsumerApplication.class, args);
	}

	@Bean
	public Consumer<Person> log() {
	    return person -> {
	        System.out.println("Received: " + person);
	    };
	}

	public static class Person {
		private String name;
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public String toString() {
			return this.name;
		}
	}
}

从上面的列表可以看出:

  • 我们使用函数式编程模型(参见 [Spring Cloud Function support])将一个消息处理器定义为一个 `Consumer`。

  • 我们依赖框架约定,将此类处理器绑定到由 binder 公开的输入目标绑定。

这样做还可以让您看到框架的一个核心功能:它会尝试自动将传入消息的 payload 转换为 `Person` 类型。

现在您拥有一个功能齐全的 Spring Cloud Stream 应用,它可以监听消息。在此基础上,为了简单起见,我们假设您在 第一个步骤 中选择了 RabbitMQ。如果您已安装并运行 RabbitMQ,您可以在 IDE 中运行其 `main` 方法来启动应用。

您应该会看到以下输出:

	--- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2a3a299:0/SimpleConnection@66c83fc8. . .
	. . .
	--- [ main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
	. . .
	--- [ main] c.e.l.LoggingConsumerApplication         : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)

转到 RabbitMQ 管理控制台或任何其他 RabbitMQ 客户端,向 `input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg` 发送一条消息。`anonymous.CbMIwdkJSBO1ZoPDOtHtCg` 部分表示组名,它是自动生成的,因此在您的环境中会有所不同。为了获得更可预测的名称,您可以通过设置 `spring.cloud.stream.bindings.input.group=hello`(或您喜欢的任何名称)来使用一个显式的组名。

消息的内容应该是 `Person` 类的 JSON 表示形式,如下所示:

{"name":"Sam Spade"}

然后,在您的控制台中,您应该会看到:

收到:Sam Spade

您还可以将应用构建并打包成一个 boot jar(使用 `./mvnw clean install` 命令),然后使用 `java -jar` 命令运行构建好的 JAR 文件。

现在您拥有了一个可工作的(尽管非常基础的)Spring Cloud Stream 应用。

流式数据上下文中的 Spring Expression Language (SpEL)

在本参考手册中,您会遇到许多可以使用 Spring Expression Language (SpEL) 的特性和示例。理解使用它时的一些限制非常重要。

SpEL 让您可以访问当前的消息 (Message) 以及您正在运行的应用上下文 (Application Context)。然而,理解 SpEL 能够看到的数据类型非常重要,尤其是在传入消息的上下文中。从消息代理接收的消息是以 `byte[]` 数组的形式到达的。然后,binder 将其转换为 `Message`,您可以看到消息的 payload 保持其原始形式。消息的 headers 是 `` 类型,其中值通常是另一个原始类型或原始类型的集合/数组,因此是 Object。这是因为 binder 不知道所需的输入类型,因为它无法访问用户代码(函数)。所以,实际上 binder 传递了一个带有 payload 和一些可读元数据(以消息 headers 的形式)的信封,就像邮件投递的信件一样。这意味着,虽然可以访问消息的 payload,但您只能以原始数据(即 `byte[]`)的形式访问它。开发者可能经常希望 SpEL 能以具体类型(例如 Foo, Bar 等)访问 payload 对象的字段,但您可以看到这有多么困难甚至不可能实现。这里有一个例子来演示这个问题;假设您有一个路由表达式,根据 payload 类型路由到不同的函数。这个要求意味着需要将 payload 从 `byte[]` 转换为特定类型,然后再应用 SpEL。然而,为了执行这种转换,我们需要知道要传递给转换器 (converter) 的实际类型,而这个类型来自函数的签名 (function’s signature),我们不知道是哪个签名。解决这个问题的更好方法是将类型信息作为消息 header 传递(例如,`application/json;type=foo.bar.Baz`)。您将获得一个清晰可读的 String 值,该值可以在易于读取的 SpEL 表达式中进行访问和评估。

此外,使用 payload 进行路由决策被认为是非常糟糕的做法,因为 payload 被认为是特权数据——只有最终接收者才能读取的数据。再次以邮件投递类比,您不会希望邮递员打开您的信封并阅读信件内容来决定如何投递。同样的道理也适用于此处,特别是当生成消息时包含此类信息相对容易时。这有助于强制执行与网络传输数据设计相关的某些纪律,明确哪些数据部分可以被视为公共的,哪些是特权的。