使用应用事件

为了让应用模块之间尽可能地解耦,它们主要的交互方式应该是事件的发布和消费。这避免了发起模块了解所有可能感兴趣的各方,这是实现应用模块集成测试(参见 应用模块集成测试)的关键方面。

我们通常会发现应用组件的定义如下

  • Java

  • Kotlin

@Service
@RequiredArgsConstructor
public class OrderManagement {

  private final InventoryManagement inventory;

  @Transactional
  public void complete(Order order) {

    // State transition on the order aggregate go here

    // Invoke related functionality
    inventory.updateStockFor(order);
  }
}
@Service
class OrderManagement(val inventory: InventoryManagement) {

  @Transactional
  fun complete(order: Order) {
    inventory.updateStockFor(order)
  }
}

complete(…) 方法会在某种程度上产生功能引力,它会吸引相关的功能,从而与在其他应用模块中定义的 Spring bean 进行交互。这尤其使得该组件更难测试,因为仅仅创建 OrderManagement 的实例就需要那些依赖的 bean 的可用实例(参见 处理外向依赖)。这也意味着无论何时我们希望与业务事件的订单完成进一步集成功能,都必须修改该类。

我们可以按如下方式改变应用模块的交互

通过 Spring 的 ApplicationEventPublisher 发布应用事件
  • Java

  • Kotlin

@Service
@RequiredArgsConstructor
public class OrderManagement {

  private final ApplicationEventPublisher events;
  private final OrderInternal dependency;

  @Transactional
  public void complete(Order order) {

    // State transition on the order aggregate go here

    events.publishEvent(new OrderCompleted(order.getId()));
  }
}
@Service
class OrderManagement(val events: ApplicationEventPublisher, val dependency: OrderInternal) {

  @Transactional
  fun complete(order: Order) {
    events.publishEvent(OrderCompleted(order.id))
  }
}

注意,我们没有依赖其他应用模块的 Spring bean,而是使用 Spring 的 ApplicationEventPublisher 在主聚合上完成状态转换后发布一个领域事件。有关更面向聚合的事件发布方法,请参阅 Spring Data 的应用事件发布机制了解详情。由于事件发布默认是同步发生的,因此整体安排的事务语义与上述示例相同。这既有好处,因为我们获得了一个非常简单的最终一致性模型(订单的状态更改和库存更新要么都成功,要么都不成功),但也有坏处,因为更多触发的相关功能将扩大事务边界,并可能导致整个事务失败,即使导致错误的功能并不关键。

另一种方法是将事件消费转移到事务提交时的异步处理,并将次要功能正是如此对待

一个异步的事务性事件监听器
  • Java

  • Kotlin

@Component
class InventoryManagement {

  @Async
  @TransactionalEventListener
  void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {

  @Async
  @TransactionalEventListener
  fun on(event: OrderCompleted) { /* … */ }
}

这有效地将原始事务与监听器的执行解耦。虽然这避免了原始业务事务的扩展,但也带来了一个风险:如果监听器因任何原因失败,事件发布将丢失,除非每个监听器都实现了自己的安全网。更糟糕的是,这甚至不能完全奏效,因为系统可能在方法被调用之前就失败了。

应用模块监听器

要在事务中运行事务性事件监听器,它需要相应地用 @Transactional 进行注解。

一个运行在自身事务中的异步事务性事件监听器
  • Java

  • Kotlin

@Component
class InventoryManagement {

  @Async
  @Transactional(propagation = Propagation.REQUIRES_NEW)
  @TransactionalEventListener
  void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {

  @Async
  @Transactional(propagation = Propagation.REQUIRES_NEW)
  @TransactionalEventListener
  fun on(event: OrderCompleted) { /* … */ }
}

为了简化通过事件集成模块的默认方式的声明,Spring Modulith 提供了 @ApplicationModuleListener 作为快捷方式。

一个应用模块监听器
  • Java

  • Kotlin

@Component
class InventoryManagement {

