使用应用事件

为了尽可能地保持应用模块之间的解耦,它们之间主要的交互方式应该是事件发布和消费。这避免了源模块需要了解所有潜在的感兴趣方,这是启用应用模块集成测试的关键方面(参见 集成测试应用模块)。

通常我们会发现应用组件是这样定义的

  • Java

  • Kotlin

@Service
@RequiredArgsConstructor
public class OrderManagement {

  private final InventoryManagement inventory;

  @Transactional
  public void complete(Order order) {

    // State transition on the order aggregate go here

    // Invoke related functionality
    inventory.updateStockFor(order);
  }
}
@Service
class OrderManagement(val inventory: InventoryManagement) {

  @Transactional
  fun complete(order: Order) {
    inventory.updateStockFor(order)
  }
}

complete(…) 方法在某种意义上创建了功能上的“重力”,它吸引了相关的功能,从而与其他应用模块中定义的 Spring Bean 进行交互。这尤其使得组件更难测试,因为我们需要有那些被依赖的 Bean 的实例可用,才能创建一个 OrderManagement 的实例(参见 处理传出依赖)。这也意味着,每当我们想要将更多功能与业务事件订单完成集成时,都必须修改该类。

我们可以按如下方式更改应用模块交互

通过 Spring 的 ApplicationEventPublisher 发布应用事件
  • Java

  • Kotlin

@Service
@RequiredArgsConstructor
public class OrderManagement {

  private final ApplicationEventPublisher events;
  private final OrderInternal dependency;

  @Transactional
  public void complete(Order order) {

    // State transition on the order aggregate go here

    events.publishEvent(new OrderCompleted(order.getId()));
  }
}
@Service
class OrderManagement(val events: ApplicationEventPublisher, val dependency: OrderInternal) {

  @Transactional
  fun complete(order: Order) {
    events.publishEvent(OrderCompleted(order.id))
  }
}

请注意,我们没有依赖其他应用模块的 Spring Bean,而是在完成主聚合的状态转换后,使用 Spring 的 ApplicationEventPublisher 发布域事件。有关更侧重于聚合的事件发布方法,请参见 Spring Data 的应用事件发布机制 以获取详细信息。由于事件发布默认情况下是同步发生的,因此整体安排的事务语义与上面的示例相同。既有好处,因为我们得到了一个非常简单的一致性模型(订单的状态更改和库存更新要么都成功,要么都不成功),但也存在坏处,因为更多触发的相关功能会扩大事务边界,并可能导致整个事务失败,即使导致错误的功能并不关键。

另一种方法是在事务提交时将事件消费移动到异步处理,并将辅助功能视为就是这样。

一个异步的事务事件监听器
  • Java

  • Kotlin

@Component
class InventoryManagement {

  @Async
  @TransactionalEventListener
  void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {

  @Async
  @TransactionalEventListener
  fun on(event: OrderCompleted) { /* … */ }
}

现在这有效地将原始事务与监听器的执行解耦了。虽然这避免了原始业务事务的扩展,但也带来了一种风险:如果监听器因任何原因失败,则事件发布将丢失,除非每个监听器都实现了自身的保护措施。更糟糕的是,这甚至不能完全奏效,因为系统可能在方法被调用之前就失败了。

应用模块监听器

要在事务本身中运行事务事件监听器,则需要反过来使用 @Transactional 注解。

在事务本身中运行的异步事务事件监听器
  • Java

  • Kotlin

@Component
class InventoryManagement {

  @Async
  @Transactional(propagation = Propagation.REQUIRES_NEW)
  @TransactionalEventListener
  void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {

  @Async
  @Transactional(propagation = Propagation.REQUIRES_NEW)
  @TransactionalEventListener
  fun on(event: OrderCompleted) { /* … */ }
}

为了简化描述通过事件集成模块的默认方式,Spring Modulith 提供了 @ApplicationModuleListener 作为快捷方式。

应用模块监听器
  • Java

  • Kotlin

@Component
class InventoryManagement {