  @ApplicationModuleListener
  void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {

  @ApplicationModuleListener
  fun on(event: OrderCompleted) { /* … */ }
}

事件发布注册表

Spring Modulith 附带了一个事件发布注册表,它与 Spring Framework 的核心事件发布机制挂钩。在事件发布时,它会发现哪些事务性事件监听器将接收到事件,并在原始业务事务中为每个监听器(深蓝色)写入一个条目到事件发布日志中。

event publication registry start
图 1. 执行前的事务性事件监听器安排

每个事务性事件监听器都被包装在一个切面中,如果监听器执行成功,该切面将该日志条目标记为已完成。如果监听器失败,日志条目保持不变,以便可以根据应用的需要部署重试机制。可以通过 spring.modulith.events.republish-outstanding-events-on-restart 属性启用事件的自动重新发布。

event publication registry end
图 2. 执行后的事务性事件监听器安排

Spring Boot 事件注册表 Starter

使用事务性事件发布日志需要向您的应用添加一组构件。为了简化这项任务,Spring Modulith 提供了围绕要使用的持久化技术而设计的 starter POM,并默认使用基于 Jackson 的 EventSerializer 实现。以下 starter 可用

持久化技术 构件 描述

JPA

spring-modulith-starter-jpa

使用 JPA 作为持久化技术。

JDBC

spring-modulith-starter-jdbc

使用 JDBC 作为持久化技术。也可用于基于 JPA 的应用,但绕过您的 JPA 提供程序进行实际的事件持久化。

MongoDB

spring-modulith-starter-mongodb

使用 MongoDB 作为持久化技术。也启用 MongoDB 事务,并需要服务器配置为副本集才能进行交互。通过将 spring.modulith.events.mongodb.transaction-management.enabled 属性设置为 false 可以禁用事务自动配置。

Neo4j

spring-modulith-starter-neo4j

在 Spring Data Neo4j 后面使用 Neo4j。

管理事件发布

事件发布可能需要在应用运行时以多种方式进行管理。未完成的发布可能需要在一定时间后重新提交给相应的监听器。另一方面,已完成的发布很可能需要从数据库中清除或移动到归档存储中。由于这类清理工作的需求因应用而异,Spring Modulith 提供了一个 API 来处理这两种类型的发布。该 API 通过您可以添加到应用的 spring-modulith-events-api 构件提供

使用 Spring Modulith Events API 构件
  • Maven

  • Gradle

<dependency>
  <groupId>org.springframework.modulith</groupId>
  <artifactId>spring-modulith-events-api</artifactId>
  <version>1.3.5</version>
</dependency>
dependencies {
  implementation 'org.springframework.modulith:spring-modulith-events-api:1.3.5'
}

此构件包含两个主要抽象,它们作为 Spring Bean 可供应用代码使用

  • CompletedEventPublications — 该接口允许访问所有已完成的事件发布,并提供一个 API,用于立即将它们全部从数据库中清除,或清除早于指定时长(例如,1 分钟)的已完成发布。

  • IncompleteEventPublications — 该接口允许访问所有未完成的事件发布,以便重新提交符合指定谓词或早于相对于原始发布日期的指定 Duration 的发布。

事件发布完成

当事务性或 @ApplicationModuleListener 执行成功完成后,事件发布会被标记为已完成。默认情况下,完成通过在 EventPublication 上设置完成日期来注册。这意味着已完成的发布将保留在事件发布注册表中,以便可以通过上面描述的 CompletedEventPublications 接口进行检查。这样做的一个后果是,您需要编写一些代码来定期清除旧的、已完成的 EventPublication。否则,它们的持久化抽象(例如关系数据库表)将无限增长,并且与存储进行交互创建和完成新的 EventPublication 可能会变慢。

Spring Modulith 1.3 引入了一个配置属性 spring.modulith.events.completion-mode 来支持另外两种完成模式。它默认为 UPDATE,由上面描述的策略支持。或者,可以将完成模式设置为 DELETE,这将更改注册表的持久化机制,改为在完成时删除 EventPublication。这意味着 CompletedEventPublications 将不再返回任何发布,但同时,您也不必再手动从持久化存储中清除已完成的事件。

第三种选项是 ARCHIVE 模式,它将条目复制到归档表、集合或节点中。对于该归档条目,将设置完成日期并移除原始条目。与 DELETE 模式相反,已完成的事件发布仍然可以通过 CompletedEventPublications 抽象进行访问。

事件发布仓库

为了实际写入事件发布日志,Spring Modulith 暴露了一个 EventPublicationRepository SPI 以及支持事务的流行持久化技术的实现,如 JPA、JDBC 和 MongoDB。通过将相应的 JAR 添加到您的 Spring Modulith 应用来选择要使用的持久化技术。我们已准备好专用的 starter 来简化该任务。

基于 JDBC 的实现可以在设置相应的配置属性 (spring.modulith.events.jdbc.schema-initialization.enabled) 为 true 时,为事件发布日志创建一个专用表。详情请查阅附录中的 Schema 概览

事件序列化器

每个日志条目都包含序列化形式的原始事件。包含在 spring-modulith-events-core 中的 EventSerializer 抽象允许插入不同的策略,用于将事件实例转换为适合数据存储的格式。Spring Modulith 通过 spring-modulith-events-jackson 构件提供了一个基于 Jackson 的 JSON 实现,该构件默认通过标准 Spring Boot 自动配置注册一个消费 ObjectMapperJacksonEventSerializer

自定义事件发布日期

默认情况下,事件发布注册表将使用 Clock.systemUTC() 返回的日期作为事件发布日期。如果您想自定义,请在应用上下文注册一个 Clock 类型的 bean

@Configuration
class MyConfiguration {