  @ApplicationModuleListener
  void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {

  @ApplicationModuleListener
  fun on(event: OrderCompleted) { /* … */ }
}

事件发布注册表

Spring Modulith 带有一个事件发布注册表,该注册表挂接到 Spring Framework 的核心事件发布机制。在事件发布时,它会找出将接收事件的事务事件监听器,并在原始业务事务的一部分中将每个监听器的条目(深蓝色)写入事件发布日志。

event publication registry start
图 1. 执行前的事务事件监听器安排

每个事务事件监听器都包装在一个方面中,如果监听器的执行成功,则该方面会将日志条目标记为已完成。如果监听器失败,则日志条目保持不变,以便根据应用程序的需求部署重试机制。可以通过 spring.modulith.republish-outstanding-events-on-restart 属性启用事件的自动重新发布。

event publication registry end
图 2. 执行后的事务事件监听器安排

Spring Boot 事件注册表启动器

使用事务事件发布日志需要将工件组合添加到您的应用程序中。为了简化此任务,Spring Modulith 提供了以 持久化技术 为中心的启动器 POM,并默认为基于 Jackson 的 EventSerializer 实现。以下启动器可用

持久化技术 工件 描述

JPA

spring-modulith-starter-jpa

使用 JPA 作为持久化技术。

JDBC

spring-modulith-starter-jdbc

使用 JDBC 作为持久化技术。也可用于基于 JPA 的应用程序,但会绕过您的 JPA 提供程序进行实际的事件持久化。

MongoDB

spring-modulith-starter-mongodb

使用 MongoDB 作为持久化技术。还启用了 MongoDB 事务,并需要服务器的副本集设置才能进行交互。可以通过将 spring.modulith.events.mongobd.transaction-management.enabled 属性设置为 false 来禁用事务自动配置。

Neo4j

spring-modulith-starter-neo4j

在 Spring Data Neo4j 后面使用 Neo4j。

管理事件发布

在应用程序的运行时,可能需要以各种方式管理事件发布。不完整的发布可能必须在给定的时间后重新提交给相应的监听器。另一方面,已完成的发布可能必须从数据库中清除或移动到归档存储中。由于此类维护的需求在不同的应用程序之间差异很大,因此 Spring Modulith 提供了一个 API 来处理这两种类型的发布。该 API 可通过 spring-modulith-events-api 工件获得,您可以将其添加到您的应用程序中

使用 Spring Modulith 事件 API 工件
  • Maven

  • Gradle

<dependency>
  <groupId>org.springframework.modulith</groupId>
  <artifactId>spring-modulith-events-api</artifactId>
  <version>1.2.5</version>
</dependency>
dependencies {
  implementation 'org.springframework.modulith:spring-modulith-events-api:1.2.5'
}

此工件包含两个主要抽象,它们作为 Spring Bean 可供应用程序代码使用

  • CompletedEventPublications——此接口允许访问所有已完成的事件发布,并提供一个 API 以立即从数据库中清除所有已完成的事件发布,或清除比给定持续时间(例如,1 分钟)更旧的已完成事件发布。

  • IncompleteEventPublications——此接口允许访问所有未完成的事件发布,以便重新提交与给定谓词匹配的事件发布或比原始发布日期早于给定 Duration 的事件发布。

事件发布存储库

为了实际写入事件发布日志,Spring Modulith 公开了 EventPublicationRepository SPI 和支持事务的流行持久化技术的实现,如 JPA、JDBC 和 MongoDB。您可以通过将相应的 JAR 添加到 Spring Modulith 应用程序中来选择要使用的持久化技术。我们已经准备了专门的 启动器 来简化此任务。

当相应的配置属性(spring.modulith.events.jdbc.schema-initialization.enabled)设置为 true 时,基于 JDBC 的实现可以为事件发布日志创建一个专用的表。有关详细信息,请参阅附录中的 架构概述

事件序列化器

每个日志条目都包含以序列化形式表示的原始事件。EventSerializer抽象包含在spring-modulith-events-core中,允许插入不同的策略,用于将事件实例转换为适合数据存储的格式。Spring Modulith 通过spring-modulith-events-jackson工件提供了一个基于 Jackson 的 JSON 实现,该工件默认情况下通过标准 Spring Boot 自动配置注册一个使用ObjectMapperJacksonEventSerializer

自定义事件发布日期

默认情况下,事件发布注册表将使用Clock.systemUTC()返回的日期作为事件发布日期。如果要自定义此日期,请在应用程序上下文中注册一个时钟类型的 Bean。

@Configuration
class MyConfiguration {

  @Bean
  Clock myCustomClock() {
    return … // Your custom Clock instance created here.
  }
}

事件外部化

应用程序模块之间交换的一些事件可能对外部系统很有趣。Spring Modulith 允许将选定的事件发布到各种消息代理。要使用此支持,您需要执行以下步骤

  1. 特定于代理的 Spring Modulith 工件添加到您的项目中。

  2. 通过使用 Spring Modulith 或 jMolecules 的@Externalized注释来选择要外部化的事件类型。

  3. 在注释的值中指定特定于代理的路由目标。

要了解如何使用其他选择要外部化的事件的方法或自定义它们在代理中的路由,请查看事件外部化的基础知识

支持的基础设施

代理 工件 描述

Kafka

spring-modulith-events-kafka

使用 Spring Kafka 与代理进行交互。逻辑路由键将用作 Kafka 的主题和消息键。

AMQP

spring-modulith-events-amqp

使用 Spring AMQP 与任何兼容的代理进行交互。例如,需要为 Spring Rabbit 明确声明依赖关系。逻辑路由键将用作 AMQP 路由键。

JMS

spring-modulith-events-jms

使用 Spring 的核心 JMS 支持。不支持路由键。

SQS

spring-modulith-events-aws-sqs

使用 Spring Cloud AWS SQS 支持。逻辑路由键将用作 SQS 消息组 ID。当设置路由键时,需要将 SQS 队列配置为 FIFO 队列。

SNS

spring-modulith-events-aws-sns

使用 Spring Cloud AWS SNS 支持。逻辑路由键将用作 SNS 消息组 ID。当设置路由键时,需要将 SNS 配置为具有基于内容的重复数据删除功能的 FIFO 主题。

事件外部化的基础知识

事件外部化在发布的每个应用程序事件上执行三个步骤。

  1. 确定是否应该外部化事件 — 我们将其称为“事件选择”。默认情况下,仅选择位于 Spring Boot 自动配置包中并用支持的@Externalized注释之一进行注释的事件类型进行外部化。

  2. 映射事件(可选) — 默认情况下,事件使用应用程序中存在的 Jackson ObjectMapper序列化为 JSON 并按原样发布。映射步骤允许开发人员自定义表示形式,甚至用适合外部方的表示形式完全替换原始事件。请注意,映射步骤先于要发布的对象的实际序列化。

  3. 确定路由目标 — 消息代理客户端需要一个逻辑目标来将消息发布到该目标。目标通常标识物理基础设施(主题、交换机或队列,具体取决于代理),并且通常从事件类型静态派生。除非在@Externalized注释中专门定义,否则 Spring Modulith 使用应用程序本地类型名称作为目标。换句话说,在基本包为com.acme.app的 Spring Boot 应用程序中,事件类型com.acme.app.sample.SampleEvent将发布到sample.SampleEvent

    一些代理还允许定义一个相当动态的路由键,该路由键用于实际目标内的不同目的。默认情况下,不使用路由键。

基于注释的事件外部化配置

要通过@Externalized注释定义自定义路由键,可以使用$target::$key模式作为每个特定注释中可用的目标/值属性。键可以是 SpEL 表达式,它将获取配置为根对象的事件实例。

通过 SpEL 表达式定义动态路由键
  • Java

  • Kotlin

@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {

  String getLastname() { (1)
    // …
  }
}
@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {
  fun getLastname(): String { (1)
    // …
  }
}

CustomerCreated事件通过访问器方法公开客户的姓氏。然后,在目标声明的::分隔符后面的键表达式中使用该方法通过#this.getLastname()表达式。

如果键计算变得更复杂,建议将其委托给一个 Spring Bean,该 Bean 将事件作为参数。

调用 Spring Bean 来计算路由键
  • Java