  @Bean
  Clock myCustomClock() {
    return … // Your custom Clock instance created here.
  }
}

外部化事件

应用模块之间交换的一些事件可能对外部系统感兴趣。Spring Modulith 允许将选定的事件发布到各种消息代理。要使用该支持,您需要执行以下步骤

  1. 将特定于代理的 Spring Modulith 构件添加到您的项目中。

  2. 通过使用 Spring Modulith 或 jMolecules 的 @Externalized 注解来标记要外部化的事件类型。

  3. 在注解的值中指定特定于代理的路由目标。

要了解如何使用其他方式选择要外部化的事件,或自定义它们在代理内的路由,请查阅 事件外部化基础知识

支持的基础设施

代理 构件 描述

Kafka

spring-modulith-events-kafka

使用 Spring Kafka 与代理交互。逻辑路由键将用作 Kafka 的 Topic 和 Message Key。

AMQP

spring-modulith-events-amqp

使用 Spring AMQP 与任何兼容代理交互。例如,需要显式声明对 Spring Rabbit 的依赖。逻辑路由键将用作 AMQP 路由键。

JMS

spring-modulith-events-jms

使用 Spring 的核心 JMS 支持。不支持路由键。

SQS

spring-modulith-events-aws-sqs

已弃用(详情参见 此处)。使用 Spring Cloud AWS SQS 支持。逻辑路由键将用作 SQS 消息组 ID。当设置路由键时,要求 SQS 队列配置为 FIFO 队列。

SNS

spring-modulith-events-aws-sns

已弃用(详情参见 此处)。使用 Spring Cloud AWS SNS 支持。逻辑路由键将用作 SNS 消息组 ID。当设置路由键时,要求 SNS 配置为启用基于内容的去重功能的 FIFO Topic。

Spring Messaging

spring-modulith-events-messaging

使用 Spring 的核心 MessageMessageChannel 支持。根据 Externalized 注解中的目标值,通过 bean 名称解析目标 MessageChannel。将路由信息作为名为 springModulith_routingTarget 的 Header 转发,以便下游组件以任何方式处理,通常是在 Spring Integration 的 IntegrationFlow 中。

事件外部化基础知识

事件外部化对发布的每个应用事件执行三个步骤。

  1. 确定事件是否应该被外部化——我们称之为“事件选择”。默认情况下,只有位于 Spring Boot 自动配置包内并用支持的 @Externalized 注解之一标记的事件类型才会被选择进行外部化。

  2. 准备消息(可选)——默认情况下,事件由相应的代理基础设施按原样序列化。可选的映射步骤允许开发者自定义或甚至完全替换原始事件,使用适合外部方的 Payload。对于 Kafka 和 AMQP,开发者还可以向要发布的消息添加 Header。

  3. 确定路由目标——消息代理客户端需要一个逻辑目标来发布消息。目标通常标识物理基础设施(Topic、Exchange 或 Queue,取决于代理),并且通常静态地从事件类型派生。除非在 @Externalized 注解中明确定义,Spring Modulith 使用应用本地类型名称作为目标。换句话说,在一个基础包为 com.acme.app 的 Spring Boot 应用中,事件类型 com.acme.app.sample.SampleEvent 将发布到 sample.SampleEvent

    一些代理还允许定义一个更具动态性的路由键,它用于实际目标内的不同目的。默认情况下,不使用路由键。

基于注解的事件外部化配置

要通过 @Externalized 注解定义自定义路由键,可以在各个注解中的 target/value 属性使用 $target::$key 模式。target 和 key 都可以是一个 SpEL 表达式,事件实例将被配置为根对象。

通过 SpEL 表达式定义动态路由键
  • Java

  • Kotlin

@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {

  String getLastname() { (1)
    // …
  }
}
@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {
  fun getLastname(): String { (1)
    // …
  }
}

CustomerCreated 事件通过 accessor 方法暴露客户的姓氏。然后该方法通过目标声明的 :: 分隔符后的 key 表达式 #this.getLastname() 使用。

如果键的计算变得更复杂,建议将其委托给一个接受事件作为参数的 Spring bean。

调用 Spring bean 计算路由键
  • Java