  • Kotlin

@Externalized("…::#{@beanName.someMethod(#this)}")
@Externalized("…::#{@beanName.someMethod(#this)}")

编程事件外部化配置

spring-modulith-events-api工件包含EventExternalizationConfiguration,允许开发人员自定义上述所有步骤。

以编程方式配置事件外部化
  • Java

  • Kotlin

@Configuration
class ExternalizationConfiguration {

  @Bean
  EventExternalizationConfiguration eventExternalizationConfiguration() {

    return EventExternalizationConfiguration.externalizing()                 (1)
      .select(EventExternalizationConfiguration.annotatedAsExternalized())   (2)
      .mapping(SomeEvent.class, event -> …)                                  (3)
      .routeKey(WithKeyProperty.class, WithKeyProperty::getKey)              (4)
      .build();
  }
}
@Configuration
class ExternalizationConfiguration {

  @Bean
  fun eventExternalizationConfiguration(): EventExternalizationConfiguration {

    EventExternalizationConfiguration.externalizing()                         (1)
      .select(EventExternalizationConfiguration.annotatedAsExternalized())    (2)
      .mapping(SomeEvent::class.java) { event -> … }                           (3)
      .routeKey(WithKeyProperty::class.java, WithKeyProperty::getKey)         (4)
      .build()
  }
}
1 我们首先创建EventExternalizationConfiguration的默认实例。
2 我们通过在先前调用返回的Selector实例上调用一个select(…)方法来自定义事件选择。此步骤从根本上禁用应用程序基本包过滤器,因为我们现在只查找注释。方便的方法可以轻松地按类型、按包、包和注释选择事件。此外,还有一个快捷方式可以在一步中定义选择和路由。
3 我们为SomeEvent实例定义一个映射步骤。请注意,路由仍将由原始事件实例确定,除非您另外在路由器上调用….routeMapped()
4 我们最终通过定义一个方法句柄来提取事件实例的值来确定路由键。或者,可以通过使用先前调用返回的Router实例上的通用route(…)方法为各个事件生成完整的RoutingKey

测试已发布的事件

以下部分描述了一种仅专注于跟踪 Spring 应用程序事件的测试方法。有关测试使用@ApplicationModuleListener的模块的更全面方法,请查看Scenario API

Spring Modulith 的@ApplicationModuleTest使能够将PublishedEvents实例注入到测试方法中,以验证在被测业务操作过程中是否已发布特定的一组事件。

应用程序模块安排的基于事件的集成测试
  • Java

  • Kotlin

@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  void someTestMethod(PublishedEvents events) {

    // …
    var matchingMapped = events.ofType(OrderCompleted.class)
      .matching(OrderCompleted::getOrderId, reference.getId());

    assertThat(matchingMapped).hasSize(1);
  }
}
@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  fun someTestMethod(events: PublishedEvents events) {

    // …
    val matchingMapped = events.ofType(OrderCompleted::class.java)
      .matching(OrderCompleted::getOrderId, reference.getId())

    assertThat(matchingMapped).hasSize(1)
  }
}

请注意,PublishedEvents如何公开一个 API 来选择匹配特定条件的事件。验证以 AssertJ 断言结束,该断言验证预期元素的数量。如果您无论如何都在使用 AssertJ 进行这些断言,您也可以使用AssertablePublishedEvents作为测试方法参数类型,并使用通过该类型提供的流畅断言 API。

使用AssertablePublishedEvents验证事件发布
  • Java

  • Kotlin

@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  void someTestMethod(AssertablePublishedEvents events) {

    // …
    assertThat(events)
      .contains(OrderCompleted.class)
      .matching(OrderCompleted::getOrderId, reference.getId());
  }
}
@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  fun someTestMethod(events: AssertablePublishedEvents) {

    // …
    assertThat(events)
      .contains(OrderCompleted::class.java)
      .matching(OrderCompleted::getOrderId, reference.getId())
  }
}

请注意,assertThat(…)表达式返回的类型允许直接对已发布的事件定义约束。