  • Kotlin

@Externalized("…::#{@beanName.someMethod(#this)}")
@Externalized("…::#{@beanName.someMethod(#this)}")

编程式事件外部化配置

spring-modulith-events-api 构件包含 EventExternalizationConfiguration,允许开发者自定义上述所有步骤。

编程式配置事件外部化
  • Java

  • Kotlin

@Configuration
class ExternalizationConfiguration {

  @Bean
  EventExternalizationConfiguration eventExternalizationConfiguration() {

    return EventExternalizationConfiguration.externalizing()                 (1)
      .select(EventExternalizationConfiguration.annotatedAsExternalized())   (2)
      .mapping(SomeEvent.class, event -> …)                                  (3)
      .headers(event -> …)                                                   (4)
      .routeKey(WithKeyProperty.class, WithKeyProperty::getKey)              (5)
      .build();
  }
}
@Configuration
class ExternalizationConfiguration {

  @Bean
  fun eventExternalizationConfiguration(): EventExternalizationConfiguration {

    EventExternalizationConfiguration.externalizing()                         (1)
      .select(EventExternalizationConfiguration.annotatedAsExternalized())    (2)
      .mapping(SomeEvent::class.java) { event -> … }                          (3)
      .headers() { event -> … }                                               (4)
      .routeKey(WithKeyProperty::class.java, WithKeyProperty::getKey)         (5)
      .build()
  }
}
1 我们首先创建一个默认的 EventExternalizationConfiguration 实例。
2 我们通过调用上一步返回的 Selector 实例上的一个或多个 select(…) 方法来定制事件选择。这一步基本上禁用了应用基础包过滤器,因为我们现在只查找注解。存在按类型、按包、按包和注解轻松选择事件的便捷方法。此外,还有一步定义选择和路由的快捷方式。
3 我们为 SomeEvent 实例定义一个映射步骤。请注意,路由仍将由原始事件实例确定,除非您额外调用 router 上的 ….routeMapped() 方法。
4 我们为要发送的消息添加自定义 Header,可以像所示的那样通用添加,也可以针对特定 Payload 类型进行添加。
5 我们最终通过定义一个方法句柄来确定路由键,以提取事件实例的一个值。另外,可以通过使用上一步调用返回的 Router 实例上的通用 route(…) 方法为单个事件生成完整的 RoutingKey

测试已发布的事件

以下部分描述了一种仅关注跟踪 Spring 应用事件的测试方法。对于使用 @ApplicationModuleListener 的模块测试的更全面的方法,请查看 Scenario API

Spring Modulith 的 @ApplicationModuleTest 使能够将 PublishedEvents 实例注入到测试方法中,以验证在被测试的业务操作过程中是否发布了特定的一组事件。

基于事件的应用模块安排集成测试
  • Java

  • Kotlin

@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  void someTestMethod(PublishedEvents events) {

    // …
    var matchingMapped = events.ofType(OrderCompleted.class)
      .matching(OrderCompleted::getOrderId, reference.getId());

    assertThat(matchingMapped).hasSize(1);
  }
}
@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  fun someTestMethod(events: PublishedEvents events) {

    // …
    val matchingMapped = events.ofType(OrderCompleted::class.java)
      .matching(OrderCompleted::getOrderId, reference.getId())

    assertThat(matchingMapped).hasSize(1)
  }
}

注意 PublishedEvents 如何公开一个 API 来选择匹配特定标准的事件。验证是通过 AssertJ 断言来完成的,该断言验证预期的元素数量。如果您本来就使用 AssertJ 进行这些断言,您也可以使用 AssertablePublishedEvents 作为测试方法参数类型,并使用通过它提供的流式断言 API。

使用 AssertablePublishedEvents 验证事件发布
  • Java

  • Kotlin

@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  void someTestMethod(AssertablePublishedEvents events) {

    // …
    assertThat(events)
      .contains(OrderCompleted.class)
      .matching(OrderCompleted::getOrderId, reference.getId());
  }
}
@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  fun someTestMethod(events: AssertablePublishedEvents) {

    // …
    assertThat(events)
      .contains(OrderCompleted::class.java)
      .matching(OrderCompleted::getOrderId, reference.getId())
  }
}

注意 assertThat(…) 表达式返回的类型如何允许直接对已发布的事件定义约束。